Left: | ||
Right: |
OLD | NEW |
---|---|
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package worker | 4 package worker |
5 | 5 |
6 import ( | 6 import ( |
7 "errors" | 7 "errors" |
8 "launchpad.net/juju-core/log" | 8 "launchpad.net/juju-core/log" |
9 "launchpad.net/tomb" | 9 "launchpad.net/tomb" |
10 "local/runtime/debug" | |
10 "time" | 11 "time" |
11 ) | 12 ) |
12 | 13 |
13 // RestartDelay holds the length of time that a worker | 14 // RestartDelay holds the length of time that a worker |
14 // will wait between exiting and restarting. | 15 // will wait between exiting and restarting. |
15 var RestartDelay = 3 * time.Second | 16 var RestartDelay = 3 * time.Second |
16 | 17 |
17 // Worker is implemented by a running worker. | 18 // Worker is implemented by a running worker. |
18 type Worker interface { | 19 type Worker interface { |
19 // Kill asks the worker to stop without necessarily | 20 // Kill asks the worker to stop without necessarily |
(...skipping 26 matching lines...) Expand all Loading... | |
46 worker Worker | 47 worker Worker |
47 } | 48 } |
48 | 49 |
49 type doneInfo struct { | 50 type doneInfo struct { |
50 id string | 51 id string |
51 err error | 52 err error |
52 } | 53 } |
53 | 54 |
54 // NewRunner creates a new Runner. When a worker finishes, if its error | 55 // NewRunner creates a new Runner. When a worker finishes, if its error |
55 // is deemed fatal (determined by calling isFatal), all the other workers | 56 // is deemed fatal (determined by calling isFatal), all the other workers |
56 // will be stopped and the runner itself will finish. Of all the errors | 57 // will be stopped and the runner itself will finish. Of all the fatal errors |
57 // returned by the stopped workers, only the most important one | 58 // returned by the stopped workers, only the most important one, |
58 // (determined by calling moreImportant) will be returned from | 59 // determined by calling moreImportant, will be returned from |
59 // Runner.Wait. | 60 // Runner.Wait (non-fatal errors will not be returned from Wait). |
fwereade
2013/05/27 20:35:53
This deserves a sentence, not parens, imo
| |
60 // | 61 // |
61 // The function isFatal(err) returns whether err is a fatal error. The | 62 // The function isFatal(err) returns whether err is a fatal error. The |
62 // function moreImportant(err0, err1) returns whether err0 is considered | 63 // function moreImportant(err0, err1) returns whether err0 is considered |
63 // more important than err1.. | 64 // more important than err1. |
64 func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bo ol) *Runner { | 65 func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bo ol) *Runner { |
65 runner := &Runner{ | 66 runner := &Runner{ |
66 startc: make(chan startReq), | 67 startc: make(chan startReq), |
67 stopc: make(chan string), | 68 stopc: make(chan string), |
68 donec: make(chan doneInfo), | 69 donec: make(chan doneInfo), |
69 startedc: make(chan startInfo), | 70 startedc: make(chan startInfo), |
70 isFatal: isFatal, | 71 isFatal: isFatal, |
71 moreImportant: moreImportant, | 72 moreImportant: moreImportant, |
72 } | 73 } |
73 go func() { | 74 go func() { |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
107 case <-runner.tomb.Dead(): | 108 case <-runner.tomb.Dead(): |
108 } | 109 } |
109 return ErrDead | 110 return ErrDead |
110 } | 111 } |
111 | 112 |
112 func (runner *Runner) Wait() error { | 113 func (runner *Runner) Wait() error { |
113 return runner.tomb.Wait() | 114 return runner.tomb.Wait() |
114 } | 115 } |
115 | 116 |
116 func (runner *Runner) Kill() { | 117 func (runner *Runner) Kill() { |
118 log.Debugf("worker: killing runner %p %s", runner, debug.Callers(0, 20)) | |
117 runner.tomb.Kill(nil) | 119 runner.tomb.Kill(nil) |
118 } | 120 } |
119 | 121 |
120 // Stop kills the given worker and waits for it to exit. | 122 // Stop kills the given worker and waits for it to exit. |
121 func Stop(worker Worker) error { | 123 func Stop(worker Worker) error { |
122 worker.Kill() | 124 worker.Kill() |
123 return worker.Wait() | 125 return worker.Wait() |
124 } | 126 } |
125 | 127 |
128 type workerInfo struct { | |
129 start func() (Worker, error) | |
130 worker Worker | |
131 restartDelay time.Duration | |
132 stopping bool | |
133 } | |
134 | |
126 func (runner *Runner) run() error { | 135 func (runner *Runner) run() error { |
127 type workerInfo struct { | |
128 start func() (Worker, error) | |
129 worker Worker | |
130 restartDelay time.Duration | |
131 stopping bool | |
132 } | |
133 // workers holds the current set of workers. All workers with a | 136 // workers holds the current set of workers. All workers with a |
134 // running goroutine have an entry here. | 137 // running goroutine have an entry here. |
135 workers := make(map[string]*workerInfo) | 138 workers := make(map[string]*workerInfo) |
136 var finalError error | 139 var finalError error |
137 loop: | 140 |
141 » // isDying holds whether the runner is currently dying. When it | |
142 » // is dying (whether as a result of being killed or due to a | |
143 » // fatal error), all existing workers are killed, no new workers | |
144 » // will be started, and the loop will exit when all existing | |
145 » // workers have stopped. | |
146 » isDying := false | |
147 » tombDying := runner.tomb.Dying() | |
138 for { | 148 for { |
149 if isDying && len(workers) == 0 { | |
150 return finalError | |
151 } | |
139 select { | 152 select { |
140 » » case <-runner.tomb.Dying(): | 153 » » case <-tombDying: |
141 » » » break loop | 154 » » » log.Infof("worker: runner is dying") |
thumper
2013/06/06 04:37:44
FYI, you could create a package level logger now u
fwereade
2013/06/06 16:36:15
+100 :)
rog
2013/06/19 08:19:43
i think that probably deserves another CL. we coul
| |
155 » » » isDying = true | |
156 » » » killAll(workers) | |
157 » » » tombDying = nil | |
142 case req := <-runner.startc: | 158 case req := <-runner.startc: |
159 if isDying { | |
160 log.Infof("worker: ignoring start request for %q when dying", req.id) | |
161 break | |
162 } | |
143 info := workers[req.id] | 163 info := workers[req.id] |
144 if info == nil { | 164 if info == nil { |
145 workers[req.id] = &workerInfo{ | 165 workers[req.id] = &workerInfo{ |
146 start: req.start, | 166 start: req.start, |
147 restartDelay: RestartDelay, | 167 restartDelay: RestartDelay, |
148 } | 168 } |
149 go runner.runWorker(0, req.id, req.start) | 169 go runner.runWorker(0, req.id, req.start) |
150 break | 170 break |
151 } | 171 } |
152 if !info.stopping { | 172 if !info.stopping { |
153 // The worker is already running, so leave it al one | 173 // The worker is already running, so leave it al one |
154 break | 174 break |
155 } | 175 } |
156 » » » // The worker previously existed and is currently | 176 » » » // The worker previously existed and is |
157 » » » // being stopped. When it eventually does stop, | 177 » » » // currently being stopped. When it eventually |
158 » » » // we'll restart it immediately with the new | 178 » » » // does stop, we'll restart it immediately with |
159 » » » // start function. | 179 » » » // the new start function. |
160 info.start = req.start | 180 info.start = req.start |
161 info.restartDelay = 0 | 181 info.restartDelay = 0 |
162 case id := <-runner.stopc: | 182 case id := <-runner.stopc: |
163 » » » info := workers[id] | 183 » » » if info := workers[id]; info != nil { |
164 » » » if info == nil { | 184 » » » » killWorker(id, info) |
165 » » » » // The worker doesn't exist so nothing to do. | |
166 » » » » break | |
167 } | 185 } |
168 » » » if info.worker != nil { | 186 » » case info := <-runner.startedc: |
169 » » » » info.worker.Kill() | 187 » » » workerInfo := workers[info.id] |
170 » » » » info.worker = nil | 188 » » » workerInfo.worker = info.worker |
189 » » » if isDying { | |
190 » » » » killWorker(info.id, workerInfo) | |
171 } | 191 } |
172 info.stopping = true | |
173 info.start = nil | |
174 case info := <-runner.startedc: | |
175 workers[info.id].worker = info.worker | |
176 case info := <-runner.donec: | 192 case info := <-runner.donec: |
177 workerInfo := workers[info.id] | 193 workerInfo := workers[info.id] |
178 if !workerInfo.stopping && info.err == nil { | 194 if !workerInfo.stopping && info.err == nil { |
179 info.err = errors.New("unexpected quit") | 195 info.err = errors.New("unexpected quit") |
180 } | 196 } |
181 if info.err != nil { | 197 if info.err != nil { |
182 log.Errorf("worker: worker %q: %v", info.id, inf o.err) | |
183 if runner.isFatal(info.err) { | 198 if runner.isFatal(info.err) { |
184 » » » » » finalError = info.err | 199 » » » » » log.Errorf("worker: fatal %q: %v", info. id, info.err) |
200 » » » » » if finalError == nil || runner.moreImpor tant(info.err, finalError) { | |
201 » » » » » » finalError = info.err | |
202 » » » » » } | |
185 delete(workers, info.id) | 203 delete(workers, info.id) |
186 » » » » » break loop | 204 » » » » » if !isDying { |
205 » » » » » » isDying = true | |
206 » » » » » » killAll(workers) | |
207 » » » » » } | |
208 » » » » » break | |
209 » » » » } else { | |
210 » » » » » log.Errorf("worker: exited %q: %v", info .id, info.err) | |
187 } | 211 } |
188 } | 212 } |
189 if workerInfo.start == nil { | 213 if workerInfo.start == nil { |
190 // The worker has been deliberately stopped; | 214 // The worker has been deliberately stopped; |
191 // we can now remove it from the list of workers . | 215 // we can now remove it from the list of workers . |
192 delete(workers, info.id) | 216 delete(workers, info.id) |
193 break | 217 break |
194 } | 218 } |
195 go runner.runWorker(workerInfo.restartDelay, info.id, wo rkerInfo.start) | 219 go runner.runWorker(workerInfo.restartDelay, info.id, wo rkerInfo.start) |
196 workerInfo.restartDelay = RestartDelay | 220 workerInfo.restartDelay = RestartDelay |
197 } | 221 } |
198 } | 222 } |
199 » for _, info := range workers { | 223 » panic("unreachable") |
200 » » if info.worker != nil { | 224 } |
201 » » » info.worker.Kill() | 225 |
202 » » » info.worker = nil | 226 func killAll(workers map[string]*workerInfo) { |
203 » » } | 227 » for id, info := range workers { |
228 » » killWorker(id, info) | |
204 } | 229 } |
205 » for len(workers) > 0 { | 230 } |
206 » » info := <-runner.donec | 231 |
207 » » if runner.moreImportant(info.err, finalError) { | 232 func killWorker(id string, info *workerInfo) { |
208 » » » finalError = info.err | 233 » if info.worker != nil { |
209 » » } | 234 » » log.Debugf("worker: killing %q", id) |
210 » » if info.err != nil { | 235 » » info.worker.Kill() |
211 » » » log.Errorf("worker: worker %q: %v", info.id, info.err) | 236 » » info.worker = nil |
212 » » } | |
213 » » delete(workers, info.id) | |
214 } | 237 } |
215 » return finalError | 238 » info.stopping = true |
239 » info.start = nil | |
216 } | 240 } |
217 | 241 |
218 // runWorker starts the given worker after waiting for the given delay. | 242 // runWorker starts the given worker after waiting for the given delay. |
219 func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Wo rker, error)) { | 243 func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Wo rker, error)) { |
220 if delay > 0 { | 244 if delay > 0 { |
245 log.Infof("worker: restarting %q in %v", id, delay) | |
221 select { | 246 select { |
222 case <-runner.tomb.Dying(): | 247 case <-runner.tomb.Dying(): |
223 runner.donec <- doneInfo{id, nil} | 248 runner.donec <- doneInfo{id, nil} |
224 return | 249 return |
225 case <-time.After(delay): | 250 case <-time.After(delay): |
226 } | 251 } |
227 } | 252 } |
253 log.Infof("worker: start %q", id) | |
228 worker, err := start() | 254 worker, err := start() |
229 if err == nil { | 255 if err == nil { |
230 runner.startedc <- startInfo{id, worker} | 256 runner.startedc <- startInfo{id, worker} |
231 err = worker.Wait() | 257 err = worker.Wait() |
232 } | 258 } |
233 runner.donec <- doneInfo{id, err} | 259 runner.donec <- doneInfo{id, err} |
234 } | 260 } |
OLD | NEW |