Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(165)

Side by Side Diff: state/state.go

Issue 53210044: state;apiserver: Fix a race - lp bug #1067979 (Closed)
Patch Set: state;apiserver: Fix a race - lp bug #1067979 Created 11 years, 1 month ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 // The state package enables reading, observing, and changing 4 // The state package enables reading, observing, and changing
5 // the state stored in MongoDB of a whole environment 5 // the state stored in MongoDB of a whole environment
6 // managed by juju. 6 // managed by juju.
7 package state 7 package state
8 8
9 import ( 9 import (
10 "fmt" 10 "fmt"
(...skipping 427 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 case names.RelationTagKind: 438 case names.RelationTagKind:
439 coll = st.relations.Name 439 coll = st.relations.Name
440 case names.EnvironTagKind: 440 case names.EnvironTagKind:
441 coll = st.environments.Name 441 coll = st.environments.Name
442 default: 442 default:
443 return "", "", fmt.Errorf("%q is not a valid collection tag", ta g) 443 return "", "", fmt.Errorf("%q is not a valid collection tag", ta g)
444 } 444 }
445 return coll, id, nil 445 return coll, id, nil
446 } 446 }
447 447
448 // AddCharm adds the ch charm with curl to the state. bundleUrl must be 448 // AddCharm adds the ch charm with curl to the state. bundleURL must
449 // set to a URL where the bundle for ch may be downloaded from. 449 // be set to a URL where the bundle for ch may be downloaded from. On
450 // On success the newly added charm state is returned. 450 // success the newly added charm state is returned.
451 func (st *State) AddCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, b undleSha256 string) (stch *Charm, err error) { 451 func (st *State) AddCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, b undleSha256 string) (stch *Charm, err error) {
452 » // The charm may already exist in state as a placeholder, so we check fo r that situation and update the 452 » // The charm may already exist in state as a placeholder, so we
453 » // existing charm record if necessary, otherwise add a new record. 453 » // check for that situation and update the existing charm record
454 » // if necessary, otherwise add a new record.
454 var existing charmDoc 455 var existing charmDoc
455 err = st.charms.Find(D{{"_id", curl.String()}, {"placeholder", true}}).O ne(&existing) 456 err = st.charms.Find(D{{"_id", curl.String()}, {"placeholder", true}}).O ne(&existing)
456 if err == mgo.ErrNotFound { 457 if err == mgo.ErrNotFound {
457 cdoc := &charmDoc{ 458 cdoc := &charmDoc{
458 URL: curl, 459 URL: curl,
459 Meta: ch.Meta(), 460 Meta: ch.Meta(),
460 Config: ch.Config(), 461 Config: ch.Config(),
461 BundleURL: bundleURL, 462 BundleURL: bundleURL,
462 BundleSha256: bundleSha256, 463 BundleSha256: bundleSha256,
463 } 464 }
464 err = st.charms.Insert(cdoc) 465 err = st.charms.Insert(cdoc)
465 if err != nil { 466 if err != nil {
466 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err) 467 return nil, fmt.Errorf("cannot add charm %q: %v", curl, err)
467 } 468 }
468 return newCharm(st, cdoc) 469 return newCharm(st, cdoc)
469 } else if err != nil { 470 } else if err != nil {
470 return nil, err 471 return nil, err
471 } 472 }
472 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPlaceho lder) 473 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPlaceho lder)
473 } 474 }
474 475
475 // Charm returns the charm with the given URL. 476 // Charm returns the charm with the given URL. Charms pending upload
477 // to storage and placeholders are never returned.
476 func (st *State) Charm(curl *charm.URL) (*Charm, error) { 478 func (st *State) Charm(curl *charm.URL) (*Charm, error) {
477 cdoc := &charmDoc{} 479 cdoc := &charmDoc{}
478 » err := st.charms.Find(D{{"_id", curl}, {"pendingupload", false}, {"place holder", false}}).One(cdoc) 480 » what := D{
481 » » {"_id", curl},
482 » » {"placeholder", D{{"$ne", true}}},
483 » » {"pendingupload", D{{"$ne", true}}},
484 » }
485 » err := st.charms.Find(what).One(&cdoc)
479 if err == mgo.ErrNotFound { 486 if err == mgo.ErrNotFound {
480 return nil, errors.NotFoundf("charm %q", curl) 487 return nil, errors.NotFoundf("charm %q", curl)
481 } 488 }
482 if err != nil { 489 if err != nil {
483 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) 490 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err)
484 } 491 }
485 if err := cdoc.Meta.Check(); err != nil { 492 if err := cdoc.Meta.Check(); err != nil {
486 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err) 493 return nil, fmt.Errorf("malformed charm metadata found in state: %v", err)
487 } 494 }
488 return newCharm(st, cdoc) 495 return newCharm(st, cdoc)
489 } 496 }
490 497
491 // LatestPlaceholderCharm returns the latest charm described by the given URL bu t which is not yet deployed. 498 // LatestPlaceholderCharm returns the latest charm described by the
499 // given URL but which is not yet deployed.
492 func (st *State) LatestPlaceholderCharm(curl *charm.URL) (*Charm, error) { 500 func (st *State) LatestPlaceholderCharm(curl *charm.URL) (*Charm, error) {
493 noRevURL := curl.WithRevision(-1) 501 noRevURL := curl.WithRevision(-1)
494 curlRegex := "^" + regexp.QuoteMeta(noRevURL.String()) 502 curlRegex := "^" + regexp.QuoteMeta(noRevURL.String())
495 var docs []charmDoc 503 var docs []charmDoc
496 err := st.charms.Find(D{{"_id", D{{"$regex", curlRegex}}}, {"placeholder ", true}}).All(&docs) 504 err := st.charms.Find(D{{"_id", D{{"$regex", curlRegex}}}, {"placeholder ", true}}).All(&docs)
497 if err != nil { 505 if err != nil {
498 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err) 506 return nil, fmt.Errorf("cannot get charm %q: %v", curl, err)
499 } 507 }
500 // Find the highest revision. 508 // Find the highest revision.
501 var latest charmDoc 509 var latest charmDoc
502 for _, doc := range docs { 510 for _, doc := range docs {
503 if latest.URL == nil || doc.URL.Revision > latest.URL.Revision { 511 if latest.URL == nil || doc.URL.Revision > latest.URL.Revision {
504 latest = doc 512 latest = doc
505 } 513 }
506 } 514 }
507 if latest.URL == nil { 515 if latest.URL == nil {
508 return nil, errors.NotFoundf("placeholder charm %q", noRevURL) 516 return nil, errors.NotFoundf("placeholder charm %q", noRevURL)
509 } 517 }
510 return newCharm(st, &latest) 518 return newCharm(st, &latest)
511 } 519 }
512 520
513 // PrepareLocalCharmUpload must be called before a charm is uploaded 521 // PrepareLocalCharmUpload must be called before a local charm is
514 // to the provider storage in order to create a charm document in 522 // uploaded to the provider storage in order to create a charm
515 // state. It returns a new Charm with no metadata and a unique 523 // document in state. It returns the chosen unique charm URL reserved
516 // revision number. 524 // in state for the charm.
517 // 525 //
518 // The url's schema must be "local" and it must include a revision. 526 // The url's schema must be "local" and it must include a revision.
519 func (st *State) PrepareLocalCharmUpload(curl *charm.URL) (chosenUrl *charm.URL, err error) { 527 func (st *State) PrepareLocalCharmUpload(curl *charm.URL) (chosenUrl *charm.URL, err error) {
520 // Perform a few sanity checks first. 528 // Perform a few sanity checks first.
521 if curl.Schema != "local" { 529 if curl.Schema != "local" {
522 return nil, fmt.Errorf("expected charm URL with local schema, go t %q", curl) 530 return nil, fmt.Errorf("expected charm URL with local schema, go t %q", curl)
523 } 531 }
524 if curl.Revision < 0 { 532 if curl.Revision < 0 {
525 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl) 533 return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl)
526 } 534 }
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
568 return nil, err 576 return nil, err
569 } 577 }
570 break 578 break
571 } 579 }
572 if err != nil { 580 if err != nil {
573 return nil, ErrExcessiveContention 581 return nil, ErrExcessiveContention
574 } 582 }
575 return chosenUrl, nil 583 return chosenUrl, nil
576 } 584 }
577 585
578 var StillPending = D{{"pendingupload", true}} 586 // PrepareStoreCharmUpload must be called before a charm store charm
579 var StillPlaceholder = D{{"placeholder", true}} 587 // is uploaded to the provider storage in order to create a charm
588 // document in state. If a charm with the same URL is already in
589 // state, it will be returned as a *state.Charm (is can be still
590 // pending or already uploaded). Otherwise, a new charm document is
591 // added in state with just the given charm URL and
592 // PendingUpload=true, which is then returned as a *state.Charm.
593 //
594 // The url's schema must be "cs" and it must include a revision.
595 func (st *State) PrepareStoreCharmUpload(curl *charm.URL) (*Charm, error) {
596 » // Perform a few sanity checks first.
597 » if curl.Schema != "cs" {
598 » » return nil, fmt.Errorf("expected charm URL with cs schema, got % q", curl)
599 » }
600 » if curl.Revision < 0 {
601 » » return nil, fmt.Errorf("expected charm URL with revision, got %q ", curl)
602 » }
603
604 » var (
605 » » uploadedCharm charmDoc
606 » » err error
607 » )
608 » for attempt := 0; attempt < 3; attempt++ {
609 » » // Find an uploaded or pending charm with the given exact curl.
610 » » err = st.charms.FindId(curl).One(&uploadedCharm)
611 » » if err != nil && err != mgo.ErrNotFound {
612 » » » return nil, err
613 » » } else if err == nil && !uploadedCharm.Placeholder {
614 » » » // The charm exists and it's either uploaded or still
615 » » » // pending, but it's not a placeholder. In any case, we
616 » » » // just return what we got.
617 » » » break
618 » » } else if err == mgo.ErrNotFound {
619 » » » // Prepare the pending charm document for insertion.
620 » » » uploadedCharm = charmDoc{
621 » » » » URL: curl,
622 » » » » PendingUpload: true,
623 » » » » Placeholder: false,
624 » » » }
625 » » }
626
627 » » var ops []txn.Op
628 » » if uploadedCharm.Placeholder {
629 » » » // Convert the placeholder to a pending charm.
630 » » » ops = []txn.Op{{
631 » » » » C: st.charms.Name,
632 » » » » Id: curl,
633 » » » » Assert: txn.DocExists,
fwereade 2014/01/29 16:01:22 hmm, should be be asserting that it hasn't had arc
dimitern 2014/01/29 20:59:13 Done.
634 » » » » Update: D{{"$set", D{
635 » » » » » {"pendingupload", true},
636 » » » » » {"placeholder", false},
637 » » » » }}},
638 » » » }}
639 » » » // Update the fields of the document we're returning.
640 » » » uploadedCharm.PendingUpload = true
641 » » » uploadedCharm.Placeholder = false
642 » » } else {
643 » » » // No charm document with this curl yet, insert it.
644 » » » ops = []txn.Op{{
645 » » » » C: st.charms.Name,
646 » » » » Id: curl,
647 » » » » Assert: txn.DocMissing,
648 » » » » Insert: uploadedCharm,
649 » » » }}
650 » » }
651
652 » » // Run the transaction, and retry on abort.
653 » » err = st.runTransaction(ops)
654 » » if err == txn.ErrAborted {
655 » » » continue
656 » » } else if err != nil {
657 » » » return nil, err
658 » » }
659 » » break
660 » }
661 » if err != nil {
fwereade 2014/01/29 16:01:22 this makes me twitch a little -- AFAICT the only p
dimitern 2014/01/29 20:59:13 Done.
662 » » return nil, ErrExcessiveContention
663 » }
664 » return newCharm(st, &uploadedCharm)
665 }
666
667 var (
668 » stillPending = D{{"pendingupload", true}}
669 » stillPlaceholder = D{{"placeholder", true}}
670 )
580 671
581 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which 672 // AddStoreCharmPlaceholder creates a charm document in state for the given char m URL which
582 // must reference a charm from the store. The charm document is marked as a plac eholder which 673 // must reference a charm from the store. The charm document is marked as a plac eholder which
583 // means that if the charm is to be deployed, it will need to first be uploaded to env storage. 674 // means that if the charm is to be deployed, it will need to first be uploaded to env storage.
584 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) { 675 func (st *State) AddStoreCharmPlaceholder(curl *charm.URL) (err error) {
585 // Perform sanity checks first. 676 // Perform sanity checks first.
586 if curl.Schema != "cs" { 677 if curl.Schema != "cs" {
587 return fmt.Errorf("expected charm URL with cs schema, got %q", c url) 678 return fmt.Errorf("expected charm URL with cs schema, got %q", c url)
588 } 679 }
589 if curl.Revision < 0 { 680 if curl.Revision < 0 {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
646 return nil, err 737 return nil, err
647 } 738 }
648 var ops []txn.Op 739 var ops []txn.Op
649 for _, doc := range docs { 740 for _, doc := range docs {
650 if doc.URL.Revision >= curl.Revision { 741 if doc.URL.Revision >= curl.Revision {
651 continue 742 continue
652 } 743 }
653 ops = append(ops, txn.Op{ 744 ops = append(ops, txn.Op{
654 C: st.charms.Name, 745 C: st.charms.Name,
655 Id: doc.URL.String(), 746 Id: doc.URL.String(),
656 » » » Assert: StillPlaceholder, 747 » » » Assert: stillPlaceholder,
657 Remove: true, 748 Remove: true,
658 }) 749 })
659 } 750 }
660 return ops, nil 751 return ops, nil
661 } 752 }
662 753
754 // ErrCharmAlreadyUploaded is returned by UpdateUploadedCharm() when
755 // the given charm is already uploaded and marked as not pending in
756 // state.
757 type ErrCharmAlreadyUploaded struct {
758 curl *charm.URL
759 }
760
761 func (e *ErrCharmAlreadyUploaded) Error() string {
762 return fmt.Sprintf("charm %q already uploaded", e.curl)
763 }
764
765 // IsCharmAlreadyUploadedError returns if the given error is
766 // ErrCharmAlreadyUploaded.
767 func IsCharmAlreadyUploadedError(err interface{}) bool {
768 if err == nil {
769 return false
770 }
771 _, ok := err.(*ErrCharmAlreadyUploaded)
772 return ok
773 }
774
775 // ErrCharmRevisionAlreadyModified is returned when a pending or
776 // placeholder charm is no longer pending or a placeholder, signaling
777 // the charm is available in state with its full information.
778 var ErrCharmRevisionAlreadyModified = fmt.Errorf("charm revision already modifie d")
779
663 // UpdateUploadedCharm marks the given charm URL as uploaded and 780 // UpdateUploadedCharm marks the given charm URL as uploaded and
664 // updates the rest of its data, returning it as *state.Charm. 781 // updates the rest of its data, returning it as *state.Charm.
665 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) { 782 func (st *State) UpdateUploadedCharm(ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string) (*Charm, error) {
666 doc := &charmDoc{} 783 doc := &charmDoc{}
667 err := st.charms.FindId(curl).One(&doc) 784 err := st.charms.FindId(curl).One(&doc)
668 if err == mgo.ErrNotFound { 785 if err == mgo.ErrNotFound {
669 return nil, errors.NotFoundf("charm %q", curl) 786 return nil, errors.NotFoundf("charm %q", curl)
670 } 787 }
671 if err != nil { 788 if err != nil {
672 return nil, err 789 return nil, err
673 } 790 }
674 if !doc.PendingUpload { 791 if !doc.PendingUpload {
675 » » return nil, fmt.Errorf("charm %q already uploaded", curl) 792 » » return nil, &ErrCharmAlreadyUploaded{curl}
676 } 793 }
677 794
678 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, StillPending ) 795 » return st.updateCharmDoc(ch, curl, bundleURL, bundleSha256, stillPending )
679 } 796 }
680 797
681 // updateCharmDoc updates the charm with specified URL with the given data, and resets the placeholder 798 // updateCharmDoc updates the charm with specified URL with the given
682 // and pendingupdate flags. 799 // data, and resets the placeholder and pendingupdate flags. If the
800 // charm is no longer a placeholder or pending (depending on preReq),
801 // it returns ErrCharmRevisionAlreadyModified.
683 func (st *State) updateCharmDoc( 802 func (st *State) updateCharmDoc(
684 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) { 803 ch charm.Charm, curl *charm.URL, bundleURL *url.URL, bundleSha256 string , preReq interface{}) (*Charm, error) {
685 804
686 updateFields := D{{"$set", D{ 805 updateFields := D{{"$set", D{
687 {"meta", ch.Meta()}, 806 {"meta", ch.Meta()},
688 {"config", ch.Config()}, 807 {"config", ch.Config()},
689 {"bundleurl", bundleURL}, 808 {"bundleurl", bundleURL},
690 {"bundlesha256", bundleSha256}, 809 {"bundlesha256", bundleSha256},
691 {"pendingupload", false}, 810 {"pendingupload", false},
692 {"placeholder", false}, 811 {"placeholder", false},
693 }}} 812 }}}
694 ops := []txn.Op{{ 813 ops := []txn.Op{{
695 C: st.charms.Name, 814 C: st.charms.Name,
696 Id: curl, 815 Id: curl,
697 Assert: preReq, 816 Assert: preReq,
698 Update: updateFields, 817 Update: updateFields,
699 }} 818 }}
700 if err := st.runTransaction(ops); err != nil { 819 if err := st.runTransaction(ops); err != nil {
701 » » return nil, onAbort(err, fmt.Errorf("charm revision already modi fied")) 820 » » return nil, onAbort(err, ErrCharmRevisionAlreadyModified)
702 } 821 }
703 return st.Charm(curl) 822 return st.Charm(curl)
704 } 823 }
705 824
706 // addPeerRelationsOps returns the operations necessary to add the 825 // addPeerRelationsOps returns the operations necessary to add the
707 // specified service peer relations to the state. 826 // specified service peer relations to the state.
708 func (st *State) addPeerRelationsOps(serviceName string, peers map[string]charm. Relation) ([]txn.Op, error) { 827 func (st *State) addPeerRelationsOps(serviceName string, peers map[string]charm. Relation) ([]txn.Op, error) {
709 var ops []txn.Op 828 var ops []txn.Op
710 for _, rel := range peers { 829 for _, rel := range peers {
711 relId, err := st.sequence("relation") 830 relId, err := st.sequence("relation")
(...skipping 529 matching lines...) Expand 10 before | Expand all | Expand 10 after
1241 func tagForGlobalKey(key string) (string, bool) { 1360 func tagForGlobalKey(key string) (string, bool) {
1242 if len(key) < 3 || key[1] != '#' { 1361 if len(key) < 3 || key[1] != '#' {
1243 return "", false 1362 return "", false
1244 } 1363 }
1245 p, ok := tagPrefix[key[0]] 1364 p, ok := tagPrefix[key[0]]
1246 if !ok { 1365 if !ok {
1247 return "", false 1366 return "", false
1248 } 1367 }
1249 return p + key[2:], true 1368 return p + key[2:], true
1250 } 1369 }
OLDNEW

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b