Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(153)

Side by Side Diff: state/presence/presence.go

Issue 6335057: add presence.ChildrenWatcher
Patch Set: add presence.ChildrenWatcher Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | state/presence/presence_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // The presence package is intended as a replacement for zookeeper ephemeral 1 // The presence package is intended as a replacement for zookeeper ephemeral
2 // nodes; the primary difference is that node timeout is unrelated to session 2 // nodes; the primary difference is that node timeout is unrelated to session
3 // timeout, and this allows us to restart a presence-enabled process "silently" 3 // timeout, and this allows us to restart a presence-enabled process "silently"
4 // (from the perspective of the rest of the system) without dealing with the 4 // (from the perspective of the rest of the system) without dealing with the
5 // complication of session re-establishment. 5 // complication of session re-establishment.
6 6
7 package presence 7 package presence
8 8
9 import ( 9 import (
10 "fmt" 10 "fmt"
11 zk "launchpad.net/gozk/zookeeper" 11 zk "launchpad.net/gozk/zookeeper"
12 "launchpad.net/juju-core/state/watcher"
12 "launchpad.net/tomb" 13 "launchpad.net/tomb"
14 "sync"
13 "time" 15 "time"
14 ) 16 )
15 17
16 // changeNode wraps a zookeeper node and can induce watches on that node to fire . 18 // changeNode wraps a zookeeper node and can induce watches on that node to fire .
17 type changeNode struct { 19 type changeNode struct {
18 conn *zk.Conn 20 conn *zk.Conn
19 path string 21 path string
20 content string 22 content string
21 } 23 }
22 24
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
272 return fmt.Errorf("presence: channel closed while waitin g") 274 return fmt.Errorf("presence: channel closed while waitin g")
273 } 275 }
274 if !alive { 276 if !alive {
275 return fmt.Errorf("presence: alive watch misbehaved whil e waiting") 277 return fmt.Errorf("presence: alive watch misbehaved whil e waiting")
276 } 278 }
277 case <-time.After(timeout): 279 case <-time.After(timeout):
278 return fmt.Errorf("presence: still not alive after timeout") 280 return fmt.Errorf("presence: still not alive after timeout")
279 } 281 }
280 return nil 282 return nil
281 } 283 }
284
285 // ChildrenWatcher mimics state.watcher.ChildrenWatcher, but treats nodes that
niemeyer 2012/06/27 05:41:57 state.watcher.ChildrenWatcher => state/watcher's C
fwereade 2012/06/27 14:52:51 Good point :). Done.
286 // do not have active Pingers as nonexistent.
287 type ChildrenWatcher struct {
288 conn *zk.Conn
289 tomb tomb.Tomb
290 path string
291 alive map[string]bool
292 stops map[string]chan struct{}
293 watcher *watcher.ChildrenWatcher
294 updates chan aliveChange
295 changes chan watcher.ChildrenChange
296 wg sync.WaitGroup
297 }
298
299 // aliveChange is used internally by ChildrenWatcher to communicate node
300 // status changes.
301 type aliveChange struct {
302 key string
303 alive bool
304 }
305
306 // NewChildrenWatcher returns a ChildrenWatcher that notifies of the
307 // presence and absence of Pingers in the direct child nodes of path.
308 func NewChildrenWatcher(conn *zk.Conn, path string) *ChildrenWatcher {
309 w := &ChildrenWatcher{
310 conn: conn,
311 path: path,
312 alive: make(map[string]bool),
313 stops: make(map[string]chan struct{}),
314 watcher: watcher.NewChildrenWatcher(conn, path),
315 updates: make(chan aliveChange),
316 changes: make(chan watcher.ChildrenChange),
317 }
318 go w.loop()
319 return w
320 }
321
322 // Changes returns a channel on which presence changes can be received.
323 // The first event returns the current set of children on which Pingers
324 // are active.
325 func (w *ChildrenWatcher) Changes() <-chan watcher.ChildrenChange {
326 return w.changes
327 }
328
329 // Stop terminates the watcher and returns any error encountered while watching.
330 func (w *ChildrenWatcher) Stop() error {
331 w.tomb.Kill(nil)
332 return w.tomb.Wait()
333 }
334
335 func (w *ChildrenWatcher) loop() {
336 defer func() {
niemeyer 2012/06/27 05:41:57 There's no reason for this to be a closure. defer
fwereade 2012/06/27 14:52:51 Changed enough that Doneness is moot :).
337 if err := w.watcher.Stop(); err != nil {
338 w.tomb.Kill(err)
339 }
340 for _, stop := range w.stops {
341 close(stop)
342 }
343 // Wait for the child loops to terminate before
344 // closing the channel on which they send.
345 w.wg.Wait()
346 close(w.updates)
347 close(w.changes)
niemeyer 2012/06/27 05:41:57 Closing these channels seems to just create extra
fwereade 2012/06/27 14:52:51 Done.
348 w.tomb.Done()
349 }()
350 emittedValue := false
351 for {
352 var change watcher.ChildrenChange
353 select {
354 case <-w.tomb.Dying():
355 return
356 case ch, ok := <-w.watcher.Changes():
357 var err error
358 if !ok {
359 if err = w.watcher.Err(); err == nil {
360 panic("candidates channel closed unexpec tedly")
361 }
362 }
363 if err == nil {
364 change, err = w.updateWatches(ch)
365 }
366 if err != nil {
367 w.tomb.Kill(err)
368 return
369 }
370 case ch, ok := <-w.updates:
371 if !ok {
372 panic("updates channel closed unexpectedly")
niemeyer 2012/06/27 05:41:57 If closing it can only lead to panics, we don't ne
fwereade 2012/06/27 14:52:51 Done.
373 }
374 change = w.updatePresence(ch)
375 }
376 if emittedValue && len(change.Added) == 0 && len(change.Deleted) == 0 {
377 continue
378 }
379 select {
380 case <-w.tomb.Dying():
381 return
382 case w.changes <- change:
383 emittedValue = true
384 }
385 }
386 }
387
388 // updateWatches stops existing presence watches on deleted candidates, and
389 // starts new presence watches on newly-added candidates. It returns a
390 // ChildrenChange representing only those changes that correspond to the
391 // presence or absence of a Pinger on a candidate node.
392 func (w *ChildrenWatcher) updateWatches(ch watcher.ChildrenChange) (watcher.Chil drenChange, error) {
393 change := watcher.ChildrenChange{}
394 for _, key := range ch.Deleted {
395 stop := w.stops[key]
396 delete(w.stops, key)
397 close(stop)
niemeyer 2012/06/27 05:41:57 What if we discarded delete notification for nodes
fwereade 2012/06/27 14:52:51 Very neat. /me removes hat. Done.
398 if w.alive[key] {
399 // Race: a node deletion might have already been noted b y
400 // the child loop, and already passed on in a previous c hange.
401 delete(w.alive, key)
402 change.Deleted = append(change.Deleted, key)
403 }
404 }
405 for _, key := range ch.Added {
406 alive, aliveW, err := AliveW(w.conn, w.path+"/"+key)
407 if err != nil {
408 return watcher.ChildrenChange{}, err
409 }
410 if alive {
411 w.alive[key] = true
412 change.Added = append(change.Added, key)
413 }
414 w.stops[key] = make(chan struct{})
415 w.wg.Add(1)
416 go w.childLoop(key, aliveW, w.stops[key])
417 }
418 return change, nil
419 }
420
421 // childLoop sends aliveChange events to w.updates, in response to presence
422 // changes received from watch, until it is stopped.
423 func (w *ChildrenWatcher) childLoop(key string, watch <-chan bool, stop <-chan s truct{}) {
424 defer w.wg.Done()
425 for {
426 select {
427 case <-stop:
niemeyer 2012/06/27 05:41:57 If the above suggestion works, it doesn't feel lik
fwereade 2012/06/27 14:52:51 I don't think we even need wg any more :).
428 return
429 case alive, ok := <-watch:
430 if !ok {
431 w.tomb.Killf("presence watch closed unexpectedly ")
niemeyer 2012/06/27 05:41:57 Watchers can close channels due to a number of rea
fwereade 2012/06/27 14:52:51 Tweaked wording. The proper solution, I think, is
432 }
433 select {
434 case <-w.tomb.Dying():
435 return
436 case w.updates <- aliveChange{key, alive}:
437 }
438 }
439 }
440 }
441
442 // updatePresence interprets an aliveChange event and converts it to
443 // a ChildrenChange event.
444 func (w *ChildrenWatcher) updatePresence(ch aliveChange) watcher.ChildrenChange {
445 if ch.alive {
446 w.alive[ch.key] = true
447 return watcher.ChildrenChange{Added: []string{ch.key}}
448 }
449 if w.alive[ch.key] {
450 // Race: w.watcher might have processed the node deletion *after *
451 // the childLoop detected the change, but *before* it sent it to
452 // the updates channel (which send we are currently handling).
453 delete(w.alive, ch.key)
454 return watcher.ChildrenChange{Deleted: []string{ch.key}}
455 }
456 return watcher.ChildrenChange{}
457 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | state/presence/presence_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d