| OLD | NEW |
| 1 package state | 1 package state |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "launchpad.net/goyaml" | 4 "launchpad.net/goyaml" |
| 5 "launchpad.net/juju-core/log" | 5 "launchpad.net/juju-core/log" |
| 6 "launchpad.net/juju-core/state/watcher" | 6 "launchpad.net/juju-core/state/watcher" |
| 7 "launchpad.net/tomb" | |
| 8 ) | 7 ) |
| 9 | 8 |
| 10 // contentWatcher holds behaviour common to all ContentWatcher clients in | 9 // contentWatcher holds behaviour common to all ContentWatcher clients in |
| 11 // the state package. | 10 // the state package. |
| 12 type contentWatcher struct { | 11 type contentWatcher struct { |
| 13 st *State | 12 st *State |
| 14 tomb tomb.Tomb | |
| 15 path string | 13 path string |
| 16 updated bool | 14 updated bool |
| 15 err error |
| 16 handler contentHandler |
| 17 stop <-chan struct{} |
| 17 } | 18 } |
| 18 | 19 |
| 19 func newContentWatcher(st *State, path string) contentWatcher { | 20 func newContentWatcher(st *State, path string, stop <-chan struct{}) contentWatc
her { |
| 20 » return contentWatcher{st: st, path: path} | 21 » return contentWatcher{ |
| 22 » » st: st, |
| 23 » » path: path, |
| 24 » » stop: stop, |
| 25 » » err: watcher.ErrNotStopped, |
| 26 » } |
| 21 } | 27 } |
| 22 | 28 |
| 23 // contentHandler must be implemented by watchers that intend to make use | 29 // contentHandler must be implemented by watchers that intend to make use |
| 24 // of contentWatcher. | 30 // of contentWatcher. |
| 25 type contentHandler interface { | 31 type contentHandler interface { |
| 32 // update is called whenever the content changes. |
| 26 update(watcher.ContentChange) error | 33 update(watcher.ContentChange) error |
| 34 |
| 35 // wait reads values from the change channel until it is closed. |
| 36 wait() |
| 37 |
| 38 // done cleans up and closes the change channel. |
| 27 done() | 39 done() |
| 28 } | 40 } |
| 29 | 41 |
| 30 // loop handles the common tasks of receiving changes from a watcher.ContentWatc
her, | 42 // watcher handles the common tasks of receiving changes from a watcher.ContentW
atcher, |
| 31 // and dispatching them to the contentHandler's update method. | 43 // and dispatching them to the contentHandler's update method. |
| 32 func (w *contentWatcher) loop(handler contentHandler) { | 44 func (w *contentWatcher) runWatcher(handler contentHandler) { |
| 33 » defer w.tomb.Done() | 45 » w.handler = handler |
| 34 » defer handler.done() | 46 » cw := watcher.NewContentWatcher(w.st.zk, w.path, w.stop) |
| 35 » cw := watcher.NewContentWatcher(w.st.zk, w.path) | 47 » go func() { |
| 36 » defer watcher.Stop(cw, &w.tomb) | 48 » » defer handler.done() |
| 37 » for { | 49 » » for ch := range cw.Changes() { |
| 38 » » select { | |
| 39 » » case <-w.tomb.Dying(): | |
| 40 » » » return | |
| 41 » » case ch, ok := <-cw.Changes(): | |
| 42 » » » if !ok { | |
| 43 » » » » w.tomb.Kill(watcher.MustErr(cw)) | |
| 44 » » » » return | |
| 45 » » » } | |
| 46 if err := handler.update(ch); err != nil { | 50 if err := handler.update(ch); err != nil { |
| 47 » » » » w.tomb.Kill(err) | 51 » » » » w.err = err |
| 48 return | 52 return |
| 49 } | 53 } |
| 50 w.updated = true | 54 w.updated = true |
| 51 } | 55 } |
| 52 » } | 56 » » w.err = cw.Wait() |
| 57 » }() |
| 53 } | 58 } |
| 54 | 59 |
| 55 // Stop stops the watcher and returns any errors encountered while watching. | 60 // Wait reads from the Changes channel until it's closed, |
| 56 func (w *contentWatcher) Stop() error { | 61 // then returns any error encountered while the watcher |
| 57 » w.tomb.Kill(nil) | 62 // was running. |
| 58 » return w.tomb.Wait() | 63 func (w *contentWatcher) Wait() error { |
| 59 } | 64 » w.handler.wait() |
| 60 | 65 » return w.err |
| 61 // Err returns any error encountered while stopping the watcher, or | |
| 62 // tome.ErrStillAlive if the watcher is still running. | |
| 63 func (w *contentWatcher) Err() error { | |
| 64 » return w.tomb.Err() | |
| 65 } | 66 } |
| 66 | 67 |
| 67 // ConfigWatcher observes changes to any configuration node. | 68 // ConfigWatcher observes changes to any configuration node. |
| 68 type ConfigWatcher struct { | 69 type ConfigWatcher struct { |
| 69 contentWatcher | 70 contentWatcher |
| 70 changeChan chan *ConfigNode | 71 changeChan chan *ConfigNode |
| 71 } | 72 } |
| 72 | 73 |
| 73 // newConfigWatcher creates and starts a new config watcher for | 74 // newConfigWatcher creates and starts a new config watcher for |
| 74 // the given path. | 75 // the given path. After stop is closed, the watcher will terminate and |
| 75 func newConfigWatcher(st *State, path string) *ConfigWatcher { | 76 // close the Changes channel. |
| 77 func newConfigWatcher(st *State, path string, stop <-chan struct{}) *ConfigWatch
er { |
| 76 w := &ConfigWatcher{ | 78 w := &ConfigWatcher{ |
| 77 » » contentWatcher: newContentWatcher(st, path), | 79 » » contentWatcher: newContentWatcher(st, path, stop), |
| 78 changeChan: make(chan *ConfigNode), | 80 changeChan: make(chan *ConfigNode), |
| 79 } | 81 } |
| 80 » go w.loop(w) | 82 » w.runWatcher(w) |
| 81 return w | 83 return w |
| 82 } | 84 } |
| 83 | 85 |
| 84 // Changes returns a channel that will receive the new | 86 // Changes returns a channel that will receive the new |
| 85 // *ConfigNode when a change is detected. Note that multiple | 87 // *ConfigNode when a change is detected. Note that multiple |
| 86 // changes may be observed as a single event in the channel. | 88 // changes may be observed as a single event in the channel. |
| 87 // The first event on the channel holds the initial state | 89 // The first event on the channel holds the initial state |
| 88 // as returned by Service.Config. | 90 // as returned by Service.Config. |
| 89 func (w *ConfigWatcher) Changes() <-chan *ConfigNode { | 91 func (w *ConfigWatcher) Changes() <-chan *ConfigNode { |
| 90 return w.changeChan | 92 return w.changeChan |
| 91 } | 93 } |
| 92 | 94 |
| 93 func (w *ConfigWatcher) update(change watcher.ContentChange) error { | 95 func (w *ConfigWatcher) update(change watcher.ContentChange) error { |
| 94 // A non-existent node is treated as an empty node. | 96 // A non-existent node is treated as an empty node. |
| 95 configNode, err := parseConfigNode(w.st.zk, w.path, change.Content) | 97 configNode, err := parseConfigNode(w.st.zk, w.path, change.Content) |
| 96 if err != nil { | 98 if err != nil { |
| 97 return err | 99 return err |
| 98 } | 100 } |
| 99 » select { | 101 » w.changeChan <- configNode |
| 100 » case <-w.tomb.Dying(): | 102 » return nil |
| 101 » » return tomb.ErrDying | 103 } |
| 102 » case w.changeChan <- configNode: | 104 |
| 105 func (w *ConfigWatcher) wait() { |
| 106 » for _ = range w.changeChan { |
| 103 } | 107 } |
| 104 return nil | |
| 105 } | 108 } |
| 106 | 109 |
| 107 func (w *ConfigWatcher) done() { | 110 func (w *ConfigWatcher) done() { |
| 108 close(w.changeChan) | 111 close(w.changeChan) |
| 109 } | 112 } |
| 110 | 113 |
| 111 // FlagWatcher observes whether a given flag is on or off. | 114 // FlagWatcher observes whether a given flag is on or off. |
| 112 type FlagWatcher struct { | 115 type FlagWatcher struct { |
| 113 contentWatcher | 116 contentWatcher |
| 114 changeChan chan bool | 117 changeChan chan bool |
| 115 exists bool | 118 exists bool |
| 116 } | 119 } |
| 117 | 120 |
| 118 // newFlagWatcher creates and starts a new flag watcher for | 121 // newFlagWatcher creates and starts a new flag watcher for |
| 119 // the given path. | 122 // the given path. |
| 120 func newFlagWatcher(st *State, path string) *FlagWatcher { | 123 func newFlagWatcher(st *State, path string, stop <-chan struct{}) *FlagWatcher { |
| 121 w := &FlagWatcher{ | 124 w := &FlagWatcher{ |
| 122 » » contentWatcher: newContentWatcher(st, path), | 125 » » contentWatcher: newContentWatcher(st, path, stop), |
| 123 changeChan: make(chan bool), | 126 changeChan: make(chan bool), |
| 124 } | 127 } |
| 125 » go w.loop(w) | 128 » w.runWatcher(w) |
| 126 return w | 129 return w |
| 127 } | 130 } |
| 128 | 131 |
| 129 // Changes returns a channel that will receive true when a | 132 // Changes returns a channel that will receive true when a |
| 130 // flag is set and false if it is cleared. Note that multiple | 133 // flag is set and false if it is cleared. Note that multiple |
| 131 // changes may be observed as a single event in the channel. | 134 // changes may be observed as a single event in the channel. |
| 132 // The first event on the channel holds the initial state. | 135 // The first event on the channel holds the initial state. |
| 133 func (w *FlagWatcher) Changes() <-chan bool { | 136 func (w *FlagWatcher) Changes() <-chan bool { |
| 134 return w.changeChan | 137 return w.changeChan |
| 135 } | 138 } |
| 136 | 139 |
| 137 func (w *FlagWatcher) update(change watcher.ContentChange) error { | 140 func (w *FlagWatcher) update(change watcher.ContentChange) error { |
| 138 if w.updated && change.Exists == w.exists { | 141 if w.updated && change.Exists == w.exists { |
| 139 return nil | 142 return nil |
| 140 } | 143 } |
| 141 » select { | 144 » w.changeChan <- change.Exists |
| 142 » case <-w.tomb.Dying(): | 145 » w.exists = change.Exists |
| 143 » » return tomb.ErrDying | 146 » return nil |
| 144 » case w.changeChan <- change.Exists: | 147 } |
| 145 » » w.exists = change.Exists | 148 |
| 149 func (w *FlagWatcher) wait() { |
| 150 » for _ = range w.changeChan { |
| 146 } | 151 } |
| 147 return nil | |
| 148 } | 152 } |
| 149 | 153 |
| 150 func (w *FlagWatcher) done() { | 154 func (w *FlagWatcher) done() { |
| 151 close(w.changeChan) | 155 close(w.changeChan) |
| 152 } | 156 } |
| 153 | 157 |
| 154 // NeedsUpgradeWatcher observes changes to a unit's upgrade flag. | 158 // NeedsUpgradeWatcher observes changes to a unit's upgrade flag. |
| 155 type NeedsUpgradeWatcher struct { | 159 type NeedsUpgradeWatcher struct { |
| 156 contentWatcher | 160 contentWatcher |
| 157 changeChan chan NeedsUpgrade | 161 changeChan chan NeedsUpgrade |
| 158 } | 162 } |
| 159 | 163 |
| 160 // newNeedsUpgradeWatcher creates and starts a new resolved flag node | 164 // newNeedsUpgradeWatcher creates and starts a new resolved flag node |
| 161 // watcher for the given path. | 165 // watcher for the given path. |
| 162 func newNeedsUpgradeWatcher(st *State, path string) *NeedsUpgradeWatcher { | 166 func newNeedsUpgradeWatcher(st *State, path string, stop <-chan struct{}) *Needs
UpgradeWatcher { |
| 163 w := &NeedsUpgradeWatcher{ | 167 w := &NeedsUpgradeWatcher{ |
| 164 » » contentWatcher: newContentWatcher(st, path), | 168 » » contentWatcher: newContentWatcher(st, path, stop), |
| 165 changeChan: make(chan NeedsUpgrade), | 169 changeChan: make(chan NeedsUpgrade), |
| 166 } | 170 } |
| 167 » go w.loop(w) | 171 » w.runWatcher(w) |
| 168 return w | 172 return w |
| 169 } | 173 } |
| 170 | 174 |
| 171 // Changes returns a channel that will receive notifications | 175 // Changes returns a channel that will receive notifications |
| 172 // about upgrades for the unit. Note that multiple changes | 176 // about upgrades for the unit. Note that multiple changes |
| 173 // may be observed as a single event in the channel. | 177 // may be observed as a single event in the channel. |
| 174 // The first event on the channel holds the initial | 178 // The first event on the channel holds the initial |
| 175 // state as returned by Unit.NeedsUpgrade. | 179 // state as returned by Unit.NeedsUpgrade. |
| 176 func (w *NeedsUpgradeWatcher) Changes() <-chan NeedsUpgrade { | 180 func (w *NeedsUpgradeWatcher) Changes() <-chan NeedsUpgrade { |
| 177 return w.changeChan | 181 return w.changeChan |
| 178 } | 182 } |
| 179 | 183 |
| 180 func (w *NeedsUpgradeWatcher) update(change watcher.ContentChange) error { | 184 func (w *NeedsUpgradeWatcher) update(change watcher.ContentChange) error { |
| 181 var needsUpgrade NeedsUpgrade | 185 var needsUpgrade NeedsUpgrade |
| 182 if change.Exists { | 186 if change.Exists { |
| 183 needsUpgrade.Upgrade = true | 187 needsUpgrade.Upgrade = true |
| 184 var setting needsUpgradeNode | 188 var setting needsUpgradeNode |
| 185 if err := goyaml.Unmarshal([]byte(change.Content), &setting); er
r != nil { | 189 if err := goyaml.Unmarshal([]byte(change.Content), &setting); er
r != nil { |
| 186 return err | 190 return err |
| 187 } | 191 } |
| 188 needsUpgrade.Force = setting.Force | 192 needsUpgrade.Force = setting.Force |
| 189 } | 193 } |
| 190 » select { | 194 » w.changeChan <- needsUpgrade |
| 191 » case <-w.tomb.Dying(): | 195 » return nil |
| 192 » » return tomb.ErrDying | 196 } |
| 193 » case w.changeChan <- needsUpgrade: | 197 |
| 198 func (w *NeedsUpgradeWatcher) wait() { |
| 199 » for _ = range w.changeChan { |
| 194 } | 200 } |
| 195 return nil | |
| 196 } | 201 } |
| 197 | 202 |
| 198 func (w *NeedsUpgradeWatcher) done() { | 203 func (w *NeedsUpgradeWatcher) done() { |
| 199 close(w.changeChan) | 204 close(w.changeChan) |
| 200 } | 205 } |
| 201 | 206 |
| 202 // ResolvedWatcher observes changes to a unit's resolved | 207 // ResolvedWatcher observes changes to a unit's resolved |
| 203 // mode. See SetResolved for details. | 208 // mode. See SetResolved for details. |
| 204 type ResolvedWatcher struct { | 209 type ResolvedWatcher struct { |
| 205 contentWatcher | 210 contentWatcher |
| 206 changeChan chan ResolvedMode | 211 changeChan chan ResolvedMode |
| 207 } | 212 } |
| 208 | 213 |
| 209 // newResolvedWatcher returns a new ResolvedWatcher watching path. | 214 // newResolvedWatcher returns a new ResolvedWatcher watching path. |
| 210 func newResolvedWatcher(st *State, path string) *ResolvedWatcher { | 215 func newResolvedWatcher(st *State, path string, stop <-chan struct{}) *ResolvedW
atcher { |
| 211 w := &ResolvedWatcher{ | 216 w := &ResolvedWatcher{ |
| 212 » » contentWatcher: newContentWatcher(st, path), | 217 » » contentWatcher: newContentWatcher(st, path, stop), |
| 213 changeChan: make(chan ResolvedMode), | 218 changeChan: make(chan ResolvedMode), |
| 214 } | 219 } |
| 215 » go w.loop(w) | 220 » w.runWatcher(w) |
| 216 return w | 221 return w |
| 217 } | 222 } |
| 218 | 223 |
| 219 // Changes returns a channel that will receive the new | 224 // Changes returns a channel that will receive the new |
| 220 // resolved mode when a change is detected. Note that multiple | 225 // resolved mode when a change is detected. Note that multiple |
| 221 // changes may be observed as a single event in the channel. | 226 // changes may be observed as a single event in the channel. |
| 222 // The first event on the channel holds the initial | 227 // The first event on the channel holds the initial |
| 223 // state as returned by Unit.Resolved. | 228 // state as returned by Unit.Resolved. |
| 224 func (w *ResolvedWatcher) Changes() <-chan ResolvedMode { | 229 func (w *ResolvedWatcher) Changes() <-chan ResolvedMode { |
| 225 return w.changeChan | 230 return w.changeChan |
| 226 } | 231 } |
| 227 | 232 |
| 228 func (w *ResolvedWatcher) update(change watcher.ContentChange) error { | 233 func (w *ResolvedWatcher) update(change watcher.ContentChange) error { |
| 229 mode := ResolvedNone | 234 mode := ResolvedNone |
| 230 if change.Exists { | 235 if change.Exists { |
| 231 var err error | 236 var err error |
| 232 mode, err = parseResolvedMode(change.Content) | 237 mode, err = parseResolvedMode(change.Content) |
| 233 if err != nil { | 238 if err != nil { |
| 234 return err | 239 return err |
| 235 } | 240 } |
| 236 } | 241 } |
| 237 » select { | 242 » w.changeChan <- mode |
| 238 » case <-w.tomb.Dying(): | 243 » return nil |
| 239 » » return tomb.ErrDying | 244 } |
| 240 » case w.changeChan <- mode: | 245 |
| 246 func (w *ResolvedWatcher) wait() { |
| 247 » for _ = range w.changeChan { |
| 241 } | 248 } |
| 242 return nil | |
| 243 } | 249 } |
| 244 | 250 |
| 245 func (w *ResolvedWatcher) done() { | 251 func (w *ResolvedWatcher) done() { |
| 246 close(w.changeChan) | 252 close(w.changeChan) |
| 247 } | 253 } |
| 248 | 254 |
| 249 // PortsWatcher observes changes to a unit's open ports. | 255 // PortsWatcher observes changes to a unit's open ports. |
| 250 // See OpenPort for details. | 256 // See OpenPort for details. |
| 251 type PortsWatcher struct { | 257 type PortsWatcher struct { |
| 252 contentWatcher | 258 contentWatcher |
| 253 changeChan chan []Port | 259 changeChan chan []Port |
| 254 } | 260 } |
| 255 | 261 |
| 256 // newPortsWatcher creates and starts a new ports node | 262 // newPortsWatcher creates and starts a new ports node |
| 257 // watcher for the given path. | 263 // watcher for the given path. |
| 258 func newPortsWatcher(st *State, path string) *PortsWatcher { | 264 func newPortsWatcher(st *State, path string, stop <-chan struct{}) *PortsWatcher
{ |
| 259 w := &PortsWatcher{ | 265 w := &PortsWatcher{ |
| 260 » » contentWatcher: newContentWatcher(st, path), | 266 » » contentWatcher: newContentWatcher(st, path, stop), |
| 261 changeChan: make(chan []Port), | 267 changeChan: make(chan []Port), |
| 262 } | 268 } |
| 263 » go w.loop(w) | 269 » w.runWatcher(w) |
| 264 return w | 270 return w |
| 265 } | 271 } |
| 266 | 272 |
| 267 // Changes returns a channel that will receive the actual | 273 // Changes returns a channel that will receive the actual |
| 268 // open ports when a change is detected. Note that multiple | 274 // open ports when a change is detected. Note that multiple |
| 269 // changes may be observed as a single event in the channel. | 275 // changes may be observed as a single event in the channel. |
| 270 // The first event on the channel holds the initial | 276 // The first event on the channel holds the initial |
| 271 // state as returned by Unit.OpenPorts. | 277 // state as returned by Unit.OpenPorts. |
| 272 func (w *PortsWatcher) Changes() <-chan []Port { | 278 func (w *PortsWatcher) Changes() <-chan []Port { |
| 273 return w.changeChan | 279 return w.changeChan |
| 274 } | 280 } |
| 275 | 281 |
| 276 func (w *PortsWatcher) update(change watcher.ContentChange) error { | 282 func (w *PortsWatcher) update(change watcher.ContentChange) error { |
| 277 var ports openPortsNode | 283 var ports openPortsNode |
| 278 if err := goyaml.Unmarshal([]byte(change.Content), &ports); err != nil { | 284 if err := goyaml.Unmarshal([]byte(change.Content), &ports); err != nil { |
| 279 return err | 285 return err |
| 280 } | 286 } |
| 281 » select { | 287 » w.changeChan <- ports.Open |
| 282 » case <-w.tomb.Dying(): | 288 » return nil |
| 283 » » return tomb.ErrDying | 289 } |
| 284 » case w.changeChan <- ports.Open: | 290 |
| 291 func (w *PortsWatcher) wait() { |
| 292 » for _ = range w.changeChan { |
| 285 } | 293 } |
| 286 return nil | |
| 287 } | 294 } |
| 288 | 295 |
| 289 func (w *PortsWatcher) done() { | 296 func (w *PortsWatcher) done() { |
| 290 close(w.changeChan) | 297 close(w.changeChan) |
| 291 } | 298 } |
| 292 | 299 |
| 293 // MachinesWatcher notifies about machines being added or removed | 300 // MachinesWatcher notifies about machines being added or removed |
| 294 // from the environment. | 301 // from the environment. |
| 295 type MachinesWatcher struct { | 302 type MachinesWatcher struct { |
| 296 contentWatcher | 303 contentWatcher |
| 297 changeChan chan *MachinesChange | 304 changeChan chan *MachinesChange |
| 298 watcher *watcher.ContentWatcher | 305 watcher *watcher.ContentWatcher |
| 299 knownMachineKeys []string | 306 knownMachineKeys []string |
| 300 } | 307 } |
| 301 | 308 |
| 302 // MachinesChange contains information about | 309 // MachinesChange contains information about |
| 303 // machines that have been added or deleted. | 310 // machines that have been added or deleted. |
| 304 type MachinesChange struct { | 311 type MachinesChange struct { |
| 305 Added []*Machine | 312 Added []*Machine |
| 306 Removed []*Machine | 313 Removed []*Machine |
| 307 } | 314 } |
| 308 | 315 |
| 309 // newMachinesWatcher creates and starts a new watcher for changes to | 316 // newMachinesWatcher creates and starts a new watcher for changes to |
| 310 // the set of machines known to the topology. | 317 // the set of machines known to the topology. |
| 311 func newMachinesWatcher(st *State) *MachinesWatcher { | 318 func newMachinesWatcher(st *State, stop <-chan struct{}) *MachinesWatcher { |
| 312 w := &MachinesWatcher{ | 319 w := &MachinesWatcher{ |
| 313 » » contentWatcher: newContentWatcher(st, zkTopologyPath), | 320 » » contentWatcher: newContentWatcher(st, zkTopologyPath, stop), |
| 314 changeChan: make(chan *MachinesChange), | 321 changeChan: make(chan *MachinesChange), |
| 315 } | 322 } |
| 316 » go w.loop(w) | 323 » w.runWatcher(w) |
| 317 return w | 324 return w |
| 318 } | 325 } |
| 319 | 326 |
| 320 // Changes returns a channel that will receive changes when machines are | 327 // Changes returns a channel that will receive changes when machines are |
| 321 // added or deleted. The Added field in the first event on the channel | 328 // added or deleted. The Added field in the first event on the channel |
| 322 // holds the initial state as returned by State.AllMachines. | 329 // holds the initial state as returned by State.AllMachines. |
| 323 func (w *MachinesWatcher) Changes() <-chan *MachinesChange { | 330 func (w *MachinesWatcher) Changes() <-chan *MachinesChange { |
| 324 return w.changeChan | 331 return w.changeChan |
| 325 } | 332 } |
| 326 | 333 |
| 327 func (w *MachinesWatcher) update(change watcher.ContentChange) error { | 334 func (w *MachinesWatcher) update(change watcher.ContentChange) error { |
| 328 topology, err := parseTopology(change.Content) | 335 topology, err := parseTopology(change.Content) |
| 329 if err != nil { | 336 if err != nil { |
| 330 return err | 337 return err |
| 331 } | 338 } |
| 332 currentMachineKeys := topology.MachineKeys() | 339 currentMachineKeys := topology.MachineKeys() |
| 333 added := diff(currentMachineKeys, w.knownMachineKeys) | 340 added := diff(currentMachineKeys, w.knownMachineKeys) |
| 334 removed := diff(w.knownMachineKeys, currentMachineKeys) | 341 removed := diff(w.knownMachineKeys, currentMachineKeys) |
| 335 w.knownMachineKeys = currentMachineKeys | 342 w.knownMachineKeys = currentMachineKeys |
| 336 if w.updated && len(added) == 0 && len(removed) == 0 { | 343 if w.updated && len(added) == 0 && len(removed) == 0 { |
| 337 return nil | 344 return nil |
| 338 } | 345 } |
| 339 mc := &MachinesChange{} | 346 mc := &MachinesChange{} |
| 340 for _, m := range added { | 347 for _, m := range added { |
| 341 mc.Added = append(mc.Added, &Machine{w.st, m}) | 348 mc.Added = append(mc.Added, &Machine{w.st, m}) |
| 342 } | 349 } |
| 343 for _, m := range removed { | 350 for _, m := range removed { |
| 344 mc.Removed = append(mc.Removed, &Machine{w.st, m}) | 351 mc.Removed = append(mc.Removed, &Machine{w.st, m}) |
| 345 } | 352 } |
| 346 » select { | 353 » w.changeChan <- mc |
| 347 » case <-w.tomb.Dying(): | 354 » return nil |
| 348 » » return tomb.ErrDying | 355 } |
| 349 » case w.changeChan <- mc: | 356 |
| 357 func (w *MachinesWatcher) wait() { |
| 358 » for _ = range w.changeChan { |
| 350 } | 359 } |
| 351 return nil | |
| 352 } | 360 } |
| 353 | 361 |
| 354 func (w *MachinesWatcher) done() { | 362 func (w *MachinesWatcher) done() { |
| 355 close(w.changeChan) | 363 close(w.changeChan) |
| 356 } | 364 } |
| 357 | 365 |
| 358 type MachineUnitsWatcher struct { | 366 type MachineUnitsWatcher struct { |
| 359 contentWatcher | 367 contentWatcher |
| 360 machine *Machine | 368 machine *Machine |
| 361 changeChan chan *MachineUnitsChange | 369 changeChan chan *MachineUnitsChange |
| 362 knownUnitKeys []string | 370 knownUnitKeys []string |
| 363 knownUnits map[string]*Unit | 371 knownUnits map[string]*Unit |
| 364 } | 372 } |
| 365 | 373 |
| 366 type MachineUnitsChange struct { | 374 type MachineUnitsChange struct { |
| 367 Added []*Unit | 375 Added []*Unit |
| 368 Removed []*Unit | 376 Removed []*Unit |
| 369 } | 377 } |
| 370 | 378 |
| 371 // newMachineUnitsWatcher creates and starts a new machine units watcher. | 379 // newMachineUnitsWatcher creates and starts a new machine units watcher. |
| 372 func newMachineUnitsWatcher(m *Machine) *MachineUnitsWatcher { | 380 func newMachineUnitsWatcher(m *Machine, stop <-chan struct{}) *MachineUnitsWatch
er { |
| 373 w := &MachineUnitsWatcher{ | 381 w := &MachineUnitsWatcher{ |
| 374 » » contentWatcher: newContentWatcher(m.st, zkTopologyPath), | 382 » » contentWatcher: newContentWatcher(m.st, zkTopologyPath, stop), |
| 375 machine: m, | 383 machine: m, |
| 376 changeChan: make(chan *MachineUnitsChange), | 384 changeChan: make(chan *MachineUnitsChange), |
| 377 knownUnits: make(map[string]*Unit), | 385 knownUnits: make(map[string]*Unit), |
| 378 } | 386 } |
| 379 » go w.loop(w) | 387 » w.runWatcher(w) |
| 380 return w | 388 return w |
| 381 } | 389 } |
| 382 | 390 |
| 383 // Changes returns a channel that will receive changes when | 391 // Changes returns a channel that will receive changes when |
| 384 // units are assigned or unassigned from a machine. | 392 // units are assigned or unassigned from a machine. |
| 385 // The Added field in the first event on the channel holds the initial | 393 // The Added field in the first event on the channel holds the initial |
| 386 // state as returned by machine.Units. | 394 // state as returned by machine.Units. |
| 387 func (w *MachineUnitsWatcher) Changes() <-chan *MachineUnitsChange { | 395 func (w *MachineUnitsWatcher) Changes() <-chan *MachineUnitsChange { |
| 388 return w.changeChan | 396 return w.changeChan |
| 389 } | 397 } |
| (...skipping 21 matching lines...) Expand all Loading... |
| 411 } | 419 } |
| 412 for _, ukey := range added { | 420 for _, ukey := range added { |
| 413 unit, err := w.st.unitFromKey(topology, ukey) | 421 unit, err := w.st.unitFromKey(topology, ukey) |
| 414 if err != nil { | 422 if err != nil { |
| 415 log.Printf("inconsistent topology: %v", err) | 423 log.Printf("inconsistent topology: %v", err) |
| 416 continue | 424 continue |
| 417 } | 425 } |
| 418 w.knownUnits[ukey] = unit | 426 w.knownUnits[ukey] = unit |
| 419 uc.Added = append(uc.Added, unit) | 427 uc.Added = append(uc.Added, unit) |
| 420 } | 428 } |
| 421 » select { | 429 » w.changeChan <- uc |
| 422 » case <-w.tomb.Dying(): | 430 » return nil |
| 423 » » return tomb.ErrDying | 431 } |
| 424 » case w.changeChan <- uc: | 432 |
| 433 func (w *MachineUnitsWatcher) wait() { |
| 434 » for _ = range w.changeChan { |
| 425 } | 435 } |
| 426 return nil | |
| 427 } | 436 } |
| 428 | 437 |
| 429 func (w *MachineUnitsWatcher) done() { | 438 func (w *MachineUnitsWatcher) done() { |
| 430 close(w.changeChan) | 439 close(w.changeChan) |
| 431 } | 440 } |
| 432 | 441 |
| 433 // ServicesWatcher observes the addition and removal of services. | 442 // ServicesWatcher observes the addition and removal of services. |
| 434 type ServicesWatcher struct { | 443 type ServicesWatcher struct { |
| 435 contentWatcher | 444 contentWatcher |
| 436 knownServices map[string]*Service | 445 knownServices map[string]*Service |
| 437 knownServiceKeys []string | 446 knownServiceKeys []string |
| 438 changeChan chan *ServicesChange | 447 changeChan chan *ServicesChange |
| 439 } | 448 } |
| 440 | 449 |
| 441 // ServicesChange holds services that were added or removed | 450 // ServicesChange holds services that were added or removed |
| 442 // from the environment. | 451 // from the environment. |
| 443 type ServicesChange struct { | 452 type ServicesChange struct { |
| 444 Added []*Service | 453 Added []*Service |
| 445 Removed []*Service | 454 Removed []*Service |
| 446 } | 455 } |
| 447 | 456 |
| 448 // newServicesWatcher returns a new ServicesWatcher. | 457 // newServicesWatcher returns a new ServicesWatcher. |
| 449 func newServicesWatcher(st *State) *ServicesWatcher { | 458 func newServicesWatcher(st *State, stop <-chan struct{}) *ServicesWatcher { |
| 450 w := &ServicesWatcher{ | 459 w := &ServicesWatcher{ |
| 451 » » contentWatcher: newContentWatcher(st, zkTopologyPath), | 460 » » contentWatcher: newContentWatcher(st, zkTopologyPath, stop), |
| 452 knownServices: make(map[string]*Service), | 461 knownServices: make(map[string]*Service), |
| 453 changeChan: make(chan *ServicesChange), | 462 changeChan: make(chan *ServicesChange), |
| 454 } | 463 } |
| 455 » go w.loop(w) | 464 » w.runWatcher(w) |
| 456 return w | 465 return w |
| 457 } | 466 } |
| 458 | 467 |
| 459 // Changes returns a channel that will receive a notification when services | 468 // Changes returns a channel that will receive a notification when services |
| 460 // are added to or removed from the state. The Added field in | 469 // are added to or removed from the state. The Added field in |
| 461 // the first event on the channel holds the initial state as would be· | 470 // the first event on the channel holds the initial state as would be· |
| 462 // returned by Service.AllServices. | 471 // returned by Service.AllServices. |
| 463 func (w *ServicesWatcher) Changes() <-chan *ServicesChange { | 472 func (w *ServicesWatcher) Changes() <-chan *ServicesChange { |
| 464 return w.changeChan | 473 return w.changeChan |
| 465 } | 474 } |
| (...skipping 23 matching lines...) Expand all Loading... |
| 489 continue | 498 continue |
| 490 } | 499 } |
| 491 service, err := w.st.Service(serviceName) | 500 service, err := w.st.Service(serviceName) |
| 492 if err != nil { | 501 if err != nil { |
| 493 log.Printf("can't read service %q: %v", serviceName, err
) | 502 log.Printf("can't read service %q: %v", serviceName, err
) |
| 494 continue | 503 continue |
| 495 } | 504 } |
| 496 w.knownServices[serviceKey] = service | 505 w.knownServices[serviceKey] = service |
| 497 servicesChange.Added = append(servicesChange.Added, service) | 506 servicesChange.Added = append(servicesChange.Added, service) |
| 498 } | 507 } |
| 499 » select { | 508 » w.changeChan <- servicesChange |
| 500 » case <-w.tomb.Dying(): | 509 » return nil |
| 501 » » return tomb.ErrDying | 510 } |
| 502 » case w.changeChan <- servicesChange: | 511 |
| 512 func (w *ServicesWatcher) wait() { |
| 513 » for _ = range w.changeChan { |
| 503 } | 514 } |
| 504 return nil | |
| 505 } | 515 } |
| 506 | 516 |
| 507 func (w *ServicesWatcher) done() { | 517 func (w *ServicesWatcher) done() { |
| 508 close(w.changeChan) | 518 close(w.changeChan) |
| 509 } | 519 } |
| 510 | 520 |
| 511 // ServiceUnitsWatcher observes the addition and removal | 521 // ServiceUnitsWatcher observes the addition and removal |
| 512 // of units to and from a service. | 522 // of units to and from a service. |
| 513 type ServiceUnitsWatcher struct { | 523 type ServiceUnitsWatcher struct { |
| 514 contentWatcher | 524 contentWatcher |
| 515 serviceKey string | 525 serviceKey string |
| 516 knownUnits map[string]*Unit | 526 knownUnits map[string]*Unit |
| 517 knownUnitKeys []string | 527 knownUnitKeys []string |
| 518 changeChan chan *ServiceUnitsChange | 528 changeChan chan *ServiceUnitsChange |
| 519 } | 529 } |
| 520 | 530 |
| 521 // ServiceUnitsChange contains information about | 531 // ServiceUnitsChange contains information about |
| 522 // units that have been added to or removed from | 532 // units that have been added to or removed from |
| 523 // services. | 533 // services. |
| 524 type ServiceUnitsChange struct { | 534 type ServiceUnitsChange struct { |
| 525 Added []*Unit | 535 Added []*Unit |
| 526 Removed []*Unit | 536 Removed []*Unit |
| 527 } | 537 } |
| 528 | 538 |
| 529 // newServiceUnitsWatcher creates and starts a new watcher | 539 // newServiceUnitsWatcher creates and starts a new watcher |
| 530 // for service unit changes. | 540 // for service unit changes. |
| 531 func newServiceUnitsWatcher(service *Service) *ServiceUnitsWatcher { | 541 func newServiceUnitsWatcher(service *Service, stop <-chan struct{}) *ServiceUnit
sWatcher { |
| 532 w := &ServiceUnitsWatcher{ | 542 w := &ServiceUnitsWatcher{ |
| 533 » » contentWatcher: newContentWatcher(service.st, zkTopologyPath), | 543 » » contentWatcher: newContentWatcher(service.st, zkTopologyPath, st
op), |
| 534 serviceKey: service.key, | 544 serviceKey: service.key, |
| 535 knownUnits: make(map[string]*Unit), | 545 knownUnits: make(map[string]*Unit), |
| 536 changeChan: make(chan *ServiceUnitsChange), | 546 changeChan: make(chan *ServiceUnitsChange), |
| 537 } | 547 } |
| 538 » go w.loop(w) | 548 » w.runWatcher(w) |
| 539 return w | 549 return w |
| 540 } | 550 } |
| 541 | 551 |
| 542 // Changes returns a channel that will receive changes when units | 552 // Changes returns a channel that will receive changes when units |
| 543 // are added to or removed from the service. The Added field in | 553 // are added to or removed from the service. The Added field in |
| 544 // the first event on the channel holds the initial state as returned | 554 // the first event on the channel holds the initial state as returned |
| 545 // by Service.AllUnits. | 555 // by Service.AllUnits. |
| 546 func (w *ServiceUnitsWatcher) Changes() <-chan *ServiceUnitsChange { | 556 func (w *ServiceUnitsWatcher) Changes() <-chan *ServiceUnitsChange { |
| 547 return w.changeChan | 557 return w.changeChan |
| 548 } | 558 } |
| (...skipping 21 matching lines...) Expand all Loading... |
| 570 } | 580 } |
| 571 for _, unitKey := range added { | 581 for _, unitKey := range added { |
| 572 unit, err := w.st.unitFromKey(topology, unitKey) | 582 unit, err := w.st.unitFromKey(topology, unitKey) |
| 573 if err != nil { | 583 if err != nil { |
| 574 log.Printf("can't read unit %q: %v", unitKey, err) | 584 log.Printf("can't read unit %q: %v", unitKey, err) |
| 575 continue | 585 continue |
| 576 } | 586 } |
| 577 w.knownUnits[unitKey] = unit | 587 w.knownUnits[unitKey] = unit |
| 578 serviceUnitsChange.Added = append(serviceUnitsChange.Added, unit
) | 588 serviceUnitsChange.Added = append(serviceUnitsChange.Added, unit
) |
| 579 } | 589 } |
| 580 » select { | 590 » w.changeChan <- serviceUnitsChange |
| 581 » case <-w.tomb.Dying(): | 591 » return nil |
| 582 » » return tomb.ErrDying | 592 } |
| 583 » case w.changeChan <- serviceUnitsChange: | 593 |
| 594 func (w *ServiceUnitsWatcher) wait() { |
| 595 » for _ = range w.changeChan { |
| 584 } | 596 } |
| 585 return nil | |
| 586 } | 597 } |
| 587 | 598 |
| 588 func (w *ServiceUnitsWatcher) done() { | 599 func (w *ServiceUnitsWatcher) done() { |
| 589 close(w.changeChan) | 600 close(w.changeChan) |
| 590 } | 601 } |
| 591 | 602 |
| 592 // ServiceRelationsWatcher notifies of changes to a service's relations. | 603 // ServiceRelationsWatcher notifies of changes to a service's relations. |
| 593 type ServiceRelationsWatcher struct { | 604 type ServiceRelationsWatcher struct { |
| 594 contentWatcher | 605 contentWatcher |
| 595 changeChan chan RelationsChange | 606 changeChan chan RelationsChange |
| 596 service *Service | 607 service *Service |
| 597 current map[string]*Relation | 608 current map[string]*Relation |
| 598 } | 609 } |
| 599 | 610 |
| 600 type RelationsChange struct { | 611 type RelationsChange struct { |
| 601 Added, Removed []*Relation | 612 Added, Removed []*Relation |
| 602 } | 613 } |
| 603 | 614 |
| 604 // newServiceRelationsWatcher creates and starts a new service relations watcher
. | 615 // newServiceRelationsWatcher creates and starts a new service relations watcher
. |
| 605 func newServiceRelationsWatcher(s *Service) *ServiceRelationsWatcher { | 616 func newServiceRelationsWatcher(s *Service, stop <-chan struct{}) *ServiceRelati
onsWatcher { |
| 606 w := &ServiceRelationsWatcher{ | 617 w := &ServiceRelationsWatcher{ |
| 607 » » contentWatcher: newContentWatcher(s.st, zkTopologyPath), | 618 » » contentWatcher: newContentWatcher(s.st, zkTopologyPath, stop), |
| 608 changeChan: make(chan RelationsChange), | 619 changeChan: make(chan RelationsChange), |
| 609 service: s, | 620 service: s, |
| 610 current: make(map[string]*Relation), | 621 current: make(map[string]*Relation), |
| 611 } | 622 } |
| 612 » go w.loop(w) | 623 » w.runWatcher(w) |
| 613 return w | 624 return w |
| 614 } | 625 } |
| 615 | 626 |
| 616 // Changes returns a channel that will receive changes when | 627 // Changes returns a channel that will receive changes when |
| 617 // the service enters and leaves relations. | 628 // the service enters and leaves relations. |
| 618 // The Added field in the first event on the channel holds the initial | 629 // The Added field in the first event on the channel holds the initial |
| 619 // state, corresponding to that returned by service.Relations. | 630 // state, corresponding to that returned by service.Relations. |
| 620 func (w *ServiceRelationsWatcher) Changes() <-chan RelationsChange { | 631 func (w *ServiceRelationsWatcher) Changes() <-chan RelationsChange { |
| 621 return w.changeChan | 632 return w.changeChan |
| 622 } | 633 } |
| (...skipping 18 matching lines...) Expand all Loading... |
| 641 } | 652 } |
| 642 } | 653 } |
| 643 for key, rel := range w.current { | 654 for key, rel := range w.current { |
| 644 if latest[key] == nil { | 655 if latest[key] == nil { |
| 645 ch.Removed = append(ch.Removed, rel) | 656 ch.Removed = append(ch.Removed, rel) |
| 646 } | 657 } |
| 647 } | 658 } |
| 648 if w.updated && len(ch.Added) == 0 && len(ch.Removed) == 0 { | 659 if w.updated && len(ch.Added) == 0 && len(ch.Removed) == 0 { |
| 649 return nil | 660 return nil |
| 650 } | 661 } |
| 651 » select { | 662 » w.changeChan <- ch |
| 652 » case <-w.tomb.Dying(): | 663 » w.current = latest |
| 653 » » return tomb.ErrDying | 664 » return nil |
| 654 » case w.changeChan <- ch: | 665 } |
| 655 » » w.current = latest | 666 |
| 667 func (w *ServiceRelationsWatcher) wait() { |
| 668 » for _ = range w.changeChan { |
| 656 } | 669 } |
| 657 return nil | |
| 658 } | 670 } |
| 659 | 671 |
| 660 func (w *ServiceRelationsWatcher) done() { | 672 func (w *ServiceRelationsWatcher) done() { |
| 661 close(w.changeChan) | 673 close(w.changeChan) |
| 662 } | 674 } |
| OLD | NEW |