Left: | ||
Right: |
OLD | NEW |
---|---|
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 // The state package enables reading, observing, and changing | 4 // The state package enables reading, observing, and changing |
5 // the state stored in MongoDB of a whole environment | 5 // the state stored in MongoDB of a whole environment |
6 // managed by juju. | 6 // managed by juju. |
7 package state | 7 package state |
8 | 8 |
9 import ( | 9 import ( |
10 "fmt" | 10 "fmt" |
(...skipping 427 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
438 case names.RelationTagKind: | 438 case names.RelationTagKind: |
439 coll = st.relations.Name | 439 coll = st.relations.Name |
440 case names.EnvironTagKind: | 440 case names.EnvironTagKind: |
441 coll = st.environments.Name | 441 coll = st.environments.Name |
442 default: | 442 default: |
443 return "", "", fmt.Errorf("%q is not a valid collection tag", ta g) | 443 return "", "", fmt.Errorf("%q is not a valid collection tag", ta g) |
444 } | 444 } |
445 return coll, id, nil | 445 return coll, id, nil |
446 } | 446 } |
447 | 447 |
448 // AddCharm adds the ch charm with curl to the state. bundleUrl must be | 448 // AddCharm adds the ch charm with curl to the state. bundleURL must |
449 // set to a URL where the bundle for ch may be downloaded from. | 449 // be set to a URL where the bundle for ch may be downloaded from. On |
450 // On success the newly added charm state is returned. | 450 // success the newly added charm state is returned. |
451 func (st *State) AddCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, b undleSha256 string) (stch *Charm, err error) { | 451 func (st *State) AddCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, b undleSha256 string) (stch *Charm, err error) { |
452 » // The charm may already exist in state as a placeholder, so we check fo r that situation and update the | 452 » // The charm may already exist in state as a placeholder, so we |
453 » // existing charm record if necessary, otherwise add a new record. | 453 » // check for that situation and update the existing charm record |
454 » // if necessary, otherwise add a new record. | |
454 var existing charmDoc | 455 var existing charmDoc |
455 err = st.charms.Find(D{{"_id", curl.String()}, {"placeholder", true}}).O ne(&existing) | 456 err = st.charms.Find(D{{"_id", curl.String()}, {"placeholder", true}}).O ne(&existing) |
456 if err == mgo.ErrNotFound { | 457 if err == mgo.ErrNotFound { |
457 cdoc := &charmDoc{ | 458 cdoc := &charmDoc{ |
458 URL: curl, | 459 URL: curl, |
459 Meta: ch.Meta(), | 460 Meta: ch.Meta(), |
460 Config: ch.Config(), | 461 Config: ch.Config(), |
461 BundleURL: bundleURL, | 462 BundleURL: bundleURL, |
462 BundleSha256: bundleSha256, | 463 BundleSha256: bundleSha256, |
463 } | 464 } |
464 err = st.charms.Insert(cdoc) | 465 err = st.charms.Insert(cdoc) |
465 if err != nil { | 466 if err != nil { |
466 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err) | 467 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err) |
467 } | 468 } |
468 return newCharm(st, cdoc) | 469 return newCharm(st, cdoc) |
469 } else if err != nil { | 470 } else if err != nil { |
470 return nil, err | 471 return nil, err |
471 } | 472 } |
472 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPlaceho lder) | 473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPlaceho lder) |
473 } | 474 } |
474 | 475 |
475 // Charm returns the charm with the given URL. | 476 // Charm returns the charm with the given URL. Charms pending upload |
477 // to storage and placeholders are never returned. | |
476 func (st *State) Charm(curl *charm.URL) (*Charm, error) { | 478 func (st *State) Charm(curl *charm.URL) (*Charm, error) { |
477 cdoc := &charmDoc{} | 479 cdoc := &charmDoc{} |
478 » err := st.charms.Find(D{{"_id", curl}, {"pendingupload", false}, {"place holder", false}}).One(cdoc) | 480 » what := D{ |
481 » » {"_id", curl}, | |
482 » » {"placeholder", D{{"$ne", true}}}, | |
483 » » {"pendingupload", D{{"$ne", true}}}, | |
484 » } | |
485 » err := st.charms.Find(what).One(&cdoc) | |
479 if err == mgo.ErrNotFound { | 486 if err == mgo.ErrNotFound { |
480 return nil, errors.NotFoundf("charm %q", curl) | 487 return nil, errors.NotFoundf("charm %q", curl) |
481 } | 488 } |
482 if err != nil { | 489 if err != nil { |
483 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) | 490 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) |
484 } | 491 } |
485 if err := cdoc.Meta.Check(); err != nil { | 492 if err := cdoc.Meta.Check(); err != nil { |
486 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err) | 493 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err) |
487 } | 494 } |
488 return newCharm(st, cdoc) | 495 return newCharm(st, cdoc) |
489 } | 496 } |
490 | 497 |
491 // LatestPlaceholderCharm returns the latest charm described by the given URL bu t which is not yet deployed. | 498 // LatestPlaceholderCharm returns the latest charm described by the |
499 // given URL but which is not yet deployed. | |
492 func (st *State) LatestPlaceholderCharm(curl *charm.URL) (*Charm, error) { | 500 func (st *State) LatestPlaceholderCharm(curl *charm.URL) (*Charm, error) { |
493 noRevURL := curl.WithRevision(-1) | 501 noRevURL := curl.WithRevision(-1) |
494 curlRegex := "^" + regexp.QuoteMeta(noRevURL.String()) | 502 curlRegex := "^" + regexp.QuoteMeta(noRevURL.String()) |
495 var docs []charmDoc | 503 var docs []charmDoc |
496 err := st.charms.Find(D{{"_id", D{{"$regex", curlRegex}}}, {"placeholder ", true}}).All(&docs) | 504 err := st.charms.Find(D{{"_id", D{{"$regex", curlRegex}}}, {"placeholder ", true}}).All(&docs) |
497 if err != nil { | 505 if err != nil { |
498 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) | 506 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) |
499 } | 507 } |
500 // Find the highest revision. | 508 // Find the highest revision. |
501 var latest charmDoc | 509 var latest charmDoc |
502 for _, doc := range docs { | 510 for _, doc := range docs { |
503 if latest.URL == nil || doc.URL.Revision > latest.URL.Revision { | 511 if latest.URL == nil || doc.URL.Revision > latest.URL.Revision { |
504 latest = doc | 512 latest = doc |
505 } | 513 } |
506 } | 514 } |
507 if latest.URL == nil { | 515 if latest.URL == nil { |
508 return nil, errors.NotFoundf("placeholder charm %q", noRevURL) | 516 return nil, errors.NotFoundf("placeholder charm %q", noRevURL) |
509 } | 517 } |
510 return newCharm(st, &latest) | 518 return newCharm(st, &latest) |
511 } | 519 } |
512 | 520 |
513 // PrepareLocalCharmUpload must be called before a charm is uploaded | 521 // PrepareLocalCharmUpload must be called before a local charm is |
514 // to the provider storage in order to create a charm document in | 522 // uploaded to the provider storage in order to create a charm |
515 // state. It returns a new Charm with no metadata and a unique | 523 // document in state. It returns the chosen unique charm URL reserved |
516 // revision number. | 524 // in state for the charm. |
517 // | 525 // |
518 // The url's schema must be "local" and it must include a revision. | 526 // The url's schema must be "local" and it must include a revision. |
519 func (st *State) PrepareLocalCharmUpload(curl *charm.URL) (chosenUrl *charm.URL, err error) { | 527 func (st *State) PrepareLocalCharmUpload(curl *charm.URL) (chosenUrl *charm.URL, err error) { |
520 // Perform a few sanity checks first. | 528 // Perform a few sanity checks first. |
521 if curl.Schema != "local" { | 529 if curl.Schema != "local" { |
522 return nil, fmt.Errorf("expected charm URL with local schema, go t %q", curl) | 530 return nil, fmt.Errorf("expected charm URL with local schema, go t %q", curl) |
523 } | 531 } |
524 if curl.Revision < 0 { | 532 if curl.Revision < 0 { |
525 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl) | 533 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl) |
526 } | 534 } |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
568 return nil, err | 576 return nil, err |
569 } | 577 } |
570 break | 578 break |
571 } | 579 } |
572 if err != nil { | 580 if err != nil { |
573 return nil, ErrExcessiveContention | 581 return nil, ErrExcessiveContention |
574 } | 582 } |
575 return chosenUrl, nil | 583 return chosenUrl, nil |
576 } | 584 } |
577 | 585 |
578 var StillPending = D{{"pendingupload", true}} | 586 // PrepareStoreCharmUpload must be called before a charm store charm |
579 var StillPlaceholder = D{{"placeholder", true}} | 587 // is uploaded to the provider storage in order to create a charm |
588 // document in state. If a charm with the same URL is already in | |
589 // state, it will be returned as a *state.Charm (is can be still | |
590 // pending or already uploaded). Otherwise, a new charm document is | |
591 // added in state with just the given charm URL and | |
592 // PendingUpload=true, which is then returned as a *state.Charm. | |
593 // | |
594 // The url's schema must be "cs" and it must include a revision. | |
595 func (st *State) PrepareStoreCharmUpload(curl *charm.URL) (*Charm, error) { | |
596 » // Perform a few sanity checks first. | |
597 » if curl.Schema != "cs" { | |
598 » » return nil, fmt.Errorf("expected charm URL with cs schema, got % q", curl) | |
599 » } | |
600 » if curl.Revision < 0 { | |
601 » » return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl) | |
602 » } | |
603 | |
604 » var ( | |
605 » » uploadedCharm charmDoc | |
606 » » err error | |
607 » ) | |
608 » for attempt := 0; attempt < 3; attempt++ { | |
609 » » // Find an uploaded or pending charm with the given exact curl. | |
610 » » err = st.charms.FindId(curl).One(&uploadedCharm) | |
611 » » if err != nil && err != mgo.ErrNotFound { | |
612 » » » return nil, err | |
613 » » } else if err == nil && !uploadedCharm.Placeholder { | |
614 » » » // The charm exists and it's either uploaded or still | |
615 » » » // pending, but it's not a placeholder. In any case, we | |
616 » » » // just return what we got. | |
617 » » » break | |
618 » » } else if err == mgo.ErrNotFound { | |
619 » » » // Prepare the pending charm document for insertion. | |
620 » » » uploadedCharm = charmDoc{ | |
621 » » » » URL: curl, | |
622 » » » » PendingUpload: true, | |
623 » » » » Placeholder: false, | |
624 » » » } | |
625 » » } | |
626 | |
627 » » var ops []txn.Op | |
628 » » if uploadedCharm.Placeholder { | |
629 » » » // Convert the placeholder to a pending charm. | |
630 » » » ops = []txn.Op{{ | |
631 » » » » C: st.charms.Name, | |
632 » » » » Id: curl, | |
633 » » » » Assert: txn.DocExists, | |
fwereade
2014/01/29 16:01:22
hmm, should be be asserting that it hasn't had arc
dimitern
2014/01/29 20:59:13
Done.
| |
634 » » » » Update: D{{"$set", D{ | |
635 » » » » » {"pendingupload", true}, | |
636 » » » » » {"placeholder", false}, | |
637 » » » » }}}, | |
638 » » » }} | |
639 » » » // Update the fields of the document we're returning. | |
640 » » » uploadedCharm.PendingUpload = true | |
641 » » » uploadedCharm.Placeholder = false | |
642 » » } else { | |
643 » » » // No charm document with this curl yet, insert it. | |
644 » » » ops = []txn.Op{{ | |
645 » » » » C: st.charms.Name, | |
646 » » » » Id: curl, | |
647 » » » » Assert: txn.DocMissing, | |
648 » » » » Insert: uploadedCharm, | |
649 » » » }} | |
650 » » } | |
651 | |
652 » » // Run the transaction, and retry on abort. | |
653 » » err = st.runTransaction(ops) | |
654 » » if err == txn.ErrAborted { | |
655 » » » continue | |
656 » » } else if err != nil { | |
657 » » » return nil, err | |
658 » » } | |
659 » » break | |
660 » } | |
661 » if err != nil { | |
fwereade
2014/01/29 16:01:22
this makes me twitch a little -- AFAICT the only p
dimitern
2014/01/29 20:59:13
Done.
| |
662 » » return nil, ErrExcessiveContention | |
663 » } | |
664 » return newCharm(st, &uploadedCharm) | |
665 } | |
666 | |
667 var ( | |
668 » stillPending = D{{"pendingupload", true}} | |
669 » stillPlaceholder = D{{"placeholder", true}} | |
670 ) | |
580 | 671 |
581 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which | 672 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which |
582 // must reference a charm from the store. The charm document is marked as a plac eholder which | 673 // must reference a charm from the store. The charm document is marked as a plac eholder which |
583 // means that if the charm is to be deployed, it will need to first be uploaded to env storage. | 674 // means that if the charm is to be deployed, it will need to first be uploaded to env storage. |
584 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { | 675 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { |
585 // Perform sanity checks first. | 676 // Perform sanity checks first. |
586 if curl.Schema != "cs" { | 677 if curl.Schema != "cs" { |
587 return fmt.Errorf("expected charm URL with cs schema, got %q", c url) | 678 return fmt.Errorf("expected charm URL with cs schema, got %q", c url) |
588 } | 679 } |
589 if curl.Revision < 0 { | 680 if curl.Revision < 0 { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
646 return nil, err | 737 return nil, err |
647 } | 738 } |
648 var ops []txn.Op | 739 var ops []txn.Op |
649 for _, doc := range docs { | 740 for _, doc := range docs { |
650 if doc.URL.Revision >= curl.Revision { | 741 if doc.URL.Revision >= curl.Revision { |
651 continue | 742 continue |
652 } | 743 } |
653 ops = append(ops, txn.Op{ | 744 ops = append(ops, txn.Op{ |
654 C: st.charms.Name, | 745 C: st.charms.Name, |
655 Id: doc.URL.String(), | 746 Id: doc.URL.String(), |
656 » » » Assert: StillPlaceholder, | 747 » » » Assert: stillPlaceholder, |
657 Remove: true, | 748 Remove: true, |
658 }) | 749 }) |
659 } | 750 } |
660 return ops, nil | 751 return ops, nil |
661 } | 752 } |
662 | 753 |
754 // ErrCharmAlreadyUploaded is returned by UpdateUploadedCharm() when | |
755 // the given charm is already uploaded and marked as not pending in | |
756 // state. | |
757 type ErrCharmAlreadyUploaded struct { | |
758 curl *charm.URL | |
759 } | |
760 | |
761 func (e *ErrCharmAlreadyUploaded) Error() string { | |
762 return fmt.Sprintf("charm %q already uploaded", e.curl) | |
763 } | |
764 | |
765 // IsCharmAlreadyUploadedError returns if the given error is | |
766 // ErrCharmAlreadyUploaded. | |
767 func IsCharmAlreadyUploadedError(err interface{}) bool { | |
768 if err == nil { | |
769 return false | |
770 } | |
771 _, ok := err.(*ErrCharmAlreadyUploaded) | |
772 return ok | |
773 } | |
774 | |
775 // ErrCharmRevisionAlreadyModified is returned when a pending or | |
776 // placeholder charm is no longer pending or a placeholder, signaling | |
777 // the charm is available in state with its full information. | |
778 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie d") | |
779 | |
663 // UpdateUploadedCharm marks the given charm URL as uploaded and | 780 // UpdateUploadedCharm marks the given charm URL as uploaded and |
664 // updates the rest of its data, returning it as *state.Charm. | 781 // updates the rest of its data, returning it as *state.Charm. |
665 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) { | 782 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) { |
666 doc := &charmDoc{} | 783 doc := &charmDoc{} |
667 err := st.charms.FindId(curl).One(&doc) | 784 err := st.charms.FindId(curl).One(&doc) |
668 if err == mgo.ErrNotFound { | 785 if err == mgo.ErrNotFound { |
669 return nil, errors.NotFoundf("charm %q", curl) | 786 return nil, errors.NotFoundf("charm %q", curl) |
670 } | 787 } |
671 if err != nil { | 788 if err != nil { |
672 return nil, err | 789 return nil, err |
673 } | 790 } |
674 if !doc.PendingUpload { | 791 if !doc.PendingUpload { |
675 » » return nil, fmt.Errorf("charm %q already uploaded", curl) | 792 » » return nil, &ErrCharmAlreadyUploaded{curl} |
676 } | 793 } |
677 | 794 |
678 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPending ) | 795 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPending ) |
679 } | 796 } |
680 | 797 |
681 // updateCharmDoc updates the charm with specified URL with the given data, and resets the placeholder | 798 // updateCharmDoc updates the charm with specified URL with the given |
682 // and pendingupdate flags. | 799 // data, and resets the placeholder and pendingupdate flags. If the |
800 // charm is no longer a placeholder or pending (depending on preReq), | |
801 // it returns ErrCharmRevisionAlreadyModified. | |
683 func (st *State) updateCharmDoc( | 802 func (st *State) updateCharmDoc( |
684 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) { | 803 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) { |
685 | 804 |
686 updateFields := D{{"$set", D{ | 805 updateFields := D{{"$set", D{ |
687 {"meta", ch.Meta()}, | 806 {"meta", ch.Meta()}, |
688 {"config", ch.Config()}, | 807 {"config", ch.Config()}, |
689 {"bundleurl", bundleURL}, | 808 {"bundleurl", bundleURL}, |
690 {"bundlesha256", bundleSha256}, | 809 {"bundlesha256", bundleSha256}, |
691 {"pendingupload", false}, | 810 {"pendingupload", false}, |
692 {"placeholder", false}, | 811 {"placeholder", false}, |
693 }}} | 812 }}} |
694 ops := []txn.Op{{ | 813 ops := []txn.Op{{ |
695 C: st.charms.Name, | 814 C: st.charms.Name, |
696 Id: curl, | 815 Id: curl, |
697 Assert: preReq, | 816 Assert: preReq, |
698 Update: updateFields, | 817 Update: updateFields, |
699 }} | 818 }} |
700 if err := st.runTransaction(ops); err != nil { | 819 if err := st.runTransaction(ops); err != nil { |
701 » » return nil, onAbort(err, fmt.Errorf("charm revision already modi fied")) | 820 » » return nil, onAbort(err, ErrCharmRevisionAlreadyModified) |
702 } | 821 } |
703 return st.Charm(curl) | 822 return st.Charm(curl) |
704 } | 823 } |
705 | 824 |
706 // addPeerRelationsOps returns the operations necessary to add the | 825 // addPeerRelationsOps returns the operations necessary to add the |
707 // specified service peer relations to the state. | 826 // specified service peer relations to the state. |
708 func (st *State) addPeerRelationsOps(serviceName string, peers map[string]charm. Relation) ([]txn.Op, error) { | 827 func (st *State) addPeerRelationsOps(serviceName string, peers map[string]charm. Relation) ([]txn.Op, error) { |
709 var ops []txn.Op | 828 var ops []txn.Op |
710 for _, rel := range peers { | 829 for _, rel := range peers { |
711 relId, err := st.sequence("relation") | 830 relId, err := st.sequence("relation") |
(...skipping 529 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1241 func tagForGlobalKey(key string) (string, bool) { | 1360 func tagForGlobalKey(key string) (string, bool) { |
1242 if len(key) < 3 || key[1] != '#' { | 1361 if len(key) < 3 || key[1] != '#' { |
1243 return "", false | 1362 return "", false |
1244 } | 1363 } |
1245 p, ok := tagPrefix[key[0]] | 1364 p, ok := tagPrefix[key[0]] |
1246 if !ok { | 1365 if !ok { |
1247 return "", false | 1366 return "", false |
1248 } | 1367 } |
1249 return p + key[2:], true | 1368 return p + key[2:], true |
1250 } | 1369 } |
OLD | NEW |