| OLD | NEW |
| 1 package watcher | 1 package watcher |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "fmt" | 4 "fmt" |
| 5 "launchpad.net/gozk/zookeeper" | 5 "launchpad.net/gozk/zookeeper" |
| 6 "launchpad.net/tomb" | 6 "launchpad.net/tomb" |
| 7 ) | 7 ) |
| 8 | 8 |
| 9 // ErrStopper is implemented by all watchers. | 9 // ErrStopper is implemented by all watchers. |
| 10 type ErrStopper interface { | 10 type ErrStopper interface { |
| (...skipping 28 matching lines...) Expand all Loading... |
| 39 Exists bool | 39 Exists bool |
| 40 Version int | 40 Version int |
| 41 Content string | 41 Content string |
| 42 } | 42 } |
| 43 | 43 |
| 44 // ContentWatcher observes a ZooKeeper node and delivers a | 44 // ContentWatcher observes a ZooKeeper node and delivers a |
| 45 // notification when a content change is detected. | 45 // notification when a content change is detected. |
| 46 type ContentWatcher struct { | 46 type ContentWatcher struct { |
| 47 zk *zookeeper.Conn | 47 zk *zookeeper.Conn |
| 48 path string | 48 path string |
| 49 tomb tomb.Tomb | |
| 50 changeChan chan ContentChange | 49 changeChan chan ContentChange |
| 51 emittedValue bool | 50 emittedValue bool |
| 52 content ContentChange | 51 content ContentChange |
| 52 err error |
| 53 stop <-chan struct{} |
| 53 } | 54 } |
| 54 | 55 |
| 55 // NewContentWatcher creates a ContentWatcher observing | 56 // NewContentWatcher creates a ContentWatcher observing |
| 56 // the ZooKeeper node at watchedPath. | 57 // the ZooKeeper node at watchedPath. |
| 57 func NewContentWatcher(zk *zookeeper.Conn, watchedPath string) *ContentWatcher { | 58 func NewContentWatcher(zk *zookeeper.Conn, watchedPath string, stop <-chan struc
t{}) *ContentWatcher { |
| 58 w := &ContentWatcher{ | 59 w := &ContentWatcher{ |
| 59 zk: zk, | 60 zk: zk, |
| 60 path: watchedPath, | 61 path: watchedPath, |
| 61 changeChan: make(chan ContentChange), | 62 changeChan: make(chan ContentChange), |
| 63 stop: stop, |
| 62 } | 64 } |
| 63 go w.loop() | 65 go w.loop() |
| 64 return w | 66 return w |
| 65 } | 67 } |
| 66 | 68 |
| 67 // Changes returns a channel that will receive the new node | 69 // Changes returns a channel that will receive the new node |
| 68 // content when a change is detected. Note that multiple | 70 // content when a change is detected. Note that multiple |
| 69 // changes may be observed as a single event in the channel. | 71 // changes may be observed as a single event in the channel. |
| 70 // The first event on the channel holds the initial state. | 72 // The first event on the channel holds the initial state. |
| 71 func (w *ContentWatcher) Changes() <-chan ContentChange { | 73 func (w *ContentWatcher) Changes() <-chan ContentChange { |
| 72 return w.changeChan | 74 return w.changeChan |
| 73 } | 75 } |
| 74 | 76 |
| 75 // Dying returns a channel that is closed when the | 77 // Wait reads all remaining changes and returns |
| 76 // watcher has stopped or is about to stop. | 78 // any error the watcher encounted while running. |
| 77 func (w *ContentWatcher) Dying() <-chan struct{} { | 79 func (w *ContentWatcher) Wait() error { |
| 78 » return w.tomb.Dying() | 80 » for _ = range w.changeChan { |
| 79 } | 81 » } |
| 80 | 82 » return w.err |
| 81 // Err returns the error that stopped the watcher, or | |
| 82 // tomb.ErrStillAlive if the watcher is still running. | |
| 83 func (w *ContentWatcher) Err() error { | |
| 84 » return w.tomb.Err() | |
| 85 } | |
| 86 | |
| 87 // Stop stops the watch and returns any error encountered | |
| 88 // while watching. This method should always be called before | |
| 89 // discarding the watcher. | |
| 90 func (w *ContentWatcher) Stop() error { | |
| 91 » w.tomb.Kill(nil) | |
| 92 » return w.tomb.Wait() | |
| 93 } | 83 } |
| 94 | 84 |
| 95 // loop is the backend for watching. | 85 // loop is the backend for watching. |
| 96 func (w *ContentWatcher) loop() { | 86 func (w *ContentWatcher) loop() { |
| 97 defer w.tomb.Done() | |
| 98 defer close(w.changeChan) | 87 defer close(w.changeChan) |
| 99 | 88 |
| 100 watch, err := w.update() | 89 watch, err := w.update() |
| 101 if err != nil { | 90 if err != nil { |
| 102 » » w.tomb.Kill(err) | 91 » » w.err = err |
| 103 return | 92 return |
| 104 } | 93 } |
| 105 | 94 |
| 106 for { | 95 for { |
| 107 select { | 96 select { |
| 108 » » case <-w.tomb.Dying(): | 97 » » case <-w.stop: |
| 109 return | 98 return |
| 110 case evt := <-watch: | 99 case evt := <-watch: |
| 111 if !evt.Ok() { | 100 if !evt.Ok() { |
| 112 » » » » w.tomb.Killf("watcher: critical session event: %
v", evt) | 101 » » » » w.err = fmt.Errorf("watcher: critical session ev
ent: %v", evt) |
| 113 return | 102 return |
| 114 } | 103 } |
| 115 watch, err = w.update() | 104 watch, err = w.update() |
| 116 if err != nil { | 105 if err != nil { |
| 117 » » » » w.tomb.Kill(err) | 106 » » » » w.err = err |
| 118 return | 107 return |
| 119 } | 108 } |
| 120 } | 109 } |
| 121 } | 110 } |
| 122 } | 111 } |
| 123 | 112 |
| 124 // update retrieves the node content and emits it as well as an existence | 113 // update retrieves the node content and emits it as well as an existence |
| 125 // flag to the change channel if it has changed. It returns the next watch. | 114 // flag to the change channel if it has changed. It returns the next watch. |
| 126 func (w *ContentWatcher) update() (nextWatch <-chan zookeeper.Event, err error)
{ | 115 func (w *ContentWatcher) update() (nextWatch <-chan zookeeper.Event, err error)
{ |
| 127 var content string | 116 var content string |
| (...skipping 24 matching lines...) Expand all Loading... |
| 152 newContent := ContentChange{} | 141 newContent := ContentChange{} |
| 153 if stat != nil { | 142 if stat != nil { |
| 154 newContent.Exists = true | 143 newContent.Exists = true |
| 155 newContent.Version = stat.Version() | 144 newContent.Version = stat.Version() |
| 156 newContent.Content = content | 145 newContent.Content = content |
| 157 } | 146 } |
| 158 if w.emittedValue && newContent == w.content { | 147 if w.emittedValue && newContent == w.content { |
| 159 return nextWatch, nil | 148 return nextWatch, nil |
| 160 } | 149 } |
| 161 w.content = newContent | 150 w.content = newContent |
| 162 » select { | 151 » w.changeChan <- w.content |
| 163 » case <-w.tomb.Dying(): | 152 » w.emittedValue = true |
| 164 » » return nil, tomb.ErrDying | |
| 165 » case w.changeChan <- w.content: | |
| 166 » » w.emittedValue = true | |
| 167 » } | |
| 168 return nextWatch, nil | 153 return nextWatch, nil |
| 169 } | 154 } |
| 170 | 155 |
| 171 // ChildrenChange contains information about | 156 // ChildrenChange contains information about |
| 172 // children that have been added or removed. | 157 // children that have been added or removed. |
| 173 type ChildrenChange struct { | 158 type ChildrenChange struct { |
| 174 Added []string | 159 Added []string |
| 175 Removed []string | 160 Removed []string |
| 176 } | 161 } |
| 177 | 162 |
| (...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 302 return | 287 return |
| 303 } | 288 } |
| 304 select { | 289 select { |
| 305 case <-w.tomb.Dying(): | 290 case <-w.tomb.Dying(): |
| 306 return nil, tomb.ErrDying | 291 return nil, tomb.ErrDying |
| 307 case w.changeChan <- change: | 292 case w.changeChan <- change: |
| 308 w.emittedValue = true | 293 w.emittedValue = true |
| 309 } | 294 } |
| 310 return | 295 return |
| 311 } | 296 } |
| OLD | NEW |