| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| 1 // The presence package is intended as a replacement for zookeeper ephemeral | 1 // The presence package is intended as a replacement for zookeeper ephemeral |
| 2 // nodes; the primary difference is that node timeout is unrelated to session | 2 // nodes; the primary difference is that node timeout is unrelated to session |
| 3 // timeout, and this allows us to restart a presence-enabled process "silently" | 3 // timeout, and this allows us to restart a presence-enabled process "silently" |
| 4 // (from the perspective of the rest of the system) without dealing with the | 4 // (from the perspective of the rest of the system) without dealing with the |
| 5 // complication of session re-establishment. | 5 // complication of session re-establishment. |
| 6 | 6 |
| 7 package presence | 7 package presence |
| 8 | 8 |
| 9 import ( | 9 import ( |
| 10 "fmt" | 10 "fmt" |
| 11 zk "launchpad.net/gozk/zookeeper" | 11 zk "launchpad.net/gozk/zookeeper" |
| 12 "launchpad.net/juju-core/state/watcher" | |
| 12 "launchpad.net/tomb" | 13 "launchpad.net/tomb" |
| 14 "sync" | |
| 13 "time" | 15 "time" |
| 14 ) | 16 ) |
| 15 | 17 |
| 16 // changeNode wraps a zookeeper node and can induce watches on that node to fire . | 18 // changeNode wraps a zookeeper node and can induce watches on that node to fire . |
| 17 type changeNode struct { | 19 type changeNode struct { |
| 18 conn *zk.Conn | 20 conn *zk.Conn |
| 19 path string | 21 path string |
| 20 content string | 22 content string |
| 21 } | 23 } |
| 22 | 24 |
| (...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 272 return fmt.Errorf("presence: channel closed while waitin g") | 274 return fmt.Errorf("presence: channel closed while waitin g") |
| 273 } | 275 } |
| 274 if !alive { | 276 if !alive { |
| 275 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") | 277 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") |
| 276 } | 278 } |
| 277 case <-time.After(timeout): | 279 case <-time.After(timeout): |
| 278 return fmt.Errorf("presence: still not alive after timeout") | 280 return fmt.Errorf("presence: still not alive after timeout") |
| 279 } | 281 } |
| 280 return nil | 282 return nil |
| 281 } | 283 } |
| 284 | |
| 285 // ChildrenWatcher mimics state.watcher.ChildrenWatcher, but treats nodes that | |
|
niemeyer
2012/06/27 05:41:57
state.watcher.ChildrenWatcher => state/watcher's C
fwereade
2012/06/27 14:52:51
Good point :). Done.
| |
| 286 // do not have active Pingers as nonexistent. | |
| 287 type ChildrenWatcher struct { | |
| 288 conn *zk.Conn | |
| 289 tomb tomb.Tomb | |
| 290 path string | |
| 291 alive map[string]bool | |
| 292 stops map[string]chan struct{} | |
| 293 watcher *watcher.ChildrenWatcher | |
| 294 updates chan aliveChange | |
| 295 changes chan watcher.ChildrenChange | |
| 296 wg sync.WaitGroup | |
| 297 } | |
| 298 | |
| 299 // aliveChange is used internally by ChildrenWatcher to communicate node | |
| 300 // status changes. | |
| 301 type aliveChange struct { | |
| 302 key string | |
| 303 alive bool | |
| 304 } | |
| 305 | |
| 306 // NewChildrenWatcher returns a ChildrenWatcher that notifies of the | |
| 307 // presence and absence of Pingers in the direct child nodes of path. | |
| 308 func NewChildrenWatcher(conn *zk.Conn, path string) *ChildrenWatcher { | |
| 309 w := &ChildrenWatcher{ | |
| 310 conn: conn, | |
| 311 path: path, | |
| 312 alive: make(map[string]bool), | |
| 313 stops: make(map[string]chan struct{}), | |
| 314 watcher: watcher.NewChildrenWatcher(conn, path), | |
| 315 updates: make(chan aliveChange), | |
| 316 changes: make(chan watcher.ChildrenChange), | |
| 317 } | |
| 318 go w.loop() | |
| 319 return w | |
| 320 } | |
| 321 | |
| 322 // Changes returns a channel on which presence changes can be received. | |
| 323 // The first event returns the current set of children on which Pingers | |
| 324 // are active. | |
| 325 func (w *ChildrenWatcher) Changes() <-chan watcher.ChildrenChange { | |
| 326 return w.changes | |
| 327 } | |
| 328 | |
| 329 // Stop terminates the watcher and returns any error encountered while watching. | |
| 330 func (w *ChildrenWatcher) Stop() error { | |
| 331 w.tomb.Kill(nil) | |
| 332 return w.tomb.Wait() | |
| 333 } | |
| 334 | |
| 335 func (w *ChildrenWatcher) loop() { | |
| 336 defer func() { | |
|
niemeyer
2012/06/27 05:41:57
There's no reason for this to be a closure. defer
fwereade
2012/06/27 14:52:51
Changed enough that Doneness is moot :).
| |
| 337 if err := w.watcher.Stop(); err != nil { | |
| 338 w.tomb.Kill(err) | |
| 339 } | |
| 340 for _, stop := range w.stops { | |
| 341 close(stop) | |
| 342 } | |
| 343 // Wait for the child loops to terminate before | |
| 344 // closing the channel on which they send. | |
| 345 w.wg.Wait() | |
| 346 close(w.updates) | |
| 347 close(w.changes) | |
|
niemeyer
2012/06/27 05:41:57
Closing these channels seems to just create extra
fwereade
2012/06/27 14:52:51
Done.
| |
| 348 w.tomb.Done() | |
| 349 }() | |
| 350 emittedValue := false | |
| 351 for { | |
| 352 var change watcher.ChildrenChange | |
| 353 select { | |
| 354 case <-w.tomb.Dying(): | |
| 355 return | |
| 356 case ch, ok := <-w.watcher.Changes(): | |
| 357 var err error | |
| 358 if !ok { | |
| 359 if err = w.watcher.Err(); err == nil { | |
| 360 panic("candidates channel closed unexpec tedly") | |
| 361 } | |
| 362 } | |
| 363 if err == nil { | |
| 364 change, err = w.updateWatches(ch) | |
| 365 } | |
| 366 if err != nil { | |
| 367 w.tomb.Kill(err) | |
| 368 return | |
| 369 } | |
| 370 case ch, ok := <-w.updates: | |
| 371 if !ok { | |
| 372 panic("updates channel closed unexpectedly") | |
|
niemeyer
2012/06/27 05:41:57
If closing it can only lead to panics, we don't ne
fwereade
2012/06/27 14:52:51
Done.
| |
| 373 } | |
| 374 change = w.updatePresence(ch) | |
| 375 } | |
| 376 if emittedValue && len(change.Added) == 0 && len(change.Deleted) == 0 { | |
| 377 continue | |
| 378 } | |
| 379 select { | |
| 380 case <-w.tomb.Dying(): | |
| 381 return | |
| 382 case w.changes <- change: | |
| 383 emittedValue = true | |
| 384 } | |
| 385 } | |
| 386 } | |
| 387 | |
| 388 // updateWatches stops existing presence watches on deleted candidates, and | |
| 389 // starts new presence watches on newly-added candidates. It returns a | |
| 390 // ChildrenChange representing only those changes that correspond to the | |
| 391 // presence or absence of a Pinger on a candidate node. | |
| 392 func (w *ChildrenWatcher) updateWatches(ch watcher.ChildrenChange) (watcher.Chil drenChange, error) { | |
| 393 change := watcher.ChildrenChange{} | |
| 394 for _, key := range ch.Deleted { | |
| 395 stop := w.stops[key] | |
| 396 delete(w.stops, key) | |
| 397 close(stop) | |
|
niemeyer
2012/06/27 05:41:57
What if we discarded delete notification for nodes
fwereade
2012/06/27 14:52:51
Very neat.
/me removes hat.
Done.
| |
| 398 if w.alive[key] { | |
| 399 // Race: a node deletion might have already been noted b y | |
| 400 // the child loop, and already passed on in a previous c hange. | |
| 401 delete(w.alive, key) | |
| 402 change.Deleted = append(change.Deleted, key) | |
| 403 } | |
| 404 } | |
| 405 for _, key := range ch.Added { | |
| 406 alive, aliveW, err := AliveW(w.conn, w.path+"/"+key) | |
| 407 if err != nil { | |
| 408 return watcher.ChildrenChange{}, err | |
| 409 } | |
| 410 if alive { | |
| 411 w.alive[key] = true | |
| 412 change.Added = append(change.Added, key) | |
| 413 } | |
| 414 w.stops[key] = make(chan struct{}) | |
| 415 w.wg.Add(1) | |
| 416 go w.childLoop(key, aliveW, w.stops[key]) | |
| 417 } | |
| 418 return change, nil | |
| 419 } | |
| 420 | |
| 421 // childLoop sends aliveChange events to w.updates, in response to presence | |
| 422 // changes received from watch, until it is stopped. | |
| 423 func (w *ChildrenWatcher) childLoop(key string, watch <-chan bool, stop <-chan s truct{}) { | |
| 424 defer w.wg.Done() | |
| 425 for { | |
| 426 select { | |
| 427 case <-stop: | |
|
niemeyer
2012/06/27 05:41:57
If the above suggestion works, it doesn't feel lik
fwereade
2012/06/27 14:52:51
I don't think we even need wg any more :).
| |
| 428 return | |
| 429 case alive, ok := <-watch: | |
| 430 if !ok { | |
| 431 w.tomb.Killf("presence watch closed unexpectedly ") | |
|
niemeyer
2012/06/27 05:41:57
Watchers can close channels due to a number of rea
fwereade
2012/06/27 14:52:51
Tweaked wording. The proper solution, I think, is
| |
| 432 } | |
| 433 select { | |
| 434 case <-w.tomb.Dying(): | |
| 435 return | |
| 436 case w.updates <- aliveChange{key, alive}: | |
| 437 } | |
| 438 } | |
| 439 } | |
| 440 } | |
| 441 | |
| 442 // updatePresence interprets an aliveChange event and converts it to | |
| 443 // a ChildrenChange event. | |
| 444 func (w *ChildrenWatcher) updatePresence(ch aliveChange) watcher.ChildrenChange { | |
| 445 if ch.alive { | |
| 446 w.alive[ch.key] = true | |
| 447 return watcher.ChildrenChange{Added: []string{ch.key}} | |
| 448 } | |
| 449 if w.alive[ch.key] { | |
| 450 // Race: w.watcher might have processed the node deletion *after * | |
| 451 // the childLoop detected the change, but *before* it sent it to | |
| 452 // the updates channel (which send we are currently handling). | |
| 453 delete(w.alive, ch.key) | |
| 454 return watcher.ChildrenChange{Deleted: []string{ch.key}} | |
| 455 } | |
| 456 return watcher.ChildrenChange{} | |
| 457 } | |
| OLD | NEW |