Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(634)

Side by Side Diff: state/presence/presence.go

Issue 6354045: add presence.ChildrenWatcher
Patch Set: add presence.ChildrenWatcher Created 12 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | state/presence/presence_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // The presence package is intended as a replacement for zookeeper ephemeral 1 // The presence package is intended as a replacement for zookeeper ephemeral
2 // nodes; the primary difference is that node timeout is unrelated to session 2 // nodes; the primary difference is that node timeout is unrelated to session
3 // timeout, and this allows us to restart a presence-enabled process "silently" 3 // timeout, and this allows us to restart a presence-enabled process "silently"
4 // (from the perspective of the rest of the system) without dealing with the 4 // (from the perspective of the rest of the system) without dealing with the
5 // complication of session re-establishment. 5 // complication of session re-establishment.
6 6
7 package presence 7 package presence
8 8
9 import ( 9 import (
10 "fmt" 10 "fmt"
11 zk "launchpad.net/gozk/zookeeper" 11 zk "launchpad.net/gozk/zookeeper"
12 "launchpad.net/juju-core/log" 12 "launchpad.net/juju-core/log"
13 "launchpad.net/juju-core/state/watcher"
13 "launchpad.net/tomb" 14 "launchpad.net/tomb"
14 "time" 15 "time"
15 ) 16 )
16 17
17 // changeNode wraps a zookeeper node and can induce watches on that node to fire . 18 // changeNode wraps a zookeeper node and can induce watches on that node to fire .
18 type changeNode struct { 19 type changeNode struct {
19 conn *zk.Conn 20 conn *zk.Conn
20 path string 21 path string
21 content string 22 content string
22 } 23 }
(...skipping 312 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 return fmt.Errorf("presence: channel closed while waitin g") 336 return fmt.Errorf("presence: channel closed while waitin g")
336 } 337 }
337 if !alive { 338 if !alive {
338 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") 339 return fmt.Errorf("presence: alive watch misbehaved whil e waiting")
339 } 340 }
340 case <-time.After(timeout): 341 case <-time.After(timeout):
341 return fmt.Errorf("presence: still not alive after timeout") 342 return fmt.Errorf("presence: still not alive after timeout")
342 } 343 }
343 return nil 344 return nil
344 } 345 }
346
347 // ChildrenWatcher mimics state/watcher's ChildrenWatcher, but treats nodes that
348 // do not have active Pingers as nonexistent.
349 type ChildrenWatcher struct {
350 conn *zk.Conn
351 tomb tomb.Tomb
352 path string
353 alive map[string]bool
354 stops map[string]chan bool
355 updates chan aliveChange
356 changes chan watcher.ChildrenChange
357 }
358
359 // aliveChange is used internally by ChildrenWatcher to communicate node
360 // status changes from childLoop goroutines to the main loop goroutine.
361 type aliveChange struct {
362 key string
363 alive bool
364 }
365
366 // NewChildrenWatcher returns a ChildrenWatcher that notifies of the
367 // presence and absence of Pingers in the direct child nodes of path.
368 func NewChildrenWatcher(conn *zk.Conn, path string) *ChildrenWatcher {
369 w := &ChildrenWatcher{
370 conn: conn,
371 path: path,
372 alive: make(map[string]bool),
373 stops: make(map[string]chan bool),
374 updates: make(chan aliveChange),
375 changes: make(chan watcher.ChildrenChange),
376 }
377 go w.loop()
378 return w
379 }
380
381 // Changes returns a channel on which presence changes can be received.
382 // The first event returns the current set of children on which Pingers
383 // are active.
384 func (w *ChildrenWatcher) Changes() <-chan watcher.ChildrenChange {
385 return w.changes
386 }
387
388 // Stop terminates the watcher and returns any error encountered while watching.
389 func (w *ChildrenWatcher) Stop() error {
390 w.tomb.Kill(nil)
391 return w.tomb.Wait()
392 }
393
394 // Err returns the error that stopped the watcher, or
395 // tomb.ErrStillAlive if the watcher is still running.
396 func (w *ChildrenWatcher) Err() error {
397 return w.tomb.Err()
398 }
399
400 func (w *ChildrenWatcher) loop() {
401 defer w.finish()
402 cw := watcher.NewChildrenWatcher(w.conn, w.path)
403 defer watcher.Stop(cw, &w.tomb)
404 emittedValue := false
405 for {
406 var change watcher.ChildrenChange
407 select {
408 case <-w.tomb.Dying():
409 return
410 case ch, ok := <-cw.Changes():
411 var err error
412 if !ok {
413 err = watcher.MustErr(cw)
414 } else {
415 change, err = w.changeWatches(ch)
416 }
417 if err != nil {
418 w.tomb.Kill(err)
419 return
420 }
421 if emittedValue && len(change.Added) == 0 && len(change. Removed) == 0 {
422 continue
423 }
424 case ch, ok := <-w.updates:
425 if !ok {
426 panic("updates channel closed")
427 }
428 if ch.alive {
429 w.alive[ch.key] = true
430 change = watcher.ChildrenChange{Added: []string{ ch.key}}
431 } else if w.alive[ch.key] {
432 delete(w.alive, ch.key)
433 change = watcher.ChildrenChange{Removed: []strin g{ch.key}}
434 } else {
435 // The node is already known to be dead, as a re sult of a
436 // child removal detected by cw, and no further action need
437 // be taken.
438 continue
439 }
440 }
441 select {
442 case <-w.tomb.Dying():
443 return
444 case w.changes <- change:
445 emittedValue = true
446 }
447 }
448 }
449
450 // finish tidies up any active watches on the child nodes, closes
451 // channels, and marks the watcher as finished.
452 func (w *ChildrenWatcher) finish() {
453 for _, stop := range w.stops {
454 stop <- true
455 }
456 close(w.updates)
niemeyer 2012/07/19 00:03:26 Why was this added? There's apparently no benefit
fwereade 2012/07/19 07:17:02 The benefits is nebulous, philosophical, and argua
457 close(w.changes)
458 w.tomb.Done()
459 }
460
461 // changeWatches starts new presence watches on newly-added candidates, and
462 // stops them on deleted ones. It returns a ChildrenChange representing only
463 // those changes that correspond to the presence or hitherto-undetected
464 // absence of a Pinger on a candidate node.
465 func (w *ChildrenWatcher) changeWatches(ch watcher.ChildrenChange) (watcher.Chil drenChange, error) {
466 change := watcher.ChildrenChange{}
467 for _, key := range ch.Removed {
468 stop := w.stops[key]
469 delete(w.stops, key)
470 stop <- true
471 // The node might not already be known to be dead.
472 if w.alive[key] {
473 delete(w.alive, key)
474 change.Removed = append(change.Removed, key)
475 }
476 }
477 for _, key := range ch.Added {
478 path := w.path + "/" + key
479 alive, aliveW, err := AliveW(w.conn, path)
480 if err != nil {
481 return watcher.ChildrenChange{}, err
482 }
483 stop := make(chan bool)
484 w.stops[key] = stop
485 go w.childLoop(key, path, aliveW, stop)
486 if alive {
487 w.alive[key] = true
488 change.Added = append(change.Added, key)
489 }
490 }
491 return change, nil
492 }
493
494 // childLoop sends aliveChange events to w.updates, in response to presence
495 // changes received from watch (which is refreshed internally as required),
496 // until its stop chan is closed.
497 func (w *ChildrenWatcher) childLoop(key, path string, watch <-chan bool, stop <- chan bool) {
498 for {
499 select {
500 case <-stop:
501 return
502 case alive, ok := <-watch:
503 if !ok {
504 w.tomb.Killf("presence watch on %q failed", path )
505 return
506 }
507 // We definitely need to watch again; do so early, so we can verify
508 // that the state has changed an odd number of times sin ce we last
509 // notified, and thereby only send notifications on real changes.
niemeyer 2012/07/19 00:03:26 Thanks for the comment.
510 aliveNow, newWatch, err := AliveW(w.conn, path)
511 if err != nil {
512 w.tomb.Kill(err)
513 return
514 }
515 watch = newWatch
516 if aliveNow != alive {
517 // State changed an odd number of times since th e watch fired;
518 // thus, it changed an even number of times sinc e we last
519 // notified; thus, there is no externally observ able change,
520 // and we should remain silent and just wait for the watch.
niemeyer 2012/07/19 00:03:26 This seems a bit confusing, and is basically resta
fwereade 2012/07/19 07:17:02 Done.
521 continue
522 }
523 select {
524 case <-stop:
525 return
526 case w.updates <- aliveChange{key, aliveNow}:
527 }
528 }
529 }
530 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | state/presence/presence_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b