Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(6)

Unified Diff: state/watcher.go

Issue 13386044: state: avoid panic in watchers
Patch Set: state: avoid panic in watchers Created 11 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « state/state_test.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: state/watcher.go
=== modified file 'state/watcher.go'
--- state/watcher.go 2013-08-21 15:14:20 +0000
+++ state/watcher.go 2013-08-29 15:44:21 +0000
@@ -23,19 +23,33 @@
var watchLogger = loggo.GetLogger("juju.state.watch")
+// Watcher is implemented by all watchers; the actual
+// changes channel is returned by a watcher-specific
+// Changes method.
+type Watcher interface {
+ // Kill asks the watcher to stop without waiting for it do so.
+ Kill()
+ // Wait waits for the watcher to die and returns any
+ // error encountered when it was running.
+ Wait() error
+ // Stop kills the watcher, then waits for it to die.
+ Stop() error
+ // Err returns any error encountered while the watcher
+ // has been running.
+ Err() error
+}
+
// NotifyWatcher generates signals when something changes, but it does not
// return any content for those changes
type NotifyWatcher interface {
- Stop() error
- Err() error
+ Watcher
Changes() <-chan struct{}
}
// StringsWatcher generates signals when something changes, returning
// the changes as a list of strings.
type StringsWatcher interface {
- Stop() error
- Err() error
+ Watcher
Changes() <-chan []string
}
@@ -48,7 +62,18 @@
// Stop stops the watcher, and returns any error encountered while running
// or shutting down.
func (w *commonWatcher) Stop() error {
+ w.Kill()
+ return w.Wait()
+}
+
+// Kill kills the watcher without waiting for it to shut down.
+func (w *commonWatcher) Kill() {
w.tomb.Kill(nil)
+}
+
+// Wait waits for the watcher to die and returns any
+// error encountered when it was running.
+func (w *commonWatcher) Wait() error {
return w.tomb.Wait()
}
@@ -94,6 +119,8 @@
return false
}
+var _ Watcher = (*lifecycleWatcher)(nil)
+
// lifecycleWatcher notifies about lifecycle changes for a set of entities of
// the same kind. The first event emitted will contain the ids of all non-Dead
// entities; subsequent events are emitted whenever one or more entities are
@@ -257,7 +284,23 @@
return nil
}
-func (w *lifecycleWatcher) loop() (err error) {
+// ErrStateClosed is returned from watchers if their underlying
+// state connection has been closed.
+var ErrStateClosed = fmt.Errorf("state has been closed")
+
+// stateWatcherDeadError processes the error received when the watcher
+// inside a state connection dies. If the State has been closed, the
+// watcher will have been stopped and error will be nil, so we ensure
+// that higher level watchers return a non-nil error in that case, as
+// watchers are not expected to die unexpectedly without an error.
+func stateWatcherDeadError(err error) error {
+ if err != nil {
+ return err
+ }
+ return ErrStateClosed
+}
+
+func (w *lifecycleWatcher) loop() error {
in := make(chan watcher.Change)
w.st.watcher.WatchCollectionWithFilter(w.coll.Name, in, w.filter)
defer w.st.watcher.UnwatchCollection(w.coll.Name, in)
@@ -271,7 +314,7 @@
case <-w.tomb.Dying():
return tomb.ErrDying
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case ch := <-in:
updates, ok := collect(ch, in, w.tomb.Dying())
if !ok {
@@ -302,6 +345,8 @@
out chan []string
}
+var _ Watcher = (*minUnitsWatcher)(nil)
+
func newMinUnitsWatcher(st *State) StringsWatcher {
w := &minUnitsWatcher{
commonWatcher: commonWatcher{st: st},
@@ -363,10 +408,9 @@
select {
case <-w.tomb.Dying():
return tomb.ErrDying
- case change, ok := <-ch:
- if !ok {
- return watcher.MustErr(w.st.watcher)
- }
+ case <-w.st.watcher.Dead():
+ return stateWatcherDeadError(w.st.watcher.Err())
+ case change := <-ch:
if err = w.merge(serviceNames, change); err != nil {
return err
}
@@ -395,6 +439,8 @@
out chan *RelationScopeChange
}
+var _ Watcher = (*RelationScopeWatcher)(nil)
+
// RelationScopeChange contains information about units that have
// entered or left a particular scope.
type RelationScopeChange struct {
@@ -480,7 +526,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case c := <-ch:
@@ -509,6 +555,8 @@
out chan RelationUnitsChange
}
+var _ Watcher = (*RelationUnitsWatcher)(nil)
+
// RelationUnitsChange holds notifications of units entering and leaving the
// scope of a RelationUnit, and changes to the settings of those units known
// to have entered.
@@ -629,7 +677,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case c, ok := <-w.sw.Changes():
@@ -671,6 +719,8 @@
out chan []string
}
+var _ Watcher = (*unitsWatcher)(nil)
+
// WatchSubordinateUnits returns a StringsWatcher tracking the unit's subordinate units.
func (u *Unit) WatchSubordinateUnits() StringsWatcher {
u = &Unit{st: u.st, doc: u.doc}
@@ -834,7 +884,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case c := <-w.in:
@@ -865,6 +915,8 @@
out chan *config.Config
}
+var _ Watcher = (*EnvironConfigWatcher)(nil)
+
// WatchEnvironConfig returns a watcher for observing changes
// to the environment configuration.
func (s *State) WatchEnvironConfig() *EnvironConfigWatcher {
@@ -900,7 +952,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case settings, ok := <-sw.Changes():
@@ -925,6 +977,8 @@
out chan *Settings
}
+var _ Watcher = (*settingsWatcher)(nil)
+
// watchSettings creates a watcher for observing changes to settings.
func (s *State) watchSettings(key string) *settingsWatcher {
return newSettingsWatcher(s, key)
@@ -967,7 +1021,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case <-ch:
@@ -989,6 +1043,8 @@
out chan struct{}
}
+var _ Watcher = (*entityWatcher)(nil)
+
// WatchHardwareCharacteristics returns a watcher for observing changes to a machine's hardware characteristics.
func (m *Machine) WatchHardwareCharacteristics() NotifyWatcher {
return newEntityWatcher(m.st, m.st.instanceData, m.doc.Id)
@@ -1067,7 +1123,7 @@
case <-w.tomb.Dying():
return tomb.ErrDying
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case ch := <-in:
if _, ok := collect(ch, in, w.tomb.Dying()); !ok {
return tomb.ErrDying
@@ -1098,6 +1154,8 @@
known map[string]Life
}
+var _ Watcher = (*machineUnitsWatcher)(nil)
+
// WatchUnits returns a new StringsWatcher watching m's units.
func (m *Machine) WatchUnits() StringsWatcher {
return newMachineUnitsWatcher(m)
@@ -1202,7 +1260,7 @@
for {
select {
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case <-w.tomb.Dying():
return tomb.ErrDying
case <-machineCh:
@@ -1234,6 +1292,8 @@
out chan struct{}
}
+var _ Watcher = (*cleanupWatcher)(nil)
+
// WatchCleanups starts and returns a CleanupWatcher.
func (st *State) WatchCleanups() NotifyWatcher {
return newCleanupWatcher(st)
@@ -1269,7 +1329,7 @@
case <-w.tomb.Dying():
return tomb.ErrDying
case <-w.st.watcher.Dead():
- return watcher.MustErr(w.st.watcher)
+ return stateWatcherDeadError(w.st.watcher.Err())
case ch := <-in:
if _, ok := collect(ch, in, w.tomb.Dying()); !ok {
return tomb.ErrDying
« no previous file with comments | « state/state_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b