Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(75)

Side by Side Diff: state/watcher/watcher.go

Issue 6373048: environs/state: simplify watcher implementation
Patch Set: Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | state/watcher/watcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package watcher 1 package watcher
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "launchpad.net/gozk/zookeeper" 5 "launchpad.net/gozk/zookeeper"
6 "launchpad.net/tomb" 6 "launchpad.net/tomb"
7 ) 7 )
8 8
9 // ErrStopper is implemented by all watchers. 9 // ErrStopper is implemented by all watchers.
10 type ErrStopper interface { 10 type ErrStopper interface {
(...skipping 28 matching lines...) Expand all
39 Exists bool 39 Exists bool
40 Version int 40 Version int
41 Content string 41 Content string
42 } 42 }
43 43
44 // ContentWatcher observes a ZooKeeper node and delivers a 44 // ContentWatcher observes a ZooKeeper node and delivers a
45 // notification when a content change is detected. 45 // notification when a content change is detected.
46 type ContentWatcher struct { 46 type ContentWatcher struct {
47 zk *zookeeper.Conn 47 zk *zookeeper.Conn
48 path string 48 path string
49 tomb tomb.Tomb
50 changeChan chan ContentChange 49 changeChan chan ContentChange
51 emittedValue bool 50 emittedValue bool
52 content ContentChange 51 content ContentChange
52 err error
53 stop <-chan struct{}
53 } 54 }
54 55
55 // NewContentWatcher creates a ContentWatcher observing 56 // NewContentWatcher creates a ContentWatcher observing
56 // the ZooKeeper node at watchedPath. 57 // the ZooKeeper node at watchedPath.
57 func NewContentWatcher(zk *zookeeper.Conn, watchedPath string) *ContentWatcher { 58 func NewContentWatcher(zk *zookeeper.Conn, watchedPath string, stop <-chan struc t{}) *ContentWatcher {
58 w := &ContentWatcher{ 59 w := &ContentWatcher{
59 zk: zk, 60 zk: zk,
60 path: watchedPath, 61 path: watchedPath,
61 changeChan: make(chan ContentChange), 62 changeChan: make(chan ContentChange),
63 stop: stop,
62 } 64 }
63 go w.loop() 65 go w.loop()
64 return w 66 return w
65 } 67 }
66 68
67 // Changes returns a channel that will receive the new node 69 // Changes returns a channel that will receive the new node
68 // content when a change is detected. Note that multiple 70 // content when a change is detected. Note that multiple
69 // changes may be observed as a single event in the channel. 71 // changes may be observed as a single event in the channel.
70 // The first event on the channel holds the initial state. 72 // The first event on the channel holds the initial state.
71 func (w *ContentWatcher) Changes() <-chan ContentChange { 73 func (w *ContentWatcher) Changes() <-chan ContentChange {
72 return w.changeChan 74 return w.changeChan
73 } 75 }
74 76
75 // Dying returns a channel that is closed when the 77 // Wait reads all remaining changes and returns
76 // watcher has stopped or is about to stop. 78 // any error the watcher encounted while running.
77 func (w *ContentWatcher) Dying() <-chan struct{} { 79 func (w *ContentWatcher) Wait() error {
78 » return w.tomb.Dying() 80 » for _ = range w.changeChan {
79 } 81 » }
80 82 » return w.err
81 // Err returns the error that stopped the watcher, or
82 // tomb.ErrStillAlive if the watcher is still running.
83 func (w *ContentWatcher) Err() error {
84 » return w.tomb.Err()
85 }
86
87 // Stop stops the watch and returns any error encountered
88 // while watching. This method should always be called before
89 // discarding the watcher.
90 func (w *ContentWatcher) Stop() error {
91 » w.tomb.Kill(nil)
92 » return w.tomb.Wait()
93 } 83 }
94 84
95 // loop is the backend for watching. 85 // loop is the backend for watching.
96 func (w *ContentWatcher) loop() { 86 func (w *ContentWatcher) loop() {
97 defer w.tomb.Done()
98 defer close(w.changeChan) 87 defer close(w.changeChan)
99 88
100 watch, err := w.update() 89 watch, err := w.update()
101 if err != nil { 90 if err != nil {
102 » » w.tomb.Kill(err) 91 » » w.err = err
103 return 92 return
104 } 93 }
105 94
106 for { 95 for {
107 select { 96 select {
108 » » case <-w.tomb.Dying(): 97 » » case <-w.stop:
109 return 98 return
110 case evt := <-watch: 99 case evt := <-watch:
111 if !evt.Ok() { 100 if !evt.Ok() {
112 » » » » w.tomb.Killf("watcher: critical session event: % v", evt) 101 » » » » w.err = fmt.Errorf("watcher: critical session ev ent: %v", evt)
113 return 102 return
114 } 103 }
115 watch, err = w.update() 104 watch, err = w.update()
116 if err != nil { 105 if err != nil {
117 » » » » w.tomb.Kill(err) 106 » » » » w.err = err
118 return 107 return
119 } 108 }
120 } 109 }
121 } 110 }
122 } 111 }
123 112
124 // update retrieves the node content and emits it as well as an existence 113 // update retrieves the node content and emits it as well as an existence
125 // flag to the change channel if it has changed. It returns the next watch. 114 // flag to the change channel if it has changed. It returns the next watch.
126 func (w *ContentWatcher) update() (nextWatch <-chan zookeeper.Event, err error) { 115 func (w *ContentWatcher) update() (nextWatch <-chan zookeeper.Event, err error) {
127 var content string 116 var content string
(...skipping 24 matching lines...) Expand all
152 newContent := ContentChange{} 141 newContent := ContentChange{}
153 if stat != nil { 142 if stat != nil {
154 newContent.Exists = true 143 newContent.Exists = true
155 newContent.Version = stat.Version() 144 newContent.Version = stat.Version()
156 newContent.Content = content 145 newContent.Content = content
157 } 146 }
158 if w.emittedValue && newContent == w.content { 147 if w.emittedValue && newContent == w.content {
159 return nextWatch, nil 148 return nextWatch, nil
160 } 149 }
161 w.content = newContent 150 w.content = newContent
162 » select { 151 » w.changeChan <- w.content
163 » case <-w.tomb.Dying(): 152 » w.emittedValue = true
164 » » return nil, tomb.ErrDying
165 » case w.changeChan <- w.content:
166 » » w.emittedValue = true
167 » }
168 return nextWatch, nil 153 return nextWatch, nil
169 } 154 }
170 155
171 // ChildrenChange contains information about 156 // ChildrenChange contains information about
172 // children that have been added or removed. 157 // children that have been added or removed.
173 type ChildrenChange struct { 158 type ChildrenChange struct {
174 Added []string 159 Added []string
175 Removed []string 160 Removed []string
176 } 161 }
177 162
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
302 return 287 return
303 } 288 }
304 select { 289 select {
305 case <-w.tomb.Dying(): 290 case <-w.tomb.Dying():
306 return nil, tomb.ErrDying 291 return nil, tomb.ErrDying
307 case w.changeChan <- change: 292 case w.changeChan <- change:
308 w.emittedValue = true 293 w.emittedValue = true
309 } 294 }
310 return 295 return
311 } 296 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | state/watcher/watcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d