Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(314)

Side by Side Diff: worker/runner.go

Issue 9770043: runner: fix possible shutdown deadlocks
Patch Set: runner: fix possible shutdown deadlocks Created 11 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « [revision details] ('k') | worker/runner_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package worker 4 package worker
5 5
6 import ( 6 import (
7 "errors" 7 "errors"
8 "launchpad.net/juju-core/log" 8 "launchpad.net/juju-core/log"
9 "launchpad.net/tomb" 9 "launchpad.net/tomb"
10 "local/runtime/debug"
10 "time" 11 "time"
11 ) 12 )
12 13
13 // RestartDelay holds the length of time that a worker 14 // RestartDelay holds the length of time that a worker
14 // will wait between exiting and restarting. 15 // will wait between exiting and restarting.
15 var RestartDelay = 3 * time.Second 16 var RestartDelay = 3 * time.Second
16 17
17 // Worker is implemented by a running worker. 18 // Worker is implemented by a running worker.
18 type Worker interface { 19 type Worker interface {
19 // Kill asks the worker to stop without necessarily 20 // Kill asks the worker to stop without necessarily
(...skipping 26 matching lines...) Expand all
46 worker Worker 47 worker Worker
47 } 48 }
48 49
49 type doneInfo struct { 50 type doneInfo struct {
50 id string 51 id string
51 err error 52 err error
52 } 53 }
53 54
54 // NewRunner creates a new Runner. When a worker finishes, if its error 55 // NewRunner creates a new Runner. When a worker finishes, if its error
55 // is deemed fatal (determined by calling isFatal), all the other workers 56 // is deemed fatal (determined by calling isFatal), all the other workers
56 // will be stopped and the runner itself will finish. Of all the errors 57 // will be stopped and the runner itself will finish. Of all the fatal errors
57 // returned by the stopped workers, only the most important one 58 // returned by the stopped workers, only the most important one,
58 // (determined by calling moreImportant) will be returned from 59 // determined by calling moreImportant, will be returned from
59 // Runner.Wait. 60 // Runner.Wait (non-fatal errors will not be returned from Wait).
fwereade 2013/05/27 20:35:53 This deserves a sentence, not parens, imo
60 // 61 //
61 // The function isFatal(err) returns whether err is a fatal error. The 62 // The function isFatal(err) returns whether err is a fatal error. The
62 // function moreImportant(err0, err1) returns whether err0 is considered 63 // function moreImportant(err0, err1) returns whether err0 is considered
63 // more important than err1.. 64 // more important than err1.
64 func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bo ol) *Runner { 65 func NewRunner(isFatal func(error) bool, moreImportant func(err0, err1 error) bo ol) *Runner {
65 runner := &Runner{ 66 runner := &Runner{
66 startc: make(chan startReq), 67 startc: make(chan startReq),
67 stopc: make(chan string), 68 stopc: make(chan string),
68 donec: make(chan doneInfo), 69 donec: make(chan doneInfo),
69 startedc: make(chan startInfo), 70 startedc: make(chan startInfo),
70 isFatal: isFatal, 71 isFatal: isFatal,
71 moreImportant: moreImportant, 72 moreImportant: moreImportant,
72 } 73 }
73 go func() { 74 go func() {
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
107 case <-runner.tomb.Dead(): 108 case <-runner.tomb.Dead():
108 } 109 }
109 return ErrDead 110 return ErrDead
110 } 111 }
111 112
112 func (runner *Runner) Wait() error { 113 func (runner *Runner) Wait() error {
113 return runner.tomb.Wait() 114 return runner.tomb.Wait()
114 } 115 }
115 116
116 func (runner *Runner) Kill() { 117 func (runner *Runner) Kill() {
118 log.Debugf("worker: killing runner %p %s", runner, debug.Callers(0, 20))
117 runner.tomb.Kill(nil) 119 runner.tomb.Kill(nil)
118 } 120 }
119 121
120 // Stop kills the given worker and waits for it to exit. 122 // Stop kills the given worker and waits for it to exit.
121 func Stop(worker Worker) error { 123 func Stop(worker Worker) error {
122 worker.Kill() 124 worker.Kill()
123 return worker.Wait() 125 return worker.Wait()
124 } 126 }
125 127
128 type workerInfo struct {
129 start func() (Worker, error)
130 worker Worker
131 restartDelay time.Duration
132 stopping bool
133 }
134
126 func (runner *Runner) run() error { 135 func (runner *Runner) run() error {
127 type workerInfo struct {
128 start func() (Worker, error)
129 worker Worker
130 restartDelay time.Duration
131 stopping bool
132 }
133 // workers holds the current set of workers. All workers with a 136 // workers holds the current set of workers. All workers with a
134 // running goroutine have an entry here. 137 // running goroutine have an entry here.
135 workers := make(map[string]*workerInfo) 138 workers := make(map[string]*workerInfo)
136 var finalError error 139 var finalError error
137 loop: 140
141 » // isDying holds whether the runner is currently dying. When it
142 » // is dying (whether as a result of being killed or due to a
143 » // fatal error), all existing workers are killed, no new workers
144 » // will be started, and the loop will exit when all existing
145 » // workers have stopped.
146 » isDying := false
147 » tombDying := runner.tomb.Dying()
138 for { 148 for {
149 if isDying && len(workers) == 0 {
150 return finalError
151 }
139 select { 152 select {
140 » » case <-runner.tomb.Dying(): 153 » » case <-tombDying:
141 » » » break loop 154 » » » log.Infof("worker: runner is dying")
thumper 2013/06/06 04:37:44 FYI, you could create a package level logger now u
fwereade 2013/06/06 16:36:15 +100 :)
rog 2013/06/19 08:19:43 i think that probably deserves another CL. we coul
155 » » » isDying = true
156 » » » killAll(workers)
157 » » » tombDying = nil
142 case req := <-runner.startc: 158 case req := <-runner.startc:
159 if isDying {
160 log.Infof("worker: ignoring start request for %q when dying", req.id)
161 break
162 }
143 info := workers[req.id] 163 info := workers[req.id]
144 if info == nil { 164 if info == nil {
145 workers[req.id] = &workerInfo{ 165 workers[req.id] = &workerInfo{
146 start: req.start, 166 start: req.start,
147 restartDelay: RestartDelay, 167 restartDelay: RestartDelay,
148 } 168 }
149 go runner.runWorker(0, req.id, req.start) 169 go runner.runWorker(0, req.id, req.start)
150 break 170 break
151 } 171 }
152 if !info.stopping { 172 if !info.stopping {
153 // The worker is already running, so leave it al one 173 // The worker is already running, so leave it al one
154 break 174 break
155 } 175 }
156 » » » // The worker previously existed and is currently 176 » » » // The worker previously existed and is
157 » » » // being stopped. When it eventually does stop, 177 » » » // currently being stopped. When it eventually
158 » » » // we'll restart it immediately with the new 178 » » » // does stop, we'll restart it immediately with
159 » » » // start function. 179 » » » // the new start function.
160 info.start = req.start 180 info.start = req.start
161 info.restartDelay = 0 181 info.restartDelay = 0
162 case id := <-runner.stopc: 182 case id := <-runner.stopc:
163 » » » info := workers[id] 183 » » » if info := workers[id]; info != nil {
164 » » » if info == nil { 184 » » » » killWorker(id, info)
165 » » » » // The worker doesn't exist so nothing to do.
166 » » » » break
167 } 185 }
168 » » » if info.worker != nil { 186 » » case info := <-runner.startedc:
169 » » » » info.worker.Kill() 187 » » » workerInfo := workers[info.id]
170 » » » » info.worker = nil 188 » » » workerInfo.worker = info.worker
189 » » » if isDying {
190 » » » » killWorker(info.id, workerInfo)
171 } 191 }
172 info.stopping = true
173 info.start = nil
174 case info := <-runner.startedc:
175 workers[info.id].worker = info.worker
176 case info := <-runner.donec: 192 case info := <-runner.donec:
177 workerInfo := workers[info.id] 193 workerInfo := workers[info.id]
178 if !workerInfo.stopping && info.err == nil { 194 if !workerInfo.stopping && info.err == nil {
179 info.err = errors.New("unexpected quit") 195 info.err = errors.New("unexpected quit")
180 } 196 }
181 if info.err != nil { 197 if info.err != nil {
182 log.Errorf("worker: worker %q: %v", info.id, inf o.err)
183 if runner.isFatal(info.err) { 198 if runner.isFatal(info.err) {
184 » » » » » finalError = info.err 199 » » » » » log.Errorf("worker: fatal %q: %v", info. id, info.err)
200 » » » » » if finalError == nil || runner.moreImpor tant(info.err, finalError) {
201 » » » » » » finalError = info.err
202 » » » » » }
185 delete(workers, info.id) 203 delete(workers, info.id)
186 » » » » » break loop 204 » » » » » if !isDying {
205 » » » » » » isDying = true
206 » » » » » » killAll(workers)
207 » » » » » }
208 » » » » » break
209 » » » » } else {
210 » » » » » log.Errorf("worker: exited %q: %v", info .id, info.err)
187 } 211 }
188 } 212 }
189 if workerInfo.start == nil { 213 if workerInfo.start == nil {
190 // The worker has been deliberately stopped; 214 // The worker has been deliberately stopped;
191 // we can now remove it from the list of workers . 215 // we can now remove it from the list of workers .
192 delete(workers, info.id) 216 delete(workers, info.id)
193 break 217 break
194 } 218 }
195 go runner.runWorker(workerInfo.restartDelay, info.id, wo rkerInfo.start) 219 go runner.runWorker(workerInfo.restartDelay, info.id, wo rkerInfo.start)
196 workerInfo.restartDelay = RestartDelay 220 workerInfo.restartDelay = RestartDelay
197 } 221 }
198 } 222 }
199 » for _, info := range workers { 223 » panic("unreachable")
200 » » if info.worker != nil { 224 }
201 » » » info.worker.Kill() 225
202 » » » info.worker = nil 226 func killAll(workers map[string]*workerInfo) {
203 » » } 227 » for id, info := range workers {
228 » » killWorker(id, info)
204 } 229 }
205 » for len(workers) > 0 { 230 }
206 » » info := <-runner.donec 231
207 » » if runner.moreImportant(info.err, finalError) { 232 func killWorker(id string, info *workerInfo) {
208 » » » finalError = info.err 233 » if info.worker != nil {
209 » » } 234 » » log.Debugf("worker: killing %q", id)
210 » » if info.err != nil { 235 » » info.worker.Kill()
211 » » » log.Errorf("worker: worker %q: %v", info.id, info.err) 236 » » info.worker = nil
212 » » }
213 » » delete(workers, info.id)
214 } 237 }
215 » return finalError 238 » info.stopping = true
239 » info.start = nil
216 } 240 }
217 241
218 // runWorker starts the given worker after waiting for the given delay. 242 // runWorker starts the given worker after waiting for the given delay.
219 func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Wo rker, error)) { 243 func (runner *Runner) runWorker(delay time.Duration, id string, start func() (Wo rker, error)) {
220 if delay > 0 { 244 if delay > 0 {
245 log.Infof("worker: restarting %q in %v", id, delay)
221 select { 246 select {
222 case <-runner.tomb.Dying(): 247 case <-runner.tomb.Dying():
223 runner.donec <- doneInfo{id, nil} 248 runner.donec <- doneInfo{id, nil}
224 return 249 return
225 case <-time.After(delay): 250 case <-time.After(delay):
226 } 251 }
227 } 252 }
253 log.Infof("worker: start %q", id)
228 worker, err := start() 254 worker, err := start()
229 if err == nil { 255 if err == nil {
230 runner.startedc <- startInfo{id, worker} 256 runner.startedc <- startInfo{id, worker}
231 err = worker.Wait() 257 err = worker.Wait()
232 } 258 }
233 runner.donec <- doneInfo{id, err} 259 runner.donec <- doneInfo{id, err}
234 } 260 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | worker/runner_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b