Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1075)

Side by Side Diff: worker/peergrouper/worker.go

Issue 68990043: System SSH key upgrader (Closed)
Patch Set: System SSH key upgrader Created 11 years, 1 month ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « worker/peergrouper/shim.go ('k') | worker/peergrouper/worker_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details.
3
4 package peergrouper
5
6 import (
7 "fmt"
8 "sync"
9 "time"
10
11 "launchpad.net/tomb"
12
13 "launchpad.net/juju-core/errors"
14 "launchpad.net/juju-core/replicaset"
15 "launchpad.net/juju-core/state"
16 "launchpad.net/juju-core/worker"
17 )
18
19 type stateInterface interface {
20 Machine(id string) (stateMachine, error)
21 WatchStateServerInfo() state.NotifyWatcher
22 StateServerInfo() (*state.StateServerInfo, error)
23 MongoSession() mongoSession
24 }
25
26 type stateMachine interface {
27 Id() string
28 Refresh() error
29 Watch() state.NotifyWatcher
30 WantsVote() bool
31 HasVote() bool
32 SetHasVote(hasVote bool) error
33 StateHostPort() string
34 }
35
36 type mongoSession interface {
37 CurrentStatus() (*replicaset.Status, error)
38 CurrentMembers() ([]replicaset.Member, error)
39 Set([]replicaset.Member) error
40 }
41
42 // notifyFunc holds a function that is sent
43 // to the main worker loop to fetch new information
44 // when something changes. It reports whether
45 // the information has actually changed (and by implication
46 // whether the replica set may need to be changed).
47 type notifyFunc func() (bool, error)
48
49 var (
50 // If we fail to set the mongo replica set members,
51 // we retry at the following interval until we succeed.
52 retryInterval = 2 * time.Second
53
54 // pollInterval holds the interval at which the replica set
55 // members will be updated even in the absence of changes
56 // to State. This enables us to make changes to members
57 // that are triggered by changes to member status.
58 //
59 // 10 seconds is the default time interval used by
60 // mongo to keep its replicas up to date.
61 pollInterval = 10 * time.Second
62 )
63
64 // pgWorker holds all the mutable state that we are watching.
65 // The only goroutine that is allowed to modify this
66 // is worker.loop - other watchers modify the
67 // current state by calling worker.notify instead of
68 // modifying it directly.
69 type pgWorker struct {
70 tomb tomb.Tomb
71
72 // wg represents all the currently running goroutines.
73 // The worker main loop waits for all of these to exit
74 // before finishing.
75 wg sync.WaitGroup
76
77 // st represents the State. It is an interface for testing
78 // purposes only.
79 st stateInterface
80
81 // When something changes that might might affect
82 // the peer group membership, it sends a function
83 // on notifyCh that is run inside the main worker
84 // goroutine to mutate the state. It reports whether
85 // the state has actually changed.
86 notifyCh chan notifyFunc
87
88 // machines holds the set of machines we are currently
89 // watching (all the state server machines). Each one has an
90 // associated goroutine that
91 // watches attributes of that machine.
92 machines map[string]*machine
93 }
94
95 // New returns a new worker that maintains the mongo replica set
96 // with respect to the given state.
97 func New(st *state.State) (worker.Worker, error) {
98 cfg, err := st.EnvironConfig()
99 if err != nil {
100 return nil, err
101 }
102 return newWorker(&stateShim{
103 State: st,
104 mongoPort: cfg.StatePort(),
105 }), nil
106 }
107
108 func newWorker(st stateInterface) worker.Worker {
109 w := &pgWorker{
110 st: st,
111 notifyCh: make(chan notifyFunc),
112 machines: make(map[string]*machine),
113 }
114 go func() {
115 defer w.tomb.Done()
116 if err := w.loop(); err != nil {
117 logger.Errorf("peergrouper loop terminated: %v", err)
118 w.tomb.Kill(err)
119 }
120 // Wait for the various goroutines to be killed.
121 // N.B. we don't defer this call because
122 // if we do and a bug causes a panic, Wait will deadlock
123 // waiting for the unkilled goroutines to exit.
124 w.wg.Wait()
125 }()
126 return w
127 }
128
129 func (w *pgWorker) Kill() {
130 w.tomb.Kill(nil)
131 }
132
133 func (w *pgWorker) Wait() error {
134 return w.tomb.Wait()
135 }
136
137 func (w *pgWorker) loop() error {
138 infow := w.watchStateServerInfo()
139 defer infow.stop()
140
141 retry := time.NewTimer(0)
142 retry.Stop()
143 for {
144 select {
145 case f := <-w.notifyCh:
146 // Update our current view of the state of affairs.
147 changed, err := f()
148 if err != nil {
149 return err
150 }
151 if !changed {
152 break
153 }
154 // Try to update the replica set immediately.
155 retry.Reset(0)
156 case <-retry.C:
157 if err := w.updateReplicaset(); err != nil {
158 if _, isReplicaSetError := err.(*replicaSetError ); !isReplicaSetError {
159 return err
160 }
161 logger.Errorf("cannot set replicaset: %v", err)
162 retry.Reset(retryInterval)
163 break
164 }
165
166 // Update the replica set members occasionally
167 // to keep them up to date with the current
168 // replica set member statuses.
169 retry.Reset(pollInterval)
170 case <-w.tomb.Dying():
171 return tomb.ErrDying
172 }
173 }
174 }
175
176 // notify sends the given notification function to
177 // the worker main loop to be executed.
178 func (w *pgWorker) notify(f notifyFunc) bool {
179 select {
180 case w.notifyCh <- f:
181 return true
182 case <-w.tomb.Dying():
183 return false
184 }
185 }
186
187 // getPeerGroupInfo collates current session information about the
188 // mongo peer group with information from state machines.
189 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) {
190 session := w.st.MongoSession()
191 info := &peerGroupInfo{}
192 var err error
193 status, err := session.CurrentStatus()
194 if err != nil {
195 return nil, fmt.Errorf("cannot get replica set status: %v", err)
196 }
197 info.statuses = status.Members
198 info.members, err = session.CurrentMembers()
199 if err != nil {
200 return nil, fmt.Errorf("cannot get replica set members: %v", err )
201 }
202 info.machines = w.machines
203 return info, nil
204 }
205
206 // replicaSetError holds an error returned as a result
207 // of calling replicaset.Set. As this is expected to fail
208 // in the normal course of things, it needs special treatment.
209 type replicaSetError struct {
210 error
211 }
212
213 // updateReplicaset sets the current replica set members, and applies the
214 // given voting status to machines in the state.
215 func (w *pgWorker) updateReplicaset() error {
216 info, err := w.peerGroupInfo()
217 if err != nil {
218 return err
219 }
220 members, voting, err := desiredPeerGroup(info)
221 if err != nil {
222 return fmt.Errorf("cannot compute desired peer group: %v", err)
223 }
224 if members == nil {
225 logger.Debugf("no change in desired peer group")
226 return nil
227 }
228 logger.Debugf("desired peer group members: %#v", members)
229 // We cannot change the HasVote flag of a machine in state at exactly
230 // the same moment as changing its voting status in the replica set.
231 //
232 // Thus we need to be careful that a machine which is actually a voting
233 // member is not seen to not have a vote, because otherwise
234 // there is nothing to prevent the machine being removed.
235 //
236 // To avoid this happening, we make sure when we call SetReplicaSet,
237 // that the voting status of machines is the union of both old
238 // and new voting machines - that is the set of HasVote machines
239 // is a superset of all the actual voting machines.
240 //
241 // Only after the call has taken place do we reset the voting status
242 // of the machines that have lost their vote.
243 //
244 // If there's a crash, the voting status may not reflect the
245 // actual voting status for a while, but when things come
246 // back on line, it will be sorted out, as desiredReplicaSet
247 // will return the actual voting status.
248
249 var added, removed []*machine
250 for m, hasVote := range voting {
251 switch {
252 case hasVote && !m.stm.HasVote():
253 added = append(added, m)
254 case !hasVote && m.stm.HasVote():
255 removed = append(removed, m)
256 }
257 }
258 if err := setHasVote(added, true); err != nil {
259 return err
260 }
261 if err := w.st.MongoSession().Set(members); err != nil {
262 // We've failed to set the replica set, so revert back
263 // to the previous settings.
264 if err1 := setHasVote(added, false); err1 != nil {
265 logger.Errorf("cannot revert machine voting after failur e to change replica set: %v", err1)
266 }
267 return &replicaSetError{err}
268 }
269 logger.Infof("successfully changed replica set to %#v", members)
270 if err := setHasVote(removed, false); err != nil {
271 return err
272 }
273 return nil
274 }
275
276 // start runs the given loop function until it returns.
277 // When it returns, the receiving pgWorker is killed with
278 // the returned error.
279 func (w *pgWorker) start(loop func() error) {
280 w.wg.Add(1)
281 go func() {
282 defer w.wg.Done()
283 if err := loop(); err != nil {
284 w.tomb.Kill(err)
285 }
286 }()
287 }
288
289 // setHasVote sets the HasVote status of all the given
290 // machines to hasVote.
291 func setHasVote(ms []*machine, hasVote bool) error {
292
293 for _, m := range ms {
294 if err := m.stm.SetHasVote(hasVote); err != nil {
295 return fmt.Errorf("cannot set voting status of %q to %v: %v", m.id, hasVote, err)
296 }
297 }
298 return nil
299 }
300
301 // serverInfoWatcher watches the state server info and
302 // notifies the worker when it changes.
303 type serverInfoWatcher struct {
304 worker *pgWorker
305 watcher state.NotifyWatcher
306 }
307
308 func (w *pgWorker) watchStateServerInfo() *serverInfoWatcher {
309 infow := &serverInfoWatcher{
310 worker: w,
311 watcher: w.st.WatchStateServerInfo(),
312 }
313 w.start(infow.loop)
314 return infow
315 }
316
317 func (infow *serverInfoWatcher) loop() error {
318 for {
319 select {
320 case _, ok := <-infow.watcher.Changes():
321 if !ok {
322 return infow.watcher.Err()
323 }
324 infow.worker.notify(infow.updateMachines)
325 case <-infow.worker.tomb.Dying():
326 return tomb.ErrDying
327 }
328 }
329 }
330
331 func (infow *serverInfoWatcher) stop() {
332 infow.watcher.Stop()
333 }
334
335 // updateMachines is a notifyFunc that updates the current
336 // machines when the state server info has changed.
337 func (infow *serverInfoWatcher) updateMachines() (bool, error) {
338 info, err := infow.worker.st.StateServerInfo()
339 if err != nil {
340 return false, fmt.Errorf("cannot get state server info: %v", err )
341 }
342 changed := false
343 // Stop machine goroutines that no longer correspond to state server
344 // machines.
345 for _, m := range infow.worker.machines {
346 if !inStrings(m.id, info.MachineIds) {
347 m.stop()
348 delete(infow.worker.machines, m.id)
349 changed = true
350 }
351 }
352 // Start machines with no watcher
353 for _, id := range info.MachineIds {
354 if _, ok := infow.worker.machines[id]; ok {
355 continue
356 }
357 logger.Debugf("found new machine %q", id)
358 stm, err := infow.worker.st.Machine(id)
359 if err != nil {
360 if errors.IsNotFoundError(err) {
361 // If the machine isn't found, it must have been
362 // removed and will soon enough be removed
363 // from the state server list. This will probabl y
364 // never happen, but we'll code defensively anyw ay.
365 logger.Warningf("machine %q from state server li st not found", id)
366 continue
367 }
368 return false, fmt.Errorf("cannot get machine %q: %v", id , err)
369 }
370 infow.worker.machines[id] = infow.worker.newMachine(stm)
371 changed = true
372 }
373 return changed, nil
374 }
375
376 // machine represents a machine in State.
377 type machine struct {
378 id string
379 wantsVote bool
380 hostPort string
381
382 worker *pgWorker
383 stm stateMachine
384 machineWatcher state.NotifyWatcher
385 }
386
387 func (m *machine) String() string {
388 return m.id
389 }
390
391 func (m *machine) GoString() string {
392 return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort : %q}", m.id, m.wantsVote, m.hostPort)
393 }
394
395 func (w *pgWorker) newMachine(stm stateMachine) *machine {
396 m := &machine{
397 worker: w,
398 id: stm.Id(),
399 stm: stm,
400 hostPort: stm.StateHostPort(),
401 wantsVote: stm.WantsVote(),
402 machineWatcher: stm.Watch(),
403 }
404 w.start(m.loop)
405 return m
406 }
407
408 func (m *machine) loop() error {
409 for {
410 select {
411 case _, ok := <-m.machineWatcher.Changes():
412 if !ok {
413 return m.machineWatcher.Err()
414 }
415 m.worker.notify(m.refresh)
416 case <-m.worker.tomb.Dying():
417 return nil
418 }
419 }
420 }
421
422 func (m *machine) stop() {
423 m.machineWatcher.Stop()
424 }
425
426 func (m *machine) refresh() (bool, error) {
427 if err := m.stm.Refresh(); err != nil {
428 if errors.IsNotFoundError(err) {
429 // We want to be robust when the machine
430 // state is out of date with respect to the
431 // state server info, so if the machine
432 // has been removed, just assume that
433 // no change has happened - the machine
434 // loop will be stopped very soon anyway.
435 return false, nil
436 }
437 return false, err
438 }
439 changed := false
440 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote {
441 m.wantsVote = wantsVote
442 changed = true
443 }
444 if hostPort := m.stm.StateHostPort(); hostPort != m.hostPort {
445 m.hostPort = hostPort
446 changed = true
447 }
448 return changed, nil
449 }
450
451 func inStrings(t string, ss []string) bool {
452 for _, s := range ss {
453 if s == t {
454 return true
455 }
456 }
457 return false
458 }
OLDNEW
« no previous file with comments | « worker/peergrouper/shim.go ('k') | worker/peergrouper/worker_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b