Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(571)

Side by Side Diff: state/state.go

Issue 6678046: state: integrate service relations with life
Patch Set: state: integrate service relations with life Created 12 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « state/service_test.go ('k') | worker/uniter/context_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // The state package enables reading, observing, and changing 1 // The state package enables reading, observing, and changing
2 // the state stored in MongoDB of a whole environment 2 // the state stored in MongoDB of a whole environment
3 // managed by juju. 3 // managed by juju.
4 package state 4 package state
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "labix.org/v2/mgo" 8 "labix.org/v2/mgo"
9 "labix.org/v2/mgo/bson" 9 "labix.org/v2/mgo/bson"
10 "labix.org/v2/mgo/txn" 10 "labix.org/v2/mgo/txn"
11 "launchpad.net/juju-core/charm" 11 "launchpad.net/juju-core/charm"
12 "launchpad.net/juju-core/environs/config" 12 "launchpad.net/juju-core/environs/config"
13 "launchpad.net/juju-core/log"
13 "launchpad.net/juju-core/state/presence" 14 "launchpad.net/juju-core/state/presence"
14 "launchpad.net/juju-core/state/watcher" 15 "launchpad.net/juju-core/state/watcher"
15 "launchpad.net/juju-core/trivial" 16 "launchpad.net/juju-core/trivial"
16 "launchpad.net/juju-core/version" 17 "launchpad.net/juju-core/version"
17 "net/url" 18 "net/url"
18 "regexp" 19 "regexp"
19 ) 20 )
20 21
21 // TODO(niemeyer): This must not be exported. 22 // TODO(niemeyer): This must not be exported.
22 type D []bson.DocElem 23 type D []bson.DocElem
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 type State struct { 92 type State struct {
92 db *mgo.Database 93 db *mgo.Database
93 charms *mgo.Collection 94 charms *mgo.Collection
94 machines *mgo.Collection 95 machines *mgo.Collection
95 relations *mgo.Collection 96 relations *mgo.Collection
96 relationScopes *mgo.Collection 97 relationScopes *mgo.Collection
97 services *mgo.Collection 98 services *mgo.Collection
98 settings *mgo.Collection 99 settings *mgo.Collection
99 units *mgo.Collection 100 units *mgo.Collection
100 presence *mgo.Collection 101 presence *mgo.Collection
102 cleanups *mgo.Collection
101 runner *txn.Runner 103 runner *txn.Runner
102 watcher *watcher.Watcher 104 watcher *watcher.Watcher
103 pwatcher *presence.Watcher 105 pwatcher *presence.Watcher
104 fwd *sshForwarder 106 fwd *sshForwarder
105 } 107 }
106 108
107 func (s *State) EnvironConfig() (*config.Config, error) { 109 func (s *State) EnvironConfig() (*config.Config, error) {
108 settings, err := readSettings(s, "e") 110 settings, err := readSettings(s, "e")
109 if err != nil { 111 if err != nil {
110 return nil, err 112 return nil, err
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
300 if err := s.runner.Run(ops, "", nil); err != nil { 302 if err := s.runner.Run(ops, "", nil); err != nil {
301 return nil, fmt.Errorf("cannot add service %q: %v", name, onAbor t(err, fmt.Errorf("duplicate service name"))) 303 return nil, fmt.Errorf("cannot add service %q: %v", name, onAbor t(err, fmt.Errorf("duplicate service name")))
302 } 304 }
303 // Refresh to pick the txn-revno. 305 // Refresh to pick the txn-revno.
304 if err = svc.Refresh(); err != nil { 306 if err = svc.Refresh(); err != nil {
305 return nil, err 307 return nil, err
306 } 308 }
307 return svc, nil 309 return svc, nil
308 } 310 }
309 311
310 // RemoveService removes a service from the state. It will also remove all 312 // RemoveService removes a service from the state.
311 // its units and break any of its existing relations.
312 func (s *State) RemoveService(svc *Service) (err error) { 313 func (s *State) RemoveService(svc *Service) (err error) {
313 // TODO Do lifecycle properly.
314 // Removing relations and units here is wrong. They need to monitor
315 // their own parent and set themselves to dying.
316 defer trivial.ErrorContextf(&err, "cannot remove service %q", svc) 314 defer trivial.ErrorContextf(&err, "cannot remove service %q", svc)
317
318 if svc.doc.Life != Dead { 315 if svc.doc.Life != Dead {
319 return fmt.Errorf("service is not dead") 316 return fmt.Errorf("service is not dead")
320 } 317 }
321 rels, err := svc.Relations()
322 if err != nil {
323 return err
324 }
325 for _, rel := range rels {
326 err = rel.EnsureDead()
327 if err != nil {
328 return err
329 }
330 err = s.RemoveRelation(rel)
331 if err != nil {
332 return err
333 }
334 }
335 units, err := svc.AllUnits()
336 if err != nil {
337 return err
338 }
339 for _, unit := range units {
340 err = unit.EnsureDead()
341 if err != nil {
342 return err
343 }
344 if err = svc.RemoveUnit(unit); err != nil {
345 return err
346 }
347 }
348 ops := []txn.Op{{ 318 ops := []txn.Op{{
349 C: s.services.Name, 319 C: s.services.Name,
350 Id: svc.doc.Name, 320 Id: svc.doc.Name,
351 Assert: D{{"life", Dead}}, 321 Assert: D{{"life", Dead}},
352 Remove: true, 322 Remove: true,
353 }, { 323 }, {
354 C: s.settings.Name, 324 C: s.settings.Name,
355 Id: svc.globalKey(), 325 Id: svc.globalKey(),
356 Remove: true, 326 Remove: true,
357 }} 327 }}
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 return nil, fmt.Errorf("single endpoint must be a peer r elation") 371 return nil, fmt.Errorf("single endpoint must be a peer r elation")
402 } 372 }
403 case 2: 373 case 2:
404 if !endpoints[0].CanRelateTo(&endpoints[1]) { 374 if !endpoints[0].CanRelateTo(&endpoints[1]) {
405 return nil, fmt.Errorf("endpoints do not relate") 375 return nil, fmt.Errorf("endpoints do not relate")
406 } 376 }
407 default: 377 default:
408 return nil, fmt.Errorf("cannot relate %d endpoints", len(endpoin ts)) 378 return nil, fmt.Errorf("cannot relate %d endpoints", len(endpoin ts))
409 } 379 }
410 380
381 ops := []txn.Op{}
411 var scope charm.RelationScope 382 var scope charm.RelationScope
412 for _, v := range endpoints { 383 for _, v := range endpoints {
413 if v.RelationScope == charm.ScopeContainer { 384 if v.RelationScope == charm.ScopeContainer {
414 scope = charm.ScopeContainer 385 scope = charm.ScopeContainer
415 } 386 }
416 » » // BUG(aram): potential race in the time between getting the ser vice 387 » » ops = append(ops, txn.Op{
417 » » // to validate the endpoint and actually writting the relation 388 » » » C: s.services.Name,
418 » » // into MongoDB; the service might have disappeared. 389 » » » Id: v.ServiceName,
419 » » _, err = s.Service(v.ServiceName) 390 » » » Assert: isAlive,
420 » » if err != nil { 391 » » » Update: D{{"$inc", D{{"relationcount", 1}}}},
421 » » » return nil, err 392 » » })
422 » » }
423 } 393 }
424 if scope == charm.ScopeContainer { 394 if scope == charm.ScopeContainer {
425 for i := range endpoints { 395 for i := range endpoints {
426 endpoints[i].RelationScope = scope 396 endpoints[i].RelationScope = scope
427 } 397 }
428 } 398 }
429 id, err := s.sequence("relation") 399 id, err := s.sequence("relation")
430 if err != nil { 400 if err != nil {
431 return nil, err 401 return nil, err
432 } 402 }
433 doc := relationDoc{ 403 doc := relationDoc{
434 Key: relationKey(endpoints), 404 Key: relationKey(endpoints),
435 Id: id, 405 Id: id,
436 Endpoints: endpoints, 406 Endpoints: endpoints,
437 Life: Alive, 407 Life: Alive,
438 } 408 }
439 » ops := []txn.Op{{ 409 » ops = append(ops, txn.Op{
440 C: s.relations.Name, 410 C: s.relations.Name,
441 Id: doc.Key, 411 Id: doc.Key,
442 Assert: txn.DocMissing, 412 Assert: txn.DocMissing,
443 Insert: doc, 413 Insert: doc,
444 » }} 414 » })
445 err = s.runner.Run(ops, "", nil) 415 err = s.runner.Run(ops, "", nil)
446 » if err != nil { 416 » if err == txn.ErrAborted {
417 » » for _, ep := range endpoints {
418 » » » svc, err := s.Service(ep.ServiceName)
419 » » » if IsNotFound(err) || svc.Life() != Alive {
420 » » » » return nil, fmt.Errorf("service %q is not alive" , ep.ServiceName)
421 » » » } else if err != nil {
422 » » » » return nil, err
423 » » » }
424 » » }
425 » » return nil, fmt.Errorf("relation already exists")
426 » } else if err != nil {
447 return nil, err 427 return nil, err
448 } 428 }
449 return newRelation(s, &doc), nil 429 return newRelation(s, &doc), nil
450 } 430 }
451 431
452 // EndpointsRelation returns the existing relation with the given endpoints. 432 // EndpointsRelation returns the existing relation with the given endpoints.
453 func (s *State) EndpointsRelation(endpoints ...RelationEndpoint) (*Relation, err or) { 433 func (s *State) EndpointsRelation(endpoints ...RelationEndpoint) (*Relation, err or) {
454 doc := relationDoc{} 434 doc := relationDoc{}
455 key := relationKey(endpoints) 435 key := relationKey(endpoints)
456 err := s.relations.Find(D{{"_id", key}}).One(&doc) 436 err := s.relations.Find(D{{"_id", key}}).One(&doc)
(...skipping 12 matching lines...) Expand all
469 err := s.relations.Find(D{{"id", id}}).One(&doc) 449 err := s.relations.Find(D{{"id", id}}).One(&doc)
470 if err == mgo.ErrNotFound { 450 if err == mgo.ErrNotFound {
471 return nil, notFound("relation %d", id) 451 return nil, notFound("relation %d", id)
472 } 452 }
473 if err != nil { 453 if err != nil {
474 return nil, fmt.Errorf("cannot get relation %d: %v", id, err) 454 return nil, fmt.Errorf("cannot get relation %d: %v", id, err)
475 } 455 }
476 return newRelation(s, &doc), nil 456 return newRelation(s, &doc), nil
477 } 457 }
478 458
479 // RemoveRelation removes the supplied relation. 459 // RemoveRelation removes the supplied relation and all its unit settings.
480 func (s *State) RemoveRelation(r *Relation) (err error) { 460 func (s *State) RemoveRelation(r *Relation) (err error) {
481 defer trivial.ErrorContextf(&err, "cannot remove relation %q", r.doc.Key ) 461 defer trivial.ErrorContextf(&err, "cannot remove relation %q", r.doc.Key )
482 if r.doc.Life != Dead { 462 if r.doc.Life != Dead {
483 return fmt.Errorf("relation is not dead") 463 return fmt.Errorf("relation is not dead")
484 } 464 }
465 cDoc := &cleanupDoc{
466 Id: bson.NewObjectId(),
467 Kind: "settings",
468 Prefix: fmt.Sprintf("r#%d#", r.Id()),
469 }
485 ops := []txn.Op{{ 470 ops := []txn.Op{{
471 C: s.cleanups.Name,
472 Id: cDoc.Id,
473 Insert: cDoc,
474 }, {
486 C: s.relations.Name, 475 C: s.relations.Name,
487 Id: r.doc.Key, 476 Id: r.doc.Key,
488 » » Assert: D{{"life", Dead}}, 477 » » Assert: D{{"life", Dead}, {"id", r.Id()}},
489 Remove: true, 478 Remove: true,
490 }} 479 }}
480 for _, ep := range r.doc.Endpoints {
481 ops = append(ops, txn.Op{
482 C: s.services.Name,
483 Id: ep.ServiceName,
484 Assert: D{{"relationcount", D{{"$gt", 0}}}},
485 Update: D{{"$inc", D{{"relationcount", -1}}}},
486 })
487 }
491 if err := s.runner.Run(ops, "", nil); err != nil { 488 if err := s.runner.Run(ops, "", nil); err != nil {
492 // If aborted, the relation is either dead or recreated.
493 return onAbort(err, nil) 489 return onAbort(err, nil)
494 } 490 }
495 return nil 491 return nil
496 } 492 }
497 493
498 // Unit returns a unit by name. 494 // Unit returns a unit by name.
499 func (s *State) Unit(name string) (*Unit, error) { 495 func (s *State) Unit(name string) (*Unit, error) {
500 if !IsUnitName(name) { 496 if !IsUnitName(name) {
501 return nil, fmt.Errorf("%q is not a valid unit name", name) 497 return nil, fmt.Errorf("%q is not a valid unit name", name)
502 } 498 }
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
588 584
589 func (s *State) setPassword(name, password string) error { 585 func (s *State) setPassword(name, password string) error {
590 if err := s.db.AddUser(name, password, false); err != nil { 586 if err := s.db.AddUser(name, password, false); err != nil {
591 return fmt.Errorf("cannot set password in juju db for %q: %v", n ame, err) 587 return fmt.Errorf("cannot set password in juju db for %q: %v", n ame, err)
592 } 588 }
593 if err := s.db.Session.DB("presence").AddUser(name, password, false); er r != nil { 589 if err := s.db.Session.DB("presence").AddUser(name, password, false); er r != nil {
594 return fmt.Errorf("cannot set password in presence db for %q: %v ", name, err) 590 return fmt.Errorf("cannot set password in presence db for %q: %v ", name, err)
595 } 591 }
596 return nil 592 return nil
597 } 593 }
594
595 // cleanupDoc represents a potentially large set of documents that should be
596 // removed.
597 type cleanupDoc struct {
598 Id bson.ObjectId `bson:"_id"`
599 Kind string
600 Prefix string
601 }
602
603 // Cleanup removes all documents that were previously marked for removal, if
604 // any such exist. It should be called periodically by at least one element
605 // of the system.
606 func (s *State) Cleanup() error {
607 doc := cleanupDoc{}
608 iter := s.cleanups.Find(nil).Iter()
609 for iter.Next(&doc) {
610 var c *mgo.Collection
611 var sel interface{}
612 switch doc.Kind {
613 case "settings":
614 c = s.settings
615 sel = D{{"_id", D{{"$regex", "^" + doc.Prefix}}}}
616 default:
617 log.Printf("state: WARNING: ignoring unknown cleanup kin d %q", doc.Kind)
618 continue
619 }
620 if count, err := c.Find(sel).Count(); err != nil {
621 return fmt.Errorf("cannot detect cleanup targets: %v", e rr)
622 } else if count != 0 {
623 // Documents marked for cleanup are not otherwise refere nced in the
624 // system, and will not be under watch, and are therefor e safe to
625 // delete directly.
626 if _, err := c.RemoveAll(sel); err != nil {
627 return fmt.Errorf("cannot remove documents marke d for cleanup: %v", err)
628 }
629 }
630 ops := []txn.Op{{
631 C: s.cleanups.Name,
632 Id: doc.Id,
633 Remove: true,
634 }}
635 if err := s.runner.Run(ops, "", nil); err != nil {
636 return fmt.Errorf("cannot remove empty cleanup document: %v", err)
637 }
638 }
639 if err := iter.Err(); err != nil {
640 return fmt.Errorf("cannot read cleanup document: %v", err)
641 }
642 return nil
643 }
OLDNEW
« no previous file with comments | « state/service_test.go ('k') | worker/uniter/context_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b