Index: state/watcher.go |
=== modified file 'state/watcher.go' |
--- state/watcher.go 2013-08-21 15:14:20 +0000 |
+++ state/watcher.go 2013-08-29 15:44:21 +0000 |
@@ -23,19 +23,33 @@ |
var watchLogger = loggo.GetLogger("juju.state.watch") |
+// Watcher is implemented by all watchers; the actual |
+// changes channel is returned by a watcher-specific |
+// Changes method. |
+type Watcher interface { |
+ // Kill asks the watcher to stop without waiting for it do so. |
+ Kill() |
+ // Wait waits for the watcher to die and returns any |
+ // error encountered when it was running. |
+ Wait() error |
+ // Stop kills the watcher, then waits for it to die. |
+ Stop() error |
+ // Err returns any error encountered while the watcher |
+ // has been running. |
+ Err() error |
+} |
+ |
// NotifyWatcher generates signals when something changes, but it does not |
// return any content for those changes |
type NotifyWatcher interface { |
- Stop() error |
- Err() error |
+ Watcher |
Changes() <-chan struct{} |
} |
// StringsWatcher generates signals when something changes, returning |
// the changes as a list of strings. |
type StringsWatcher interface { |
- Stop() error |
- Err() error |
+ Watcher |
Changes() <-chan []string |
} |
@@ -48,7 +62,18 @@ |
// Stop stops the watcher, and returns any error encountered while running |
// or shutting down. |
func (w *commonWatcher) Stop() error { |
+ w.Kill() |
+ return w.Wait() |
+} |
+ |
+// Kill kills the watcher without waiting for it to shut down. |
+func (w *commonWatcher) Kill() { |
w.tomb.Kill(nil) |
+} |
+ |
+// Wait waits for the watcher to die and returns any |
+// error encountered when it was running. |
+func (w *commonWatcher) Wait() error { |
return w.tomb.Wait() |
} |
@@ -94,6 +119,8 @@ |
return false |
} |
+var _ Watcher = (*lifecycleWatcher)(nil) |
+ |
// lifecycleWatcher notifies about lifecycle changes for a set of entities of |
// the same kind. The first event emitted will contain the ids of all non-Dead |
// entities; subsequent events are emitted whenever one or more entities are |
@@ -257,7 +284,23 @@ |
return nil |
} |
-func (w *lifecycleWatcher) loop() (err error) { |
+// ErrStateClosed is returned from watchers if their underlying |
+// state connection has been closed. |
+var ErrStateClosed = fmt.Errorf("state has been closed") |
+ |
+// stateWatcherDeadError processes the error received when the watcher |
+// inside a state connection dies. If the State has been closed, the |
+// watcher will have been stopped and error will be nil, so we ensure |
+// that higher level watchers return a non-nil error in that case, as |
+// watchers are not expected to die unexpectedly without an error. |
+func stateWatcherDeadError(err error) error { |
+ if err != nil { |
+ return err |
+ } |
+ return ErrStateClosed |
+} |
+ |
+func (w *lifecycleWatcher) loop() error { |
in := make(chan watcher.Change) |
w.st.watcher.WatchCollectionWithFilter(w.coll.Name, in, w.filter) |
defer w.st.watcher.UnwatchCollection(w.coll.Name, in) |
@@ -271,7 +314,7 @@ |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case ch := <-in: |
updates, ok := collect(ch, in, w.tomb.Dying()) |
if !ok { |
@@ -302,6 +345,8 @@ |
out chan []string |
} |
+var _ Watcher = (*minUnitsWatcher)(nil) |
+ |
func newMinUnitsWatcher(st *State) StringsWatcher { |
w := &minUnitsWatcher{ |
commonWatcher: commonWatcher{st: st}, |
@@ -363,10 +408,9 @@ |
select { |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
- case change, ok := <-ch: |
- if !ok { |
- return watcher.MustErr(w.st.watcher) |
- } |
+ case <-w.st.watcher.Dead(): |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
+ case change := <-ch: |
if err = w.merge(serviceNames, change); err != nil { |
return err |
} |
@@ -395,6 +439,8 @@ |
out chan *RelationScopeChange |
} |
+var _ Watcher = (*RelationScopeWatcher)(nil) |
+ |
// RelationScopeChange contains information about units that have |
// entered or left a particular scope. |
type RelationScopeChange struct { |
@@ -480,7 +526,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case c := <-ch: |
@@ -509,6 +555,8 @@ |
out chan RelationUnitsChange |
} |
+var _ Watcher = (*RelationUnitsWatcher)(nil) |
+ |
// RelationUnitsChange holds notifications of units entering and leaving the |
// scope of a RelationUnit, and changes to the settings of those units known |
// to have entered. |
@@ -629,7 +677,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case c, ok := <-w.sw.Changes(): |
@@ -671,6 +719,8 @@ |
out chan []string |
} |
+var _ Watcher = (*unitsWatcher)(nil) |
+ |
// WatchSubordinateUnits returns a StringsWatcher tracking the unit's subordinate units. |
func (u *Unit) WatchSubordinateUnits() StringsWatcher { |
u = &Unit{st: u.st, doc: u.doc} |
@@ -834,7 +884,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case c := <-w.in: |
@@ -865,6 +915,8 @@ |
out chan *config.Config |
} |
+var _ Watcher = (*EnvironConfigWatcher)(nil) |
+ |
// WatchEnvironConfig returns a watcher for observing changes |
// to the environment configuration. |
func (s *State) WatchEnvironConfig() *EnvironConfigWatcher { |
@@ -900,7 +952,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case settings, ok := <-sw.Changes(): |
@@ -925,6 +977,8 @@ |
out chan *Settings |
} |
+var _ Watcher = (*settingsWatcher)(nil) |
+ |
// watchSettings creates a watcher for observing changes to settings. |
func (s *State) watchSettings(key string) *settingsWatcher { |
return newSettingsWatcher(s, key) |
@@ -967,7 +1021,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case <-ch: |
@@ -989,6 +1043,8 @@ |
out chan struct{} |
} |
+var _ Watcher = (*entityWatcher)(nil) |
+ |
// WatchHardwareCharacteristics returns a watcher for observing changes to a machine's hardware characteristics. |
func (m *Machine) WatchHardwareCharacteristics() NotifyWatcher { |
return newEntityWatcher(m.st, m.st.instanceData, m.doc.Id) |
@@ -1067,7 +1123,7 @@ |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case ch := <-in: |
if _, ok := collect(ch, in, w.tomb.Dying()); !ok { |
return tomb.ErrDying |
@@ -1098,6 +1154,8 @@ |
known map[string]Life |
} |
+var _ Watcher = (*machineUnitsWatcher)(nil) |
+ |
// WatchUnits returns a new StringsWatcher watching m's units. |
func (m *Machine) WatchUnits() StringsWatcher { |
return newMachineUnitsWatcher(m) |
@@ -1202,7 +1260,7 @@ |
for { |
select { |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case <-machineCh: |
@@ -1234,6 +1292,8 @@ |
out chan struct{} |
} |
+var _ Watcher = (*cleanupWatcher)(nil) |
+ |
// WatchCleanups starts and returns a CleanupWatcher. |
func (st *State) WatchCleanups() NotifyWatcher { |
return newCleanupWatcher(st) |
@@ -1269,7 +1329,7 @@ |
case <-w.tomb.Dying(): |
return tomb.ErrDying |
case <-w.st.watcher.Dead(): |
- return watcher.MustErr(w.st.watcher) |
+ return stateWatcherDeadError(w.st.watcher.Err()) |
case ch := <-in: |
if _, ok := collect(ch, in, w.tomb.Dying()); !ok { |
return tomb.ErrDying |