Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(97)

Delta Between Two Patch Sets: store/store.go

Issue 5901058: cmd/juju: working bootstrap and destroy commands
Left Patch Set: cmd/juju: working bootstrap and destroy commands Created 13 years ago
Right Patch Set: cmd/juju: working bootstrap and destroy commands Created 12 years, 11 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « store/server_test.go ('k') | store/store_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // The store package is capable of storing and updating charms in a MongoDB 1 // The store package is capable of storing and updating charms in a MongoDB
2 // database, as well as maintaining further information about them such as 2 // database, as well as maintaining further information about them such as
3 // the VCS revision the charm was loaded from and the URLs for the charms. 3 // the VCS revision the charm was loaded from and the URLs for the charms.
4 package store 4 package store
5 5
6 import ( 6 import (
7 "crypto/sha256" 7 "crypto/sha256"
8 "encoding/hex" 8 "encoding/hex"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "hash" 11 "hash"
12 "io" 12 "io"
13 "launchpad.net/juju/go/charm" 13 "launchpad.net/juju/go/charm"
14 "launchpad.net/juju/go/log" 14 "launchpad.net/juju/go/log"
15 "launchpad.net/mgo" 15 "launchpad.net/mgo"
16 "launchpad.net/mgo/bson" 16 "launchpad.net/mgo/bson"
17 "sort" 17 "sort"
18 "strconv"
19 "sync"
18 "time" 20 "time"
19 ) 21 )
20 22
21 // The following MongoDB collections are currently used: 23 // The following MongoDB collections are currently used:
22 // 24 //
23 // juju.events - Log of events relating to the lifecycle of charms 25 // juju.events - Log of events relating to the lifecycle of charms
24 // juju.charms - Information about the stored charms 26 // juju.charms - Information about the stored charms
25 // juju.charmfs.* - GridFS with the charm files 27 // juju.charmfs.* - GridFS with the charm files
26 // juju.locks - Has unique keys with url of updating charms 28 // juju.locks - Has unique keys with url of updating charms
27 29
28 var ( 30 var (
29 ErrUpdateConflict = errors.New("charm update in progress") 31 ErrUpdateConflict = errors.New("charm update in progress")
30 ErrRedundantUpdate = errors.New("charm is up-to-date") 32 ErrRedundantUpdate = errors.New("charm is up-to-date")
31 ErrNotFound = errors.New("entry not found") 33 ErrNotFound = errors.New("entry not found")
32 ) 34 )
33 35
34 const ( 36 const (
35 UpdateTimeout = 600e9 37 UpdateTimeout = 600e9
36 ) 38 )
37 39
38 // Store holds a connection to a charm store. 40 // Store holds a connection to a charm store.
39 type Store struct { 41 type Store struct {
40 session *storeSession 42 session *storeSession
43
44 // Cache for statistics key words (two generations).
45 cacheMu sync.RWMutex
46 statsTokenNew map[string]int
47 statsTokenOld map[string]int
41 } 48 }
42 49
43 // Open creates a new session with the store. It connects to the MongoDB 50 // Open creates a new session with the store. It connects to the MongoDB
44 // server at the given address (as expected by the Mongo function in the 51 // server at the given address (as expected by the Mongo function in the
45 // launchpad.net/mgo package). 52 // launchpad.net/mgo package).
46 func Open(mongoAddr string) (store *Store, err error) { 53 func Open(mongoAddr string) (store *Store, err error) {
47 log.Printf("Store opened. Connecting to: %s", mongoAddr) 54 log.Printf("Store opened. Connecting to: %s", mongoAddr)
48 store = &Store{} 55 store = &Store{}
49 session, err := mgo.Dial(mongoAddr) 56 session, err := mgo.Dial(mongoAddr)
50 if err != nil { 57 if err != nil {
51 log.Printf("Error connecting to MongoDB: %v", err) 58 log.Printf("Error connecting to MongoDB: %v", err)
52 return nil, err 59 return nil, err
53 } 60 }
54 » store = &Store{&storeSession{session}} 61
62 » store = &Store{session: &storeSession{session}}
63
64 » // Ignore error. It'll always fail after created.
65 » // TODO Check the error once mgo hands it to us.
66 » _ = store.session.DB("juju").Run(bson.D{{"create", "stat.counters"}, {"a utoIndexId", false}}, nil)
67
68 » counters := store.session.StatCounters()
69 » err = counters.EnsureIndex(mgo.Index{Key: []string{"k", "t"}, Unique: tr ue})
70 » if err != nil {
71 » » log.Printf("Error ensuring stat.counters index: %v", err)
72 » » session.Close()
73 » » return nil, err
74 » }
75
55 charms := store.session.Charms() 76 charms := store.session.Charms()
56 err = charms.EnsureIndex(mgo.Index{Key: []string{"urls", "revision"}, Un ique: true}) 77 err = charms.EnsureIndex(mgo.Index{Key: []string{"urls", "revision"}, Un ique: true})
57 if err != nil { 78 if err != nil {
58 log.Printf("Error ensuring charms index: %v", err) 79 log.Printf("Error ensuring charms index: %v", err)
59 session.Close() 80 session.Close()
60 return nil, err 81 return nil, err
61 } 82 }
83
62 events := store.session.Events() 84 events := store.session.Events()
63 err = events.EnsureIndex(mgo.Index{Key: []string{"urls", "digest"}}) 85 err = events.EnsureIndex(mgo.Index{Key: []string{"urls", "digest"}})
64 if err != nil { 86 if err != nil {
65 log.Printf("Error ensuring events index: %v", err) 87 log.Printf("Error ensuring events index: %v", err)
66 session.Close() 88 session.Close()
67 return nil, err 89 return nil, err
68 } 90 }
69 » // Put the socket we used on EnsureIndex back in the pool. 91
92 » // Put the socket we used back in the pool.
70 session.Refresh() 93 session.Refresh()
71 return store, nil 94 return store, nil
72 } 95 }
73 96
74 // Close terminates the connection with the store. 97 // Close terminates the connection with the store.
75 func (s *Store) Close() { 98 func (s *Store) Close() {
76 s.session.Close() 99 s.session.Close()
100 }
101
102 // statsKey returns the compound statistics identifier that represents key.
103 // If write is true, the identifier will be created if necessary.
104 // Identifiers have a form similar to "ab:c:def:", where each section is a
105 // base-32 number that represents the respective word in key. This form
106 // allows efficiently indexing and searching for prefixes, while detaching
107 // the key content and size from the actual words used in key.
108 func (s *Store) statsKey(session *storeSession, key []string, write bool) (strin g, error) {
109 if len(key) == 0 {
110 return "", fmt.Errorf("store: empty statistics key")
111 }
112 tokens := session.StatTokens()
113 skey := make([]byte, 0, len(key)*4)
114 // Retry limit is mainly to prevent infinite recursion in edge cases,
115 // such as if the database is ever run in read-only mode.
116 // The logic below should deteministically stop in normal scenarios.
117 var err error
118 for i, retry := 0, 30; i < len(key) && retry > 0; retry-- {
119 id, found := s.statsTokenId(key[i])
120 if !found {
121 var t struct {
122 Id int "_id"
123 Token string "t"
124 }
125 err = tokens.Find(bson.D{{"t", key[i]}}).One(&t)
126 if err == mgo.NotFound {
127 if !write {
128 return "", ErrNotFound
129 }
130 t.Id, err = tokens.Count()
131 if err != nil {
132 continue
133 }
134 t.Id++
135 t.Token = key[i]
136 err = tokens.Insert(&t)
137 }
138 if err != nil {
139 continue
140 }
141 s.cacheStatsTokenId(t.Token, t.Id)
142 id = t.Id
143 }
144 skey = strconv.AppendInt(skey, int64(id), 32)
145 skey = append(skey, ':')
146 i++
147 }
148 if err != nil {
149 return "", err
150 }
151 return string(skey), nil
152 }
153
154 const statsTokenCacheSize = 512
155
156 // cacheStatsTokenId adds the id for token into the cache.
157 // The cache has two generations so that the least frequently used
158 // tokens are evicted regularly.
159 func (s *Store) cacheStatsTokenId(token string, id int) {
160 s.cacheMu.Lock()
161 defer s.cacheMu.Unlock()
162 // Can't possibly be >, but reviews want it for defensiveness.
163 if len(s.statsTokenNew) >= statsTokenCacheSize {
164 s.statsTokenOld = s.statsTokenNew
165 s.statsTokenNew = nil
166 }
167 if s.statsTokenNew == nil {
168 s.statsTokenNew = make(map[string]int, statsTokenCacheSize)
169 }
170 s.statsTokenNew[token] = id
171 }
172
173 // statsTokenId returns the id for token from the cache, if found.
174 func (s *Store) statsTokenId(token string) (id int, found bool) {
175 s.cacheMu.RLock()
176 id, found = s.statsTokenNew[token]
177 if found {
178 s.cacheMu.RUnlock()
179 return
180 }
181 id, found = s.statsTokenOld[token]
182 s.cacheMu.RUnlock()
183 if found {
184 s.cacheStatsTokenId(token, id)
185 }
186 return
187 }
188
189 var counterEpoch = time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
190
191 // IncCounter increases by one the counter associated with the composed key.
192 func (s *Store) IncCounter(key []string) error {
193 session := s.session.Copy()
194 defer session.Close()
195
196 skey, err := s.statsKey(session, key, true)
197 if err != nil {
198 return err
199 }
200
201 t := time.Now().UTC()
202 // Round to the start of the minute so we get one document per minute at most.
203 t = t.Add(-time.Duration(t.Second()) * time.Second)
204 counters := session.StatCounters()
205 _, err = counters.Upsert(bson.D{{"k", skey}, {"t", int32(t.Unix() - coun terEpoch)}}, bson.D{{"$inc", bson.D{{"c", 1}}}})
206 return err
207 }
208
209 // SumCounter returns the sum of all the counters that exactly match key,
210 // or that are prefixed by it if prefix is true.
211 func (s *Store) SumCounter(key []string, prefix bool) (count int64, err error) {
212 session := s.session.Copy()
213 defer session.Close()
214
215 skey, err := s.statsKey(session, key, false)
216 if err == ErrNotFound {
217 return 0, nil
218 }
219 if err != nil {
220 return 0, err
221 }
222
223 var regex string
224 if prefix {
225 regex = "^" + skey
226 } else {
227 regex = "^" + skey + "$"
228 }
229
230 job := mgo.MapReduce{
231 Map: "function() { emit('count', this.c); }",
232 Reduce: "function(key, values) { return Array.sum(values); }",
233 }
234 var result []struct{ Value int64 }
235 counters := session.StatCounters()
236 _, err = counters.Find(bson.D{{"k", bson.D{{"$regex", regex}}}}).MapRedu ce(job, &result)
237 if len(result) > 0 {
238 return result[0].Value, err
239 }
240 return 0, err
77 } 241 }
78 242
79 // A CharmPublisher is responsible for importing a charm dir onto the store. 243 // A CharmPublisher is responsible for importing a charm dir onto the store.
80 type CharmPublisher struct { 244 type CharmPublisher struct {
81 revision int 245 revision int
82 w *charmWriter 246 w *charmWriter
83 } 247 }
84 248
85 // Revision returns the revision that will be assigned to the published charm. 249 // Revision returns the revision that will be assigned to the published charm.
86 func (p *CharmPublisher) Revision() int { 250 func (p *CharmPublisher) Revision() int {
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
218 } 382 }
219 383
220 // finish completes the charm writing process and inserts the final metadata. 384 // finish completes the charm writing process and inserts the final metadata.
221 // After it completes the charm will be available for consumption. 385 // After it completes the charm will be available for consumption.
222 func (w *charmWriter) finish() error { 386 func (w *charmWriter) finish() error {
223 if w.file == nil { 387 if w.file == nil {
224 return nil 388 return nil
225 } 389 }
226 defer w.session.Close() 390 defer w.session.Close()
227 id := w.file.Id() 391 id := w.file.Id()
392 size := w.file.Size()
228 err := w.file.Close() 393 err := w.file.Close()
229 if err != nil { 394 if err != nil {
230 log.Printf("Failed to close GridFS file: %v", err) 395 log.Printf("Failed to close GridFS file: %v", err)
231 return err 396 return err
232 } 397 }
233 charms := w.session.Charms() 398 charms := w.session.Charms()
234 sha256 := hex.EncodeToString(w.sha256.Sum(nil)) 399 sha256 := hex.EncodeToString(w.sha256.Sum(nil))
235 charm := charmDoc{ 400 charm := charmDoc{
236 w.urls, 401 w.urls,
237 w.revision, 402 w.revision,
238 w.digest, 403 w.digest,
239 sha256, 404 sha256,
405 size,
240 id.(bson.ObjectId), 406 id.(bson.ObjectId),
241 w.charm.Meta(), 407 w.charm.Meta(),
242 w.charm.Config(), 408 w.charm.Config(),
243 } 409 }
244 if err = charms.Insert(&charm); err != nil { 410 if err = charms.Insert(&charm); err != nil {
245 err = maybeConflict(err) 411 err = maybeConflict(err)
246 log.Printf("Failed to insert new revision of charm %v: %v", w.ur ls, err) 412 log.Printf("Failed to insert new revision of charm %v: %v", w.ur ls, err)
247 return err 413 return err
248 } 414 }
249 return nil 415 return nil
250 } 416 }
251 417
252 type CharmInfo struct { 418 type CharmInfo struct {
253 revision int 419 revision int
254 digest string 420 digest string
255 sha256 string 421 sha256 string
422 size int64
256 fileId bson.ObjectId 423 fileId bson.ObjectId
257 meta *charm.Meta 424 meta *charm.Meta
258 config *charm.Config 425 config *charm.Config
259 } 426 }
260 427
261 // Statically ensure CharmInfo is a charm.Charm. 428 // Statically ensure CharmInfo is a charm.Charm.
262 var _ charm.Charm = (*CharmInfo)(nil) 429 var _ charm.Charm = (*CharmInfo)(nil)
263 430
264 // Revision returns the store charm's revision. 431 // Revision returns the store charm's revision.
265 func (ci *CharmInfo) Revision() int { 432 func (ci *CharmInfo) Revision() int {
266 return ci.revision 433 return ci.revision
267 } 434 }
268 435
269 // BundleSha256 returns the sha256 checksum for the stored charm bundle. 436 // BundleSha256 returns the sha256 checksum for the stored charm bundle.
270 func (ci *CharmInfo) BundleSha256() string { 437 func (ci *CharmInfo) BundleSha256() string {
271 return ci.sha256 438 return ci.sha256
439 }
440
441 // BundleSize returns the size for the stored charm bundle.
442 func (ci *CharmInfo) BundleSize() int64 {
443 return ci.size
272 } 444 }
273 445
274 // Digest returns the unique identifier that represents the charm 446 // Digest returns the unique identifier that represents the charm
275 // data imported. This is typically set to the VCS revision digest. 447 // data imported. This is typically set to the VCS revision digest.
276 func (ci *CharmInfo) Digest() string { 448 func (ci *CharmInfo) Digest() string {
277 return ci.digest 449 return ci.digest
278 } 450 }
279 451
280 // Meta returns the charm.Meta details for the stored charm. 452 // Meta returns the charm.Meta details for the stored charm.
281 func (ci *CharmInfo) Meta() *charm.Meta { 453 func (ci *CharmInfo) Meta() *charm.Meta {
(...skipping 20 matching lines...) Expand all
302 if rev == -1 { 474 if rev == -1 {
303 qdoc = bson.D{{"urls", url}} 475 qdoc = bson.D{{"urls", url}}
304 } else { 476 } else {
305 qdoc = bson.D{{"urls", url}, {"revision", rev}} 477 qdoc = bson.D{{"urls", url}, {"revision", rev}}
306 } 478 }
307 err = charms.Find(qdoc).Sort(bson.D{{"revision", -1}}).One(&cdoc) 479 err = charms.Find(qdoc).Sort(bson.D{{"revision", -1}}).One(&cdoc)
308 if err != nil { 480 if err != nil {
309 log.Printf("Failed to find charm %s: %v", url, err) 481 log.Printf("Failed to find charm %s: %v", url, err)
310 return nil, ErrNotFound 482 return nil, ErrNotFound
311 } 483 }
312 » return &CharmInfo{cdoc.Revision, cdoc.Digest, cdoc.Sha256, cdoc.FileId, cdoc.Meta, cdoc.Config}, nil 484 » info = &CharmInfo{
485 » » cdoc.Revision,
486 » » cdoc.Digest,
487 » » cdoc.Sha256,
488 » » cdoc.Size,
489 » » cdoc.FileId,
490 » » cdoc.Meta,
491 » » cdoc.Config,
492 » }
493 » return info, nil
313 } 494 }
314 495
315 // OpenCharm opens for reading via rc the charm currently available at url. 496 // OpenCharm opens for reading via rc the charm currently available at url.
316 // rc must be closed after dealing with it or resources will leak. 497 // rc must be closed after dealing with it or resources will leak.
317 func (s *Store) OpenCharm(url *charm.URL) (info *CharmInfo, rc io.ReadCloser, er r error) { 498 func (s *Store) OpenCharm(url *charm.URL) (info *CharmInfo, rc io.ReadCloser, er r error) {
318 log.Debugf("Opening charm %s", url) 499 log.Debugf("Opening charm %s", url)
319 info, err = s.CharmInfo(url) 500 info, err = s.CharmInfo(url)
320 if err != nil { 501 if err != nil {
321 return nil, nil, err 502 return nil, nil, err
322 } 503 }
(...skipping 24 matching lines...) Expand all
347 r.session.Close() 528 r.session.Close()
348 return err 529 return err
349 } 530 }
350 531
351 // charmDoc represents the document stored in MongoDB for a charm. 532 // charmDoc represents the document stored in MongoDB for a charm.
352 type charmDoc struct { 533 type charmDoc struct {
353 URLs []*charm.URL 534 URLs []*charm.URL
354 Revision int 535 Revision int
355 Digest string 536 Digest string
356 Sha256 string 537 Sha256 string
538 Size int64
357 FileId bson.ObjectId 539 FileId bson.ObjectId
358 Meta *charm.Meta 540 Meta *charm.Meta
359 Config *charm.Config 541 Config *charm.Config
360 } 542 }
361 543
362 // LockUpdates acquires a server-side lock for updating a single charm 544 // LockUpdates acquires a server-side lock for updating a single charm
363 // that is supposed to be made available in all of the provided urls. 545 // that is supposed to be made available in all of the provided urls.
364 // If the lock can't be acquired in any of the urls, an error will be 546 // If the lock can't be acquired in any of the urls, an error will be
365 // immediately returned. 547 // immediately returned.
366 // In the usual case, any locking done is undone when an error happens, 548 // In the usual case, any locking done is undone when an error happens,
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 // Events returns the mongo collection where charm events are stored. 656 // Events returns the mongo collection where charm events are stored.
475 func (s *storeSession) Events() *mgo.Collection { 657 func (s *storeSession) Events() *mgo.Collection {
476 return s.DB("juju").C("events") 658 return s.DB("juju").C("events")
477 } 659 }
478 660
479 // Locks returns the mongo collection where charm locks are stored. 661 // Locks returns the mongo collection where charm locks are stored.
480 func (s *storeSession) Locks() *mgo.Collection { 662 func (s *storeSession) Locks() *mgo.Collection {
481 return s.DB("juju").C("locks") 663 return s.DB("juju").C("locks")
482 } 664 }
483 665
666 // StatTokens returns the mongo collection for storing key tokens
667 // for statistics collection.
668 func (s *storeSession) StatTokens() *mgo.Collection {
669 return s.DB("juju").C("stat.tokens")
670 }
671
672 // StatCounters returns the mongo collection for counter values.
673 func (s *storeSession) StatCounters() *mgo.Collection {
674 return s.DB("juju").C("stat.counters")
675 }
676
484 type CharmEventKind int 677 type CharmEventKind int
485 678
486 const ( 679 const (
487 EventPublished CharmEventKind = iota + 1 680 EventPublished CharmEventKind = iota + 1
488 EventPublishError 681 EventPublishError
489 682
490 EventKindCount 683 EventKindCount
491 ) 684 )
492 685
493 func (k CharmEventKind) String() string { 686 func (k CharmEventKind) String() string {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 func mustLackRevision(context string, urls ...*charm.URL) error { 753 func mustLackRevision(context string, urls ...*charm.URL) error {
561 for _, url := range urls { 754 for _, url := range urls {
562 if url.Revision != -1 { 755 if url.Revision != -1 {
563 err := fmt.Errorf("%s: got charm URL with revision: %s", context, url) 756 err := fmt.Errorf("%s: got charm URL with revision: %s", context, url)
564 log.Printf("%v", err) 757 log.Printf("%v", err)
565 return err 758 return err
566 } 759 }
567 } 760 }
568 return nil 761 return nil
569 } 762 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b