Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(454)

Side by Side Diff: mstate/watcher/watcher.go

Issue 6503086: mstate/watcher: new foundation for watchers
Patch Set: mstate/watcher: new foundation for watchers Created 5 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mstate/watcher/export_test.go ('k') | mstate/watcher/watcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // The watcher package provides an interface for observing changes
2 // to arbitrary MongoDB documents that are maintained via the
3 // mgo/txn transaction package.
4 package watcher
5
6 import (
7 "fmt"
8 "labix.org/v2/mgo"
9 "labix.org/v2/mgo/bson"
10 "launchpad.net/juju-core/log"
11 "launchpad.net/tomb"
12 "time"
13 )
14
15 // A Watcher can watch any number of collections and documents for changes.
16 type Watcher struct {
17 tomb tomb.Tomb
18 log *mgo.Collection
19
20 // watches holds the observers managed by Watch/Unwatch.
21 watches map[watchKey][]watchInfo
22
23 // current holds the current txn-revno values for all the observed
24 // documents known to exist. Documents not observed or deleted are
25 // omitted from this map and are considered to have revno -1.
26 current map[watchKey]int64
27
28 // syncEvents and requestEvents contain the events to be
29 // dispatched to the watcher channels. They're queued during
30 // processing and flushed at the end to simplify the algorithm.
31 // The two queues are separated because events from sync are
32 // handled in reverse order due to the way the algorithm works.
33 syncEvents, requestEvents []event
34
35 // request is used to deliver requests from the public API into
36 // the the goroutine loop.
37 request chan interface{}
38
39 // syncDone contains pending done channels from sync requests.
40 syncDone []chan bool
41
42 // lastId is the most recent transaction id observed by a sync.
43 lastId interface{}
44
45 // next will dispatch when it's time to sync the database
46 // knowledge. It's maintained here so that Sync and StartSync
47 // can manipulate it to force a sync sooner.
48 next <-chan time.Time
49 }
50
51 // A Change holds information about a document change.
52 type Change struct {
53 // C and Id hold the collection name and document _id field value.
54 C string
55 Id interface{}
56
57 // Revno is the latest known value for the document's txn-revno
58 // field, or -1 if the document was deleted.
59 Revno int64
60 }
61
62 type watchKey struct {
63 c string
64 id interface{} // nil when watching collection
65 }
66
67 type watchInfo struct {
68 ch chan<- Change
69 revno int64
70 }
71
72 type event struct {
73 ch chan<- Change
74 key watchKey
75 revno int64
76 }
77
78 // New returns a new Watcher observing the changelog collection,
79 // which must be a capped collection maintained by mgo/txn.
80 func New(changelog *mgo.Collection) *Watcher {
81 w := &Watcher{
82 log: changelog,
83 watches: make(map[watchKey][]watchInfo),
84 current: make(map[watchKey]int64),
85 request: make(chan interface{}),
86 }
87 go func() {
88 w.tomb.Kill(w.loop())
89 w.tomb.Done()
90 }()
91 return w
92 }
93
94 // Stop stops all the watcher activities.
95 func (w *Watcher) Stop() error {
96 w.tomb.Kill(nil)
97 return w.tomb.Wait()
98 }
99
100 type reqWatch struct {
101 key watchKey
102 info watchInfo
103 }
104
105 type reqUnwatch struct {
106 key watchKey
107 ch chan<- Change
108 }
109
110 type reqSync struct {
111 done chan bool
112 }
113
114 // Watch starts watching the given collection and document id.
115 // An event will be sent onto ch whenever a matching document's txn-revno
116 // field is observed to change after a transaction is applied. The revno
117 // parameter informs the currently known revision number for the document.
118 // Non-existing documents are represented by a -1 revno.
119 func (w *Watcher) Watch(collection string, id interface{}, revno int64, ch chan< - Change) {
120 if id == nil {
121 panic("watcher: cannot watch a document with nil id")
122 }
123 select {
124 case w.request <- reqWatch{watchKey{collection, id}, watchInfo{ch, revno }}:
125 case <-w.tomb.Dying():
126 }
127 }
128
129 // WatchCollection starts watching the given collection.
130 // An event will be sent onto ch whenever the txn-revno field is observed
131 // to change after a transaction is applied for any document in the collection.
132 func (w *Watcher) WatchCollection(collection string, ch chan<- Change) {
133 select {
134 case w.request <- reqWatch{watchKey{collection, nil}, watchInfo{ch, 0}}:
135 case <-w.tomb.Dying():
136 }
137 }
138
139 // Unwatch stops watching the given collection and document id via ch.
140 func (w *Watcher) Unwatch(collection string, id interface{}, ch chan<- Change) {
141 if id == nil {
142 panic("watcher: cannot unwatch a document with nil id")
143 }
144 select {
145 case w.request <- reqUnwatch{watchKey{collection, id}, ch}:
146 case <-w.tomb.Dying():
147 }
148 }
149
150 // UnwatchCollection stops watching the given collection via ch.
151 func (w *Watcher) UnwatchCollection(collection string, ch chan<- Change) {
152 select {
153 case w.request <- reqUnwatch{watchKey{collection, nil}, ch}:
154 case <-w.tomb.Dying():
155 }
156 }
157
158 // StartSync forces the watcher to load new events from the database.
159 func (w *Watcher) StartSync() {
160 select {
161 case w.request <- reqSync{nil}:
162 case <-w.tomb.Dying():
163 }
164 }
165
166 // Sync forces the watcher to load new events from the database and blocks
167 // until all events have been dispatched.
168 func (w *Watcher) Sync() {
aram 2012/09/10 12:47:13 I don't understand the purpose of this function. I
niemeyer 2012/09/10 12:58:53 Testing is its main purpose, and its semantics see
aram 2012/09/10 13:02:31 Ah, sure, testing is fine, I thought it's a regula
niemeyer 2012/09/10 13:04:38 I believe I fixed up the spurious error you were s
169 done := make(chan bool)
170 select {
171 case w.request <- reqSync{done}:
172 case <-w.tomb.Dying():
173 }
174 select {
175 case <-done:
176 case <-w.tomb.Dying():
177 }
178 }
179
180 // period is the delay between each sync.
181 var period time.Duration = 5 * time.Second
182
183 // loop implements the main watcher loop.
184 func (w *Watcher) loop() error {
185 w.next = time.After(0)
186 if err := w.initLastId(); err != nil {
187 return err
188 }
189 for {
190 select {
191 case <-w.tomb.Dying():
192 return tomb.ErrDying
193 case <-w.next:
194 w.next = time.After(period)
195 syncDone := w.syncDone
196 w.syncDone = nil
197 if err := w.sync(); err != nil {
198 return err
199 }
200 w.flush()
201 for _, done := range syncDone {
202 close(done)
203 }
204 case req := <-w.request:
205 w.handle(req)
206 w.flush()
207 }
208 }
209 return nil
210 }
211
212 // flush sends all pending events to their respective channels.
213 func (w *Watcher) flush() {
214 // refreshEvents are stored newest first.
215 for i := len(w.syncEvents)-1; i >= 0; i-- {
216 e := &w.syncEvents[i]
217 for e.ch != nil {
218 select {
219 case <-w.tomb.Dying():
220 return
221 case req := <-w.request:
222 w.handle(req)
223 continue
224 case e.ch <- Change{e.key.c, e.key.id, e.revno}:
225 }
226 break
227 }
228 }
229 // requestEvents are stored oldest first, and
230 // may grow during the loop.
231 for i := 0; i < len(w.requestEvents); i++ {
232 e := &w.requestEvents[i]
233 for e.ch != nil {
234 select {
235 case <-w.tomb.Dying():
236 return
237 case req := <-w.request:
238 w.handle(req)
239 continue
240 case e.ch <- Change{e.key.c, e.key.id, e.revno}:
241 }
242 break
243 }
244 }
245 w.syncEvents = w.syncEvents[:0]
246 w.requestEvents = w.requestEvents[:0]
247 }
248
249 // handle deals with requests delivered by the public API
250 // onto the background watcher goroutine.
251 func (w *Watcher) handle(req interface{}) {
252 log.Debugf("watcher: got request: %#v", req)
253 switch r := req.(type) {
254 case reqSync:
255 w.next = time.After(0)
256 if r.done != nil {
257 w.syncDone = append(w.syncDone, r.done)
258 }
259 case reqWatch:
260 for _, info := range w.watches[r.key] {
261 if info.ch == r.info.ch {
262 panic("adding channel twice for the same collect ion/document")
263 }
264 }
265 if revno, ok := w.current[r.key]; ok && (revno > r.info.revno || revno == -1 && r.info.revno >= 0) {
266 r.info.revno = revno
267 w.requestEvents = append(w.requestEvents, event{r.info.c h, r.key, revno})
268 }
269 w.watches[r.key] = append(w.watches[r.key], r.info)
270 case reqUnwatch:
271 watches := w.watches[r.key]
272 for i, info := range watches {
273 if info.ch == r.ch {
274 watches[i] = watches[len(watches)-1]
275 w.watches[r.key] = watches[:len(watches)-1]
276 break
277 }
278 }
279 for i := range w.requestEvents {
280 e := &w.requestEvents[i]
281 if e.key == r.key && e.ch == r.ch {
282 e.ch = nil
283 }
284 }
285 for i := range w.syncEvents {
286 e := &w.syncEvents[i]
287 if e.key == r.key && e.ch == r.ch {
288 e.ch = nil
289 }
290 }
291 default:
292 panic(fmt.Errorf("unknown request: %T", req))
293 }
294 }
295
296 type logInfo struct {
297 Docs []interface{} `bson:"d"`
298 Revnos []int64 `bson:"r"`
299 }
300
301 // initLastId reads the most recent changelog document and initializes
302 // lastId with it. This causes all history that precedes the creation
303 // of the watcher to be ignored.
304 func (w *Watcher) initLastId() error {
305 log.Debugf("watcher: reading most recent document to ignore past history ...")
306 var entry struct {
307 Id interface{} "_id"
308 }
309 err := w.log.Find(nil).Sort("-$natural").One(&entry)
310 if err != nil && err != mgo.ErrNotFound {
311 return err
312 }
313 w.lastId = entry.Id
314 return nil
315 }
316
317 // sync updates the watcher knowledge from the database, and
318 // queues events to observing channels.
319 func (w *Watcher) sync() error {
320 log.Debugf("watcher: loading new events from changelog collection...")
321 iter := w.log.Find(nil).Batch(10).Sort("-$natural").Iter()
322 seen := make(map[watchKey]bool)
323 first := true
324 lastId := w.lastId
325 var entry bson.D
326 for iter.Next(&entry) {
327 if len(entry) == 0 {
328 log.Debugf("watcher: got empty changelog document")
329 continue
330 }
331 id := entry[0]
332 if id.Name != "_id" {
333 panic("watcher: _id field isn't first entry")
334 }
335 if first {
336 w.lastId = id.Value
337 first = false
338 }
339 if id.Value == lastId {
340 break
341 }
342 log.Debugf("watcher: got changelog document: %#v", entry)
343 for _, c := range entry[1:] {
344 // See txn's Runner.ChangeLog for the structure of log e ntries.
345 var d, r []interface{}
346 dr, _ := c.Value.(bson.D)
347 for _, item := range dr {
348 switch item.Name {
349 case "d":
350 d, _ = item.Value.([]interface{})
351 case "r":
352 r, _ = item.Value.([]interface{})
353 }
354 }
355 if len(d) == 0 || len(d) != len(r) {
356 log.Printf("watcher: changelog has invalid colle ction document: %#v", c)
357 continue
358 }
359 for i := len(d) - 1; i >= 0; i-- {
360 key := watchKey{c.Name, d[i]}
361 if seen[key] {
362 continue
363 }
364 seen[key] = true
365 revno, ok := r[i].(int64)
366 if !ok {
367 log.Printf("watcher: changelog has revno with type %T: %#v", r[i], r[i])
368 continue
369 }
370 if revno < 0 {
371 revno = -1
372 }
373 if w.current[key] == revno {
374 continue
375 }
376 w.current[key] = revno
377 // Queue notifications for per-collection watche s.
378 for _, info := range w.watches[watchKey{c.Name, nil}] {
379 w.syncEvents = append(w.syncEvents, even t{info.ch, key, revno})
380 }
381 // Queue notifications for per-document watches.
382 infos := w.watches[key]
383 for i, info := range infos {
384 if revno > info.revno || revno < 0 && in fo.revno >= 0 {
385 infos[i].revno = revno
386 w.syncEvents = append(w.syncEven ts, event{info.ch, key, revno})
387 }
388 }
389 }
390 }
391 }
392 if iter.Err() != nil {
393 return fmt.Errorf("watcher iteration error: %v", iter.Err())
394 }
395 return nil
396 }
OLDNEW
« no previous file with comments | « mstate/watcher/export_test.go ('k') | mstate/watcher/watcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d