OLD | NEW |
1 // The state package enables reading, observing, and changing | 1 // The state package enables reading, observing, and changing |
2 // the state stored in MongoDB of a whole environment | 2 // the state stored in MongoDB of a whole environment |
3 // managed by juju. | 3 // managed by juju. |
4 package state | 4 package state |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "labix.org/v2/mgo" | 8 "labix.org/v2/mgo" |
9 "labix.org/v2/mgo/bson" | 9 "labix.org/v2/mgo/bson" |
10 "labix.org/v2/mgo/txn" | 10 "labix.org/v2/mgo/txn" |
11 "launchpad.net/juju-core/charm" | 11 "launchpad.net/juju-core/charm" |
12 "launchpad.net/juju-core/environs/config" | 12 "launchpad.net/juju-core/environs/config" |
| 13 "launchpad.net/juju-core/log" |
13 "launchpad.net/juju-core/state/presence" | 14 "launchpad.net/juju-core/state/presence" |
14 "launchpad.net/juju-core/state/watcher" | 15 "launchpad.net/juju-core/state/watcher" |
15 "launchpad.net/juju-core/trivial" | 16 "launchpad.net/juju-core/trivial" |
16 "launchpad.net/juju-core/version" | 17 "launchpad.net/juju-core/version" |
17 "net/url" | 18 "net/url" |
18 "regexp" | 19 "regexp" |
19 ) | 20 ) |
20 | 21 |
21 // TODO(niemeyer): This must not be exported. | 22 // TODO(niemeyer): This must not be exported. |
22 type D []bson.DocElem | 23 type D []bson.DocElem |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
91 type State struct { | 92 type State struct { |
92 db *mgo.Database | 93 db *mgo.Database |
93 charms *mgo.Collection | 94 charms *mgo.Collection |
94 machines *mgo.Collection | 95 machines *mgo.Collection |
95 relations *mgo.Collection | 96 relations *mgo.Collection |
96 relationScopes *mgo.Collection | 97 relationScopes *mgo.Collection |
97 services *mgo.Collection | 98 services *mgo.Collection |
98 settings *mgo.Collection | 99 settings *mgo.Collection |
99 units *mgo.Collection | 100 units *mgo.Collection |
100 presence *mgo.Collection | 101 presence *mgo.Collection |
| 102 cleanups *mgo.Collection |
101 runner *txn.Runner | 103 runner *txn.Runner |
102 watcher *watcher.Watcher | 104 watcher *watcher.Watcher |
103 pwatcher *presence.Watcher | 105 pwatcher *presence.Watcher |
104 fwd *sshForwarder | 106 fwd *sshForwarder |
105 } | 107 } |
106 | 108 |
107 func (s *State) EnvironConfig() (*config.Config, error) { | 109 func (s *State) EnvironConfig() (*config.Config, error) { |
108 settings, err := readSettings(s, "e") | 110 settings, err := readSettings(s, "e") |
109 if err != nil { | 111 if err != nil { |
110 return nil, err | 112 return nil, err |
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
300 if err := s.runner.Run(ops, "", nil); err != nil { | 302 if err := s.runner.Run(ops, "", nil); err != nil { |
301 return nil, fmt.Errorf("cannot add service %q: %v", name, onAbor
t(err, fmt.Errorf("duplicate service name"))) | 303 return nil, fmt.Errorf("cannot add service %q: %v", name, onAbor
t(err, fmt.Errorf("duplicate service name"))) |
302 } | 304 } |
303 // Refresh to pick the txn-revno. | 305 // Refresh to pick the txn-revno. |
304 if err = svc.Refresh(); err != nil { | 306 if err = svc.Refresh(); err != nil { |
305 return nil, err | 307 return nil, err |
306 } | 308 } |
307 return svc, nil | 309 return svc, nil |
308 } | 310 } |
309 | 311 |
310 // RemoveService removes a service from the state. It will also remove all | 312 // RemoveService removes a service from the state. |
311 // its units and break any of its existing relations. | |
312 func (s *State) RemoveService(svc *Service) (err error) { | 313 func (s *State) RemoveService(svc *Service) (err error) { |
313 // TODO Do lifecycle properly. | |
314 // Removing relations and units here is wrong. They need to monitor | |
315 // their own parent and set themselves to dying. | |
316 defer trivial.ErrorContextf(&err, "cannot remove service %q", svc) | 314 defer trivial.ErrorContextf(&err, "cannot remove service %q", svc) |
317 | |
318 if svc.doc.Life != Dead { | 315 if svc.doc.Life != Dead { |
319 return fmt.Errorf("service is not dead") | 316 return fmt.Errorf("service is not dead") |
320 } | 317 } |
321 rels, err := svc.Relations() | |
322 if err != nil { | |
323 return err | |
324 } | |
325 for _, rel := range rels { | |
326 err = rel.EnsureDead() | |
327 if err != nil { | |
328 return err | |
329 } | |
330 err = s.RemoveRelation(rel) | |
331 if err != nil { | |
332 return err | |
333 } | |
334 } | |
335 units, err := svc.AllUnits() | |
336 if err != nil { | |
337 return err | |
338 } | |
339 for _, unit := range units { | |
340 err = unit.EnsureDead() | |
341 if err != nil { | |
342 return err | |
343 } | |
344 if err = svc.RemoveUnit(unit); err != nil { | |
345 return err | |
346 } | |
347 } | |
348 ops := []txn.Op{{ | 318 ops := []txn.Op{{ |
349 C: s.services.Name, | 319 C: s.services.Name, |
350 Id: svc.doc.Name, | 320 Id: svc.doc.Name, |
351 Assert: D{{"life", Dead}}, | 321 Assert: D{{"life", Dead}}, |
352 Remove: true, | 322 Remove: true, |
353 }, { | 323 }, { |
354 C: s.settings.Name, | 324 C: s.settings.Name, |
355 Id: svc.globalKey(), | 325 Id: svc.globalKey(), |
356 Remove: true, | 326 Remove: true, |
357 }} | 327 }} |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
401 return nil, fmt.Errorf("single endpoint must be a peer r
elation") | 371 return nil, fmt.Errorf("single endpoint must be a peer r
elation") |
402 } | 372 } |
403 case 2: | 373 case 2: |
404 if !endpoints[0].CanRelateTo(&endpoints[1]) { | 374 if !endpoints[0].CanRelateTo(&endpoints[1]) { |
405 return nil, fmt.Errorf("endpoints do not relate") | 375 return nil, fmt.Errorf("endpoints do not relate") |
406 } | 376 } |
407 default: | 377 default: |
408 return nil, fmt.Errorf("cannot relate %d endpoints", len(endpoin
ts)) | 378 return nil, fmt.Errorf("cannot relate %d endpoints", len(endpoin
ts)) |
409 } | 379 } |
410 | 380 |
| 381 ops := []txn.Op{} |
411 var scope charm.RelationScope | 382 var scope charm.RelationScope |
412 for _, v := range endpoints { | 383 for _, v := range endpoints { |
413 if v.RelationScope == charm.ScopeContainer { | 384 if v.RelationScope == charm.ScopeContainer { |
414 scope = charm.ScopeContainer | 385 scope = charm.ScopeContainer |
415 } | 386 } |
416 » » // BUG(aram): potential race in the time between getting the ser
vice | 387 » » ops = append(ops, txn.Op{ |
417 » » // to validate the endpoint and actually writting the relation | 388 » » » C: s.services.Name, |
418 » » // into MongoDB; the service might have disappeared. | 389 » » » Id: v.ServiceName, |
419 » » _, err = s.Service(v.ServiceName) | 390 » » » Assert: isAlive, |
420 » » if err != nil { | 391 » » » Update: D{{"$inc", D{{"relationcount", 1}}}}, |
421 » » » return nil, err | 392 » » }) |
422 » » } | |
423 } | 393 } |
424 if scope == charm.ScopeContainer { | 394 if scope == charm.ScopeContainer { |
425 for i := range endpoints { | 395 for i := range endpoints { |
426 endpoints[i].RelationScope = scope | 396 endpoints[i].RelationScope = scope |
427 } | 397 } |
428 } | 398 } |
429 id, err := s.sequence("relation") | 399 id, err := s.sequence("relation") |
430 if err != nil { | 400 if err != nil { |
431 return nil, err | 401 return nil, err |
432 } | 402 } |
433 doc := relationDoc{ | 403 doc := relationDoc{ |
434 Key: relationKey(endpoints), | 404 Key: relationKey(endpoints), |
435 Id: id, | 405 Id: id, |
436 Endpoints: endpoints, | 406 Endpoints: endpoints, |
437 Life: Alive, | 407 Life: Alive, |
438 } | 408 } |
439 » ops := []txn.Op{{ | 409 » ops = append(ops, txn.Op{ |
440 C: s.relations.Name, | 410 C: s.relations.Name, |
441 Id: doc.Key, | 411 Id: doc.Key, |
442 Assert: txn.DocMissing, | 412 Assert: txn.DocMissing, |
443 Insert: doc, | 413 Insert: doc, |
444 » }} | 414 » }) |
445 err = s.runner.Run(ops, "", nil) | 415 err = s.runner.Run(ops, "", nil) |
446 » if err != nil { | 416 » if err == txn.ErrAborted { |
| 417 » » for _, ep := range endpoints { |
| 418 » » » svc, err := s.Service(ep.ServiceName) |
| 419 » » » if IsNotFound(err) || svc.Life() != Alive { |
| 420 » » » » return nil, fmt.Errorf("service %q is not alive"
, ep.ServiceName) |
| 421 » » » } else if err != nil { |
| 422 » » » » return nil, err |
| 423 » » » } |
| 424 » » } |
| 425 » » return nil, fmt.Errorf("relation already exists") |
| 426 » } else if err != nil { |
447 return nil, err | 427 return nil, err |
448 } | 428 } |
449 return newRelation(s, &doc), nil | 429 return newRelation(s, &doc), nil |
450 } | 430 } |
451 | 431 |
452 // EndpointsRelation returns the existing relation with the given endpoints. | 432 // EndpointsRelation returns the existing relation with the given endpoints. |
453 func (s *State) EndpointsRelation(endpoints ...RelationEndpoint) (*Relation, err
or) { | 433 func (s *State) EndpointsRelation(endpoints ...RelationEndpoint) (*Relation, err
or) { |
454 doc := relationDoc{} | 434 doc := relationDoc{} |
455 key := relationKey(endpoints) | 435 key := relationKey(endpoints) |
456 err := s.relations.Find(D{{"_id", key}}).One(&doc) | 436 err := s.relations.Find(D{{"_id", key}}).One(&doc) |
(...skipping 12 matching lines...) Expand all Loading... |
469 err := s.relations.Find(D{{"id", id}}).One(&doc) | 449 err := s.relations.Find(D{{"id", id}}).One(&doc) |
470 if err == mgo.ErrNotFound { | 450 if err == mgo.ErrNotFound { |
471 return nil, notFound("relation %d", id) | 451 return nil, notFound("relation %d", id) |
472 } | 452 } |
473 if err != nil { | 453 if err != nil { |
474 return nil, fmt.Errorf("cannot get relation %d: %v", id, err) | 454 return nil, fmt.Errorf("cannot get relation %d: %v", id, err) |
475 } | 455 } |
476 return newRelation(s, &doc), nil | 456 return newRelation(s, &doc), nil |
477 } | 457 } |
478 | 458 |
479 // RemoveRelation removes the supplied relation. | 459 // RemoveRelation removes the supplied relation and all its unit settings. |
480 func (s *State) RemoveRelation(r *Relation) (err error) { | 460 func (s *State) RemoveRelation(r *Relation) (err error) { |
481 defer trivial.ErrorContextf(&err, "cannot remove relation %q", r.doc.Key
) | 461 defer trivial.ErrorContextf(&err, "cannot remove relation %q", r.doc.Key
) |
482 if r.doc.Life != Dead { | 462 if r.doc.Life != Dead { |
483 return fmt.Errorf("relation is not dead") | 463 return fmt.Errorf("relation is not dead") |
484 } | 464 } |
| 465 cDoc := &cleanupDoc{ |
| 466 Id: bson.NewObjectId(), |
| 467 Kind: "settings", |
| 468 Prefix: fmt.Sprintf("r#%d#", r.Id()), |
| 469 } |
485 ops := []txn.Op{{ | 470 ops := []txn.Op{{ |
| 471 C: s.cleanups.Name, |
| 472 Id: cDoc.Id, |
| 473 Insert: cDoc, |
| 474 }, { |
486 C: s.relations.Name, | 475 C: s.relations.Name, |
487 Id: r.doc.Key, | 476 Id: r.doc.Key, |
488 » » Assert: D{{"life", Dead}}, | 477 » » Assert: D{{"life", Dead}, {"id", r.Id()}}, |
489 Remove: true, | 478 Remove: true, |
490 }} | 479 }} |
| 480 for _, ep := range r.doc.Endpoints { |
| 481 ops = append(ops, txn.Op{ |
| 482 C: s.services.Name, |
| 483 Id: ep.ServiceName, |
| 484 Assert: D{{"relationcount", D{{"$gt", 0}}}}, |
| 485 Update: D{{"$inc", D{{"relationcount", -1}}}}, |
| 486 }) |
| 487 } |
491 if err := s.runner.Run(ops, "", nil); err != nil { | 488 if err := s.runner.Run(ops, "", nil); err != nil { |
492 // If aborted, the relation is either dead or recreated. | |
493 return onAbort(err, nil) | 489 return onAbort(err, nil) |
494 } | 490 } |
495 return nil | 491 return nil |
496 } | 492 } |
497 | 493 |
498 // Unit returns a unit by name. | 494 // Unit returns a unit by name. |
499 func (s *State) Unit(name string) (*Unit, error) { | 495 func (s *State) Unit(name string) (*Unit, error) { |
500 if !IsUnitName(name) { | 496 if !IsUnitName(name) { |
501 return nil, fmt.Errorf("%q is not a valid unit name", name) | 497 return nil, fmt.Errorf("%q is not a valid unit name", name) |
502 } | 498 } |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
588 | 584 |
589 func (s *State) setPassword(name, password string) error { | 585 func (s *State) setPassword(name, password string) error { |
590 if err := s.db.AddUser(name, password, false); err != nil { | 586 if err := s.db.AddUser(name, password, false); err != nil { |
591 return fmt.Errorf("cannot set password in juju db for %q: %v", n
ame, err) | 587 return fmt.Errorf("cannot set password in juju db for %q: %v", n
ame, err) |
592 } | 588 } |
593 if err := s.db.Session.DB("presence").AddUser(name, password, false); er
r != nil { | 589 if err := s.db.Session.DB("presence").AddUser(name, password, false); er
r != nil { |
594 return fmt.Errorf("cannot set password in presence db for %q: %v
", name, err) | 590 return fmt.Errorf("cannot set password in presence db for %q: %v
", name, err) |
595 } | 591 } |
596 return nil | 592 return nil |
597 } | 593 } |
| 594 |
| 595 // cleanupDoc represents a potentially large set of documents that should be |
| 596 // removed. |
| 597 type cleanupDoc struct { |
| 598 Id bson.ObjectId `bson:"_id"` |
| 599 Kind string |
| 600 Prefix string |
| 601 } |
| 602 |
| 603 // Cleanup removes all documents that were previously marked for removal, if |
| 604 // any such exist. It should be called periodically by at least one element |
| 605 // of the system. |
| 606 func (s *State) Cleanup() error { |
| 607 doc := cleanupDoc{} |
| 608 iter := s.cleanups.Find(nil).Iter() |
| 609 for iter.Next(&doc) { |
| 610 var c *mgo.Collection |
| 611 var sel interface{} |
| 612 switch doc.Kind { |
| 613 case "settings": |
| 614 c = s.settings |
| 615 sel = D{{"_id", D{{"$regex", "^" + doc.Prefix}}}} |
| 616 default: |
| 617 log.Printf("state: WARNING: ignoring unknown cleanup kin
d %q", doc.Kind) |
| 618 continue |
| 619 } |
| 620 if count, err := c.Find(sel).Count(); err != nil { |
| 621 return fmt.Errorf("cannot detect cleanup targets: %v", e
rr) |
| 622 } else if count != 0 { |
| 623 // Documents marked for cleanup are not otherwise refere
nced in the |
| 624 // system, and will not be under watch, and are therefor
e safe to |
| 625 // delete directly. |
| 626 if _, err := c.RemoveAll(sel); err != nil { |
| 627 return fmt.Errorf("cannot remove documents marke
d for cleanup: %v", err) |
| 628 } |
| 629 } |
| 630 ops := []txn.Op{{ |
| 631 C: s.cleanups.Name, |
| 632 Id: doc.Id, |
| 633 Remove: true, |
| 634 }} |
| 635 if err := s.runner.Run(ops, "", nil); err != nil { |
| 636 return fmt.Errorf("cannot remove empty cleanup document:
%v", err) |
| 637 } |
| 638 } |
| 639 if err := iter.Err(); err != nil { |
| 640 return fmt.Errorf("cannot read cleanup document: %v", err) |
| 641 } |
| 642 return nil |
| 643 } |
OLD | NEW |