Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1060)

Side by Side Diff: state/watcher/watcher.go

Issue 5905064: Watcher package as base for all state watcher. (Closed)
Patch Set: Watcher package as base for all state watcher. Created 5 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | state/watcher/watcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package watcher
2
3 import (
4 "launchpad.net/gozk/zookeeper"
5 "launchpad.net/tomb"
6 "sort"
7 )
8
9 // updater defines the interface that has to be implemented
10 // by the concrete watchers to handle and deliver node·
11 // updates individually.
niemeyer 2012/03/28 00:28:59 There's some mix up going on which would be good t
TheMue 2012/03/28 11:12:57 Done.
12 type updater interface {
13 // update checks if the observed data has changed. Only in case·
14 // of a change the new data will be sent to a receiver. The methods
15 // returns the next watch and a continuation flag to the watcher.
16 // The latter will be set to false in case of an error or the
17 // receiving of a stop signal.
18 update(eventType zookeeper.Event) (nextWatch <-chan zookeeper.Event, con t bool)
19 // close is called if the watcher ends its work.
20 close()
21 }
22
23 // watcher provides the backend goroutine handling for the
24 // observation of ZooKeeper node changes. The concrete handling
25 // has to be done by the updater.
26 type watcher struct {
27 tomb tomb.Tomb
28 updater updater
29 }
30
31 // init assigns the updater and then starts the watching loop.
32 func (w *watcher) init(updater updater, event zookeeper.Event) {
33 w.updater = updater
34 go w.loop(event)
35 }
36
37 // Stop ends the watching.
38 func (w *watcher) Stop() error {
39 w.tomb.Kill(nil)
40 return w.tomb.Wait()
41 }
42
43 // loop is the backend for watching.
44 func (w *watcher) loop(event zookeeper.Event) {
45 defer w.tomb.Done()
46 defer w.updater.close()
47
48 watch, cont := w.updater.update(event)
49 if !cont {
50 return
51 }
52 // Fire an initial event.
53 // watch := func() <-chan zookeeper.Event {
niemeyer 2012/03/28 00:28:59 The dead code may be removed.
TheMue 2012/03/28 11:12:57 Done.
54 // eventChan := make(chan zookeeper.Event, 1)
55 // eventChan <- zookeeper.Event{
56 // State: zookeeper.STATE_CONNECTED,
57 // Type: zookeeper.EVENT_CHANGED,
58 // }
59 // return eventChan
60 // }()
61
62 for {
63 select {
64 case <-w.tomb.Dying():
65 return
66 case evt := <-watch:
67 if !evt.Ok() {
68 w.tomb.Killf("watcher: critical session event: % v", evt)
69 return
70 }
71 watch, cont = w.updater.update(event)
72 if !cont {
73 return
74 }
75 }
76 }
77 }
78
79 // ContentWatcher observes a ZooKeeper node for changes of the content
80 // and delivers those via the Change() method.
niemeyer 2012/03/28 00:28:59 // ContentWatcher observes a ZooKeeper node and de
TheMue 2012/03/28 11:12:57 Done.
81 type ContentWatcher struct {
82 zk *zookeeper.Conn
83 path string
84 changeChan chan string
85 content string
86 watcher
87 }
88
89 // NewContentWatcher creates a new content watcher.
niemeyer 2012/03/28 00:28:59 This comment may be dropped.
TheMue 2012/03/28 11:12:57 Done.
90 func NewContentWatcher(zk *zookeeper.Conn, path string) (*ContentWatcher, error) {
91 w := &ContentWatcher{
92 zk: zk,
93 path: path,
94 changeChan: make(chan string),
95 }
96 evt := zookeeper.Event{
97 State: zookeeper.STATE_CONNECTED,
98 Type: zookeeper.EVENT_CHANGED,
99 }
100 w.watcher.init(w, evt)
101 return w, nil
niemeyer 2012/03/28 00:28:59 If there are no possibilities of error, we don't n
TheMue 2012/03/28 11:12:57 Done.
102 }
103
104 // Changes emits the content of the node on the
105 // returned channel each time it changes.
niemeyer 2012/03/28 00:28:59 // Changes returns a channel that will receive the
TheMue 2012/03/28 11:12:57 Done.
106 func (w *ContentWatcher) Changes() <-chan string {
107 return w.changeChan
108 }
109
110 // update is documented in the updater interface above. For the
111 // ContentWatcher it retrieves the nodes content as string and
112 // emits changed content via the changeChan.
niemeyer 2012/03/28 00:28:59 Needs a new comment, and the comment should not me
TheMue 2012/03/28 11:12:57 Done.
113 func (w *ContentWatcher) update(event zookeeper.Event) (nextWatch <-chan zookeep er.Event, cont bool) {
niemeyer 2012/03/28 00:28:59 The use of "cont" in those methods feels a bit non
TheMue 2012/03/28 11:12:57 Done.
114 if event.Type == zookeeper.EVENT_DELETED {
115 return nil, false
116 }
117 content, _, watch, err := w.zk.GetW(w.path)
118 if err != nil {
119 w.tomb.Kill(err)
120 return nil, false
121 }
122 if content == w.content {
123 return watch, true
124 }
125 w.content = content
126 select {
127 case <-w.tomb.Dying():
128 return nil, false
129 case w.changeChan <- w.content:
130 }
131 return watch, true
132 }
133
134 // close is documented in the updater interface above. For the
135 // ContentWatcher it just closes the changeChan.
niemeyer 2012/03/28 00:28:59 Observe: // For the ContentWatcher it just clos
TheMue 2012/03/28 11:12:57 Done.
136 func (w *ContentWatcher) close() {
137 close(w.changeChan)
138 }
139
140 // ChildrenChange contains information about
141 // children that have been created or deleted.
142 type ChildrenChange struct {
143 // Del holds names of children that have been deleted.
144 Del []string
145 // New holds names of children that have been created.
146 New []string
niemeyer 2012/03/28 00:28:59 s/Del/Removed/ s/New/Added/ And drop comments ple
TheMue 2012/03/28 11:12:57 Done.
147 }
148
149 // ChildrenWatcher observes a ZooKeeper node for changes of the
150 // children and delivers those via the Change() method.
niemeyer 2012/03/28 00:28:59 // ChildrenWatcher observes a ZooKeeper node and d
TheMue 2012/03/28 11:12:57 Done.
151 type ChildrenWatcher struct {
152 zk *zookeeper.Conn
153 path string
154 changeChan chan ChildrenChange
155 children map[string]bool
156 watcher
157 }
158
159 // NewWatcher creates a new watcher.
niemeyer 2012/03/28 00:28:59 Comment is wrong, but may actually be dropped.
TheMue 2012/03/28 11:12:57 Done.
160 func NewChildrenWatcher(zk *zookeeper.Conn, path string) (*ChildrenWatcher, erro r) {
161 w := &ChildrenWatcher{
162 zk: zk,
163 path: path,
164 changeChan: make(chan ChildrenChange),
165 children: make(map[string]bool),
166 }
167 evt := zookeeper.Event{
168 State: zookeeper.STATE_CONNECTED,
169 Type: zookeeper.EVENT_CHILD,
170 }
171 w.watcher.init(w, evt)
172 return w, nil
niemeyer 2012/03/28 00:28:59 If there are no errors, no need for an error resul
TheMue 2012/03/28 11:12:57 Done.
173 }
174
175 // Changes emits the deleted and the new created children of·
176 // the node on the returned channel each time they are changing.
niemeyer 2012/03/28 00:28:59 // Changes returns a channel that will receive the
TheMue 2012/03/28 11:12:57 Done.
177 func (w *ChildrenWatcher) Changes() <-chan ChildrenChange {
178 return w.changeChan
179 }
180
181 // update is documented in the updater interface above. For the
182 // ChildrenWatcher it retrieves the nodes children, checks which
183 // are added or deleted and emits these changes via the changeChan.
niemeyer 2012/03/28 00:28:59 Needs a new comment, and the comment should not me
TheMue 2012/03/28 11:12:57 Done.
184 func (w *ChildrenWatcher) update(event zookeeper.Event) (nextWatch <-chan zookee per.Event, cont bool) {
niemeyer 2012/03/28 00:28:59 Please return error rather than bool, and fix appr
TheMue 2012/03/28 11:12:57 Done.
185 if event.Type == zookeeper.EVENT_DELETED {
186 return nil, false
187 }
188 retrievedChildren, _, watch, err := w.zk.ChildrenW(w.path)
189 if err != nil {
190 w.tomb.Kill(err)
191 return nil, false
192 }
193 children := make(map[string]bool)
194 for _, child := range retrievedChildren {
195 children[child] = true
196 }
197 var change ChildrenChange
198 for child, _ := range w.children {
199 if !children[child] {
200 change.Del = append(change.Del, child)
201 delete(w.children, child)
202 }
203 }
204 for child, _ := range children {
205 if !w.children[child] {
206 change.New = append(change.New, child)
207 w.children[child] = true
208 }
209 }
210 if len(change.Del) == 0 && len(change.New) == 0 {
211 return watch, true
212 }
213 sort.Strings(change.Del)
214 sort.Strings(change.New)
215 select {
216 case <-w.tomb.Dying():
217 return nil, false
218 case w.changeChan <- change:
219 }
220 return watch, true
221 }
222
223 // close is documented in the updater interface above. For the
224 // ChildrenWatcher it just closes the changeChan.
niemeyer 2012/03/28 00:28:59 Comment needs love.
TheMue 2012/03/28 11:12:57 Done.
225 func (w *ChildrenWatcher) close() {
226 close(w.changeChan)
227 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | state/watcher/watcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d