OLD | NEW |
1 // Copyright 2014 Canonical Ltd. | 1 // Copyright 2014 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package peergrouper | 4 package peergrouper |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "sync" | 8 "sync" |
9 "time" | 9 "time" |
10 | 10 |
11 "launchpad.net/tomb" | 11 "launchpad.net/tomb" |
12 | 12 |
13 "launchpad.net/juju-core/errors" | 13 "launchpad.net/juju-core/errors" |
| 14 "launchpad.net/juju-core/instance" |
14 "launchpad.net/juju-core/replicaset" | 15 "launchpad.net/juju-core/replicaset" |
15 "launchpad.net/juju-core/state" | 16 "launchpad.net/juju-core/state" |
16 "launchpad.net/juju-core/worker" | 17 "launchpad.net/juju-core/worker" |
17 ) | 18 ) |
18 | 19 |
19 type stateInterface interface { | 20 type stateInterface interface { |
20 Machine(id string) (stateMachine, error) | 21 Machine(id string) (stateMachine, error) |
21 WatchStateServerInfo() state.NotifyWatcher | 22 WatchStateServerInfo() state.NotifyWatcher |
22 StateServerInfo() (*state.StateServerInfo, error) | 23 StateServerInfo() (*state.StateServerInfo, error) |
23 MongoSession() mongoSession | 24 MongoSession() mongoSession |
24 } | 25 } |
25 | 26 |
26 type stateMachine interface { | 27 type stateMachine interface { |
27 Id() string | 28 Id() string |
28 Refresh() error | 29 Refresh() error |
29 Watch() state.NotifyWatcher | 30 Watch() state.NotifyWatcher |
30 WantsVote() bool | 31 WantsVote() bool |
31 HasVote() bool | 32 HasVote() bool |
32 SetHasVote(hasVote bool) error | 33 SetHasVote(hasVote bool) error |
33 » StateHostPort() string | 34 » APIHostPorts() []instance.HostPort |
| 35 » MongoHostPorts() []instance.HostPort |
34 } | 36 } |
35 | 37 |
36 type mongoSession interface { | 38 type mongoSession interface { |
37 CurrentStatus() (*replicaset.Status, error) | 39 CurrentStatus() (*replicaset.Status, error) |
38 CurrentMembers() ([]replicaset.Member, error) | 40 CurrentMembers() ([]replicaset.Member, error) |
39 Set([]replicaset.Member) error | 41 Set([]replicaset.Member) error |
40 } | 42 } |
41 | 43 |
| 44 type publisherInterface interface { |
| 45 // publish publishes information about the given state servers |
| 46 // to whomsoever it may concern. When it is called there |
| 47 // is no guarantee that any of the information has actually changed. |
| 48 publishAPIServers(apiServers [][]instance.HostPort) error |
| 49 } |
| 50 |
42 // notifyFunc holds a function that is sent | 51 // notifyFunc holds a function that is sent |
43 // to the main worker loop to fetch new information | 52 // to the main worker loop to fetch new information |
44 // when something changes. It reports whether | 53 // when something changes. It reports whether |
45 // the information has actually changed (and by implication | 54 // the information has actually changed (and by implication |
46 // whether the replica set may need to be changed). | 55 // whether the replica set may need to be changed). |
47 type notifyFunc func() (bool, error) | 56 type notifyFunc func() (changed bool, err error) |
48 | 57 |
49 var ( | 58 var ( |
50 // If we fail to set the mongo replica set members, | 59 // If we fail to set the mongo replica set members, |
51 // we retry at the following interval until we succeed. | 60 // we retry at the following interval until we succeed. |
52 retryInterval = 2 * time.Second | 61 retryInterval = 2 * time.Second |
53 | 62 |
54 // pollInterval holds the interval at which the replica set | 63 // pollInterval holds the interval at which the replica set |
55 // members will be updated even in the absence of changes | 64 // members will be updated even in the absence of changes |
56 // to State. This enables us to make changes to members | 65 // to State. This enables us to make changes to members |
57 // that are triggered by changes to member status. | 66 // that are triggered by changes to member status. |
58 // | 67 // |
59 // 10 seconds is the default time interval used by | 68 // 10 seconds is the default time interval used by |
60 // mongo to keep its replicas up to date. | 69 // mongo to keep its replicas up to date. |
61 pollInterval = 10 * time.Second | 70 pollInterval = 10 * time.Second |
62 ) | 71 ) |
63 | 72 |
64 // pgWorker holds all the mutable state that we are watching. | 73 // pgWorker holds all the mutable state that we are watching. |
65 // The only goroutine that is allowed to modify this | 74 // The only goroutine that is allowed to modify this |
66 // is worker.loop - other watchers modify the | 75 // is worker.loop - other watchers modify the |
67 // current state by calling worker.notify instead of | 76 // current state by calling worker.notify instead of |
68 // modifying it directly. | 77 // modifying it directly. |
69 type pgWorker struct { | 78 type pgWorker struct { |
70 tomb tomb.Tomb | 79 tomb tomb.Tomb |
71 | 80 |
72 // wg represents all the currently running goroutines. | 81 // wg represents all the currently running goroutines. |
73 // The worker main loop waits for all of these to exit | 82 // The worker main loop waits for all of these to exit |
74 // before finishing. | 83 // before finishing. |
75 wg sync.WaitGroup | 84 wg sync.WaitGroup |
76 | 85 |
77 » // st represents the State. It is an interface for testing | 86 » // st represents the State. It is an interface so we can swap |
78 » // purposes only. | 87 » // out the implementation during testing. |
79 st stateInterface | 88 st stateInterface |
80 | 89 |
81 » // When something changes that might might affect | 90 » // When something changes that might affect |
82 // the peer group membership, it sends a function | 91 // the peer group membership, it sends a function |
83 // on notifyCh that is run inside the main worker | 92 // on notifyCh that is run inside the main worker |
84 // goroutine to mutate the state. It reports whether | 93 // goroutine to mutate the state. It reports whether |
85 // the state has actually changed. | 94 // the state has actually changed. |
86 notifyCh chan notifyFunc | 95 notifyCh chan notifyFunc |
87 | 96 |
88 // machines holds the set of machines we are currently | 97 // machines holds the set of machines we are currently |
89 // watching (all the state server machines). Each one has an | 98 // watching (all the state server machines). Each one has an |
90 // associated goroutine that | 99 // associated goroutine that |
91 // watches attributes of that machine. | 100 // watches attributes of that machine. |
92 machines map[string]*machine | 101 machines map[string]*machine |
| 102 |
| 103 // publisher holds the implementation of the API |
| 104 // address publisher. |
| 105 publisher publisherInterface |
93 } | 106 } |
94 | 107 |
95 // New returns a new worker that maintains the mongo replica set | 108 // New returns a new worker that maintains the mongo replica set |
96 // with respect to the given state. | 109 // with respect to the given state. |
97 func New(st *state.State) (worker.Worker, error) { | 110 func New(st *state.State) (worker.Worker, error) { |
98 cfg, err := st.EnvironConfig() | 111 cfg, err := st.EnvironConfig() |
99 if err != nil { | 112 if err != nil { |
100 return nil, err | 113 return nil, err |
101 } | 114 } |
102 return newWorker(&stateShim{ | 115 return newWorker(&stateShim{ |
103 State: st, | 116 State: st, |
104 mongoPort: cfg.StatePort(), | 117 mongoPort: cfg.StatePort(), |
105 » }), nil | 118 » » apiPort: cfg.APIPort(), |
| 119 » }, newPublisher(st)), nil |
106 } | 120 } |
107 | 121 |
108 func newWorker(st stateInterface) worker.Worker { | 122 func newWorker(st stateInterface, pub publisherInterface) worker.Worker { |
109 w := &pgWorker{ | 123 w := &pgWorker{ |
110 » » st: st, | 124 » » st: st, |
111 » » notifyCh: make(chan notifyFunc), | 125 » » notifyCh: make(chan notifyFunc), |
112 » » machines: make(map[string]*machine), | 126 » » machines: make(map[string]*machine), |
| 127 » » publisher: pub, |
113 } | 128 } |
| 129 logger.Infof("worker starting") |
114 go func() { | 130 go func() { |
115 defer w.tomb.Done() | 131 defer w.tomb.Done() |
116 if err := w.loop(); err != nil { | 132 if err := w.loop(); err != nil { |
117 logger.Errorf("peergrouper loop terminated: %v", err) | 133 logger.Errorf("peergrouper loop terminated: %v", err) |
118 w.tomb.Kill(err) | 134 w.tomb.Kill(err) |
119 } | 135 } |
120 // Wait for the various goroutines to be killed. | 136 // Wait for the various goroutines to be killed. |
121 // N.B. we don't defer this call because | 137 // N.B. we don't defer this call because |
122 // if we do and a bug causes a panic, Wait will deadlock | 138 // if we do and a bug causes a panic, Wait will deadlock |
123 // waiting for the unkilled goroutines to exit. | 139 // waiting for the unkilled goroutines to exit. |
(...skipping 23 matching lines...) Expand all Loading... |
147 changed, err := f() | 163 changed, err := f() |
148 if err != nil { | 164 if err != nil { |
149 return err | 165 return err |
150 } | 166 } |
151 if !changed { | 167 if !changed { |
152 break | 168 break |
153 } | 169 } |
154 // Try to update the replica set immediately. | 170 // Try to update the replica set immediately. |
155 retry.Reset(0) | 171 retry.Reset(0) |
156 case <-retry.C: | 172 case <-retry.C: |
| 173 ok := true |
| 174 if err := w.publisher.publishAPIServers(w.apiHostPorts()
); err != nil { |
| 175 logger.Errorf("cannot publish state server addre
sses: %v", err) |
| 176 ok = false |
| 177 } |
157 if err := w.updateReplicaset(); err != nil { | 178 if err := w.updateReplicaset(); err != nil { |
158 if _, isReplicaSetError := err.(*replicaSetError
); !isReplicaSetError { | 179 if _, isReplicaSetError := err.(*replicaSetError
); !isReplicaSetError { |
159 return err | 180 return err |
160 } | 181 } |
161 logger.Errorf("cannot set replicaset: %v", err) | 182 logger.Errorf("cannot set replicaset: %v", err) |
| 183 ok = false |
| 184 } |
| 185 if ok { |
| 186 logger.Infof("polling after %v", pollInterval) |
| 187 // Update the replica set members occasionally |
| 188 // to keep them up to date with the current |
| 189 // replica set member statuses. |
| 190 retry.Reset(pollInterval) |
| 191 } else { |
| 192 logger.Infof("retrying after %v", retryInterval) |
162 retry.Reset(retryInterval) | 193 retry.Reset(retryInterval) |
163 break | |
164 } | 194 } |
165 | 195 |
166 // Update the replica set members occasionally | |
167 // to keep them up to date with the current | |
168 // replica set member statuses. | |
169 retry.Reset(pollInterval) | |
170 case <-w.tomb.Dying(): | 196 case <-w.tomb.Dying(): |
171 return tomb.ErrDying | 197 return tomb.ErrDying |
172 } | 198 } |
173 } | 199 } |
174 } | 200 } |
175 | 201 |
| 202 func (w *pgWorker) apiHostPorts() [][]instance.HostPort { |
| 203 servers := make([][]instance.HostPort, 0, len(w.machines)) |
| 204 for _, m := range w.machines { |
| 205 if len(m.apiHostPorts) > 0 { |
| 206 servers = append(servers, m.apiHostPorts) |
| 207 } |
| 208 } |
| 209 return servers |
| 210 } |
| 211 |
176 // notify sends the given notification function to | 212 // notify sends the given notification function to |
177 // the worker main loop to be executed. | 213 // the worker main loop to be executed. |
178 func (w *pgWorker) notify(f notifyFunc) bool { | 214 func (w *pgWorker) notify(f notifyFunc) bool { |
179 select { | 215 select { |
180 case w.notifyCh <- f: | 216 case w.notifyCh <- f: |
181 return true | 217 return true |
182 case <-w.tomb.Dying(): | 218 case <-w.tomb.Dying(): |
183 return false | 219 return false |
184 } | 220 } |
185 } | 221 } |
186 | 222 |
187 // getPeerGroupInfo collates current session information about the | 223 // peerGroupInfo collates current session information about the |
188 // mongo peer group with information from state machines. | 224 // mongo peer group with information from state machines. |
189 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) { | 225 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) { |
190 session := w.st.MongoSession() | 226 session := w.st.MongoSession() |
191 info := &peerGroupInfo{} | 227 info := &peerGroupInfo{} |
192 var err error | 228 var err error |
193 status, err := session.CurrentStatus() | 229 status, err := session.CurrentStatus() |
194 if err != nil { | 230 if err != nil { |
195 return nil, fmt.Errorf("cannot get replica set status: %v", err) | 231 return nil, fmt.Errorf("cannot get replica set status: %v", err) |
196 } | 232 } |
197 info.statuses = status.Members | 233 info.statuses = status.Members |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
282 defer w.wg.Done() | 318 defer w.wg.Done() |
283 if err := loop(); err != nil { | 319 if err := loop(); err != nil { |
284 w.tomb.Kill(err) | 320 w.tomb.Kill(err) |
285 } | 321 } |
286 }() | 322 }() |
287 } | 323 } |
288 | 324 |
289 // setHasVote sets the HasVote status of all the given | 325 // setHasVote sets the HasVote status of all the given |
290 // machines to hasVote. | 326 // machines to hasVote. |
291 func setHasVote(ms []*machine, hasVote bool) error { | 327 func setHasVote(ms []*machine, hasVote bool) error { |
292 | |
293 for _, m := range ms { | 328 for _, m := range ms { |
294 if err := m.stm.SetHasVote(hasVote); err != nil { | 329 if err := m.stm.SetHasVote(hasVote); err != nil { |
295 return fmt.Errorf("cannot set voting status of %q to %v:
%v", m.id, hasVote, err) | 330 return fmt.Errorf("cannot set voting status of %q to %v:
%v", m.id, hasVote, err) |
296 } | 331 } |
297 } | 332 } |
298 return nil | 333 return nil |
299 } | 334 } |
300 | 335 |
301 // serverInfoWatcher watches the state server info and | 336 // serverInfoWatcher watches the state server info and |
302 // notifies the worker when it changes. | 337 // notifies the worker when it changes. |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
368 return false, fmt.Errorf("cannot get machine %q: %v", id
, err) | 403 return false, fmt.Errorf("cannot get machine %q: %v", id
, err) |
369 } | 404 } |
370 infow.worker.machines[id] = infow.worker.newMachine(stm) | 405 infow.worker.machines[id] = infow.worker.newMachine(stm) |
371 changed = true | 406 changed = true |
372 } | 407 } |
373 return changed, nil | 408 return changed, nil |
374 } | 409 } |
375 | 410 |
376 // machine represents a machine in State. | 411 // machine represents a machine in State. |
377 type machine struct { | 412 type machine struct { |
378 » id string | 413 » id string |
379 » wantsVote bool | 414 » wantsVote bool |
380 » hostPort string | 415 » apiHostPorts []instance.HostPort |
| 416 » mongoHostPorts []instance.HostPort |
381 | 417 |
382 worker *pgWorker | 418 worker *pgWorker |
383 stm stateMachine | 419 stm stateMachine |
384 machineWatcher state.NotifyWatcher | 420 machineWatcher state.NotifyWatcher |
385 } | 421 } |
386 | 422 |
| 423 func (m *machine) mongoHostPort() string { |
| 424 return instance.SelectInternalHostPort(m.mongoHostPorts, false) |
| 425 } |
| 426 |
387 func (m *machine) String() string { | 427 func (m *machine) String() string { |
388 return m.id | 428 return m.id |
389 } | 429 } |
390 | 430 |
391 func (m *machine) GoString() string { | 431 func (m *machine) GoString() string { |
392 » return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort
: %q}", m.id, m.wantsVote, m.hostPort) | 432 » return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort
: %q}", m.id, m.wantsVote, m.mongoHostPort()) |
393 } | 433 } |
394 | 434 |
395 func (w *pgWorker) newMachine(stm stateMachine) *machine { | 435 func (w *pgWorker) newMachine(stm stateMachine) *machine { |
396 m := &machine{ | 436 m := &machine{ |
397 worker: w, | 437 worker: w, |
398 id: stm.Id(), | 438 id: stm.Id(), |
399 stm: stm, | 439 stm: stm, |
400 » » hostPort: stm.StateHostPort(), | 440 » » apiHostPorts: stm.APIHostPorts(), |
| 441 » » mongoHostPorts: stm.MongoHostPorts(), |
401 wantsVote: stm.WantsVote(), | 442 wantsVote: stm.WantsVote(), |
402 machineWatcher: stm.Watch(), | 443 machineWatcher: stm.Watch(), |
403 } | 444 } |
404 w.start(m.loop) | 445 w.start(m.loop) |
405 return m | 446 return m |
406 } | 447 } |
407 | 448 |
408 func (m *machine) loop() error { | 449 func (m *machine) loop() error { |
409 for { | 450 for { |
410 select { | 451 select { |
(...skipping 23 matching lines...) Expand all Loading... |
434 // loop will be stopped very soon anyway. | 475 // loop will be stopped very soon anyway. |
435 return false, nil | 476 return false, nil |
436 } | 477 } |
437 return false, err | 478 return false, err |
438 } | 479 } |
439 changed := false | 480 changed := false |
440 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote { | 481 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote { |
441 m.wantsVote = wantsVote | 482 m.wantsVote = wantsVote |
442 changed = true | 483 changed = true |
443 } | 484 } |
444 » if hostPort := m.stm.StateHostPort(); hostPort != m.hostPort { | 485 » if hps := m.stm.MongoHostPorts(); !hostPortsEqual(hps, m.mongoHostPorts)
{ |
445 » » m.hostPort = hostPort | 486 » » m.mongoHostPorts = hps |
| 487 » » changed = true |
| 488 » } |
| 489 » if hps := m.stm.APIHostPorts(); !hostPortsEqual(hps, m.apiHostPorts) { |
| 490 » » m.apiHostPorts = hps |
446 changed = true | 491 changed = true |
447 } | 492 } |
448 return changed, nil | 493 return changed, nil |
449 } | 494 } |
450 | 495 |
| 496 func hostPortsEqual(hps1, hps2 []instance.HostPort) bool { |
| 497 if len(hps1) != len(hps2) { |
| 498 return false |
| 499 } |
| 500 for i := range hps1 { |
| 501 |
| 502 if hps1[i] != hps2[i] { |
| 503 return false |
| 504 } |
| 505 } |
| 506 return true |
| 507 } |
| 508 |
451 func inStrings(t string, ss []string) bool { | 509 func inStrings(t string, ss []string) bool { |
452 for _, s := range ss { | 510 for _, s := range ss { |
453 if s == t { | 511 if s == t { |
454 return true | 512 return true |
455 } | 513 } |
456 } | 514 } |
457 return false | 515 return false |
458 } | 516 } |
OLD | NEW |