Index: mstate/presence/presence.go |
=== modified file 'mstate/presence/presence.go' |
--- mstate/presence/presence.go 2012-09-06 22:12:07 +0000 |
+++ mstate/presence/presence.go 2012-09-10 19:08:22 +0000 |
@@ -58,7 +58,7 @@ |
beingKey map[int64]string |
beingSeq map[string]int64 |
- // watches has the per-key observer channels from Add/Remove. |
+ // watches has the per-key observer channels from Watch/Unwatch. |
watches map[string][]chan<- Change |
// pending contains all the events to be dispatched to the watcher |
@@ -70,13 +70,12 @@ |
// the the gorotuine loop. |
request chan interface{} |
- // refreshed contains pending ForceRefresh done channels |
- // that are waiting for the completion notice. |
- refreshed []chan bool |
+ // syncDone contains pending done channels from sync requests. |
+ syncDone []chan bool |
- // next will dispatch when it's time to refresh the database |
+ // next will dispatch when it's time to sync the database |
// knowledge. It's maintained here so that ForceRefresh |
- // can manipulate it to force a refresh sooner. |
+ // can manipulate it to force a sync sooner. |
next <-chan time.Time |
} |
@@ -116,17 +115,31 @@ |
return w.tomb.Wait() |
} |
-type reqAdd struct { |
- key string |
- ch chan<- Change |
-} |
- |
-type reqRemove struct { |
- key string |
- ch chan<- Change |
-} |
- |
-type reqRefresh struct { |
+// Dying returns a channel that is closed when the watcher is stopping |
+// due to an error or because Stop was called explicitly. |
+func (w *Watcher) Dying() <-chan struct{} { |
+ return w.tomb.Dying() |
+} |
+ |
+// Err returns the error with which the watcher stopped. |
+// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive |
+// if the watcher is still running properly, or the respective error |
+// if the watcher is terminating or has terminated with an error. |
+func (w *Watcher) Err() error { |
+ return w.tomb.Err() |
+} |
+ |
+type reqWatch struct { |
+ key string |
+ ch chan<- Change |
+} |
+ |
+type reqUnwatch struct { |
+ key string |
+ ch chan<- Change |
+} |
+ |
+type reqSync struct { |
done chan bool |
} |
@@ -135,55 +148,55 @@ |
result chan bool |
} |
-// Add includes key into w for liveness monitoring. An event will |
+func (w *Watcher) sendReq(req interface{}) { |
+ select { |
+ case w.request <- req: |
+ case <-w.tomb.Dying(): |
+ } |
+} |
+ |
+// Watch starts watching the liveness of key. An event will |
// be sent onto ch to report the initial status for the key, and |
// from then on a new event will be sent whenever a change is |
// detected. Change values sent to the channel must be consumed, |
// or the whole watcher will blocked. |
-func (w *Watcher) Add(key string, ch chan<- Change) { |
- select { |
- case w.request <- reqAdd{key, ch}: |
- case <-w.tomb.Dying(): |
- } |
-} |
- |
-// Remove removes key and ch from liveness monitoring. |
-func (w *Watcher) Remove(key string, ch chan<- Change) { |
- select { |
- case w.request <- reqRemove{key, ch}: |
- case <-w.tomb.Dying(): |
- } |
-} |
- |
-// ForceRefresh forces a synchronous refresh of the watcher knowledge. |
-// It blocks until the database state has been loaded and the events |
-// have been prepared, but unblocks before changes are sent onto the |
-// registered channels. |
-func (w *Watcher) ForceRefresh() { |
+func (w *Watcher) Watch(key string, ch chan<- Change) { |
+ w.sendReq(reqWatch{key, ch}) |
+} |
+ |
+// Unwatch stops watching the liveness of key via ch. |
+func (w *Watcher) Unwatch(key string, ch chan<- Change) { |
+ w.sendReq(reqUnwatch{key, ch}) |
+} |
+ |
+// StartSync forces the watcher to load new events from the database. |
+func (w *Watcher) StartSync() { |
+ w.sendReq(reqSync{nil}) |
+} |
+ |
+// Sync forces the watcher to load new events from the database and blocks |
+// until all events have been dispatched. |
+func (w *Watcher) Sync() { |
done := make(chan bool) |
- select { |
- case w.request <- reqRefresh{done}: |
- case <-w.tomb.Dying(): |
- } |
+ w.sendReq(reqSync{done}) |
select { |
case <-done: |
case <-w.tomb.Dying(): |
} |
} |
-// Alive returns whether the key is currently considered alive by w. |
-func (w *Watcher) Alive(key string) bool { |
+// Alive returns whether the key is currently considered alive by w, |
+// or an error in case the watcher is dying. |
+func (w *Watcher) Alive(key string) (bool, error) { |
result := make(chan bool, 1) |
- select { |
- case w.request <- reqAlive{key, result}: |
- case <-w.tomb.Dying(): |
- } |
+ w.sendReq(reqAlive{key, result}) |
var alive bool |
select { |
case alive = <-result: |
case <-w.tomb.Dying(): |
+ return false, fmt.Errorf("cannot check liveness: watcher is dying") |
} |
- return alive |
+ return alive, nil |
} |
// period is the length of each time slot in seconds. |
@@ -205,18 +218,19 @@ |
return tomb.ErrDying |
case <-w.next: |
w.next = time.After(time.Duration(period) * time.Second) |
- refreshed := w.refreshed |
- w.refreshed = nil |
- if err := w.refresh(); err != nil { |
+ syncDone := w.syncDone |
+ w.syncDone = nil |
+ if err := w.sync(); err != nil { |
return err |
} |
- for _, done := range refreshed { |
+ w.flush() |
+ for _, done := range syncDone { |
close(done) |
} |
case req := <-w.request: |
w.handle(req) |
+ w.flush() |
} |
- w.flush() |
} |
return nil |
} |
@@ -224,20 +238,18 @@ |
// flush sends all pending events to their respective channels. |
func (w *Watcher) flush() { |
// w.pending may get new requests as we handle other requests. |
- i := 0 |
- for i < len(w.pending) { |
+ for i := 0; i < len(w.pending); i++ { |
e := &w.pending[i] |
- if e.ch == nil { |
- i++ // Removed meanwhile. |
- continue |
- } |
- select { |
- case <-w.tomb.Dying(): |
- return |
- case req := <-w.request: |
- w.handle(req) |
- case e.ch <- Change{e.key, e.alive}: |
- i++ |
+ for e.ch != nil { |
+ select { |
+ case <-w.tomb.Dying(): |
+ return |
+ case req := <-w.request: |
+ w.handle(req) |
+ continue |
+ case e.ch <- Change{e.key, e.alive}: |
+ } |
+ break |
} |
} |
w.pending = w.pending[:0] |
@@ -248,10 +260,12 @@ |
func (w *Watcher) handle(req interface{}) { |
log.Debugf("presence: got request: %#v", req) |
switch r := req.(type) { |
- case reqRefresh: |
+ case reqSync: |
w.next = time.After(0) |
- w.refreshed = append(w.refreshed, r.done) |
- case reqAdd: |
+ if r.done != nil { |
+ w.syncDone = append(w.syncDone, r.done) |
+ } |
+ case reqWatch: |
for _, ch := range w.watches[r.key] { |
if ch == r.ch { |
panic("adding channel twice for same key") |
@@ -260,7 +274,7 @@ |
w.watches[r.key] = append(w.watches[r.key], r.ch) |
_, alive := w.beingSeq[r.key] |
w.pending = append(w.pending, event{r.ch, r.key, alive}) |
- case reqRemove: |
+ case reqUnwatch: |
watches := w.watches[r.key] |
for i, ch := range watches { |
if ch == r.ch { |
@@ -294,11 +308,11 @@ |
Dead map[string]int64 ",omitempty" |
} |
-// refresh updates the watcher knowledge from the database, and |
+// sync updates the watcher knowledge from the database, and |
// queues events to observing channels. It fetches the last two time |
// slots and compares the union of both to the in-memory state. |
-func (w *Watcher) refresh() error { |
- log.Debugf("presence: refreshing watcher knowledge from database...") |
+func (w *Watcher) sync() error { |
+ log.Debugf("presence: synchronizing watcher knowledge with database...") |
slot := timeSlot(time.Now(), w.delta) |
var ping []pingInfo |
err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping) |