LEFT | RIGHT |
(no file at all) | |
1 // The store package is capable of storing and updating charms in a MongoDB | 1 // The store package is capable of storing and updating charms in a MongoDB |
2 // database, as well as maintaining further information about them such as | 2 // database, as well as maintaining further information about them such as |
3 // the VCS revision the charm was loaded from and the URLs for the charms. | 3 // the VCS revision the charm was loaded from and the URLs for the charms. |
4 package store | 4 package store |
5 | 5 |
6 import ( | 6 import ( |
7 "crypto/sha256" | 7 "crypto/sha256" |
8 "encoding/hex" | 8 "encoding/hex" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 "hash" | 11 "hash" |
12 "io" | 12 "io" |
13 "launchpad.net/juju/go/charm" | 13 "launchpad.net/juju/go/charm" |
14 "launchpad.net/juju/go/log" | 14 "launchpad.net/juju/go/log" |
15 "launchpad.net/mgo" | 15 "launchpad.net/mgo" |
16 "launchpad.net/mgo/bson" | 16 "launchpad.net/mgo/bson" |
17 "sort" | 17 "sort" |
| 18 "strconv" |
| 19 "sync" |
18 "time" | 20 "time" |
19 ) | 21 ) |
20 | 22 |
21 // The following MongoDB collections are currently used: | 23 // The following MongoDB collections are currently used: |
22 // | 24 // |
23 // juju.events - Log of events relating to the lifecycle of charms | 25 // juju.events - Log of events relating to the lifecycle of charms |
24 // juju.charms - Information about the stored charms | 26 // juju.charms - Information about the stored charms |
25 // juju.charmfs.* - GridFS with the charm files | 27 // juju.charmfs.* - GridFS with the charm files |
26 // juju.locks - Has unique keys with url of updating charms | 28 // juju.locks - Has unique keys with url of updating charms |
27 | 29 |
28 var ( | 30 var ( |
29 ErrUpdateConflict = errors.New("charm update in progress") | 31 ErrUpdateConflict = errors.New("charm update in progress") |
30 ErrRedundantUpdate = errors.New("charm is up-to-date") | 32 ErrRedundantUpdate = errors.New("charm is up-to-date") |
31 ErrNotFound = errors.New("entry not found") | 33 ErrNotFound = errors.New("entry not found") |
32 ) | 34 ) |
33 | 35 |
34 const ( | 36 const ( |
35 UpdateTimeout = 600e9 | 37 UpdateTimeout = 600e9 |
36 ) | 38 ) |
37 | 39 |
38 // Store holds a connection to a charm store. | 40 // Store holds a connection to a charm store. |
39 type Store struct { | 41 type Store struct { |
40 session *storeSession | 42 session *storeSession |
| 43 |
| 44 // Cache for statistics key words (two generations). |
| 45 cacheMu sync.RWMutex |
| 46 statsTokenNew map[string]int |
| 47 statsTokenOld map[string]int |
41 } | 48 } |
42 | 49 |
43 // Open creates a new session with the store. It connects to the MongoDB | 50 // Open creates a new session with the store. It connects to the MongoDB |
44 // server at the given address (as expected by the Mongo function in the | 51 // server at the given address (as expected by the Mongo function in the |
45 // launchpad.net/mgo package). | 52 // launchpad.net/mgo package). |
46 func Open(mongoAddr string) (store *Store, err error) { | 53 func Open(mongoAddr string) (store *Store, err error) { |
47 log.Printf("Store opened. Connecting to: %s", mongoAddr) | 54 log.Printf("Store opened. Connecting to: %s", mongoAddr) |
48 store = &Store{} | 55 store = &Store{} |
49 session, err := mgo.Dial(mongoAddr) | 56 session, err := mgo.Dial(mongoAddr) |
50 if err != nil { | 57 if err != nil { |
51 log.Printf("Error connecting to MongoDB: %v", err) | 58 log.Printf("Error connecting to MongoDB: %v", err) |
52 return nil, err | 59 return nil, err |
53 } | 60 } |
54 » store = &Store{&storeSession{session}} | 61 |
| 62 » store = &Store{session: &storeSession{session}} |
| 63 |
| 64 » // Ignore error. It'll always fail after created. |
| 65 » // TODO Check the error once mgo hands it to us. |
| 66 » _ = store.session.DB("juju").Run(bson.D{{"create", "stat.counters"}, {"a
utoIndexId", false}}, nil) |
| 67 |
| 68 » counters := store.session.StatCounters() |
| 69 » err = counters.EnsureIndex(mgo.Index{Key: []string{"k", "t"}, Unique: tr
ue}) |
| 70 » if err != nil { |
| 71 » » log.Printf("Error ensuring stat.counters index: %v", err) |
| 72 » » session.Close() |
| 73 » » return nil, err |
| 74 » } |
| 75 |
55 charms := store.session.Charms() | 76 charms := store.session.Charms() |
56 err = charms.EnsureIndex(mgo.Index{Key: []string{"urls", "revision"}, Un
ique: true}) | 77 err = charms.EnsureIndex(mgo.Index{Key: []string{"urls", "revision"}, Un
ique: true}) |
57 if err != nil { | 78 if err != nil { |
58 log.Printf("Error ensuring charms index: %v", err) | 79 log.Printf("Error ensuring charms index: %v", err) |
59 session.Close() | 80 session.Close() |
60 return nil, err | 81 return nil, err |
61 } | 82 } |
| 83 |
62 events := store.session.Events() | 84 events := store.session.Events() |
63 err = events.EnsureIndex(mgo.Index{Key: []string{"urls", "digest"}}) | 85 err = events.EnsureIndex(mgo.Index{Key: []string{"urls", "digest"}}) |
64 if err != nil { | 86 if err != nil { |
65 log.Printf("Error ensuring events index: %v", err) | 87 log.Printf("Error ensuring events index: %v", err) |
66 session.Close() | 88 session.Close() |
67 return nil, err | 89 return nil, err |
68 } | 90 } |
69 » // Put the socket we used on EnsureIndex back in the pool. | 91 |
| 92 » // Put the socket we used back in the pool. |
70 session.Refresh() | 93 session.Refresh() |
71 return store, nil | 94 return store, nil |
72 } | 95 } |
73 | 96 |
74 // Close terminates the connection with the store. | 97 // Close terminates the connection with the store. |
75 func (s *Store) Close() { | 98 func (s *Store) Close() { |
76 s.session.Close() | 99 s.session.Close() |
| 100 } |
| 101 |
| 102 // statsKey returns the compound statistics identifier that represents key. |
| 103 // If write is true, the identifier will be created if necessary. |
| 104 // Identifiers have a form similar to "ab:c:def:", where each section is a |
| 105 // base-32 number that represents the respective word in key. This form |
| 106 // allows efficiently indexing and searching for prefixes, while detaching |
| 107 // the key content and size from the actual words used in key. |
| 108 func (s *Store) statsKey(session *storeSession, key []string, write bool) (strin
g, error) { |
| 109 if len(key) == 0 { |
| 110 return "", fmt.Errorf("store: empty statistics key") |
| 111 } |
| 112 tokens := session.StatTokens() |
| 113 skey := make([]byte, 0, len(key)*4) |
| 114 // Retry limit is mainly to prevent infinite recursion in edge cases, |
| 115 // such as if the database is ever run in read-only mode. |
| 116 // The logic below should deteministically stop in normal scenarios. |
| 117 var err error |
| 118 for i, retry := 0, 30; i < len(key) && retry > 0; retry-- { |
| 119 id, found := s.statsTokenId(key[i]) |
| 120 if !found { |
| 121 var t struct { |
| 122 Id int "_id" |
| 123 Token string "t" |
| 124 } |
| 125 err = tokens.Find(bson.D{{"t", key[i]}}).One(&t) |
| 126 if err == mgo.NotFound { |
| 127 if !write { |
| 128 return "", ErrNotFound |
| 129 } |
| 130 t.Id, err = tokens.Count() |
| 131 if err != nil { |
| 132 continue |
| 133 } |
| 134 t.Id++ |
| 135 t.Token = key[i] |
| 136 err = tokens.Insert(&t) |
| 137 } |
| 138 if err != nil { |
| 139 continue |
| 140 } |
| 141 s.cacheStatsTokenId(t.Token, t.Id) |
| 142 id = t.Id |
| 143 } |
| 144 skey = strconv.AppendInt(skey, int64(id), 32) |
| 145 skey = append(skey, ':') |
| 146 i++ |
| 147 } |
| 148 if err != nil { |
| 149 return "", err |
| 150 } |
| 151 return string(skey), nil |
| 152 } |
| 153 |
| 154 const statsTokenCacheSize = 512 |
| 155 |
| 156 // cacheStatsTokenId adds the id for token into the cache. |
| 157 // The cache has two generations so that the least frequently used |
| 158 // tokens are evicted regularly. |
| 159 func (s *Store) cacheStatsTokenId(token string, id int) { |
| 160 s.cacheMu.Lock() |
| 161 defer s.cacheMu.Unlock() |
| 162 // Can't possibly be >, but reviews want it for defensiveness. |
| 163 if len(s.statsTokenNew) >= statsTokenCacheSize { |
| 164 s.statsTokenOld = s.statsTokenNew |
| 165 s.statsTokenNew = nil |
| 166 } |
| 167 if s.statsTokenNew == nil { |
| 168 s.statsTokenNew = make(map[string]int, statsTokenCacheSize) |
| 169 } |
| 170 s.statsTokenNew[token] = id |
| 171 } |
| 172 |
| 173 // statsTokenId returns the id for token from the cache, if found. |
| 174 func (s *Store) statsTokenId(token string) (id int, found bool) { |
| 175 s.cacheMu.RLock() |
| 176 id, found = s.statsTokenNew[token] |
| 177 if found { |
| 178 s.cacheMu.RUnlock() |
| 179 return |
| 180 } |
| 181 id, found = s.statsTokenOld[token] |
| 182 s.cacheMu.RUnlock() |
| 183 if found { |
| 184 s.cacheStatsTokenId(token, id) |
| 185 } |
| 186 return |
| 187 } |
| 188 |
| 189 var counterEpoch = time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Unix() |
| 190 |
| 191 // IncCounter increases by one the counter associated with the composed key. |
| 192 func (s *Store) IncCounter(key []string) error { |
| 193 session := s.session.Copy() |
| 194 defer session.Close() |
| 195 |
| 196 skey, err := s.statsKey(session, key, true) |
| 197 if err != nil { |
| 198 return err |
| 199 } |
| 200 |
| 201 t := time.Now().UTC() |
| 202 // Round to the start of the minute so we get one document per minute at
most. |
| 203 t = t.Add(-time.Duration(t.Second()) * time.Second) |
| 204 counters := session.StatCounters() |
| 205 _, err = counters.Upsert(bson.D{{"k", skey}, {"t", int32(t.Unix() - coun
terEpoch)}}, bson.D{{"$inc", bson.D{{"c", 1}}}}) |
| 206 return err |
| 207 } |
| 208 |
| 209 // SumCounter returns the sum of all the counters that exactly match key, |
| 210 // or that are prefixed by it if prefix is true. |
| 211 func (s *Store) SumCounter(key []string, prefix bool) (count int64, err error) { |
| 212 session := s.session.Copy() |
| 213 defer session.Close() |
| 214 |
| 215 skey, err := s.statsKey(session, key, false) |
| 216 if err == ErrNotFound { |
| 217 return 0, nil |
| 218 } |
| 219 if err != nil { |
| 220 return 0, err |
| 221 } |
| 222 |
| 223 var regex string |
| 224 if prefix { |
| 225 regex = "^" + skey |
| 226 } else { |
| 227 regex = "^" + skey + "$" |
| 228 } |
| 229 |
| 230 job := mgo.MapReduce{ |
| 231 Map: "function() { emit('count', this.c); }", |
| 232 Reduce: "function(key, values) { return Array.sum(values); }", |
| 233 } |
| 234 var result []struct{ Value int64 } |
| 235 counters := session.StatCounters() |
| 236 _, err = counters.Find(bson.D{{"k", bson.D{{"$regex", regex}}}}).MapRedu
ce(job, &result) |
| 237 if len(result) > 0 { |
| 238 return result[0].Value, err |
| 239 } |
| 240 return 0, err |
77 } | 241 } |
78 | 242 |
79 // A CharmPublisher is responsible for importing a charm dir onto the store. | 243 // A CharmPublisher is responsible for importing a charm dir onto the store. |
80 type CharmPublisher struct { | 244 type CharmPublisher struct { |
81 revision int | 245 revision int |
82 w *charmWriter | 246 w *charmWriter |
83 } | 247 } |
84 | 248 |
85 // Revision returns the revision that will be assigned to the published charm. | 249 // Revision returns the revision that will be assigned to the published charm. |
86 func (p *CharmPublisher) Revision() int { | 250 func (p *CharmPublisher) Revision() int { |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
218 } | 382 } |
219 | 383 |
220 // finish completes the charm writing process and inserts the final metadata. | 384 // finish completes the charm writing process and inserts the final metadata. |
221 // After it completes the charm will be available for consumption. | 385 // After it completes the charm will be available for consumption. |
222 func (w *charmWriter) finish() error { | 386 func (w *charmWriter) finish() error { |
223 if w.file == nil { | 387 if w.file == nil { |
224 return nil | 388 return nil |
225 } | 389 } |
226 defer w.session.Close() | 390 defer w.session.Close() |
227 id := w.file.Id() | 391 id := w.file.Id() |
| 392 size := w.file.Size() |
228 err := w.file.Close() | 393 err := w.file.Close() |
229 if err != nil { | 394 if err != nil { |
230 log.Printf("Failed to close GridFS file: %v", err) | 395 log.Printf("Failed to close GridFS file: %v", err) |
231 return err | 396 return err |
232 } | 397 } |
233 charms := w.session.Charms() | 398 charms := w.session.Charms() |
234 sha256 := hex.EncodeToString(w.sha256.Sum(nil)) | 399 sha256 := hex.EncodeToString(w.sha256.Sum(nil)) |
235 charm := charmDoc{ | 400 charm := charmDoc{ |
236 w.urls, | 401 w.urls, |
237 w.revision, | 402 w.revision, |
238 w.digest, | 403 w.digest, |
239 sha256, | 404 sha256, |
| 405 size, |
240 id.(bson.ObjectId), | 406 id.(bson.ObjectId), |
241 w.charm.Meta(), | 407 w.charm.Meta(), |
242 w.charm.Config(), | 408 w.charm.Config(), |
243 } | 409 } |
244 if err = charms.Insert(&charm); err != nil { | 410 if err = charms.Insert(&charm); err != nil { |
245 err = maybeConflict(err) | 411 err = maybeConflict(err) |
246 log.Printf("Failed to insert new revision of charm %v: %v", w.ur
ls, err) | 412 log.Printf("Failed to insert new revision of charm %v: %v", w.ur
ls, err) |
247 return err | 413 return err |
248 } | 414 } |
249 return nil | 415 return nil |
250 } | 416 } |
251 | 417 |
252 type CharmInfo struct { | 418 type CharmInfo struct { |
253 revision int | 419 revision int |
254 digest string | 420 digest string |
255 sha256 string | 421 sha256 string |
| 422 size int64 |
256 fileId bson.ObjectId | 423 fileId bson.ObjectId |
257 meta *charm.Meta | 424 meta *charm.Meta |
258 config *charm.Config | 425 config *charm.Config |
259 } | 426 } |
260 | 427 |
261 // Statically ensure CharmInfo is a charm.Charm. | 428 // Statically ensure CharmInfo is a charm.Charm. |
262 var _ charm.Charm = (*CharmInfo)(nil) | 429 var _ charm.Charm = (*CharmInfo)(nil) |
263 | 430 |
264 // Revision returns the store charm's revision. | 431 // Revision returns the store charm's revision. |
265 func (ci *CharmInfo) Revision() int { | 432 func (ci *CharmInfo) Revision() int { |
266 return ci.revision | 433 return ci.revision |
267 } | 434 } |
268 | 435 |
269 // BundleSha256 returns the sha256 checksum for the stored charm bundle. | 436 // BundleSha256 returns the sha256 checksum for the stored charm bundle. |
270 func (ci *CharmInfo) BundleSha256() string { | 437 func (ci *CharmInfo) BundleSha256() string { |
271 return ci.sha256 | 438 return ci.sha256 |
| 439 } |
| 440 |
| 441 // BundleSize returns the size for the stored charm bundle. |
| 442 func (ci *CharmInfo) BundleSize() int64 { |
| 443 return ci.size |
272 } | 444 } |
273 | 445 |
274 // Digest returns the unique identifier that represents the charm | 446 // Digest returns the unique identifier that represents the charm |
275 // data imported. This is typically set to the VCS revision digest. | 447 // data imported. This is typically set to the VCS revision digest. |
276 func (ci *CharmInfo) Digest() string { | 448 func (ci *CharmInfo) Digest() string { |
277 return ci.digest | 449 return ci.digest |
278 } | 450 } |
279 | 451 |
280 // Meta returns the charm.Meta details for the stored charm. | 452 // Meta returns the charm.Meta details for the stored charm. |
281 func (ci *CharmInfo) Meta() *charm.Meta { | 453 func (ci *CharmInfo) Meta() *charm.Meta { |
(...skipping 20 matching lines...) Expand all Loading... |
302 if rev == -1 { | 474 if rev == -1 { |
303 qdoc = bson.D{{"urls", url}} | 475 qdoc = bson.D{{"urls", url}} |
304 } else { | 476 } else { |
305 qdoc = bson.D{{"urls", url}, {"revision", rev}} | 477 qdoc = bson.D{{"urls", url}, {"revision", rev}} |
306 } | 478 } |
307 err = charms.Find(qdoc).Sort(bson.D{{"revision", -1}}).One(&cdoc) | 479 err = charms.Find(qdoc).Sort(bson.D{{"revision", -1}}).One(&cdoc) |
308 if err != nil { | 480 if err != nil { |
309 log.Printf("Failed to find charm %s: %v", url, err) | 481 log.Printf("Failed to find charm %s: %v", url, err) |
310 return nil, ErrNotFound | 482 return nil, ErrNotFound |
311 } | 483 } |
312 » return &CharmInfo{cdoc.Revision, cdoc.Digest, cdoc.Sha256, cdoc.FileId,
cdoc.Meta, cdoc.Config}, nil | 484 » info = &CharmInfo{ |
| 485 » » cdoc.Revision, |
| 486 » » cdoc.Digest, |
| 487 » » cdoc.Sha256, |
| 488 » » cdoc.Size, |
| 489 » » cdoc.FileId, |
| 490 » » cdoc.Meta, |
| 491 » » cdoc.Config, |
| 492 » } |
| 493 » return info, nil |
313 } | 494 } |
314 | 495 |
315 // OpenCharm opens for reading via rc the charm currently available at url. | 496 // OpenCharm opens for reading via rc the charm currently available at url. |
316 // rc must be closed after dealing with it or resources will leak. | 497 // rc must be closed after dealing with it or resources will leak. |
317 func (s *Store) OpenCharm(url *charm.URL) (info *CharmInfo, rc io.ReadCloser, er
r error) { | 498 func (s *Store) OpenCharm(url *charm.URL) (info *CharmInfo, rc io.ReadCloser, er
r error) { |
318 log.Debugf("Opening charm %s", url) | 499 log.Debugf("Opening charm %s", url) |
319 info, err = s.CharmInfo(url) | 500 info, err = s.CharmInfo(url) |
320 if err != nil { | 501 if err != nil { |
321 return nil, nil, err | 502 return nil, nil, err |
322 } | 503 } |
(...skipping 24 matching lines...) Expand all Loading... |
347 r.session.Close() | 528 r.session.Close() |
348 return err | 529 return err |
349 } | 530 } |
350 | 531 |
351 // charmDoc represents the document stored in MongoDB for a charm. | 532 // charmDoc represents the document stored in MongoDB for a charm. |
352 type charmDoc struct { | 533 type charmDoc struct { |
353 URLs []*charm.URL | 534 URLs []*charm.URL |
354 Revision int | 535 Revision int |
355 Digest string | 536 Digest string |
356 Sha256 string | 537 Sha256 string |
| 538 Size int64 |
357 FileId bson.ObjectId | 539 FileId bson.ObjectId |
358 Meta *charm.Meta | 540 Meta *charm.Meta |
359 Config *charm.Config | 541 Config *charm.Config |
360 } | 542 } |
361 | 543 |
362 // LockUpdates acquires a server-side lock for updating a single charm | 544 // LockUpdates acquires a server-side lock for updating a single charm |
363 // that is supposed to be made available in all of the provided urls. | 545 // that is supposed to be made available in all of the provided urls. |
364 // If the lock can't be acquired in any of the urls, an error will be | 546 // If the lock can't be acquired in any of the urls, an error will be |
365 // immediately returned. | 547 // immediately returned. |
366 // In the usual case, any locking done is undone when an error happens, | 548 // In the usual case, any locking done is undone when an error happens, |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
474 // Events returns the mongo collection where charm events are stored. | 656 // Events returns the mongo collection where charm events are stored. |
475 func (s *storeSession) Events() *mgo.Collection { | 657 func (s *storeSession) Events() *mgo.Collection { |
476 return s.DB("juju").C("events") | 658 return s.DB("juju").C("events") |
477 } | 659 } |
478 | 660 |
479 // Locks returns the mongo collection where charm locks are stored. | 661 // Locks returns the mongo collection where charm locks are stored. |
480 func (s *storeSession) Locks() *mgo.Collection { | 662 func (s *storeSession) Locks() *mgo.Collection { |
481 return s.DB("juju").C("locks") | 663 return s.DB("juju").C("locks") |
482 } | 664 } |
483 | 665 |
| 666 // StatTokens returns the mongo collection for storing key tokens |
| 667 // for statistics collection. |
| 668 func (s *storeSession) StatTokens() *mgo.Collection { |
| 669 return s.DB("juju").C("stat.tokens") |
| 670 } |
| 671 |
| 672 // StatCounters returns the mongo collection for counter values. |
| 673 func (s *storeSession) StatCounters() *mgo.Collection { |
| 674 return s.DB("juju").C("stat.counters") |
| 675 } |
| 676 |
484 type CharmEventKind int | 677 type CharmEventKind int |
485 | 678 |
486 const ( | 679 const ( |
487 EventPublished CharmEventKind = iota + 1 | 680 EventPublished CharmEventKind = iota + 1 |
488 EventPublishError | 681 EventPublishError |
489 | 682 |
490 EventKindCount | 683 EventKindCount |
491 ) | 684 ) |
492 | 685 |
493 func (k CharmEventKind) String() string { | 686 func (k CharmEventKind) String() string { |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
560 func mustLackRevision(context string, urls ...*charm.URL) error { | 753 func mustLackRevision(context string, urls ...*charm.URL) error { |
561 for _, url := range urls { | 754 for _, url := range urls { |
562 if url.Revision != -1 { | 755 if url.Revision != -1 { |
563 err := fmt.Errorf("%s: got charm URL with revision: %s",
context, url) | 756 err := fmt.Errorf("%s: got charm URL with revision: %s",
context, url) |
564 log.Printf("%v", err) | 757 log.Printf("%v", err) |
565 return err | 758 return err |
566 } | 759 } |
567 } | 760 } |
568 return nil | 761 return nil |
569 } | 762 } |
LEFT | RIGHT |