| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // The watcher package provides an interface for observing changes | |
| 2 // to arbitrary MongoDB documents that are maintained via the | |
| 3 // mgo/txn transaction package. | |
| 4 package watcher | |
| 5 | |
| 6 import ( | |
| 7 "fmt" | |
| 8 "labix.org/v2/mgo" | |
| 9 "labix.org/v2/mgo/bson" | |
| 10 "launchpad.net/juju-core/log" | |
| 11 "launchpad.net/tomb" | |
| 12 "time" | |
| 13 ) | |
| 14 | |
| 15 // A Watcher can watch any number of collections and documents for changes. | |
| 16 type Watcher struct { | |
| 17 tomb tomb.Tomb | |
| 18 log *mgo.Collection | |
| 19 | |
| 20 // watches holds the observers managed by Watch/Unwatch. | |
| 21 watches map[watchKey][]watchInfo | |
| 22 | |
| 23 // current holds the current txn-revno values for all the observed | |
| 24 // documents known to exist. Documents not observed or deleted are | |
| 25 // omitted from this map and are considered to have revno -1. | |
| 26 current map[watchKey]int64 | |
| 27 | |
| 28 // syncEvents and requestEvents contain the events to be | |
| 29 // dispatched to the watcher channels. They're queued during | |
| 30 // processing and flushed at the end to simplify the algorithm. | |
| 31 // The two queues are separated because events from sync are | |
| 32 // handled in reverse order due to the way the algorithm works. | |
| 33 syncEvents, requestEvents []event | |
| 34 | |
| 35 // request is used to deliver requests from the public API into | |
| 36 // the the goroutine loop. | |
| 37 request chan interface{} | |
| 38 | |
| 39 // syncDone contains pending done channels from sync requests. | |
| 40 syncDone []chan bool | |
| 41 | |
| 42 // lastId is the most recent transaction id observed by a sync. | |
| 43 lastId interface{} | |
| 44 | |
| 45 // next will dispatch when it's time to sync the database | |
| 46 // knowledge. It's maintained here so that Sync and StartSync | |
| 47 // can manipulate it to force a sync sooner. | |
| 48 next <-chan time.Time | |
| 49 } | |
| 50 | |
| 51 // A Change holds information about a document change. | |
| 52 type Change struct { | |
| 53 // C and Id hold the collection name and document _id field value. | |
| 54 C string | |
| 55 Id interface{} | |
| 56 | |
| 57 // Revno is the latest known value for the document's txn-revno | |
| 58 // field, or -1 if the document was deleted. | |
| 59 Revno int64 | |
| 60 } | |
| 61 | |
| 62 type watchKey struct { | |
| 63 c string | |
| 64 id interface{} // nil when watching collection | |
| 65 } | |
| 66 | |
| 67 type watchInfo struct { | |
| 68 ch chan<- Change | |
| 69 revno int64 | |
| 70 } | |
| 71 | |
| 72 type event struct { | |
| 73 ch chan<- Change | |
| 74 key watchKey | |
| 75 revno int64 | |
| 76 } | |
| 77 | |
| 78 // New returns a new Watcher observing the changelog collection, | |
| 79 // which must be a capped collection maintained by mgo/txn. | |
| 80 func New(changelog *mgo.Collection) *Watcher { | |
| 81 w := &Watcher{ | |
| 82 log: changelog, | |
| 83 watches: make(map[watchKey][]watchInfo), | |
| 84 current: make(map[watchKey]int64), | |
| 85 request: make(chan interface{}), | |
| 86 } | |
| 87 go func() { | |
| 88 w.tomb.Kill(w.loop()) | |
| 89 w.tomb.Done() | |
| 90 }() | |
| 91 return w | |
| 92 } | |
| 93 | |
| 94 // Stop stops all the watcher activities. | |
| 95 func (w *Watcher) Stop() error { | |
| 96 w.tomb.Kill(nil) | |
| 97 return w.tomb.Wait() | |
| 98 } | |
| 99 | |
| 100 type reqWatch struct { | |
| 101 key watchKey | |
| 102 info watchInfo | |
| 103 } | |
| 104 | |
| 105 type reqUnwatch struct { | |
| 106 key watchKey | |
| 107 ch chan<- Change | |
| 108 } | |
| 109 | |
| 110 type reqSync struct { | |
| 111 done chan bool | |
| 112 } | |
| 113 | |
| 114 // Watch starts watching the given collection and document id. | |
| 115 // An event will be sent onto ch whenever a matching document's txn-revno | |
| 116 // field is observed to change after a transaction is applied. The revno | |
| 117 // parameter informs the currently known revision number for the document. | |
| 118 // Non-existing documents are represented by a -1 revno. | |
| 119 func (w *Watcher) Watch(collection string, id interface{}, revno int64, ch chan< - Change) { | |
| 120 if id == nil { | |
| 121 panic("watcher: cannot watch a document with nil id") | |
| 122 } | |
| 123 select { | |
| 124 case w.request <- reqWatch{watchKey{collection, id}, watchInfo{ch, revno }}: | |
| 125 case <-w.tomb.Dying(): | |
| 126 } | |
| 127 } | |
| 128 | |
| 129 // WatchCollection starts watching the given collection. | |
| 130 // An event will be sent onto ch whenever the txn-revno field is observed | |
| 131 // to change after a transaction is applied for any document in the collection. | |
| 132 func (w *Watcher) WatchCollection(collection string, ch chan<- Change) { | |
| 133 select { | |
| 134 case w.request <- reqWatch{watchKey{collection, nil}, watchInfo{ch, 0}}: | |
| 135 case <-w.tomb.Dying(): | |
| 136 } | |
| 137 } | |
| 138 | |
| 139 // Unwatch stops watching the given collection and document id via ch. | |
| 140 func (w *Watcher) Unwatch(collection string, id interface{}, ch chan<- Change) { | |
| 141 if id == nil { | |
| 142 panic("watcher: cannot unwatch a document with nil id") | |
| 143 } | |
| 144 select { | |
| 145 case w.request <- reqUnwatch{watchKey{collection, id}, ch}: | |
| 146 case <-w.tomb.Dying(): | |
| 147 } | |
| 148 } | |
| 149 | |
| 150 // UnwatchCollection stops watching the given collection via ch. | |
| 151 func (w *Watcher) UnwatchCollection(collection string, ch chan<- Change) { | |
| 152 select { | |
| 153 case w.request <- reqUnwatch{watchKey{collection, nil}, ch}: | |
| 154 case <-w.tomb.Dying(): | |
| 155 } | |
| 156 } | |
| 157 | |
| 158 // StartSync forces the watcher to load new events from the database. | |
| 159 func (w *Watcher) StartSync() { | |
| 160 select { | |
| 161 case w.request <- reqSync{nil}: | |
| 162 case <-w.tomb.Dying(): | |
| 163 } | |
| 164 } | |
| 165 | |
| 166 // Sync forces the watcher to load new events from the database and blocks | |
| 167 // until all events have been dispatched. | |
| 168 func (w *Watcher) Sync() { | |
|
aram
2012/09/10 12:47:13
I don't understand the purpose of this function. I
niemeyer
2012/09/10 12:58:53
Testing is its main purpose, and its semantics see
aram
2012/09/10 13:02:31
Ah, sure, testing is fine, I thought it's a regula
niemeyer
2012/09/10 13:04:38
I believe I fixed up the spurious error you were s
| |
| 169 done := make(chan bool) | |
| 170 select { | |
| 171 case w.request <- reqSync{done}: | |
| 172 case <-w.tomb.Dying(): | |
| 173 } | |
| 174 select { | |
| 175 case <-done: | |
| 176 case <-w.tomb.Dying(): | |
| 177 } | |
| 178 } | |
| 179 | |
| 180 // period is the delay between each sync. | |
| 181 var period time.Duration = 5 * time.Second | |
| 182 | |
| 183 // loop implements the main watcher loop. | |
| 184 func (w *Watcher) loop() error { | |
| 185 w.next = time.After(0) | |
| 186 if err := w.initLastId(); err != nil { | |
| 187 return err | |
| 188 } | |
| 189 for { | |
| 190 select { | |
| 191 case <-w.tomb.Dying(): | |
| 192 return tomb.ErrDying | |
| 193 case <-w.next: | |
| 194 w.next = time.After(period) | |
| 195 syncDone := w.syncDone | |
| 196 w.syncDone = nil | |
| 197 if err := w.sync(); err != nil { | |
| 198 return err | |
| 199 } | |
| 200 w.flush() | |
| 201 for _, done := range syncDone { | |
| 202 close(done) | |
| 203 } | |
| 204 case req := <-w.request: | |
| 205 w.handle(req) | |
| 206 w.flush() | |
| 207 } | |
| 208 } | |
| 209 return nil | |
| 210 } | |
| 211 | |
| 212 // flush sends all pending events to their respective channels. | |
| 213 func (w *Watcher) flush() { | |
| 214 // refreshEvents are stored newest first. | |
| 215 for i := len(w.syncEvents)-1; i >= 0; i-- { | |
| 216 e := &w.syncEvents[i] | |
| 217 for e.ch != nil { | |
| 218 select { | |
| 219 case <-w.tomb.Dying(): | |
| 220 return | |
| 221 case req := <-w.request: | |
| 222 w.handle(req) | |
| 223 continue | |
| 224 case e.ch <- Change{e.key.c, e.key.id, e.revno}: | |
| 225 } | |
| 226 break | |
| 227 } | |
| 228 } | |
| 229 // requestEvents are stored oldest first, and | |
| 230 // may grow during the loop. | |
| 231 for i := 0; i < len(w.requestEvents); i++ { | |
| 232 e := &w.requestEvents[i] | |
| 233 for e.ch != nil { | |
| 234 select { | |
| 235 case <-w.tomb.Dying(): | |
| 236 return | |
| 237 case req := <-w.request: | |
| 238 w.handle(req) | |
| 239 continue | |
| 240 case e.ch <- Change{e.key.c, e.key.id, e.revno}: | |
| 241 } | |
| 242 break | |
| 243 } | |
| 244 } | |
| 245 w.syncEvents = w.syncEvents[:0] | |
| 246 w.requestEvents = w.requestEvents[:0] | |
| 247 } | |
| 248 | |
| 249 // handle deals with requests delivered by the public API | |
| 250 // onto the background watcher goroutine. | |
| 251 func (w *Watcher) handle(req interface{}) { | |
| 252 log.Debugf("watcher: got request: %#v", req) | |
| 253 switch r := req.(type) { | |
| 254 case reqSync: | |
| 255 w.next = time.After(0) | |
| 256 if r.done != nil { | |
| 257 w.syncDone = append(w.syncDone, r.done) | |
| 258 } | |
| 259 case reqWatch: | |
| 260 for _, info := range w.watches[r.key] { | |
| 261 if info.ch == r.info.ch { | |
| 262 panic("adding channel twice for the same collect ion/document") | |
| 263 } | |
| 264 } | |
| 265 if revno, ok := w.current[r.key]; ok && (revno > r.info.revno || revno == -1 && r.info.revno >= 0) { | |
| 266 r.info.revno = revno | |
| 267 w.requestEvents = append(w.requestEvents, event{r.info.c h, r.key, revno}) | |
| 268 } | |
| 269 w.watches[r.key] = append(w.watches[r.key], r.info) | |
| 270 case reqUnwatch: | |
| 271 watches := w.watches[r.key] | |
| 272 for i, info := range watches { | |
| 273 if info.ch == r.ch { | |
| 274 watches[i] = watches[len(watches)-1] | |
| 275 w.watches[r.key] = watches[:len(watches)-1] | |
| 276 break | |
| 277 } | |
| 278 } | |
| 279 for i := range w.requestEvents { | |
| 280 e := &w.requestEvents[i] | |
| 281 if e.key == r.key && e.ch == r.ch { | |
| 282 e.ch = nil | |
| 283 } | |
| 284 } | |
| 285 for i := range w.syncEvents { | |
| 286 e := &w.syncEvents[i] | |
| 287 if e.key == r.key && e.ch == r.ch { | |
| 288 e.ch = nil | |
| 289 } | |
| 290 } | |
| 291 default: | |
| 292 panic(fmt.Errorf("unknown request: %T", req)) | |
| 293 } | |
| 294 } | |
| 295 | |
| 296 type logInfo struct { | |
| 297 Docs []interface{} `bson:"d"` | |
| 298 Revnos []int64 `bson:"r"` | |
| 299 } | |
| 300 | |
| 301 // initLastId reads the most recent changelog document and initializes | |
| 302 // lastId with it. This causes all history that precedes the creation | |
| 303 // of the watcher to be ignored. | |
| 304 func (w *Watcher) initLastId() error { | |
| 305 log.Debugf("watcher: reading most recent document to ignore past history ...") | |
| 306 var entry struct { | |
| 307 Id interface{} "_id" | |
| 308 } | |
| 309 err := w.log.Find(nil).Sort("-$natural").One(&entry) | |
| 310 if err != nil && err != mgo.ErrNotFound { | |
| 311 return err | |
| 312 } | |
| 313 w.lastId = entry.Id | |
| 314 return nil | |
| 315 } | |
| 316 | |
| 317 // sync updates the watcher knowledge from the database, and | |
| 318 // queues events to observing channels. | |
| 319 func (w *Watcher) sync() error { | |
| 320 log.Debugf("watcher: loading new events from changelog collection...") | |
| 321 iter := w.log.Find(nil).Batch(10).Sort("-$natural").Iter() | |
| 322 seen := make(map[watchKey]bool) | |
| 323 first := true | |
| 324 lastId := w.lastId | |
| 325 var entry bson.D | |
| 326 for iter.Next(&entry) { | |
| 327 if len(entry) == 0 { | |
| 328 log.Debugf("watcher: got empty changelog document") | |
| 329 continue | |
| 330 } | |
| 331 id := entry[0] | |
| 332 if id.Name != "_id" { | |
| 333 panic("watcher: _id field isn't first entry") | |
| 334 } | |
| 335 if first { | |
| 336 w.lastId = id.Value | |
| 337 first = false | |
| 338 } | |
| 339 if id.Value == lastId { | |
| 340 break | |
| 341 } | |
| 342 log.Debugf("watcher: got changelog document: %#v", entry) | |
| 343 for _, c := range entry[1:] { | |
| 344 // See txn's Runner.ChangeLog for the structure of log e ntries. | |
| 345 var d, r []interface{} | |
| 346 dr, _ := c.Value.(bson.D) | |
| 347 for _, item := range dr { | |
| 348 switch item.Name { | |
| 349 case "d": | |
| 350 d, _ = item.Value.([]interface{}) | |
| 351 case "r": | |
| 352 r, _ = item.Value.([]interface{}) | |
| 353 } | |
| 354 } | |
| 355 if len(d) == 0 || len(d) != len(r) { | |
| 356 log.Printf("watcher: changelog has invalid colle ction document: %#v", c) | |
| 357 continue | |
| 358 } | |
| 359 for i := len(d) - 1; i >= 0; i-- { | |
| 360 key := watchKey{c.Name, d[i]} | |
| 361 if seen[key] { | |
| 362 continue | |
| 363 } | |
| 364 seen[key] = true | |
| 365 revno, ok := r[i].(int64) | |
| 366 if !ok { | |
| 367 log.Printf("watcher: changelog has revno with type %T: %#v", r[i], r[i]) | |
| 368 continue | |
| 369 } | |
| 370 if revno < 0 { | |
| 371 revno = -1 | |
| 372 } | |
| 373 if w.current[key] == revno { | |
| 374 continue | |
| 375 } | |
| 376 w.current[key] = revno | |
| 377 // Queue notifications for per-collection watche s. | |
| 378 for _, info := range w.watches[watchKey{c.Name, nil}] { | |
| 379 w.syncEvents = append(w.syncEvents, even t{info.ch, key, revno}) | |
| 380 } | |
| 381 // Queue notifications for per-document watches. | |
| 382 infos := w.watches[key] | |
| 383 for i, info := range infos { | |
| 384 if revno > info.revno || revno < 0 && in fo.revno >= 0 { | |
| 385 infos[i].revno = revno | |
| 386 w.syncEvents = append(w.syncEven ts, event{info.ch, key, revno}) | |
| 387 } | |
| 388 } | |
| 389 } | |
| 390 } | |
| 391 } | |
| 392 if iter.Err() != nil { | |
| 393 return fmt.Errorf("watcher iteration error: %v", iter.Err()) | |
| 394 } | |
| 395 return nil | |
| 396 } | |
| OLD | NEW |