| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package watcher | |
| 2 | |
| 3 import ( | |
| 4 "launchpad.net/gozk/zookeeper" | |
| 5 "launchpad.net/tomb" | |
| 6 "sort" | |
| 7 ) | |
| 8 | |
| 9 // updater defines the interface that has to be implemented | |
| 10 // by the concrete watchers to handle and deliver node· | |
| 11 // updates individually. | |
|
niemeyer
2012/03/28 00:28:59
There's some mix up going on which would be good t
TheMue
2012/03/28 11:12:57
Done.
| |
| 12 type updater interface { | |
| 13 // update checks if the observed data has changed. Only in case· | |
| 14 // of a change the new data will be sent to a receiver. The methods | |
| 15 // returns the next watch and a continuation flag to the watcher. | |
| 16 // The latter will be set to false in case of an error or the | |
| 17 // receiving of a stop signal. | |
| 18 update(eventType zookeeper.Event) (nextWatch <-chan zookeeper.Event, con t bool) | |
| 19 // close is called if the watcher ends its work. | |
| 20 close() | |
| 21 } | |
| 22 | |
| 23 // watcher provides the backend goroutine handling for the | |
| 24 // observation of ZooKeeper node changes. The concrete handling | |
| 25 // has to be done by the updater. | |
| 26 type watcher struct { | |
| 27 tomb tomb.Tomb | |
| 28 updater updater | |
| 29 } | |
| 30 | |
| 31 // init assigns the updater and then starts the watching loop. | |
| 32 func (w *watcher) init(updater updater, event zookeeper.Event) { | |
| 33 w.updater = updater | |
| 34 go w.loop(event) | |
| 35 } | |
| 36 | |
| 37 // Stop ends the watching. | |
| 38 func (w *watcher) Stop() error { | |
| 39 w.tomb.Kill(nil) | |
| 40 return w.tomb.Wait() | |
| 41 } | |
| 42 | |
| 43 // loop is the backend for watching. | |
| 44 func (w *watcher) loop(event zookeeper.Event) { | |
| 45 defer w.tomb.Done() | |
| 46 defer w.updater.close() | |
| 47 | |
| 48 watch, cont := w.updater.update(event) | |
| 49 if !cont { | |
| 50 return | |
| 51 } | |
| 52 // Fire an initial event. | |
| 53 // watch := func() <-chan zookeeper.Event { | |
|
niemeyer
2012/03/28 00:28:59
The dead code may be removed.
TheMue
2012/03/28 11:12:57
Done.
| |
| 54 // eventChan := make(chan zookeeper.Event, 1) | |
| 55 // eventChan <- zookeeper.Event{ | |
| 56 // State: zookeeper.STATE_CONNECTED, | |
| 57 // Type: zookeeper.EVENT_CHANGED, | |
| 58 // } | |
| 59 // return eventChan | |
| 60 // }() | |
| 61 | |
| 62 for { | |
| 63 select { | |
| 64 case <-w.tomb.Dying(): | |
| 65 return | |
| 66 case evt := <-watch: | |
| 67 if !evt.Ok() { | |
| 68 w.tomb.Killf("watcher: critical session event: % v", evt) | |
| 69 return | |
| 70 } | |
| 71 watch, cont = w.updater.update(event) | |
| 72 if !cont { | |
| 73 return | |
| 74 } | |
| 75 } | |
| 76 } | |
| 77 } | |
| 78 | |
| 79 // ContentWatcher observes a ZooKeeper node for changes of the content | |
| 80 // and delivers those via the Change() method. | |
|
niemeyer
2012/03/28 00:28:59
// ContentWatcher observes a ZooKeeper node and de
TheMue
2012/03/28 11:12:57
Done.
| |
| 81 type ContentWatcher struct { | |
| 82 zk *zookeeper.Conn | |
| 83 path string | |
| 84 changeChan chan string | |
| 85 content string | |
| 86 watcher | |
| 87 } | |
| 88 | |
| 89 // NewContentWatcher creates a new content watcher. | |
|
niemeyer
2012/03/28 00:28:59
This comment may be dropped.
TheMue
2012/03/28 11:12:57
Done.
| |
| 90 func NewContentWatcher(zk *zookeeper.Conn, path string) (*ContentWatcher, error) { | |
| 91 w := &ContentWatcher{ | |
| 92 zk: zk, | |
| 93 path: path, | |
| 94 changeChan: make(chan string), | |
| 95 } | |
| 96 evt := zookeeper.Event{ | |
| 97 State: zookeeper.STATE_CONNECTED, | |
| 98 Type: zookeeper.EVENT_CHANGED, | |
| 99 } | |
| 100 w.watcher.init(w, evt) | |
| 101 return w, nil | |
|
niemeyer
2012/03/28 00:28:59
If there are no possibilities of error, we don't n
TheMue
2012/03/28 11:12:57
Done.
| |
| 102 } | |
| 103 | |
| 104 // Changes emits the content of the node on the | |
| 105 // returned channel each time it changes. | |
|
niemeyer
2012/03/28 00:28:59
// Changes returns a channel that will receive the
TheMue
2012/03/28 11:12:57
Done.
| |
| 106 func (w *ContentWatcher) Changes() <-chan string { | |
| 107 return w.changeChan | |
| 108 } | |
| 109 | |
| 110 // update is documented in the updater interface above. For the | |
| 111 // ContentWatcher it retrieves the nodes content as string and | |
| 112 // emits changed content via the changeChan. | |
|
niemeyer
2012/03/28 00:28:59
Needs a new comment, and the comment should not me
TheMue
2012/03/28 11:12:57
Done.
| |
| 113 func (w *ContentWatcher) update(event zookeeper.Event) (nextWatch <-chan zookeep er.Event, cont bool) { | |
|
niemeyer
2012/03/28 00:28:59
The use of "cont" in those methods feels a bit non
TheMue
2012/03/28 11:12:57
Done.
| |
| 114 if event.Type == zookeeper.EVENT_DELETED { | |
| 115 return nil, false | |
| 116 } | |
| 117 content, _, watch, err := w.zk.GetW(w.path) | |
| 118 if err != nil { | |
| 119 w.tomb.Kill(err) | |
| 120 return nil, false | |
| 121 } | |
| 122 if content == w.content { | |
| 123 return watch, true | |
| 124 } | |
| 125 w.content = content | |
| 126 select { | |
| 127 case <-w.tomb.Dying(): | |
| 128 return nil, false | |
| 129 case w.changeChan <- w.content: | |
| 130 } | |
| 131 return watch, true | |
| 132 } | |
| 133 | |
| 134 // close is documented in the updater interface above. For the | |
| 135 // ContentWatcher it just closes the changeChan. | |
|
niemeyer
2012/03/28 00:28:59
Observe:
// For the ContentWatcher it just clos
TheMue
2012/03/28 11:12:57
Done.
| |
| 136 func (w *ContentWatcher) close() { | |
| 137 close(w.changeChan) | |
| 138 } | |
| 139 | |
| 140 // ChildrenChange contains information about | |
| 141 // children that have been created or deleted. | |
| 142 type ChildrenChange struct { | |
| 143 // Del holds names of children that have been deleted. | |
| 144 Del []string | |
| 145 // New holds names of children that have been created. | |
| 146 New []string | |
|
niemeyer
2012/03/28 00:28:59
s/Del/Removed/
s/New/Added/
And drop comments ple
TheMue
2012/03/28 11:12:57
Done.
| |
| 147 } | |
| 148 | |
| 149 // ChildrenWatcher observes a ZooKeeper node for changes of the | |
| 150 // children and delivers those via the Change() method. | |
|
niemeyer
2012/03/28 00:28:59
// ChildrenWatcher observes a ZooKeeper node and d
TheMue
2012/03/28 11:12:57
Done.
| |
| 151 type ChildrenWatcher struct { | |
| 152 zk *zookeeper.Conn | |
| 153 path string | |
| 154 changeChan chan ChildrenChange | |
| 155 children map[string]bool | |
| 156 watcher | |
| 157 } | |
| 158 | |
| 159 // NewWatcher creates a new watcher. | |
|
niemeyer
2012/03/28 00:28:59
Comment is wrong, but may actually be dropped.
TheMue
2012/03/28 11:12:57
Done.
| |
| 160 func NewChildrenWatcher(zk *zookeeper.Conn, path string) (*ChildrenWatcher, erro r) { | |
| 161 w := &ChildrenWatcher{ | |
| 162 zk: zk, | |
| 163 path: path, | |
| 164 changeChan: make(chan ChildrenChange), | |
| 165 children: make(map[string]bool), | |
| 166 } | |
| 167 evt := zookeeper.Event{ | |
| 168 State: zookeeper.STATE_CONNECTED, | |
| 169 Type: zookeeper.EVENT_CHILD, | |
| 170 } | |
| 171 w.watcher.init(w, evt) | |
| 172 return w, nil | |
|
niemeyer
2012/03/28 00:28:59
If there are no errors, no need for an error resul
TheMue
2012/03/28 11:12:57
Done.
| |
| 173 } | |
| 174 | |
| 175 // Changes emits the deleted and the new created children of· | |
| 176 // the node on the returned channel each time they are changing. | |
|
niemeyer
2012/03/28 00:28:59
// Changes returns a channel that will receive the
TheMue
2012/03/28 11:12:57
Done.
| |
| 177 func (w *ChildrenWatcher) Changes() <-chan ChildrenChange { | |
| 178 return w.changeChan | |
| 179 } | |
| 180 | |
| 181 // update is documented in the updater interface above. For the | |
| 182 // ChildrenWatcher it retrieves the nodes children, checks which | |
| 183 // are added or deleted and emits these changes via the changeChan. | |
|
niemeyer
2012/03/28 00:28:59
Needs a new comment, and the comment should not me
TheMue
2012/03/28 11:12:57
Done.
| |
| 184 func (w *ChildrenWatcher) update(event zookeeper.Event) (nextWatch <-chan zookee per.Event, cont bool) { | |
|
niemeyer
2012/03/28 00:28:59
Please return error rather than bool, and fix appr
TheMue
2012/03/28 11:12:57
Done.
| |
| 185 if event.Type == zookeeper.EVENT_DELETED { | |
| 186 return nil, false | |
| 187 } | |
| 188 retrievedChildren, _, watch, err := w.zk.ChildrenW(w.path) | |
| 189 if err != nil { | |
| 190 w.tomb.Kill(err) | |
| 191 return nil, false | |
| 192 } | |
| 193 children := make(map[string]bool) | |
| 194 for _, child := range retrievedChildren { | |
| 195 children[child] = true | |
| 196 } | |
| 197 var change ChildrenChange | |
| 198 for child, _ := range w.children { | |
| 199 if !children[child] { | |
| 200 change.Del = append(change.Del, child) | |
| 201 delete(w.children, child) | |
| 202 } | |
| 203 } | |
| 204 for child, _ := range children { | |
| 205 if !w.children[child] { | |
| 206 change.New = append(change.New, child) | |
| 207 w.children[child] = true | |
| 208 } | |
| 209 } | |
| 210 if len(change.Del) == 0 && len(change.New) == 0 { | |
| 211 return watch, true | |
| 212 } | |
| 213 sort.Strings(change.Del) | |
| 214 sort.Strings(change.New) | |
| 215 select { | |
| 216 case <-w.tomb.Dying(): | |
| 217 return nil, false | |
| 218 case w.changeChan <- change: | |
| 219 } | |
| 220 return watch, true | |
| 221 } | |
| 222 | |
| 223 // close is documented in the updater interface above. For the | |
| 224 // ChildrenWatcher it just closes the changeChan. | |
|
niemeyer
2012/03/28 00:28:59
Comment needs love.
TheMue
2012/03/28 11:12:57
Done.
| |
| 225 func (w *ChildrenWatcher) close() { | |
| 226 close(w.changeChan) | |
| 227 } | |
| OLD | NEW |