| OLD | NEW |
| 1 package state | 1 package state |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "launchpad.net/goyaml" | 4 "launchpad.net/goyaml" |
| 5 "launchpad.net/juju-core/log" | 5 "launchpad.net/juju-core/log" |
| 6 "launchpad.net/juju-core/state/presence" | 6 "launchpad.net/juju-core/state/presence" |
| 7 "launchpad.net/juju-core/state/watcher" | 7 "launchpad.net/juju-core/state/watcher" |
| 8 "launchpad.net/tomb" | 8 "launchpad.net/tomb" |
| 9 ) | 9 ) |
| 10 | 10 |
| (...skipping 644 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 655 case w.changeChan <- ch: | 655 case w.changeChan <- ch: |
| 656 w.current = latest | 656 w.current = latest |
| 657 } | 657 } |
| 658 return nil | 658 return nil |
| 659 } | 659 } |
| 660 | 660 |
| 661 func (w *ServiceRelationsWatcher) done() { | 661 func (w *ServiceRelationsWatcher) done() { |
| 662 close(w.changeChan) | 662 close(w.changeChan) |
| 663 } | 663 } |
| 664 | 664 |
| 665 type relationUnit struct { |
| 666 rw *relationUnitsWatcher |
| 667 key string |
| 668 name string |
| 669 stop chan struct{} |
| 670 stopped bool |
| 671 w *watcher.ContentWatcher |
| 672 } |
| 673 |
| 665 // relationUnitsWatcher watches the presence and settings of units | 674 // relationUnitsWatcher watches the presence and settings of units |
| 666 // playing a particular role in a particular scope of a relation, | 675 // playing a particular role in a particular scope of a relation, |
| 667 // on behalf of another relation unit (which can potentially be in | 676 // on behalf of another relation unit (which can potentially be in |
| 668 // that scope/role, and will if so be exluded from reported events). | 677 // that scope/role, and will if so be exluded from reported events). |
| 669 type relationUnitsWatcher struct { | 678 type relationUnitsWatcher struct { |
| 670 st *State | 679 st *State |
| 671 » tomb tomb.Tomb | 680 » stop <-chan struct{} |
| 681 » err error |
| 672 role RelationRole | 682 role RelationRole |
| 673 scope unitScopePath | 683 scope unitScopePath |
| 674 ignore string | 684 ignore string |
| 675 updates chan unitSettingsChange | 685 updates chan unitSettingsChange |
| 676 » unitTombs map[string]*tomb.Tomb | 686 » units map[string]*relationUnit |
| 677 » names map[string]string | |
| 678 changes chan RelationUnitsChange | 687 changes chan RelationUnitsChange |
| 679 } | 688 } |
| 680 | 689 |
| 681 // RelationUnitsChange holds settings information for newly-added and -changed | 690 // RelationUnitsChange holds settings information for newly-added and -changed |
| 682 // units, and the names of those newly departed from the relation. | 691 // units, and the names of those newly departed from the relation. |
| 683 type RelationUnitsChange struct { | 692 type RelationUnitsChange struct { |
| 684 Changed map[string]UnitSettings | 693 Changed map[string]UnitSettings |
| 685 Departed []string | 694 Departed []string |
| 686 } | 695 } |
| 687 | 696 |
| 688 // UnitSettings holds information about a service unit's settings within a | 697 // UnitSettings holds information about a service unit's settings within a |
| 689 // relation. | 698 // relation. |
| 690 type UnitSettings struct { | 699 type UnitSettings struct { |
| 691 Version int | 700 Version int |
| 692 Settings map[string]interface{} | 701 Settings map[string]interface{} |
| 693 } | 702 } |
| 694 | 703 |
| 695 // unitSettingsChange is used internally by relationUnitsWatcher to communicate | 704 // unitSettingsChange is used internally by relationUnitsWatcher to communicate |
| 696 // information about a particular unit's settings within a relation. | 705 // information about a particular unit's settings within a relation. |
| 697 type unitSettingsChange struct { | 706 type unitSettingsChange struct { |
| 698 » name string | 707 » unit *relationUnit |
| 699 settings UnitSettings | 708 settings UnitSettings |
| 709 eof bool |
| 710 err error |
| 700 } | 711 } |
| 701 | 712 |
| 702 // newRelationUnitsWatcher returns a relationUnitsWatcher which notifies of | 713 // newRelationUnitsWatcher returns a relationUnitsWatcher which notifies of |
| 703 // all presence and settings changes to units playing role within scope, | 714 // all presence and settings changes to units playing role within scope, |
| 704 // excluding the given unit. | 715 // excluding the given unit. |
| 705 func newRelationUnitsWatcher(scope unitScopePath, role RelationRole, u *Unit) *r
elationUnitsWatcher { | 716 func newRelationUnitsWatcher(scope unitScopePath, role RelationRole, u *Unit, st
op <-chan struct{}) *relationUnitsWatcher { |
| 706 w := &relationUnitsWatcher{ | 717 w := &relationUnitsWatcher{ |
| 707 st: u.st, | 718 st: u.st, |
| 708 role: role, | 719 role: role, |
| 709 scope: scope, | 720 scope: scope, |
| 710 ignore: u.key, | 721 ignore: u.key, |
| 711 » » names: make(map[string]string), | 722 » » units: make(map[string]*relationUnit), |
| 712 » » unitTombs: make(map[string]*tomb.Tomb), | |
| 713 updates: make(chan unitSettingsChange), | 723 updates: make(chan unitSettingsChange), |
| 714 changes: make(chan RelationUnitsChange), | 724 changes: make(chan RelationUnitsChange), |
| 725 stop: stop, |
| 715 } | 726 } |
| 716 go w.loop() | 727 go w.loop() |
| 717 return w | 728 return w |
| 718 } | 729 } |
| 719 | 730 |
| 720 func (w *relationUnitsWatcher) loop() { | 731 func (w *relationUnitsWatcher) loop() { |
| 721 defer w.finish() | 732 defer w.finish() |
| 722 » roleWatcher := presence.NewChildrenWatcher(w.st.zk, w.scope.presencePath
(w.role, "")) | 733 » roleStop := make(chan struct{}) |
| 723 » defer watcher.Stop(roleWatcher, &w.tomb) | 734 » roleWatcher := presence.NewChildrenWatcher(w.st.zk, w.scope.presencePath
(w.role, ""), roleStop) |
| 735 » defer func() { |
| 736 » » close(roleStop) |
| 737 » » if err := roleWatcher.Wait(); err != nil && w.err == nil { |
| 738 » » » w.err = err |
| 739 » » } |
| 740 » }() |
| 724 emittedValue := false | 741 emittedValue := false |
| 725 for { | 742 for { |
| 726 var err error | |
| 727 var change RelationUnitsChange | 743 var change RelationUnitsChange |
| 728 select { | 744 select { |
| 729 » » case <-w.tomb.Dying(): | 745 » » case <-w.stop: |
| 730 return | 746 return |
| 731 case ch, ok := <-roleWatcher.Changes(): | 747 case ch, ok := <-roleWatcher.Changes(): |
| 748 var err error |
| 732 if !ok { | 749 if !ok { |
| 733 err = watcher.MustErr(roleWatcher) | 750 err = watcher.MustErr(roleWatcher) |
| 734 } else { | 751 } else { |
| 735 change, err = w.updateWatches(ch) | 752 change, err = w.updateWatches(ch) |
| 736 } | 753 } |
| 737 if err != nil { | 754 if err != nil { |
| 738 » » » » w.tomb.Kill(err) | 755 » » » » w.err = err |
| 739 return | 756 return |
| 740 } | 757 } |
| 741 if emittedValue && len(change.Changed) == 0 && len(chang
e.Departed) == 0 { | 758 if emittedValue && len(change.Changed) == 0 && len(chang
e.Departed) == 0 { |
| 742 continue | 759 continue |
| 743 } | 760 } |
| 744 » » case ch, ok := <-w.updates: | 761 » » case ch := <-w.updates: |
| 745 » » » if !ok { | 762 » » » // If the unit has been stopped, it may yet produce an e
vent, |
| 746 » » » » panic("updates channel closed") | 763 » » » // which we ignore, as we've already sent the Departed c
hange. |
| 764 » » » if ch.unit.stopped { |
| 765 » » » » continue |
| 747 } | 766 } |
| 748 change = RelationUnitsChange{ | 767 change = RelationUnitsChange{ |
| 749 » » » » Changed: map[string]UnitSettings{ch.name: ch.set
tings}, | 768 » » » » Changed: map[string]UnitSettings{ch.unit.name: c
h.settings}, |
| 750 } | 769 } |
| 751 } | 770 } |
| 752 » » select { | 771 » » w.changes <- change |
| 753 » » case <-w.tomb.Dying(): | 772 » » emittedValue = true |
| 754 » » » return | |
| 755 » » case w.changes <- change: | |
| 756 » » » emittedValue = true | |
| 757 » » } | |
| 758 } | 773 } |
| 759 } | 774 } |
| 760 | 775 |
| 761 func (w *relationUnitsWatcher) finish() { | 776 func (w *relationUnitsWatcher) finish() { |
| 762 » for _, t := range w.unitTombs { | 777 » for _, u := range w.units { |
| 763 » » t.Kill(nil) | 778 » » close(u.stop) |
| 764 » » w.tomb.Kill(t.Wait()) | |
| 765 } | 779 } |
| 766 » close(w.updates) | 780 » for len(w.units) > 0 { |
| 781 » » ch := <-w.updates |
| 782 » » if !ch.eof { |
| 783 » » » continue |
| 784 » » } |
| 785 » » if ch.err != nil && w.err == nil { |
| 786 » » » w.err = ch.err |
| 787 » » } |
| 788 » » delete(w.units, ch.unit.key) |
| 789 » } |
| 767 close(w.changes) | 790 close(w.changes) |
| 768 w.tomb.Done() | |
| 769 } | 791 } |
| 770 | 792 |
| 771 // Stop stops the watcher and returns any errors encountered while watching. | 793 // Wait consumes all remaining changes and |
| 772 func (w *relationUnitsWatcher) Stop() error { | 794 // returns any error encountered while the watcher |
| 773 » w.tomb.Kill(nil) | 795 // was running. |
| 774 » return w.tomb.Wait() | 796 func (w *relationUnitsWatcher) Wait() error { |
| 775 } | 797 » for _ = range w.changes { |
| 776 | 798 » } |
| 777 // Dying returns a channel that is closed when the | 799 » return w.err |
| 778 // watcher has stopped or is about to stop. | |
| 779 func (w *relationUnitsWatcher) Dying() <-chan struct{} { | |
| 780 » return w.tomb.Dying() | |
| 781 } | |
| 782 | |
| 783 // Err returns any error encountered while stopping the watcher, or | |
| 784 // tome.ErrStillAlive if the watcher is still running. | |
| 785 func (w *relationUnitsWatcher) Err() error { | |
| 786 » return w.tomb.Err() | |
| 787 } | 800 } |
| 788 | 801 |
| 789 // Changes returns a channel that will receive the changes to | 802 // Changes returns a channel that will receive the changes to |
| 790 // the relation when detected. | 803 // the relation when detected. |
| 791 // The first event on the channel holds the initial state of the | 804 // The first event on the channel holds the initial state of the |
| 792 // relation in its Changed field. | 805 // relation in its Changed field. |
| 793 func (w *relationUnitsWatcher) Changes() <-chan RelationUnitsChange { | 806 func (w *relationUnitsWatcher) Changes() <-chan RelationUnitsChange { |
| 794 return w.changes | 807 return w.changes |
| 795 } | 808 } |
| 796 | 809 |
| 797 // updateWatches starts or stops watches on the settings of the relation | 810 // updateWatches starts or stops watches on the settings of the relation |
| 798 // units declared present or absent by ch, and returns a RelationUnitsChange | 811 // units declared present or absent by ch, and returns a RelationUnitsChange |
| 799 // event expressing those changes. | 812 // event expressing those changes. |
| 800 func (w *relationUnitsWatcher) updateWatches(ch watcher.ChildrenChange) (change
RelationUnitsChange, err error) { | 813 func (w *relationUnitsWatcher) updateWatches(ch watcher.ChildrenChange) (change
RelationUnitsChange, err error) { |
| 801 for _, key := range ch.Removed { | 814 for _, key := range ch.Removed { |
| 802 if key == w.ignore { | 815 if key == w.ignore { |
| 803 continue | 816 continue |
| 804 } | 817 } |
| 805 » » // When we stop a unit settings watcher, we have to wait for its
tomb, | 818 » » u := w.units[key] |
| 806 » » // lest its latest change (potentially waiting to be sent on the
updates | 819 » » close(u.stop) |
| 807 » » // channel) be received and sent on as a RelationUnitsChange eve
nt *after* | 820 » » u.stopped = true |
| 808 » » // we notify of its departure in the event we are currently prep
aring. | 821 » » change.Departed = append(change.Departed, u.name) |
| 809 » » t := w.unitTombs[key] | |
| 810 » » delete(w.unitTombs, key) | |
| 811 » » t.Kill(nil) | |
| 812 » » if err := t.Wait(); err != nil { | |
| 813 » » » return RelationUnitsChange{}, err | |
| 814 » » } | |
| 815 » » name := w.names[key] | |
| 816 » » delete(w.names, key) | |
| 817 » » change.Departed = append(change.Departed, name) | |
| 818 } | 822 } |
| 819 var topo *topology | 823 var topo *topology |
| 820 for _, key := range ch.Added { | 824 for _, key := range ch.Added { |
| 821 if key == w.ignore { | 825 if key == w.ignore { |
| 822 continue | 826 continue |
| 823 } | 827 } |
| 824 if topo == nil { | 828 if topo == nil { |
| 825 // Create topology lazily; no sense reading it N times f
or | 829 // Create topology lazily; no sense reading it N times f
or |
| 826 // N added presence nodes where N != 1. | 830 // N added presence nodes where N != 1. |
| 827 if topo, err = readTopology(w.st.zk); err != nil { | 831 if topo, err = readTopology(w.st.zk); err != nil { |
| 828 return RelationUnitsChange{}, err | 832 return RelationUnitsChange{}, err |
| 829 } | 833 } |
| 830 } | 834 } |
| 831 name, err := topo.UnitName(key) | 835 name, err := topo.UnitName(key) |
| 832 if err != nil { | 836 if err != nil { |
| 833 return RelationUnitsChange{}, err | 837 return RelationUnitsChange{}, err |
| 834 } | 838 } |
| 835 // Start watching unit settings, and consume initial event to ge
t | 839 // Start watching unit settings, and consume initial event to ge
t |
| 836 // initial settings for the event we're preparing; subsequent | 840 // initial settings for the event we're preparing; subsequent |
| 837 // changes will be received on the unitLoop goroutine and sent t
o | 841 // changes will be received on the unitLoop goroutine and sent t
o |
| 838 // this one via w.updates. | 842 // this one via w.updates. |
| 839 » » w.names[key] = name | 843 » » u := &relationUnit { |
| 840 » » uw := watcher.NewContentWatcher(w.st.zk, w.scope.settingsPath(ke
y)) | 844 » » » rw: w, |
| 841 » » select { | 845 » » » key: key, |
| 842 » » case <-w.tomb.Dying(): | 846 » » » name: name, |
| 843 » » » return RelationUnitsChange{}, tomb.ErrDying | 847 » » » stop: make(chan struct{}), |
| 844 » » case cch, ok := <-uw.Changes(): | 848 » » » w: watcher.NewContentWatcher(w.st.zk, w.scope.settingsPa
th(key)), |
| 845 » » » if !ok { | |
| 846 » » » » return RelationUnitsChange{}, watcher.MustErr(uw
) | |
| 847 » » » } | |
| 848 » » » us := UnitSettings{Version: cch.Version} | |
| 849 » » » if err = goyaml.Unmarshal([]byte(cch.Content), &us.Setti
ngs); err != nil { | |
| 850 » » » » return RelationUnitsChange{}, err | |
| 851 » » » } | |
| 852 » » » if change.Changed == nil { | |
| 853 » » » » change.Changed = map[string]UnitSettings{} | |
| 854 » » » } | |
| 855 » » » change.Changed[name] = us | |
| 856 » » » t := &tomb.Tomb{} | |
| 857 » » » w.unitTombs[key] = t | |
| 858 » » » go w.unitLoop(name, uw, t) | |
| 859 } | 849 } |
| 850 cch, ok := <-u.w.Changes() |
| 851 if !ok { |
| 852 return RelationUnitsChange{}, watcher.MustErr(u.w) |
| 853 } |
| 854 us := UnitSettings{Version: cch.Version} |
| 855 if err = goyaml.Unmarshal([]byte(cch.Content), &us.Settings); er
r != nil { |
| 856 return RelationUnitsChange{}, err |
| 857 } |
| 858 if change.Changed == nil { |
| 859 change.Changed = map[string]UnitSettings{} |
| 860 } |
| 861 change.Changed[name] = us |
| 862 w.units[key] = u |
| 863 go u.loop() |
| 860 } | 864 } |
| 861 return change, nil | 865 return change, nil |
| 862 } | 866 } |
| 863 | 867 |
| 864 // unitLoop sends a unitSettingsChange event on w.updates for each ContentChange | 868 // unitLoop sends a unitSettingsChange event on w.updates for each ContentChange |
| 865 // event received from uw. | 869 // event received from uw. |
| 866 func (w *relationUnitsWatcher) unitLoop(name string, uw *watcher.ContentWatcher,
t *tomb.Tomb) { | 870 func (u *relationUnit) loop() { |
| 867 » defer t.Done() | 871 » var err error |
| 868 » defer uw.Stop() | 872 » defer func() { |
| 869 » for { | 873 » » u.rw.updates <- unitSettingsChange{unit: u, eof: true, err: err} |
| 870 » » select { | 874 » }() |
| 871 » » case <-t.Dying(): | 875 » defer u.w.Stop() |
| 876 » for ch := range u.w.Changes() { |
| 877 » » us := UnitSettings{Version: ch.Version} |
| 878 » » if e := goyaml.Unmarshal([]byte(ch.Content), &us.Settings); e !=
nil { |
| 879 » » » err = e |
| 872 return | 880 return |
| 873 case ch, ok := <-uw.Changes(): | |
| 874 if !ok { | |
| 875 w.tomb.Kill(watcher.MustErr(uw)) | |
| 876 return | |
| 877 } | |
| 878 us := UnitSettings{Version: ch.Version} | |
| 879 if err := goyaml.Unmarshal([]byte(ch.Content), &us.Setti
ngs); err != nil { | |
| 880 w.tomb.Kill(err) | |
| 881 return | |
| 882 } | |
| 883 select { | |
| 884 case <-t.Dying(): | |
| 885 return | |
| 886 case w.updates <- unitSettingsChange{name, us}: | |
| 887 } | |
| 888 } | 881 } |
| 882 u.rw.updates <- unitSettingsChange{unit: u, settings: us} |
| 889 } | 883 } |
| 884 err = watcher.MustErr(u.w) |
| 890 } | 885 } |
| OLD | NEW |