LEFT | RIGHT |
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 // The state package enables reading, observing, and changing | 4 // The state package enables reading, observing, and changing |
5 // the state stored in MongoDB of a whole environment | 5 // the state stored in MongoDB of a whole environment |
6 // managed by juju. | 6 // managed by juju. |
7 package state | 7 package state |
8 | 8 |
9 import ( | 9 import ( |
10 "fmt" | 10 "fmt" |
11 "net/url" | 11 "net/url" |
12 "regexp" | 12 "regexp" |
13 "sort" | 13 "sort" |
14 "strconv" | 14 "strconv" |
15 "strings" | 15 "strings" |
16 "sync" | 16 "sync" |
17 | 17 |
| 18 "github.com/loggo/loggo" |
18 "labix.org/v2/mgo" | 19 "labix.org/v2/mgo" |
19 "labix.org/v2/mgo/bson" | 20 "labix.org/v2/mgo/bson" |
20 "labix.org/v2/mgo/txn" | 21 "labix.org/v2/mgo/txn" |
21 "launchpad.net/loggo" | |
22 | 22 |
23 "launchpad.net/juju-core/charm" | 23 "launchpad.net/juju-core/charm" |
24 "launchpad.net/juju-core/constraints" | 24 "launchpad.net/juju-core/constraints" |
25 "launchpad.net/juju-core/environs/config" | 25 "launchpad.net/juju-core/environs/config" |
26 "launchpad.net/juju-core/errors" | 26 "launchpad.net/juju-core/errors" |
27 "launchpad.net/juju-core/names" | 27 "launchpad.net/juju-core/names" |
28 "launchpad.net/juju-core/state/multiwatcher" | 28 "launchpad.net/juju-core/state/multiwatcher" |
29 "launchpad.net/juju-core/state/presence" | 29 "launchpad.net/juju-core/state/presence" |
30 "launchpad.net/juju-core/state/watcher" | 30 "launchpad.net/juju-core/state/watcher" |
31 "launchpad.net/juju-core/utils" | 31 "launchpad.net/juju-core/utils" |
(...skipping 431 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 BundleSha256: bundleSha256, | 463 BundleSha256: bundleSha256, |
464 } | 464 } |
465 err = st.charms.Insert(cdoc) | 465 err = st.charms.Insert(cdoc) |
466 if err != nil { | 466 if err != nil { |
467 return nil, fmt.Errorf("cannot add charm %q: %v", curl,
err) | 467 return nil, fmt.Errorf("cannot add charm %q: %v", curl,
err) |
468 } | 468 } |
469 return newCharm(st, cdoc) | 469 return newCharm(st, cdoc) |
470 } else if err != nil { | 470 } else if err != nil { |
471 return nil, err | 471 return nil, err |
472 } | 472 } |
473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPlaceho
lder) | 473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPlaceho
lder) |
474 } | 474 } |
475 | 475 |
476 // Charm returns the charm with the given URL. Charms pending upload | 476 // Charm returns the charm with the given URL. Charms pending upload |
477 // to storage and placeholders are never returned. | 477 // to storage and placeholders are never returned. |
478 func (st *State) Charm(curl *charm.URL) (*Charm, error) { | 478 func (st *State) Charm(curl *charm.URL) (*Charm, error) { |
479 cdoc := &charmDoc{} | 479 cdoc := &charmDoc{} |
480 » what := D{{"_id", curl}, {"placeholder", false}, {"pendingupload", false
}} | 480 » what := D{ |
| 481 » » {"_id", curl}, |
| 482 » » {"placeholder", D{{"$ne", true}}}, |
| 483 » » {"pendingupload", D{{"$ne", true}}}, |
| 484 » } |
481 err := st.charms.Find(what).One(&cdoc) | 485 err := st.charms.Find(what).One(&cdoc) |
482 if err == mgo.ErrNotFound { | 486 if err == mgo.ErrNotFound { |
483 return nil, errors.NotFoundf("charm %q", curl) | 487 return nil, errors.NotFoundf("charm %q", curl) |
484 } | 488 } |
485 if err != nil { | 489 if err != nil { |
486 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) | 490 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) |
487 } | 491 } |
488 if err := cdoc.Meta.Check(); err != nil { | 492 if err := cdoc.Meta.Check(); err != nil { |
489 return nil, fmt.Errorf("malformed charm metadata found in state:
%v", err) | 493 return nil, fmt.Errorf("malformed charm metadata found in state:
%v", err) |
490 } | 494 } |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
596 if curl.Revision < 0 { | 600 if curl.Revision < 0 { |
597 return nil, fmt.Errorf("expected charm URL with revision, got %q
", curl) | 601 return nil, fmt.Errorf("expected charm URL with revision, got %q
", curl) |
598 } | 602 } |
599 | 603 |
600 var ( | 604 var ( |
601 uploadedCharm charmDoc | 605 uploadedCharm charmDoc |
602 err error | 606 err error |
603 ) | 607 ) |
604 for attempt := 0; attempt < 3; attempt++ { | 608 for attempt := 0; attempt < 3; attempt++ { |
605 // Find an uploaded or pending charm with the given exact curl. | 609 // Find an uploaded or pending charm with the given exact curl. |
606 » » err = st.charms.Find(D{{"_id", curl}, {"placeholder", false}}).O
ne(&uploadedCharm) | 610 » » err = st.charms.FindId(curl).One(&uploadedCharm) |
607 if err != nil && err != mgo.ErrNotFound { | 611 if err != nil && err != mgo.ErrNotFound { |
608 return nil, err | 612 return nil, err |
609 » » } else if err == nil { | 613 » » } else if err == nil && !uploadedCharm.Placeholder { |
610 » » » // The charm exists and it's either uploaded or not. | 614 » » » // The charm exists and it's either uploaded or still |
611 » » » // In either case, we just return what we got. | 615 » » » // pending, but it's not a placeholder. In any case, we |
612 » » » break | 616 » » » // just return what we got. |
613 » » } | 617 » » » return newCharm(st, &uploadedCharm) |
614 | 618 » » } else if err == mgo.ErrNotFound { |
615 » » // No charm found, so create it now as pending. | 619 » » » // Prepare the pending charm document for insertion. |
616 » » uploadedCharm = charmDoc{ | 620 » » » uploadedCharm = charmDoc{ |
617 » » » URL: curl, | 621 » » » » URL: curl, |
618 » » » PendingUpload: true, | 622 » » » » PendingUpload: true, |
619 » » } | 623 » » » » Placeholder: false, |
620 » » ops := []txn.Op{{ | 624 » » » } |
621 » » » C: st.charms.Name, | 625 » » } |
622 » » » Id: curl, | 626 |
623 » » » Assert: txn.DocMissing, | 627 » » var ops []txn.Op |
624 » » » Insert: uploadedCharm, | 628 » » if uploadedCharm.Placeholder { |
625 » » }} | 629 » » » // Convert the placeholder to a pending charm, while |
| 630 » » » // asserting the fields updated after an upload have not |
| 631 » » » // changed yet. |
| 632 » » » ops = []txn.Op{{ |
| 633 » » » » C: st.charms.Name, |
| 634 » » » » Id: curl, |
| 635 » » » » Assert: D{ |
| 636 » » » » » {"bundlesha256", ""}, |
| 637 » » » » » {"pendingupload", false}, |
| 638 » » » » » {"placeholder", true}, |
| 639 » » » » }, |
| 640 » » » » Update: D{{"$set", D{ |
| 641 » » » » » {"pendingupload", true}, |
| 642 » » » » » {"placeholder", false}, |
| 643 » » » » }}}, |
| 644 » » » }} |
| 645 » » » // Update the fields of the document we're returning. |
| 646 » » » uploadedCharm.PendingUpload = true |
| 647 » » » uploadedCharm.Placeholder = false |
| 648 » » } else { |
| 649 » » » // No charm document with this curl yet, insert it. |
| 650 » » » ops = []txn.Op{{ |
| 651 » » » » C: st.charms.Name, |
| 652 » » » » Id: curl, |
| 653 » » » » Assert: txn.DocMissing, |
| 654 » » » » Insert: uploadedCharm, |
| 655 » » » }} |
| 656 » » } |
| 657 |
626 // Run the transaction, and retry on abort. | 658 // Run the transaction, and retry on abort. |
627 » » if err = st.runTransaction(ops); err == txn.ErrAborted { | 659 » » err = st.runTransaction(ops) |
| 660 » » if err == txn.ErrAborted { |
628 continue | 661 continue |
629 } else if err != nil { | 662 } else if err != nil { |
630 return nil, err | 663 return nil, err |
631 » » } | 664 » » } else if err == nil { |
632 » » break | 665 » » » return newCharm(st, &uploadedCharm) |
633 » } | 666 » » } |
634 » if err != nil { | 667 » } |
635 » » return nil, ErrExcessiveContention | 668 » return nil, ErrExcessiveContention |
636 » } | |
637 » return newCharm(st, &uploadedCharm) | |
638 } | 669 } |
639 | 670 |
640 var ( | 671 var ( |
641 » StillPending = D{{"pendingupload", true}} | 672 » stillPending = D{{"pendingupload", true}} |
642 » StillPlaceholder = D{{"placeholder", true}} | 673 » stillPlaceholder = D{{"placeholder", true}} |
643 ) | 674 ) |
644 | 675 |
645 // AddStoreCharmPlaceholder creates a charm document in state for the given char
m URL which | 676 // AddStoreCharmPlaceholder creates a charm document in state for the given char
m URL which |
646 // must reference a charm from the store. The charm document is marked as a plac
eholder which | 677 // must reference a charm from the store. The charm document is marked as a plac
eholder which |
647 // means that if the charm is to be deployed, it will need to first be uploaded
to env storage. | 678 // means that if the charm is to be deployed, it will need to first be uploaded
to env storage. |
648 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { | 679 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { |
649 // Perform sanity checks first. | 680 // Perform sanity checks first. |
650 if curl.Schema != "cs" { | 681 if curl.Schema != "cs" { |
651 return fmt.Errorf("expected charm URL with cs schema, got %q", c
url) | 682 return fmt.Errorf("expected charm URL with cs schema, got %q", c
url) |
652 } | 683 } |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
710 return nil, err | 741 return nil, err |
711 } | 742 } |
712 var ops []txn.Op | 743 var ops []txn.Op |
713 for _, doc := range docs { | 744 for _, doc := range docs { |
714 if doc.URL.Revision >= curl.Revision { | 745 if doc.URL.Revision >= curl.Revision { |
715 continue | 746 continue |
716 } | 747 } |
717 ops = append(ops, txn.Op{ | 748 ops = append(ops, txn.Op{ |
718 C: st.charms.Name, | 749 C: st.charms.Name, |
719 Id: doc.URL.String(), | 750 Id: doc.URL.String(), |
720 » » » Assert: StillPlaceholder, | 751 » » » Assert: stillPlaceholder, |
721 Remove: true, | 752 Remove: true, |
722 }) | 753 }) |
723 } | 754 } |
724 return ops, nil | 755 return ops, nil |
725 } | 756 } |
| 757 |
| 758 // ErrCharmAlreadyUploaded is returned by UpdateUploadedCharm() when |
| 759 // the given charm is already uploaded and marked as not pending in |
| 760 // state. |
| 761 type ErrCharmAlreadyUploaded struct { |
| 762 curl *charm.URL |
| 763 } |
| 764 |
| 765 func (e *ErrCharmAlreadyUploaded) Error() string { |
| 766 return fmt.Sprintf("charm %q already uploaded", e.curl) |
| 767 } |
| 768 |
| 769 // IsCharmAlreadyUploadedError returns if the given error is |
| 770 // ErrCharmAlreadyUploaded. |
| 771 func IsCharmAlreadyUploadedError(err interface{}) bool { |
| 772 if err == nil { |
| 773 return false |
| 774 } |
| 775 _, ok := err.(*ErrCharmAlreadyUploaded) |
| 776 return ok |
| 777 } |
| 778 |
| 779 // ErrCharmRevisionAlreadyModified is returned when a pending or |
| 780 // placeholder charm is no longer pending or a placeholder, signaling |
| 781 // the charm is available in state with its full information. |
| 782 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie
d") |
726 | 783 |
727 // UpdateUploadedCharm marks the given charm URL as uploaded and | 784 // UpdateUploadedCharm marks the given charm URL as uploaded and |
728 // updates the rest of its data, returning it as *state.Charm. | 785 // updates the rest of its data, returning it as *state.Charm. |
729 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL
*url.URL, bundleSha256 string) (*Charm, error) { | 786 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL
*url.URL, bundleSha256 string) (*Charm, error) { |
730 doc := &charmDoc{} | 787 doc := &charmDoc{} |
731 err := st.charms.FindId(curl).One(&doc) | 788 err := st.charms.FindId(curl).One(&doc) |
732 if err == mgo.ErrNotFound { | 789 if err == mgo.ErrNotFound { |
733 return nil, errors.NotFoundf("charm %q", curl) | 790 return nil, errors.NotFoundf("charm %q", curl) |
734 } | 791 } |
735 if err != nil { | 792 if err != nil { |
736 return nil, err | 793 return nil, err |
737 } | 794 } |
738 if !doc.PendingUpload { | 795 if !doc.PendingUpload { |
739 » » return nil, fmt.Errorf("charm %q already uploaded", curl) | 796 » » return nil, &ErrCharmAlreadyUploaded{curl} |
740 » } | 797 » } |
741 | 798 |
742 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPending
) | 799 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPending
) |
743 } | 800 } |
744 | |
745 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie
d") | |
746 | 801 |
747 // updateCharmDoc updates the charm with specified URL with the given | 802 // updateCharmDoc updates the charm with specified URL with the given |
748 // data, and resets the placeholder and pendingupdate flags. If the | 803 // data, and resets the placeholder and pendingupdate flags. If the |
749 // charm is no longer a placeholder or pending (depending on preReq), | 804 // charm is no longer a placeholder or pending (depending on preReq), |
750 // it returns ErrCharmRevisionAlreadyModified. | 805 // it returns ErrCharmRevisionAlreadyModified. |
751 func (st *State) updateCharmDoc( | 806 func (st *State) updateCharmDoc( |
752 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string
, preReq interface{}) (*Charm, error) { | 807 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string
, preReq interface{}) (*Charm, error) { |
753 | 808 |
754 updateFields := D{{"$set", D{ | 809 updateFields := D{{"$set", D{ |
755 {"meta", ch.Meta()}, | 810 {"meta", ch.Meta()}, |
(...skipping 553 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1309 func tagForGlobalKey(key string) (string, bool) { | 1364 func tagForGlobalKey(key string) (string, bool) { |
1310 if len(key) < 3 || key[1] != '#' { | 1365 if len(key) < 3 || key[1] != '#' { |
1311 return "", false | 1366 return "", false |
1312 } | 1367 } |
1313 p, ok := tagPrefix[key[0]] | 1368 p, ok := tagPrefix[key[0]] |
1314 if !ok { | 1369 if !ok { |
1315 return "", false | 1370 return "", false |
1316 } | 1371 } |
1317 return p + key[2:], true | 1372 return p + key[2:], true |
1318 } | 1373 } |
LEFT | RIGHT |