Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(497)

Side by Side Diff: worker/peergrouper/worker.go

Issue 77600048: worker/peergrouper: publish API addresses
Patch Set: Created 11 years ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « worker/peergrouper/shim.go ('k') | worker/peergrouper/worker_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 Canonical Ltd. 1 // Copyright 2014 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package peergrouper 4 package peergrouper
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "sync" 8 "sync"
9 "time" 9 "time"
10 10
11 "launchpad.net/tomb" 11 "launchpad.net/tomb"
12 12
13 "launchpad.net/juju-core/errors" 13 "launchpad.net/juju-core/errors"
14 "launchpad.net/juju-core/instance"
14 "launchpad.net/juju-core/replicaset" 15 "launchpad.net/juju-core/replicaset"
15 "launchpad.net/juju-core/state" 16 "launchpad.net/juju-core/state"
16 "launchpad.net/juju-core/worker" 17 "launchpad.net/juju-core/worker"
17 ) 18 )
18 19
19 type stateInterface interface { 20 type stateInterface interface {
20 Machine(id string) (stateMachine, error) 21 Machine(id string) (stateMachine, error)
21 WatchStateServerInfo() state.NotifyWatcher 22 WatchStateServerInfo() state.NotifyWatcher
22 StateServerInfo() (*state.StateServerInfo, error) 23 StateServerInfo() (*state.StateServerInfo, error)
23 MongoSession() mongoSession 24 MongoSession() mongoSession
24 } 25 }
25 26
26 type stateMachine interface { 27 type stateMachine interface {
27 Id() string 28 Id() string
28 Refresh() error 29 Refresh() error
29 Watch() state.NotifyWatcher 30 Watch() state.NotifyWatcher
30 WantsVote() bool 31 WantsVote() bool
31 HasVote() bool 32 HasVote() bool
32 SetHasVote(hasVote bool) error 33 SetHasVote(hasVote bool) error
33 » StateHostPort() string 34 » APIHostPorts() []instance.HostPort
35 » MongoHostPorts() []instance.HostPort
34 } 36 }
35 37
36 type mongoSession interface { 38 type mongoSession interface {
37 CurrentStatus() (*replicaset.Status, error) 39 CurrentStatus() (*replicaset.Status, error)
38 CurrentMembers() ([]replicaset.Member, error) 40 CurrentMembers() ([]replicaset.Member, error)
39 Set([]replicaset.Member) error 41 Set([]replicaset.Member) error
40 } 42 }
41 43
44 type publisherInterface interface {
45 // publish publishes information about the given state servers
mfoord 2014/03/19 17:36:41 Should be"publishAPIServers publishes..."
46 // to whomsoever it may concern. When it is called there
47 // is no guarantee that any of the information has actually changed.
48 publishAPIServers(apiServers [][]instance.HostPort) error
49 }
50
42 // notifyFunc holds a function that is sent 51 // notifyFunc holds a function that is sent
43 // to the main worker loop to fetch new information 52 // to the main worker loop to fetch new information
44 // when something changes. It reports whether 53 // when something changes. It reports whether
45 // the information has actually changed (and by implication 54 // the information has actually changed (and by implication
46 // whether the replica set may need to be changed). 55 // whether the replica set may need to be changed).
47 type notifyFunc func() (bool, error) 56 type notifyFunc func() (changed bool, err error)
48 57
49 var ( 58 var (
50 // If we fail to set the mongo replica set members, 59 // If we fail to set the mongo replica set members,
51 // we retry at the following interval until we succeed. 60 // we retry at the following interval until we succeed.
52 retryInterval = 2 * time.Second 61 retryInterval = 2 * time.Second
53 62
54 // pollInterval holds the interval at which the replica set 63 // pollInterval holds the interval at which the replica set
55 // members will be updated even in the absence of changes 64 // members will be updated even in the absence of changes
56 // to State. This enables us to make changes to members 65 // to State. This enables us to make changes to members
57 // that are triggered by changes to member status. 66 // that are triggered by changes to member status.
58 // 67 //
59 // 10 seconds is the default time interval used by 68 // 10 seconds is the default time interval used by
60 // mongo to keep its replicas up to date. 69 // mongo to keep its replicas up to date.
61 pollInterval = 10 * time.Second 70 pollInterval = 10 * time.Second
62 ) 71 )
63 72
64 // pgWorker holds all the mutable state that we are watching. 73 // pgWorker holds all the mutable state that we are watching.
65 // The only goroutine that is allowed to modify this 74 // The only goroutine that is allowed to modify this
66 // is worker.loop - other watchers modify the 75 // is worker.loop - other watchers modify the
67 // current state by calling worker.notify instead of 76 // current state by calling worker.notify instead of
68 // modifying it directly. 77 // modifying it directly.
69 type pgWorker struct { 78 type pgWorker struct {
70 tomb tomb.Tomb 79 tomb tomb.Tomb
71 80
72 // wg represents all the currently running goroutines. 81 // wg represents all the currently running goroutines.
73 // The worker main loop waits for all of these to exit 82 // The worker main loop waits for all of these to exit
74 // before finishing. 83 // before finishing.
75 wg sync.WaitGroup 84 wg sync.WaitGroup
76 85
77 » // st represents the State. It is an interface for testing 86 » // st represents the State. It is an interface so we can swap
78 » // purposes only. 87 » // out the implementation during testing.
79 st stateInterface 88 st stateInterface
80 89
81 » // When something changes that might might affect 90 » // When something changes that might affect
82 // the peer group membership, it sends a function 91 // the peer group membership, it sends a function
83 // on notifyCh that is run inside the main worker 92 // on notifyCh that is run inside the main worker
84 // goroutine to mutate the state. It reports whether 93 // goroutine to mutate the state. It reports whether
85 // the state has actually changed. 94 // the state has actually changed.
86 notifyCh chan notifyFunc 95 notifyCh chan notifyFunc
87 96
88 // machines holds the set of machines we are currently 97 // machines holds the set of machines we are currently
89 // watching (all the state server machines). Each one has an 98 // watching (all the state server machines). Each one has an
90 // associated goroutine that 99 // associated goroutine that
91 // watches attributes of that machine. 100 // watches attributes of that machine.
92 machines map[string]*machine 101 machines map[string]*machine
102
103 // publisher holds the implementation of the API
104 // address publisher.
105 publisher publisherInterface
93 } 106 }
94 107
95 // New returns a new worker that maintains the mongo replica set 108 // New returns a new worker that maintains the mongo replica set
96 // with respect to the given state. 109 // with respect to the given state.
97 func New(st *state.State) (worker.Worker, error) { 110 func New(st *state.State) (worker.Worker, error) {
98 cfg, err := st.EnvironConfig() 111 cfg, err := st.EnvironConfig()
99 if err != nil { 112 if err != nil {
100 return nil, err 113 return nil, err
101 } 114 }
102 return newWorker(&stateShim{ 115 return newWorker(&stateShim{
103 State: st, 116 State: st,
104 mongoPort: cfg.StatePort(), 117 mongoPort: cfg.StatePort(),
105 » }), nil 118 » » apiPort: cfg.APIPort(),
119 » }, newPublisher(st)), nil
106 } 120 }
107 121
108 func newWorker(st stateInterface) worker.Worker { 122 func newWorker(st stateInterface, pub publisherInterface) worker.Worker {
109 w := &pgWorker{ 123 w := &pgWorker{
110 » » st: st, 124 » » st: st,
111 » » notifyCh: make(chan notifyFunc), 125 » » notifyCh: make(chan notifyFunc),
112 » » machines: make(map[string]*machine), 126 » » machines: make(map[string]*machine),
127 » » publisher: pub,
113 } 128 }
129 logger.Infof("worker starting")
114 go func() { 130 go func() {
115 defer w.tomb.Done() 131 defer w.tomb.Done()
116 if err := w.loop(); err != nil { 132 if err := w.loop(); err != nil {
117 logger.Errorf("peergrouper loop terminated: %v", err) 133 logger.Errorf("peergrouper loop terminated: %v", err)
118 w.tomb.Kill(err) 134 w.tomb.Kill(err)
119 } 135 }
120 // Wait for the various goroutines to be killed. 136 // Wait for the various goroutines to be killed.
121 // N.B. we don't defer this call because 137 // N.B. we don't defer this call because
122 // if we do and a bug causes a panic, Wait will deadlock 138 // if we do and a bug causes a panic, Wait will deadlock
123 // waiting for the unkilled goroutines to exit. 139 // waiting for the unkilled goroutines to exit.
(...skipping 23 matching lines...) Expand all
147 changed, err := f() 163 changed, err := f()
148 if err != nil { 164 if err != nil {
149 return err 165 return err
150 } 166 }
151 if !changed { 167 if !changed {
152 break 168 break
153 } 169 }
154 // Try to update the replica set immediately. 170 // Try to update the replica set immediately.
155 retry.Reset(0) 171 retry.Reset(0)
156 case <-retry.C: 172 case <-retry.C:
173 ok := true
174 if err := w.publisher.publishAPIServers(w.apiHostPorts() ); err != nil {
175 logger.Errorf("cannot publish state server addre sses: %v", err)
176 ok = false
177 }
157 if err := w.updateReplicaset(); err != nil { 178 if err := w.updateReplicaset(); err != nil {
158 if _, isReplicaSetError := err.(*replicaSetError ); !isReplicaSetError { 179 if _, isReplicaSetError := err.(*replicaSetError ); !isReplicaSetError {
159 return err 180 return err
160 } 181 }
161 logger.Errorf("cannot set replicaset: %v", err) 182 logger.Errorf("cannot set replicaset: %v", err)
183 ok = false
184 }
185 if ok {
186 logger.Infof("polling after %v", pollInterval)
187 // Update the replica set members occasionally
188 // to keep them up to date with the current
189 // replica set member statuses.
190 retry.Reset(pollInterval)
191 } else {
192 logger.Infof("retrying after %v", retryInterval)
162 retry.Reset(retryInterval) 193 retry.Reset(retryInterval)
163 break
164 } 194 }
165 195
166 // Update the replica set members occasionally
167 // to keep them up to date with the current
168 // replica set member statuses.
169 retry.Reset(pollInterval)
170 case <-w.tomb.Dying(): 196 case <-w.tomb.Dying():
171 return tomb.ErrDying 197 return tomb.ErrDying
172 } 198 }
173 } 199 }
174 } 200 }
175 201
202 func (w *pgWorker) apiHostPorts() [][]instance.HostPort {
203 servers := make([][]instance.HostPort, 0, len(w.machines))
204 for _, m := range w.machines {
205 if len(m.apiHostPorts) > 0 {
206 servers = append(servers, m.apiHostPorts)
207 }
208 }
209 return servers
210 }
211
176 // notify sends the given notification function to 212 // notify sends the given notification function to
177 // the worker main loop to be executed. 213 // the worker main loop to be executed.
178 func (w *pgWorker) notify(f notifyFunc) bool { 214 func (w *pgWorker) notify(f notifyFunc) bool {
179 select { 215 select {
180 case w.notifyCh <- f: 216 case w.notifyCh <- f:
181 return true 217 return true
182 case <-w.tomb.Dying(): 218 case <-w.tomb.Dying():
183 return false 219 return false
184 } 220 }
185 } 221 }
186 222
187 // getPeerGroupInfo collates current session information about the 223 // peerGroupInfo collates current session information about the
188 // mongo peer group with information from state machines. 224 // mongo peer group with information from state machines.
189 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) { 225 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) {
190 session := w.st.MongoSession() 226 session := w.st.MongoSession()
191 info := &peerGroupInfo{} 227 info := &peerGroupInfo{}
192 var err error 228 var err error
193 status, err := session.CurrentStatus() 229 status, err := session.CurrentStatus()
194 if err != nil { 230 if err != nil {
195 return nil, fmt.Errorf("cannot get replica set status: %v", err) 231 return nil, fmt.Errorf("cannot get replica set status: %v", err)
196 } 232 }
197 info.statuses = status.Members 233 info.statuses = status.Members
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
282 defer w.wg.Done() 318 defer w.wg.Done()
283 if err := loop(); err != nil { 319 if err := loop(); err != nil {
284 w.tomb.Kill(err) 320 w.tomb.Kill(err)
285 } 321 }
286 }() 322 }()
287 } 323 }
288 324
289 // setHasVote sets the HasVote status of all the given 325 // setHasVote sets the HasVote status of all the given
290 // machines to hasVote. 326 // machines to hasVote.
291 func setHasVote(ms []*machine, hasVote bool) error { 327 func setHasVote(ms []*machine, hasVote bool) error {
292
293 for _, m := range ms { 328 for _, m := range ms {
294 if err := m.stm.SetHasVote(hasVote); err != nil { 329 if err := m.stm.SetHasVote(hasVote); err != nil {
295 return fmt.Errorf("cannot set voting status of %q to %v: %v", m.id, hasVote, err) 330 return fmt.Errorf("cannot set voting status of %q to %v: %v", m.id, hasVote, err)
296 } 331 }
297 } 332 }
298 return nil 333 return nil
299 } 334 }
300 335
301 // serverInfoWatcher watches the state server info and 336 // serverInfoWatcher watches the state server info and
302 // notifies the worker when it changes. 337 // notifies the worker when it changes.
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
368 return false, fmt.Errorf("cannot get machine %q: %v", id , err) 403 return false, fmt.Errorf("cannot get machine %q: %v", id , err)
369 } 404 }
370 infow.worker.machines[id] = infow.worker.newMachine(stm) 405 infow.worker.machines[id] = infow.worker.newMachine(stm)
371 changed = true 406 changed = true
372 } 407 }
373 return changed, nil 408 return changed, nil
374 } 409 }
375 410
376 // machine represents a machine in State. 411 // machine represents a machine in State.
377 type machine struct { 412 type machine struct {
378 » id string 413 » id string
379 » wantsVote bool 414 » wantsVote bool
380 » hostPort string 415 » apiHostPorts []instance.HostPort
416 » mongoHostPorts []instance.HostPort
381 417
382 worker *pgWorker 418 worker *pgWorker
383 stm stateMachine 419 stm stateMachine
384 machineWatcher state.NotifyWatcher 420 machineWatcher state.NotifyWatcher
385 } 421 }
386 422
423 func (m *machine) mongoHostPort() string {
424 return instance.SelectInternalHostPort(m.mongoHostPorts, false)
425 }
426
387 func (m *machine) String() string { 427 func (m *machine) String() string {
388 return m.id 428 return m.id
389 } 429 }
390 430
391 func (m *machine) GoString() string { 431 func (m *machine) GoString() string {
392 » return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort : %q}", m.id, m.wantsVote, m.hostPort) 432 » return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort : %q}", m.id, m.wantsVote, m.mongoHostPort())
393 } 433 }
394 434
395 func (w *pgWorker) newMachine(stm stateMachine) *machine { 435 func (w *pgWorker) newMachine(stm stateMachine) *machine {
396 m := &machine{ 436 m := &machine{
397 worker: w, 437 worker: w,
398 id: stm.Id(), 438 id: stm.Id(),
399 stm: stm, 439 stm: stm,
400 » » hostPort: stm.StateHostPort(), 440 » » apiHostPorts: stm.APIHostPorts(),
441 » » mongoHostPorts: stm.MongoHostPorts(),
401 wantsVote: stm.WantsVote(), 442 wantsVote: stm.WantsVote(),
402 machineWatcher: stm.Watch(), 443 machineWatcher: stm.Watch(),
403 } 444 }
404 w.start(m.loop) 445 w.start(m.loop)
405 return m 446 return m
406 } 447 }
407 448
408 func (m *machine) loop() error { 449 func (m *machine) loop() error {
409 for { 450 for {
410 select { 451 select {
(...skipping 23 matching lines...) Expand all
434 // loop will be stopped very soon anyway. 475 // loop will be stopped very soon anyway.
435 return false, nil 476 return false, nil
436 } 477 }
437 return false, err 478 return false, err
438 } 479 }
439 changed := false 480 changed := false
440 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote { 481 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote {
441 m.wantsVote = wantsVote 482 m.wantsVote = wantsVote
442 changed = true 483 changed = true
443 } 484 }
444 » if hostPort := m.stm.StateHostPort(); hostPort != m.hostPort { 485 » if hps := m.stm.MongoHostPorts(); !hostPortsEqual(hps, m.mongoHostPorts) {
445 » » m.hostPort = hostPort 486 » » m.mongoHostPorts = hps
487 » » changed = true
488 » }
489 » if hps := m.stm.APIHostPorts(); !hostPortsEqual(hps, m.apiHostPorts) {
490 » » m.apiHostPorts = hps
446 changed = true 491 changed = true
447 } 492 }
448 return changed, nil 493 return changed, nil
449 } 494 }
450 495
496 func hostPortsEqual(hps1, hps2 []instance.HostPort) bool {
497 if len(hps1) != len(hps2) {
498 return false
499 }
500 for i := range hps1 {
501
502 if hps1[i] != hps2[i] {
503 return false
504 }
505 }
506 return true
507 }
508
451 func inStrings(t string, ss []string) bool { 509 func inStrings(t string, ss []string) bool {
452 for _, s := range ss { 510 for _, s := range ss {
453 if s == t { 511 if s == t {
454 return true 512 return true
455 } 513 }
456 } 514 }
457 return false 515 return false
458 } 516 }
OLDNEW
« no previous file with comments | « worker/peergrouper/shim.go ('k') | worker/peergrouper/worker_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b