Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(3)

Delta Between Two Patch Sets: state/watcher/watcher.go

Issue 5901058: cmd/juju: working bootstrap and destroy commands
Left Patch Set: Created 13 years ago
Right Patch Set: cmd/juju: working bootstrap and destroy commands Created 12 years, 11 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « state/watch_test.go ('k') | state/watcher/watcher_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 package watcher
2
3 import (
4 "fmt"
5 "launchpad.net/gozk/zookeeper"
6 "launchpad.net/tomb"
7 )
8
9 // ContentChange holds information on the existence
10 // and contents of a node. Content will be empty when the
11 // node does not exist.
12 type ContentChange struct {
13 Exists bool
14 Content string
15 }
16
17 // ContentWatcher observes a ZooKeeper node and delivers a
18 // notification when a content change is detected.
19 type ContentWatcher struct {
20 zk *zookeeper.Conn
21 path string
22 tomb tomb.Tomb
23 changeChan chan ContentChange
24 content ContentChange
25 }
26
27 // NewContentWatcher creates a ContentWatcher observing
28 // the ZooKeeper node at watchedPath.
29 func NewContentWatcher(zk *zookeeper.Conn, watchedPath string) *ContentWatcher {
30 w := &ContentWatcher{
31 zk: zk,
32 path: watchedPath,
33 changeChan: make(chan ContentChange),
34 }
35 go w.loop()
36 return w
37 }
38
39 // Changes returns a channel that will receive the new node
40 // content when a change is detected. Note that multiple
41 // changes may be observed as a single event in the channel.
42 func (w *ContentWatcher) Changes() <-chan ContentChange {
43 return w.changeChan
44 }
45
46 // Dying returns a channel that is closed when the
47 // watcher has stopped or is about to stop.
48 func (w *ContentWatcher) Dying() <-chan struct{} {
49 return w.tomb.Dying()
50 }
51
52 // Stop stops the watch and returns any error encountered
53 // while watching. This method should always be called before
54 // discarding the watcher.
55 func (w *ContentWatcher) Stop() error {
56 w.tomb.Kill(nil)
57 return w.tomb.Wait()
58 }
59
60 // loop is the backend for watching.
61 func (w *ContentWatcher) loop() {
62 defer w.tomb.Done()
63 defer close(w.changeChan)
64
65 watch, err := w.update()
66 if err != nil {
67 w.tomb.Kill(err)
68 return
69 }
70
71 for {
72 select {
73 case <-w.tomb.Dying():
74 return
75 case evt := <-watch:
76 if !evt.Ok() {
77 w.tomb.Killf("watcher: critical session event: % v", evt)
78 return
79 }
80 watch, err = w.update()
81 if err != nil {
82 w.tomb.Kill(err)
83 return
84 }
85 }
86 }
87 }
88
89 // update retrieves the node content and emits it as well as an existence
90 // flag to the change channel if it has changed. It returns the next watch.
91 func (w *ContentWatcher) update() (nextWatch <-chan zookeeper.Event, err error) {
92 var content string
93 var stat *zookeeper.Stat
94 // Repeat until we have a valid watch or an error.
95 for {
96 content, stat, nextWatch, err = w.zk.GetW(w.path)
97 if err == nil {
98 // Node exists, so leave the loop.
99 break
100 }
101 if zookeeper.IsError(err, zookeeper.ZNONODE) {
102 // Need a new watch to receive a signal when the node is created.
103 stat, nextWatch, err = w.zk.ExistsW(w.path)
104 if stat != nil {
105 // Node has been created just before ExistsW(),
106 // so call GetW() with new loop run again.
107 continue
108 }
109 if err == nil {
110 // Got a valid watch, so leave loop.
111 break
112 }
113 }
114 // Any other error during GetW() or ExistsW().
115 return nil, fmt.Errorf("watcher: can't get content of node %q: % v", w.path, err)
116 }
117 if stat != nil {
118 if w.content.Exists && content == w.content.Content {
119 return nextWatch, nil
120 }
121 w.content.Exists = true
122 w.content.Content = content
123 } else {
124 if !w.content.Exists {
125 return nextWatch, nil
126 }
127 w.content.Exists = false
128 w.content.Content = ""
129 }
130 select {
131 case <-w.tomb.Dying():
132 return nil, tomb.ErrDying
133 case w.changeChan <- w.content:
134 }
135 return nextWatch, nil
136 }
137
138 // ChildrenChange contains information about
139 // children that have been created or deleted.
140 type ChildrenChange struct {
141 Added []string
142 Deleted []string
143 }
144
145 // ChildrenWatcher observes a ZooKeeper node and delivers a
146 // notification when child nodes are added or removed.
147 type ChildrenWatcher struct {
148 zk *zookeeper.Conn
149 path string
150 tomb tomb.Tomb
151 changeChan chan ChildrenChange
152 children map[string]bool
153 }
154
155 // NewChildrenWatcher creates a ChildrenWatcher observing
156 // the ZooKeeper node at watchedPath.
157 func NewChildrenWatcher(zk *zookeeper.Conn, watchedPath string) *ChildrenWatcher {
158 w := &ChildrenWatcher{
159 zk: zk,
160 path: watchedPath,
161 changeChan: make(chan ChildrenChange),
162 children: make(map[string]bool),
163 }
164 go w.loop()
165 return w
166 }
167
168 // Changes returns a channel that will receive the changes
169 // performed to the set of children of the watched node.
170 // Note that multiple changes may be observed as a single
171 // event in the channel.
172 func (w *ChildrenWatcher) Changes() <-chan ChildrenChange {
173 return w.changeChan
174 }
175
176 // Dying returns a channel that is closed when the
177 // watcher has stopped or is about to stop.
178 func (w *ChildrenWatcher) Dying() <-chan struct{} {
179 return w.tomb.Dying()
180 }
181
182 // Stop stops the watch and returns any error encountered
183 // while watching. This method should always be called before
184 // discarding the watcher.
185 func (w *ChildrenWatcher) Stop() error {
186 w.tomb.Kill(nil)
187 return w.tomb.Wait()
188 }
189
190 // loop is the backend for watching.
191 func (w *ChildrenWatcher) loop() {
192 defer w.tomb.Done()
193 defer close(w.changeChan)
194
195 watch, err := w.update(zookeeper.EVENT_CHILD)
196 if err != nil {
197 w.tomb.Kill(err)
198 return
199 }
200
201 for {
202 select {
203 case <-w.tomb.Dying():
204 return
205 case evt := <-watch:
206 if !evt.Ok() {
207 w.tomb.Killf("watcher: critical session event: % v", evt)
208 return
209 }
210 watch, err = w.update(evt.Type)
211 if err != nil {
212 w.tomb.Kill(err)
213 return
214 }
215 }
216 }
217 }
218
219 // update retrieves the node children and emits the added or deleted children to·······
220 // the change channel if it has changed. It returns the next watch.
221 func (w *ChildrenWatcher) update(eventType int) (nextWatch <-chan zookeeper.Even t, err error) {
222 if eventType == zookeeper.EVENT_DELETED {
223 return nil, fmt.Errorf("watcher: node %q has been deleted", w.pa th)
224 }
225 retrievedChildren, _, watch, err := w.zk.ChildrenW(w.path)
226 if err != nil {
227 return nil, fmt.Errorf("watcher: can't get children of node %q: %v", w.path, err)
228 }
229 children := make(map[string]bool)
230 for _, child := range retrievedChildren {
231 children[child] = true
232 }
233 var change ChildrenChange
234 for child, _ := range w.children {
235 if !children[child] {
236 change.Deleted = append(change.Deleted, child)
237 delete(w.children, child)
238 }
239 }
240 for child, _ := range children {
241 if !w.children[child] {
242 change.Added = append(change.Added, child)
243 w.children[child] = true
244 }
245 }
246 if len(change.Deleted) == 0 && len(change.Added) == 0 {
247 return watch, nil
248 }
249 select {
250 case <-w.tomb.Dying():
251 return nil, tomb.ErrDying
252 case w.changeChan <- change:
253 }
254 return watch, nil
255 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b