Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(784)

Delta Between Two Patch Sets: state/state.go

Issue 53210044: state;apiserver: Fix a race - lp bug #1067979 (Closed)
Left Patch Set: state;apiserver: Fix a race - lp bug #1067979 Created 11 years, 1 month ago
Right Patch Set: state;apiserver: Fix a race - lp bug #1067979 Created 11 years, 1 month ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « state/apiserver/export_test.go ('k') | state/state_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 // The state package enables reading, observing, and changing 4 // The state package enables reading, observing, and changing
5 // the state stored in MongoDB of a whole environment 5 // the state stored in MongoDB of a whole environment
6 // managed by juju. 6 // managed by juju.
7 package state 7 package state
8 8
9 import ( 9 import (
10 "fmt" 10 "fmt"
11 "net/url" 11 "net/url"
12 "regexp" 12 "regexp"
13 "sort" 13 "sort"
14 "strconv" 14 "strconv"
15 "strings" 15 "strings"
16 "sync" 16 "sync"
17 17
18 "github.com/loggo/loggo"
18 "labix.org/v2/mgo" 19 "labix.org/v2/mgo"
19 "labix.org/v2/mgo/bson" 20 "labix.org/v2/mgo/bson"
20 "labix.org/v2/mgo/txn" 21 "labix.org/v2/mgo/txn"
21 "launchpad.net/loggo"
22 22
23 "launchpad.net/juju-core/charm" 23 "launchpad.net/juju-core/charm"
24 "launchpad.net/juju-core/constraints" 24 "launchpad.net/juju-core/constraints"
25 "launchpad.net/juju-core/environs/config" 25 "launchpad.net/juju-core/environs/config"
26 "launchpad.net/juju-core/errors" 26 "launchpad.net/juju-core/errors"
27 "launchpad.net/juju-core/names" 27 "launchpad.net/juju-core/names"
28 "launchpad.net/juju-core/state/multiwatcher" 28 "launchpad.net/juju-core/state/multiwatcher"
29 "launchpad.net/juju-core/state/presence" 29 "launchpad.net/juju-core/state/presence"
30 "launchpad.net/juju-core/state/watcher" 30 "launchpad.net/juju-core/state/watcher"
31 "launchpad.net/juju-core/utils" 31 "launchpad.net/juju-core/utils"
(...skipping 431 matching lines...) Expand 10 before | Expand all | Expand 10 after
463 BundleSha256: bundleSha256, 463 BundleSha256: bundleSha256,
464 } 464 }
465 err = st.charms.Insert(cdoc) 465 err = st.charms.Insert(cdoc)
466 if err != nil { 466 if err != nil {
467 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err) 467 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err)
468 } 468 }
469 return newCharm(st, cdoc) 469 return newCharm(st, cdoc)
470 } else if err != nil { 470 } else if err != nil {
471 return nil, err 471 return nil, err
472 } 472 }
473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPlaceho lder) 473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPlaceho lder)
474 } 474 }
475 475
476 // Charm returns the charm with the given URL. Charms pending upload 476 // Charm returns the charm with the given URL. Charms pending upload
477 // to storage and placeholders are never returned. 477 // to storage and placeholders are never returned.
478 func (st *State) Charm(curl *charm.URL) (*Charm, error) { 478 func (st *State) Charm(curl *charm.URL) (*Charm, error) {
479 cdoc := &charmDoc{} 479 cdoc := &charmDoc{}
480 » what := D{{"_id", curl}, {"placeholder", false}, {"pendingupload", false }} 480 » what := D{
481 » » {"_id", curl},
482 » » {"placeholder", D{{"$ne", true}}},
483 » » {"pendingupload", D{{"$ne", true}}},
484 » }
481 err := st.charms.Find(what).One(&cdoc) 485 err := st.charms.Find(what).One(&cdoc)
482 if err == mgo.ErrNotFound { 486 if err == mgo.ErrNotFound {
483 return nil, errors.NotFoundf("charm %q", curl) 487 return nil, errors.NotFoundf("charm %q", curl)
484 } 488 }
485 if err != nil { 489 if err != nil {
486 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) 490 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err)
487 } 491 }
488 if err := cdoc.Meta.Check(); err != nil { 492 if err := cdoc.Meta.Check(); err != nil {
489 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err) 493 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err)
490 } 494 }
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
596 if curl.Revision < 0 { 600 if curl.Revision < 0 {
597 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl) 601 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl)
598 } 602 }
599 603
600 var ( 604 var (
601 uploadedCharm charmDoc 605 uploadedCharm charmDoc
602 err error 606 err error
603 ) 607 )
604 for attempt := 0; attempt < 3; attempt++ { 608 for attempt := 0; attempt < 3; attempt++ {
605 // Find an uploaded or pending charm with the given exact curl. 609 // Find an uploaded or pending charm with the given exact curl.
606 » » err = st.charms.Find(D{{"_id", curl}, {"placeholder", false}}).O ne(&uploadedCharm) 610 » » err = st.charms.FindId(curl).One(&uploadedCharm)
607 if err != nil && err != mgo.ErrNotFound { 611 if err != nil && err != mgo.ErrNotFound {
608 return nil, err 612 return nil, err
609 » » } else if err == nil { 613 » » } else if err == nil && !uploadedCharm.Placeholder {
610 » » » // The charm exists and it's either uploaded or not. 614 » » » // The charm exists and it's either uploaded or still
611 » » » // In either case, we just return what we got. 615 » » » // pending, but it's not a placeholder. In any case, we
612 » » » break 616 » » » // just return what we got.
613 » » } 617 » » » return newCharm(st, &uploadedCharm)
614 618 » » } else if err == mgo.ErrNotFound {
615 » » // No charm found, so create it now as pending. 619 » » » // Prepare the pending charm document for insertion.
616 » » uploadedCharm = charmDoc{ 620 » » » uploadedCharm = charmDoc{
617 » » » URL: curl, 621 » » » » URL: curl,
618 » » » PendingUpload: true, 622 » » » » PendingUpload: true,
619 » » } 623 » » » » Placeholder: false,
620 » » ops := []txn.Op{{ 624 » » » }
621 » » » C: st.charms.Name, 625 » » }
622 » » » Id: curl, 626
623 » » » Assert: txn.DocMissing, 627 » » var ops []txn.Op
624 » » » Insert: uploadedCharm, 628 » » if uploadedCharm.Placeholder {
625 » » }} 629 » » » // Convert the placeholder to a pending charm, while
630 » » » // asserting the fields updated after an upload have not
631 » » » // changed yet.
632 » » » ops = []txn.Op{{
633 » » » » C: st.charms.Name,
634 » » » » Id: curl,
635 » » » » Assert: D{
636 » » » » » {"bundlesha256", ""},
637 » » » » » {"pendingupload", false},
638 » » » » » {"placeholder", true},
639 » » » » },
640 » » » » Update: D{{"$set", D{
641 » » » » » {"pendingupload", true},
642 » » » » » {"placeholder", false},
643 » » » » }}},
644 » » » }}
645 » » » // Update the fields of the document we're returning.
646 » » » uploadedCharm.PendingUpload = true
647 » » » uploadedCharm.Placeholder = false
648 » » } else {
649 » » » // No charm document with this curl yet, insert it.
650 » » » ops = []txn.Op{{
651 » » » » C: st.charms.Name,
652 » » » » Id: curl,
653 » » » » Assert: txn.DocMissing,
654 » » » » Insert: uploadedCharm,
655 » » » }}
656 » » }
657
626 // Run the transaction, and retry on abort. 658 // Run the transaction, and retry on abort.
627 » » if err = st.runTransaction(ops); err == txn.ErrAborted { 659 » » err = st.runTransaction(ops)
660 » » if err == txn.ErrAborted {
628 continue 661 continue
629 } else if err != nil { 662 } else if err != nil {
630 return nil, err 663 return nil, err
631 » » } 664 » » } else if err == nil {
632 » » break 665 » » » return newCharm(st, &uploadedCharm)
633 » } 666 » » }
634 » if err != nil { 667 » }
635 » » return nil, ErrExcessiveContention 668 » return nil, ErrExcessiveContention
636 » }
637 » return newCharm(st, &uploadedCharm)
638 } 669 }
639 670
640 var ( 671 var (
641 » StillPending = D{{"pendingupload", true}} 672 » stillPending = D{{"pendingupload", true}}
642 » StillPlaceholder = D{{"placeholder", true}} 673 » stillPlaceholder = D{{"placeholder", true}}
643 ) 674 )
644 675
645 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which 676 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which
646 // must reference a charm from the store. The charm document is marked as a plac eholder which 677 // must reference a charm from the store. The charm document is marked as a plac eholder which
647 // means that if the charm is to be deployed, it will need to first be uploaded to env storage. 678 // means that if the charm is to be deployed, it will need to first be uploaded to env storage.
648 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { 679 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) {
649 // Perform sanity checks first. 680 // Perform sanity checks first.
650 if curl.Schema != "cs" { 681 if curl.Schema != "cs" {
651 return fmt.Errorf("expected charm URL with cs schema, got %q", c url) 682 return fmt.Errorf("expected charm URL with cs schema, got %q", c url)
652 } 683 }
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
710 return nil, err 741 return nil, err
711 } 742 }
712 var ops []txn.Op 743 var ops []txn.Op
713 for _, doc := range docs { 744 for _, doc := range docs {
714 if doc.URL.Revision >= curl.Revision { 745 if doc.URL.Revision >= curl.Revision {
715 continue 746 continue
716 } 747 }
717 ops = append(ops, txn.Op{ 748 ops = append(ops, txn.Op{
718 C: st.charms.Name, 749 C: st.charms.Name,
719 Id: doc.URL.String(), 750 Id: doc.URL.String(),
720 » » » Assert: StillPlaceholder, 751 » » » Assert: stillPlaceholder,
721 Remove: true, 752 Remove: true,
722 }) 753 })
723 } 754 }
724 return ops, nil 755 return ops, nil
725 } 756 }
757
758 // ErrCharmAlreadyUploaded is returned by UpdateUploadedCharm() when
759 // the given charm is already uploaded and marked as not pending in
760 // state.
761 type ErrCharmAlreadyUploaded struct {
762 curl *charm.URL
763 }
764
765 func (e *ErrCharmAlreadyUploaded) Error() string {
766 return fmt.Sprintf("charm %q already uploaded", e.curl)
767 }
768
769 // IsCharmAlreadyUploadedError returns if the given error is
770 // ErrCharmAlreadyUploaded.
771 func IsCharmAlreadyUploadedError(err interface{}) bool {
772 if err == nil {
773 return false
774 }
775 _, ok := err.(*ErrCharmAlreadyUploaded)
776 return ok
777 }
778
779 // ErrCharmRevisionAlreadyModified is returned when a pending or
780 // placeholder charm is no longer pending or a placeholder, signaling
781 // the charm is available in state with its full information.
782 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie d")
726 783
727 // UpdateUploadedCharm marks the given charm URL as uploaded and 784 // UpdateUploadedCharm marks the given charm URL as uploaded and
728 // updates the rest of its data, returning it as *state.Charm. 785 // updates the rest of its data, returning it as *state.Charm.
729 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) { 786 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) {
730 doc := &charmDoc{} 787 doc := &charmDoc{}
731 err := st.charms.FindId(curl).One(&doc) 788 err := st.charms.FindId(curl).One(&doc)
732 if err == mgo.ErrNotFound { 789 if err == mgo.ErrNotFound {
733 return nil, errors.NotFoundf("charm %q", curl) 790 return nil, errors.NotFoundf("charm %q", curl)
734 } 791 }
735 if err != nil { 792 if err != nil {
736 return nil, err 793 return nil, err
737 } 794 }
738 if !doc.PendingUpload { 795 if !doc.PendingUpload {
739 » » return nil, fmt.Errorf("charm %q already uploaded", curl) 796 » » return nil, &ErrCharmAlreadyUploaded{curl}
740 » } 797 » }
741 798
742 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPending ) 799 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPending )
743 } 800 }
744
745 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie d")
746 801
747 // updateCharmDoc updates the charm with specified URL with the given 802 // updateCharmDoc updates the charm with specified URL with the given
748 // data, and resets the placeholder and pendingupdate flags. If the 803 // data, and resets the placeholder and pendingupdate flags. If the
749 // charm is no longer a placeholder or pending (depending on preReq), 804 // charm is no longer a placeholder or pending (depending on preReq),
750 // it returns ErrCharmRevisionAlreadyModified. 805 // it returns ErrCharmRevisionAlreadyModified.
751 func (st *State) updateCharmDoc( 806 func (st *State) updateCharmDoc(
752 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) { 807 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) {
753 808
754 updateFields := D{{"$set", D{ 809 updateFields := D{{"$set", D{
755 {"meta", ch.Meta()}, 810 {"meta", ch.Meta()},
(...skipping 553 matching lines...) Expand 10 before | Expand all | Expand 10 after
1309 func tagForGlobalKey(key string) (string, bool) { 1364 func tagForGlobalKey(key string) (string, bool) {
1310 if len(key) < 3 || key[1] != '#' { 1365 if len(key) < 3 || key[1] != '#' {
1311 return "", false 1366 return "", false
1312 } 1367 }
1313 p, ok := tagPrefix[key[0]] 1368 p, ok := tagPrefix[key[0]]
1314 if !ok { 1369 if !ok {
1315 return "", false 1370 return "", false
1316 } 1371 }
1317 return p + key[2:], true 1372 return p + key[2:], true
1318 } 1373 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b