Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(7)

Side by Side Diff: state/watcher.go

Issue 6399052: bogus CL just to demonstrate some changes
Patch Set: bogus CL just to demonstrate some changes Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package state 1 package state
2 2
3 import ( 3 import (
4 "launchpad.net/goyaml" 4 "launchpad.net/goyaml"
5 "launchpad.net/juju-core/log" 5 "launchpad.net/juju-core/log"
6 "launchpad.net/juju-core/state/presence" 6 "launchpad.net/juju-core/state/presence"
7 "launchpad.net/juju-core/state/watcher" 7 "launchpad.net/juju-core/state/watcher"
8 "launchpad.net/tomb" 8 "launchpad.net/tomb"
9 ) 9 )
10 10
(...skipping 644 matching lines...) Expand 10 before | Expand all | Expand 10 after
655 case w.changeChan <- ch: 655 case w.changeChan <- ch:
656 w.current = latest 656 w.current = latest
657 } 657 }
658 return nil 658 return nil
659 } 659 }
660 660
661 func (w *ServiceRelationsWatcher) done() { 661 func (w *ServiceRelationsWatcher) done() {
662 close(w.changeChan) 662 close(w.changeChan)
663 } 663 }
664 664
665 type relationUnit struct {
666 rw *relationUnitsWatcher
667 key string
668 name string
669 stop chan struct{}
670 stopped bool
671 w *watcher.ContentWatcher
672 }
673
665 // relationUnitsWatcher watches the presence and settings of units 674 // relationUnitsWatcher watches the presence and settings of units
666 // playing a particular role in a particular scope of a relation, 675 // playing a particular role in a particular scope of a relation,
667 // on behalf of another relation unit (which can potentially be in 676 // on behalf of another relation unit (which can potentially be in
668 // that scope/role, and will if so be exluded from reported events). 677 // that scope/role, and will if so be exluded from reported events).
669 type relationUnitsWatcher struct { 678 type relationUnitsWatcher struct {
670 st *State 679 st *State
671 » tomb tomb.Tomb 680 » stop <-chan struct{}
681 » err error
672 role RelationRole 682 role RelationRole
673 scope unitScopePath 683 scope unitScopePath
674 ignore string 684 ignore string
675 updates chan unitSettingsChange 685 updates chan unitSettingsChange
676 » unitTombs map[string]*tomb.Tomb 686 » units map[string]*relationUnit
677 » names map[string]string
678 changes chan RelationUnitsChange 687 changes chan RelationUnitsChange
679 } 688 }
680 689
681 // RelationUnitsChange holds settings information for newly-added and -changed 690 // RelationUnitsChange holds settings information for newly-added and -changed
682 // units, and the names of those newly departed from the relation. 691 // units, and the names of those newly departed from the relation.
683 type RelationUnitsChange struct { 692 type RelationUnitsChange struct {
684 Changed map[string]UnitSettings 693 Changed map[string]UnitSettings
685 Departed []string 694 Departed []string
686 } 695 }
687 696
688 // UnitSettings holds information about a service unit's settings within a 697 // UnitSettings holds information about a service unit's settings within a
689 // relation. 698 // relation.
690 type UnitSettings struct { 699 type UnitSettings struct {
691 Version int 700 Version int
692 Settings map[string]interface{} 701 Settings map[string]interface{}
693 } 702 }
694 703
695 // unitSettingsChange is used internally by relationUnitsWatcher to communicate 704 // unitSettingsChange is used internally by relationUnitsWatcher to communicate
696 // information about a particular unit's settings within a relation. 705 // information about a particular unit's settings within a relation.
697 type unitSettingsChange struct { 706 type unitSettingsChange struct {
698 » name string 707 » unit *relationUnit
699 settings UnitSettings 708 settings UnitSettings
709 eof bool
710 err error
700 } 711 }
701 712
702 // newRelationUnitsWatcher returns a relationUnitsWatcher which notifies of 713 // newRelationUnitsWatcher returns a relationUnitsWatcher which notifies of
703 // all presence and settings changes to units playing role within scope, 714 // all presence and settings changes to units playing role within scope,
704 // excluding the given unit. 715 // excluding the given unit.
705 func newRelationUnitsWatcher(scope unitScopePath, role RelationRole, u *Unit) *r elationUnitsWatcher { 716 func newRelationUnitsWatcher(scope unitScopePath, role RelationRole, u *Unit, st op <-chan struct{}) *relationUnitsWatcher {
706 w := &relationUnitsWatcher{ 717 w := &relationUnitsWatcher{
707 st: u.st, 718 st: u.st,
708 role: role, 719 role: role,
709 scope: scope, 720 scope: scope,
710 ignore: u.key, 721 ignore: u.key,
711 » » names: make(map[string]string), 722 » » units: make(map[string]*relationUnit),
712 » » unitTombs: make(map[string]*tomb.Tomb),
713 updates: make(chan unitSettingsChange), 723 updates: make(chan unitSettingsChange),
714 changes: make(chan RelationUnitsChange), 724 changes: make(chan RelationUnitsChange),
725 stop: stop,
715 } 726 }
716 go w.loop() 727 go w.loop()
717 return w 728 return w
718 } 729 }
719 730
720 func (w *relationUnitsWatcher) loop() { 731 func (w *relationUnitsWatcher) loop() {
721 defer w.finish() 732 defer w.finish()
722 » roleWatcher := presence.NewChildrenWatcher(w.st.zk, w.scope.presencePath (w.role, "")) 733 » roleStop := make(chan struct{})
723 » defer watcher.Stop(roleWatcher, &w.tomb) 734 » roleWatcher := presence.NewChildrenWatcher(w.st.zk, w.scope.presencePath (w.role, ""), roleStop)
735 » defer func() {
736 » » close(roleStop)
737 » » if err := roleWatcher.Wait(); err != nil && w.err == nil {
738 » » » w.err = err
739 » » }
740 » }()
724 emittedValue := false 741 emittedValue := false
725 for { 742 for {
726 var err error
727 var change RelationUnitsChange 743 var change RelationUnitsChange
728 select { 744 select {
729 » » case <-w.tomb.Dying(): 745 » » case <-w.stop:
730 return 746 return
731 case ch, ok := <-roleWatcher.Changes(): 747 case ch, ok := <-roleWatcher.Changes():
748 var err error
732 if !ok { 749 if !ok {
733 err = watcher.MustErr(roleWatcher) 750 err = watcher.MustErr(roleWatcher)
734 } else { 751 } else {
735 change, err = w.updateWatches(ch) 752 change, err = w.updateWatches(ch)
736 } 753 }
737 if err != nil { 754 if err != nil {
738 » » » » w.tomb.Kill(err) 755 » » » » w.err = err
739 return 756 return
740 } 757 }
741 if emittedValue && len(change.Changed) == 0 && len(chang e.Departed) == 0 { 758 if emittedValue && len(change.Changed) == 0 && len(chang e.Departed) == 0 {
742 continue 759 continue
743 } 760 }
744 » » case ch, ok := <-w.updates: 761 » » case ch := <-w.updates:
745 » » » if !ok { 762 » » » // If the unit has been stopped, it may yet produce an e vent,
746 » » » » panic("updates channel closed") 763 » » » // which we ignore, as we've already sent the Departed c hange.
764 » » » if ch.unit.stopped {
765 » » » » continue
747 } 766 }
748 change = RelationUnitsChange{ 767 change = RelationUnitsChange{
749 » » » » Changed: map[string]UnitSettings{ch.name: ch.set tings}, 768 » » » » Changed: map[string]UnitSettings{ch.unit.name: c h.settings},
750 } 769 }
751 } 770 }
752 » » select { 771 » » w.changes <- change
753 » » case <-w.tomb.Dying(): 772 » » emittedValue = true
754 » » » return
755 » » case w.changes <- change:
756 » » » emittedValue = true
757 » » }
758 } 773 }
759 } 774 }
760 775
761 func (w *relationUnitsWatcher) finish() { 776 func (w *relationUnitsWatcher) finish() {
762 » for _, t := range w.unitTombs { 777 » for _, u := range w.units {
763 » » t.Kill(nil) 778 » » close(u.stop)
764 » » w.tomb.Kill(t.Wait())
765 } 779 }
766 » close(w.updates) 780 » for len(w.units) > 0 {
781 » » ch := <-w.updates
782 » » if !ch.eof {
783 » » » continue
784 » » }
785 » » if ch.err != nil && w.err == nil {
786 » » » w.err = ch.err
787 » » }
788 » » delete(w.units, ch.unit.key)
789 » }
767 close(w.changes) 790 close(w.changes)
768 w.tomb.Done()
769 } 791 }
770 792
771 // Stop stops the watcher and returns any errors encountered while watching. 793 // Wait consumes all remaining changes and
772 func (w *relationUnitsWatcher) Stop() error { 794 // returns any error encountered while the watcher
773 » w.tomb.Kill(nil) 795 // was running.
774 » return w.tomb.Wait() 796 func (w *relationUnitsWatcher) Wait() error {
775 } 797 » for _ = range w.changes {
776 798 » }
777 // Dying returns a channel that is closed when the 799 » return w.err
778 // watcher has stopped or is about to stop.
779 func (w *relationUnitsWatcher) Dying() <-chan struct{} {
780 » return w.tomb.Dying()
781 }
782
783 // Err returns any error encountered while stopping the watcher, or
784 // tome.ErrStillAlive if the watcher is still running.
785 func (w *relationUnitsWatcher) Err() error {
786 » return w.tomb.Err()
787 } 800 }
788 801
789 // Changes returns a channel that will receive the changes to 802 // Changes returns a channel that will receive the changes to
790 // the relation when detected. 803 // the relation when detected.
791 // The first event on the channel holds the initial state of the 804 // The first event on the channel holds the initial state of the
792 // relation in its Changed field. 805 // relation in its Changed field.
793 func (w *relationUnitsWatcher) Changes() <-chan RelationUnitsChange { 806 func (w *relationUnitsWatcher) Changes() <-chan RelationUnitsChange {
794 return w.changes 807 return w.changes
795 } 808 }
796 809
797 // updateWatches starts or stops watches on the settings of the relation 810 // updateWatches starts or stops watches on the settings of the relation
798 // units declared present or absent by ch, and returns a RelationUnitsChange 811 // units declared present or absent by ch, and returns a RelationUnitsChange
799 // event expressing those changes. 812 // event expressing those changes.
800 func (w *relationUnitsWatcher) updateWatches(ch watcher.ChildrenChange) (change RelationUnitsChange, err error) { 813 func (w *relationUnitsWatcher) updateWatches(ch watcher.ChildrenChange) (change RelationUnitsChange, err error) {
801 for _, key := range ch.Removed { 814 for _, key := range ch.Removed {
802 if key == w.ignore { 815 if key == w.ignore {
803 continue 816 continue
804 } 817 }
805 » » // When we stop a unit settings watcher, we have to wait for its tomb, 818 » » u := w.units[key]
806 » » // lest its latest change (potentially waiting to be sent on the updates 819 » » close(u.stop)
807 » » // channel) be received and sent on as a RelationUnitsChange eve nt *after* 820 » » u.stopped = true
808 » » // we notify of its departure in the event we are currently prep aring. 821 » » change.Departed = append(change.Departed, u.name)
809 » » t := w.unitTombs[key]
810 » » delete(w.unitTombs, key)
811 » » t.Kill(nil)
812 » » if err := t.Wait(); err != nil {
813 » » » return RelationUnitsChange{}, err
814 » » }
815 » » name := w.names[key]
816 » » delete(w.names, key)
817 » » change.Departed = append(change.Departed, name)
818 } 822 }
819 var topo *topology 823 var topo *topology
820 for _, key := range ch.Added { 824 for _, key := range ch.Added {
821 if key == w.ignore { 825 if key == w.ignore {
822 continue 826 continue
823 } 827 }
824 if topo == nil { 828 if topo == nil {
825 // Create topology lazily; no sense reading it N times f or 829 // Create topology lazily; no sense reading it N times f or
826 // N added presence nodes where N != 1. 830 // N added presence nodes where N != 1.
827 if topo, err = readTopology(w.st.zk); err != nil { 831 if topo, err = readTopology(w.st.zk); err != nil {
828 return RelationUnitsChange{}, err 832 return RelationUnitsChange{}, err
829 } 833 }
830 } 834 }
831 name, err := topo.UnitName(key) 835 name, err := topo.UnitName(key)
832 if err != nil { 836 if err != nil {
833 return RelationUnitsChange{}, err 837 return RelationUnitsChange{}, err
834 } 838 }
835 // Start watching unit settings, and consume initial event to ge t 839 // Start watching unit settings, and consume initial event to ge t
836 // initial settings for the event we're preparing; subsequent 840 // initial settings for the event we're preparing; subsequent
837 // changes will be received on the unitLoop goroutine and sent t o 841 // changes will be received on the unitLoop goroutine and sent t o
838 // this one via w.updates. 842 // this one via w.updates.
839 » » w.names[key] = name 843 » » u := &relationUnit {
840 » » uw := watcher.NewContentWatcher(w.st.zk, w.scope.settingsPath(ke y)) 844 » » » rw: w,
841 » » select { 845 » » » key: key,
842 » » case <-w.tomb.Dying(): 846 » » » name: name,
843 » » » return RelationUnitsChange{}, tomb.ErrDying 847 » » » stop: make(chan struct{}),
844 » » case cch, ok := <-uw.Changes(): 848 » » » w: watcher.NewContentWatcher(w.st.zk, w.scope.settingsPa th(key)),
845 » » » if !ok {
846 » » » » return RelationUnitsChange{}, watcher.MustErr(uw )
847 » » » }
848 » » » us := UnitSettings{Version: cch.Version}
849 » » » if err = goyaml.Unmarshal([]byte(cch.Content), &us.Setti ngs); err != nil {
850 » » » » return RelationUnitsChange{}, err
851 » » » }
852 » » » if change.Changed == nil {
853 » » » » change.Changed = map[string]UnitSettings{}
854 » » » }
855 » » » change.Changed[name] = us
856 » » » t := &tomb.Tomb{}
857 » » » w.unitTombs[key] = t
858 » » » go w.unitLoop(name, uw, t)
859 } 849 }
850 cch, ok := <-u.w.Changes()
851 if !ok {
852 return RelationUnitsChange{}, watcher.MustErr(u.w)
853 }
854 us := UnitSettings{Version: cch.Version}
855 if err = goyaml.Unmarshal([]byte(cch.Content), &us.Settings); er r != nil {
856 return RelationUnitsChange{}, err
857 }
858 if change.Changed == nil {
859 change.Changed = map[string]UnitSettings{}
860 }
861 change.Changed[name] = us
862 w.units[key] = u
863 go u.loop()
860 } 864 }
861 return change, nil 865 return change, nil
862 } 866 }
863 867
864 // unitLoop sends a unitSettingsChange event on w.updates for each ContentChange 868 // unitLoop sends a unitSettingsChange event on w.updates for each ContentChange
865 // event received from uw. 869 // event received from uw.
866 func (w *relationUnitsWatcher) unitLoop(name string, uw *watcher.ContentWatcher, t *tomb.Tomb) { 870 func (u *relationUnit) loop() {
867 » defer t.Done() 871 » var err error
868 » defer uw.Stop() 872 » defer func() {
869 » for { 873 » » u.rw.updates <- unitSettingsChange{unit: u, eof: true, err: err}
870 » » select { 874 » }()
871 » » case <-t.Dying(): 875 » defer u.w.Stop()
876 » for ch := range u.w.Changes() {
877 » » us := UnitSettings{Version: ch.Version}
878 » » if e := goyaml.Unmarshal([]byte(ch.Content), &us.Settings); e != nil {
879 » » » err = e
872 return 880 return
873 case ch, ok := <-uw.Changes():
874 if !ok {
875 w.tomb.Kill(watcher.MustErr(uw))
876 return
877 }
878 us := UnitSettings{Version: ch.Version}
879 if err := goyaml.Unmarshal([]byte(ch.Content), &us.Setti ngs); err != nil {
880 w.tomb.Kill(err)
881 return
882 }
883 select {
884 case <-t.Dying():
885 return
886 case w.updates <- unitSettingsChange{name, us}:
887 }
888 } 881 }
882 u.rw.updates <- unitSettingsChange{unit: u, settings: us}
889 } 883 }
884 err = watcher.MustErr(u.w)
890 } 885 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | no next file » | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d