Left: | ||
Right: |
OLD | NEW |
---|---|
1 // The presence package is intended as a replacement for zookeeper ephemeral | 1 // The presence package is intended as a replacement for zookeeper ephemeral |
2 // nodes; the primary difference is that node timeout is unrelated to session | 2 // nodes; the primary difference is that node timeout is unrelated to session |
3 // timeout, and this allows us to restart a presence-enabled process "silently" | 3 // timeout, and this allows us to restart a presence-enabled process "silently" |
4 // (from the perspective of the rest of the system) without dealing with the | 4 // (from the perspective of the rest of the system) without dealing with the |
5 // complication of session re-establishment. | 5 // complication of session re-establishment. |
6 | 6 |
7 package presence | 7 package presence |
8 | 8 |
9 import ( | 9 import ( |
10 "fmt" | 10 "fmt" |
11 zk "launchpad.net/gozk/zookeeper" | 11 zk "launchpad.net/gozk/zookeeper" |
12 "launchpad.net/juju-core/log" | 12 "launchpad.net/juju-core/log" |
13 "launchpad.net/juju-core/state/watcher" | |
13 "launchpad.net/tomb" | 14 "launchpad.net/tomb" |
14 "time" | 15 "time" |
15 ) | 16 ) |
16 | 17 |
17 // changeNode wraps a zookeeper node and can induce watches on that node to fire . | 18 // changeNode wraps a zookeeper node and can induce watches on that node to fire . |
18 type changeNode struct { | 19 type changeNode struct { |
19 conn *zk.Conn | 20 conn *zk.Conn |
20 path string | 21 path string |
21 content string | 22 content string |
22 } | 23 } |
(...skipping 312 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
335 return fmt.Errorf("presence: channel closed while waitin g") | 336 return fmt.Errorf("presence: channel closed while waitin g") |
336 } | 337 } |
337 if !alive { | 338 if !alive { |
338 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") | 339 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") |
339 } | 340 } |
340 case <-time.After(timeout): | 341 case <-time.After(timeout): |
341 return fmt.Errorf("presence: still not alive after timeout") | 342 return fmt.Errorf("presence: still not alive after timeout") |
342 } | 343 } |
343 return nil | 344 return nil |
344 } | 345 } |
346 | |
347 // ChildrenWatcher mimics state/watcher's ChildrenWatcher, but treats nodes that | |
348 // do not have active Pingers as nonexistent. | |
349 type ChildrenWatcher struct { | |
350 conn *zk.Conn | |
351 tomb tomb.Tomb | |
352 path string | |
353 alive map[string]bool | |
354 stops map[string]chan bool | |
355 updates chan aliveChange | |
356 changes chan watcher.ChildrenChange | |
357 } | |
358 | |
359 // aliveChange is used internally by ChildrenWatcher to communicate node | |
360 // status changes from childLoop goroutines to the main loop goroutine. | |
361 type aliveChange struct { | |
362 key string | |
363 alive bool | |
364 } | |
365 | |
366 // NewChildrenWatcher returns a ChildrenWatcher that notifies of the | |
367 // presence and absence of Pingers in the direct child nodes of path. | |
368 func NewChildrenWatcher(conn *zk.Conn, path string) *ChildrenWatcher { | |
369 w := &ChildrenWatcher{ | |
370 conn: conn, | |
371 path: path, | |
372 alive: make(map[string]bool), | |
373 stops: make(map[string]chan bool), | |
374 updates: make(chan aliveChange), | |
375 changes: make(chan watcher.ChildrenChange), | |
376 } | |
377 go w.loop() | |
378 return w | |
379 } | |
380 | |
381 // Changes returns a channel on which presence changes can be received. | |
382 // The first event returns the current set of children on which Pingers | |
383 // are active. | |
384 func (w *ChildrenWatcher) Changes() <-chan watcher.ChildrenChange { | |
385 return w.changes | |
386 } | |
387 | |
388 // Stop terminates the watcher and returns any error encountered while watching. | |
389 func (w *ChildrenWatcher) Stop() error { | |
390 w.tomb.Kill(nil) | |
391 return w.tomb.Wait() | |
392 } | |
393 | |
394 // Err returns the error that stopped the watcher, or | |
395 // tomb.ErrStillAlive if the watcher is still running. | |
396 func (w *ChildrenWatcher) Err() error { | |
397 return w.tomb.Err() | |
398 } | |
399 | |
400 func (w *ChildrenWatcher) loop() { | |
401 defer w.finish() | |
402 cw := watcher.NewChildrenWatcher(w.conn, w.path) | |
403 defer watcher.Stop(cw, &w.tomb) | |
404 emittedValue := false | |
405 for { | |
406 var change watcher.ChildrenChange | |
407 select { | |
408 case <-w.tomb.Dying(): | |
409 return | |
410 case ch, ok := <-cw.Changes(): | |
411 var err error | |
412 if !ok { | |
413 err = watcher.MustErr(cw) | |
414 } else { | |
415 change, err = w.changeWatches(ch) | |
416 } | |
417 if err != nil { | |
418 w.tomb.Kill(err) | |
419 return | |
420 } | |
421 if emittedValue && len(change.Added) == 0 && len(change. Removed) == 0 { | |
422 continue | |
423 } | |
424 case ch, ok := <-w.updates: | |
425 if !ok { | |
426 panic("updates channel closed") | |
427 } | |
428 if ch.alive { | |
429 w.alive[ch.key] = true | |
430 change = watcher.ChildrenChange{Added: []string{ ch.key}} | |
431 } else if w.alive[ch.key] { | |
432 delete(w.alive, ch.key) | |
433 change = watcher.ChildrenChange{Removed: []strin g{ch.key}} | |
434 } else { | |
435 // The node is already known to be dead, as a re sult of a | |
436 // child removal detected by cw, and no further action need | |
437 // be taken. | |
438 continue | |
439 } | |
440 } | |
441 select { | |
442 case <-w.tomb.Dying(): | |
443 return | |
444 case w.changes <- change: | |
445 emittedValue = true | |
446 } | |
447 } | |
448 } | |
449 | |
450 // finish tidies up any active watches on the child nodes, closes | |
451 // channels, and marks the watcher as finished. | |
452 func (w *ChildrenWatcher) finish() { | |
453 for _, stop := range w.stops { | |
454 stop <- true | |
455 } | |
456 close(w.updates) | |
niemeyer
2012/07/19 00:03:26
Why was this added? There's apparently no benefit
fwereade
2012/07/19 07:17:02
The benefits is nebulous, philosophical, and argua
| |
457 close(w.changes) | |
458 w.tomb.Done() | |
459 } | |
460 | |
461 // changeWatches starts new presence watches on newly-added candidates, and | |
462 // stops them on deleted ones. It returns a ChildrenChange representing only | |
463 // those changes that correspond to the presence or hitherto-undetected | |
464 // absence of a Pinger on a candidate node. | |
465 func (w *ChildrenWatcher) changeWatches(ch watcher.ChildrenChange) (watcher.Chil drenChange, error) { | |
466 change := watcher.ChildrenChange{} | |
467 for _, key := range ch.Removed { | |
468 stop := w.stops[key] | |
469 delete(w.stops, key) | |
470 stop <- true | |
471 // The node might not already be known to be dead. | |
472 if w.alive[key] { | |
473 delete(w.alive, key) | |
474 change.Removed = append(change.Removed, key) | |
475 } | |
476 } | |
477 for _, key := range ch.Added { | |
478 path := w.path + "/" + key | |
479 alive, aliveW, err := AliveW(w.conn, path) | |
480 if err != nil { | |
481 return watcher.ChildrenChange{}, err | |
482 } | |
483 stop := make(chan bool) | |
484 w.stops[key] = stop | |
485 go w.childLoop(key, path, aliveW, stop) | |
486 if alive { | |
487 w.alive[key] = true | |
488 change.Added = append(change.Added, key) | |
489 } | |
490 } | |
491 return change, nil | |
492 } | |
493 | |
494 // childLoop sends aliveChange events to w.updates, in response to presence | |
495 // changes received from watch (which is refreshed internally as required), | |
496 // until its stop chan is closed. | |
497 func (w *ChildrenWatcher) childLoop(key, path string, watch <-chan bool, stop <- chan bool) { | |
498 for { | |
499 select { | |
500 case <-stop: | |
501 return | |
502 case alive, ok := <-watch: | |
503 if !ok { | |
504 w.tomb.Killf("presence watch on %q failed", path ) | |
505 return | |
506 } | |
507 // We definitely need to watch again; do so early, so we can verify | |
508 // that the state has changed an odd number of times sin ce we last | |
509 // notified, and thereby only send notifications on real changes. | |
niemeyer
2012/07/19 00:03:26
Thanks for the comment.
| |
510 aliveNow, newWatch, err := AliveW(w.conn, path) | |
511 if err != nil { | |
512 w.tomb.Kill(err) | |
513 return | |
514 } | |
515 watch = newWatch | |
516 if aliveNow != alive { | |
517 // State changed an odd number of times since th e watch fired; | |
518 // thus, it changed an even number of times sinc e we last | |
519 // notified; thus, there is no externally observ able change, | |
520 // and we should remain silent and just wait for the watch. | |
niemeyer
2012/07/19 00:03:26
This seems a bit confusing, and is basically resta
fwereade
2012/07/19 07:17:02
Done.
| |
521 continue | |
522 } | |
523 select { | |
524 case <-stop: | |
525 return | |
526 case w.updates <- aliveChange{key, aliveNow}: | |
527 } | |
528 } | |
529 } | |
530 } | |
OLD | NEW |