Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(5)

Unified Diff: mstate/presence/presence.go

Issue 6501114: mstate/presence: bring it in line with mstate/watcher
Patch Set: mstate/presence: bring it in line with mstate/watcher Created 11 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mstate/open.go ('k') | mstate/presence/presence_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mstate/presence/presence.go
=== modified file 'mstate/presence/presence.go'
--- mstate/presence/presence.go 2012-09-06 22:12:07 +0000
+++ mstate/presence/presence.go 2012-09-10 19:08:22 +0000
@@ -58,7 +58,7 @@
beingKey map[int64]string
beingSeq map[string]int64
- // watches has the per-key observer channels from Add/Remove.
+ // watches has the per-key observer channels from Watch/Unwatch.
watches map[string][]chan<- Change
// pending contains all the events to be dispatched to the watcher
@@ -70,13 +70,12 @@
// the the gorotuine loop.
request chan interface{}
- // refreshed contains pending ForceRefresh done channels
- // that are waiting for the completion notice.
- refreshed []chan bool
+ // syncDone contains pending done channels from sync requests.
+ syncDone []chan bool
- // next will dispatch when it's time to refresh the database
+ // next will dispatch when it's time to sync the database
// knowledge. It's maintained here so that ForceRefresh
- // can manipulate it to force a refresh sooner.
+ // can manipulate it to force a sync sooner.
next <-chan time.Time
}
@@ -116,17 +115,31 @@
return w.tomb.Wait()
}
-type reqAdd struct {
- key string
- ch chan<- Change
-}
-
-type reqRemove struct {
- key string
- ch chan<- Change
-}
-
-type reqRefresh struct {
+// Dying returns a channel that is closed when the watcher is stopping
+// due to an error or because Stop was called explicitly.
+func (w *Watcher) Dying() <-chan struct{} {
+ return w.tomb.Dying()
+}
+
+// Err returns the error with which the watcher stopped.
+// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive
+// if the watcher is still running properly, or the respective error
+// if the watcher is terminating or has terminated with an error.
+func (w *Watcher) Err() error {
+ return w.tomb.Err()
+}
+
+type reqWatch struct {
+ key string
+ ch chan<- Change
+}
+
+type reqUnwatch struct {
+ key string
+ ch chan<- Change
+}
+
+type reqSync struct {
done chan bool
}
@@ -135,55 +148,55 @@
result chan bool
}
-// Add includes key into w for liveness monitoring. An event will
+func (w *Watcher) sendReq(req interface{}) {
+ select {
+ case w.request <- req:
+ case <-w.tomb.Dying():
+ }
+}
+
+// Watch starts watching the liveness of key. An event will
// be sent onto ch to report the initial status for the key, and
// from then on a new event will be sent whenever a change is
// detected. Change values sent to the channel must be consumed,
// or the whole watcher will blocked.
-func (w *Watcher) Add(key string, ch chan<- Change) {
- select {
- case w.request <- reqAdd{key, ch}:
- case <-w.tomb.Dying():
- }
-}
-
-// Remove removes key and ch from liveness monitoring.
-func (w *Watcher) Remove(key string, ch chan<- Change) {
- select {
- case w.request <- reqRemove{key, ch}:
- case <-w.tomb.Dying():
- }
-}
-
-// ForceRefresh forces a synchronous refresh of the watcher knowledge.
-// It blocks until the database state has been loaded and the events
-// have been prepared, but unblocks before changes are sent onto the
-// registered channels.
-func (w *Watcher) ForceRefresh() {
+func (w *Watcher) Watch(key string, ch chan<- Change) {
+ w.sendReq(reqWatch{key, ch})
+}
+
+// Unwatch stops watching the liveness of key via ch.
+func (w *Watcher) Unwatch(key string, ch chan<- Change) {
+ w.sendReq(reqUnwatch{key, ch})
+}
+
+// StartSync forces the watcher to load new events from the database.
+func (w *Watcher) StartSync() {
+ w.sendReq(reqSync{nil})
+}
+
+// Sync forces the watcher to load new events from the database and blocks
+// until all events have been dispatched.
+func (w *Watcher) Sync() {
done := make(chan bool)
- select {
- case w.request <- reqRefresh{done}:
- case <-w.tomb.Dying():
- }
+ w.sendReq(reqSync{done})
select {
case <-done:
case <-w.tomb.Dying():
}
}
-// Alive returns whether the key is currently considered alive by w.
-func (w *Watcher) Alive(key string) bool {
+// Alive returns whether the key is currently considered alive by w,
+// or an error in case the watcher is dying.
+func (w *Watcher) Alive(key string) (bool, error) {
result := make(chan bool, 1)
- select {
- case w.request <- reqAlive{key, result}:
- case <-w.tomb.Dying():
- }
+ w.sendReq(reqAlive{key, result})
var alive bool
select {
case alive = <-result:
case <-w.tomb.Dying():
+ return false, fmt.Errorf("cannot check liveness: watcher is dying")
}
- return alive
+ return alive, nil
}
// period is the length of each time slot in seconds.
@@ -205,18 +218,19 @@
return tomb.ErrDying
case <-w.next:
w.next = time.After(time.Duration(period) * time.Second)
- refreshed := w.refreshed
- w.refreshed = nil
- if err := w.refresh(); err != nil {
+ syncDone := w.syncDone
+ w.syncDone = nil
+ if err := w.sync(); err != nil {
return err
}
- for _, done := range refreshed {
+ w.flush()
+ for _, done := range syncDone {
close(done)
}
case req := <-w.request:
w.handle(req)
+ w.flush()
}
- w.flush()
}
return nil
}
@@ -224,20 +238,18 @@
// flush sends all pending events to their respective channels.
func (w *Watcher) flush() {
// w.pending may get new requests as we handle other requests.
- i := 0
- for i < len(w.pending) {
+ for i := 0; i < len(w.pending); i++ {
e := &w.pending[i]
- if e.ch == nil {
- i++ // Removed meanwhile.
- continue
- }
- select {
- case <-w.tomb.Dying():
- return
- case req := <-w.request:
- w.handle(req)
- case e.ch <- Change{e.key, e.alive}:
- i++
+ for e.ch != nil {
+ select {
+ case <-w.tomb.Dying():
+ return
+ case req := <-w.request:
+ w.handle(req)
+ continue
+ case e.ch <- Change{e.key, e.alive}:
+ }
+ break
}
}
w.pending = w.pending[:0]
@@ -248,10 +260,12 @@
func (w *Watcher) handle(req interface{}) {
log.Debugf("presence: got request: %#v", req)
switch r := req.(type) {
- case reqRefresh:
+ case reqSync:
w.next = time.After(0)
- w.refreshed = append(w.refreshed, r.done)
- case reqAdd:
+ if r.done != nil {
+ w.syncDone = append(w.syncDone, r.done)
+ }
+ case reqWatch:
for _, ch := range w.watches[r.key] {
if ch == r.ch {
panic("adding channel twice for same key")
@@ -260,7 +274,7 @@
w.watches[r.key] = append(w.watches[r.key], r.ch)
_, alive := w.beingSeq[r.key]
w.pending = append(w.pending, event{r.ch, r.key, alive})
- case reqRemove:
+ case reqUnwatch:
watches := w.watches[r.key]
for i, ch := range watches {
if ch == r.ch {
@@ -294,11 +308,11 @@
Dead map[string]int64 ",omitempty"
}
-// refresh updates the watcher knowledge from the database, and
+// sync updates the watcher knowledge from the database, and
// queues events to observing channels. It fetches the last two time
// slots and compares the union of both to the in-memory state.
-func (w *Watcher) refresh() error {
- log.Debugf("presence: refreshing watcher knowledge from database...")
+func (w *Watcher) sync() error {
+ log.Debugf("presence: synchronizing watcher knowledge with database...")
slot := timeSlot(time.Now(), w.delta)
var ping []pingInfo
err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping)
« no previous file with comments | « mstate/open.go ('k') | mstate/presence/presence_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b