OLD | NEW |
(Empty) | |
| 1 // Copyright 2014 Canonical Ltd. |
| 2 // Licensed under the AGPLv3, see LICENCE file for details. |
| 3 |
| 4 package peergrouper |
| 5 |
| 6 import ( |
| 7 "fmt" |
| 8 "sync" |
| 9 "time" |
| 10 |
| 11 "launchpad.net/tomb" |
| 12 |
| 13 "launchpad.net/juju-core/errors" |
| 14 "launchpad.net/juju-core/replicaset" |
| 15 "launchpad.net/juju-core/state" |
| 16 "launchpad.net/juju-core/worker" |
| 17 ) |
| 18 |
| 19 type stateInterface interface { |
| 20 Machine(id string) (stateMachine, error) |
| 21 WatchStateServerInfo() state.NotifyWatcher |
| 22 StateServerInfo() (*state.StateServerInfo, error) |
| 23 MongoSession() mongoSession |
| 24 } |
| 25 |
| 26 type stateMachine interface { |
| 27 Id() string |
| 28 Refresh() error |
| 29 Watch() state.NotifyWatcher |
| 30 WantsVote() bool |
| 31 HasVote() bool |
| 32 SetHasVote(hasVote bool) error |
| 33 StateHostPort() string |
| 34 } |
| 35 |
| 36 type mongoSession interface { |
| 37 CurrentStatus() (*replicaset.Status, error) |
| 38 CurrentMembers() ([]replicaset.Member, error) |
| 39 Set([]replicaset.Member) error |
| 40 } |
| 41 |
| 42 // notifyFunc holds a function that is sent |
| 43 // to the main worker loop to fetch new information |
| 44 // when something changes. It reports whether |
| 45 // the information has actually changed (and by implication |
| 46 // whether the replica set may need to be changed). |
| 47 type notifyFunc func() (bool, error) |
| 48 |
| 49 var ( |
| 50 // If we fail to set the mongo replica set members, |
| 51 // we retry at the following interval until we succeed. |
| 52 retryInterval = 2 * time.Second |
| 53 |
| 54 // pollInterval holds the interval at which the replica set |
| 55 // members will be updated even in the absence of changes |
| 56 // to State. This enables us to make changes to members |
| 57 // that are triggered by changes to member status. |
| 58 // |
| 59 // 10 seconds is the default time interval used by |
| 60 // mongo to keep its replicas up to date. |
| 61 pollInterval = 10 * time.Second |
| 62 ) |
| 63 |
| 64 // pgWorker holds all the mutable state that we are watching. |
| 65 // The only goroutine that is allowed to modify this |
| 66 // is worker.loop - other watchers modify the |
| 67 // current state by calling worker.notify instead of |
| 68 // modifying it directly. |
| 69 type pgWorker struct { |
| 70 tomb tomb.Tomb |
| 71 |
| 72 // wg represents all the currently running goroutines. |
| 73 // The worker main loop waits for all of these to exit |
| 74 // before finishing. |
| 75 wg sync.WaitGroup |
| 76 |
| 77 // st represents the State. It is an interface for testing |
| 78 // purposes only. |
| 79 st stateInterface |
| 80 |
| 81 // When something changes that might might affect |
| 82 // the peer group membership, it sends a function |
| 83 // on notifyCh that is run inside the main worker |
| 84 // goroutine to mutate the state. It reports whether |
| 85 // the state has actually changed. |
| 86 notifyCh chan notifyFunc |
| 87 |
| 88 // machines holds the set of machines we are currently |
| 89 // watching (all the state server machines). Each one has an |
| 90 // associated goroutine that |
| 91 // watches attributes of that machine. |
| 92 machines map[string]*machine |
| 93 } |
| 94 |
| 95 // New returns a new worker that maintains the mongo replica set |
| 96 // with respect to the given state. |
| 97 func New(st *state.State) (worker.Worker, error) { |
| 98 cfg, err := st.EnvironConfig() |
| 99 if err != nil { |
| 100 return nil, err |
| 101 } |
| 102 return newWorker(&stateShim{ |
| 103 State: st, |
| 104 mongoPort: cfg.StatePort(), |
| 105 }), nil |
| 106 } |
| 107 |
| 108 func newWorker(st stateInterface) worker.Worker { |
| 109 w := &pgWorker{ |
| 110 st: st, |
| 111 notifyCh: make(chan notifyFunc), |
| 112 machines: make(map[string]*machine), |
| 113 } |
| 114 go func() { |
| 115 defer w.tomb.Done() |
| 116 if err := w.loop(); err != nil { |
| 117 logger.Errorf("peergrouper loop terminated: %v", err) |
| 118 w.tomb.Kill(err) |
| 119 } |
| 120 // Wait for the various goroutines to be killed. |
| 121 // N.B. we don't defer this call because |
| 122 // if we do and a bug causes a panic, Wait will deadlock |
| 123 // waiting for the unkilled goroutines to exit. |
| 124 w.wg.Wait() |
| 125 }() |
| 126 return w |
| 127 } |
| 128 |
| 129 func (w *pgWorker) Kill() { |
| 130 w.tomb.Kill(nil) |
| 131 } |
| 132 |
| 133 func (w *pgWorker) Wait() error { |
| 134 return w.tomb.Wait() |
| 135 } |
| 136 |
| 137 func (w *pgWorker) loop() error { |
| 138 infow := w.watchStateServerInfo() |
| 139 defer infow.stop() |
| 140 |
| 141 retry := time.NewTimer(0) |
| 142 retry.Stop() |
| 143 for { |
| 144 select { |
| 145 case f := <-w.notifyCh: |
| 146 // Update our current view of the state of affairs. |
| 147 changed, err := f() |
| 148 if err != nil { |
| 149 return err |
| 150 } |
| 151 if !changed { |
| 152 break |
| 153 } |
| 154 // Try to update the replica set immediately. |
| 155 retry.Reset(0) |
| 156 case <-retry.C: |
| 157 if err := w.updateReplicaset(); err != nil { |
| 158 if _, isReplicaSetError := err.(*replicaSetError
); !isReplicaSetError { |
| 159 return err |
| 160 } |
| 161 logger.Errorf("cannot set replicaset: %v", err) |
| 162 retry.Reset(retryInterval) |
| 163 break |
| 164 } |
| 165 |
| 166 // Update the replica set members occasionally |
| 167 // to keep them up to date with the current |
| 168 // replica set member statuses. |
| 169 retry.Reset(pollInterval) |
| 170 case <-w.tomb.Dying(): |
| 171 return tomb.ErrDying |
| 172 } |
| 173 } |
| 174 } |
| 175 |
| 176 // notify sends the given notification function to |
| 177 // the worker main loop to be executed. |
| 178 func (w *pgWorker) notify(f notifyFunc) bool { |
| 179 select { |
| 180 case w.notifyCh <- f: |
| 181 return true |
| 182 case <-w.tomb.Dying(): |
| 183 return false |
| 184 } |
| 185 } |
| 186 |
| 187 // getPeerGroupInfo collates current session information about the |
| 188 // mongo peer group with information from state machines. |
| 189 func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) { |
| 190 session := w.st.MongoSession() |
| 191 info := &peerGroupInfo{} |
| 192 var err error |
| 193 status, err := session.CurrentStatus() |
| 194 if err != nil { |
| 195 return nil, fmt.Errorf("cannot get replica set status: %v", err) |
| 196 } |
| 197 info.statuses = status.Members |
| 198 info.members, err = session.CurrentMembers() |
| 199 if err != nil { |
| 200 return nil, fmt.Errorf("cannot get replica set members: %v", err
) |
| 201 } |
| 202 info.machines = w.machines |
| 203 return info, nil |
| 204 } |
| 205 |
| 206 // replicaSetError holds an error returned as a result |
| 207 // of calling replicaset.Set. As this is expected to fail |
| 208 // in the normal course of things, it needs special treatment. |
| 209 type replicaSetError struct { |
| 210 error |
| 211 } |
| 212 |
| 213 // updateReplicaset sets the current replica set members, and applies the |
| 214 // given voting status to machines in the state. |
| 215 func (w *pgWorker) updateReplicaset() error { |
| 216 info, err := w.peerGroupInfo() |
| 217 if err != nil { |
| 218 return err |
| 219 } |
| 220 members, voting, err := desiredPeerGroup(info) |
| 221 if err != nil { |
| 222 return fmt.Errorf("cannot compute desired peer group: %v", err) |
| 223 } |
| 224 if members == nil { |
| 225 logger.Debugf("no change in desired peer group") |
| 226 return nil |
| 227 } |
| 228 logger.Debugf("desired peer group members: %#v", members) |
| 229 // We cannot change the HasVote flag of a machine in state at exactly |
| 230 // the same moment as changing its voting status in the replica set. |
| 231 // |
| 232 // Thus we need to be careful that a machine which is actually a voting |
| 233 // member is not seen to not have a vote, because otherwise |
| 234 // there is nothing to prevent the machine being removed. |
| 235 // |
| 236 // To avoid this happening, we make sure when we call SetReplicaSet, |
| 237 // that the voting status of machines is the union of both old |
| 238 // and new voting machines - that is the set of HasVote machines |
| 239 // is a superset of all the actual voting machines. |
| 240 // |
| 241 // Only after the call has taken place do we reset the voting status |
| 242 // of the machines that have lost their vote. |
| 243 // |
| 244 // If there's a crash, the voting status may not reflect the |
| 245 // actual voting status for a while, but when things come |
| 246 // back on line, it will be sorted out, as desiredReplicaSet |
| 247 // will return the actual voting status. |
| 248 |
| 249 var added, removed []*machine |
| 250 for m, hasVote := range voting { |
| 251 switch { |
| 252 case hasVote && !m.stm.HasVote(): |
| 253 added = append(added, m) |
| 254 case !hasVote && m.stm.HasVote(): |
| 255 removed = append(removed, m) |
| 256 } |
| 257 } |
| 258 if err := setHasVote(added, true); err != nil { |
| 259 return err |
| 260 } |
| 261 if err := w.st.MongoSession().Set(members); err != nil { |
| 262 // We've failed to set the replica set, so revert back |
| 263 // to the previous settings. |
| 264 if err1 := setHasVote(added, false); err1 != nil { |
| 265 logger.Errorf("cannot revert machine voting after failur
e to change replica set: %v", err1) |
| 266 } |
| 267 return &replicaSetError{err} |
| 268 } |
| 269 logger.Infof("successfully changed replica set to %#v", members) |
| 270 if err := setHasVote(removed, false); err != nil { |
| 271 return err |
| 272 } |
| 273 return nil |
| 274 } |
| 275 |
| 276 // start runs the given loop function until it returns. |
| 277 // When it returns, the receiving pgWorker is killed with |
| 278 // the returned error. |
| 279 func (w *pgWorker) start(loop func() error) { |
| 280 w.wg.Add(1) |
| 281 go func() { |
| 282 defer w.wg.Done() |
| 283 if err := loop(); err != nil { |
| 284 w.tomb.Kill(err) |
| 285 } |
| 286 }() |
| 287 } |
| 288 |
| 289 // setHasVote sets the HasVote status of all the given |
| 290 // machines to hasVote. |
| 291 func setHasVote(ms []*machine, hasVote bool) error { |
| 292 |
| 293 for _, m := range ms { |
| 294 if err := m.stm.SetHasVote(hasVote); err != nil { |
| 295 return fmt.Errorf("cannot set voting status of %q to %v:
%v", m.id, hasVote, err) |
| 296 } |
| 297 } |
| 298 return nil |
| 299 } |
| 300 |
| 301 // serverInfoWatcher watches the state server info and |
| 302 // notifies the worker when it changes. |
| 303 type serverInfoWatcher struct { |
| 304 worker *pgWorker |
| 305 watcher state.NotifyWatcher |
| 306 } |
| 307 |
| 308 func (w *pgWorker) watchStateServerInfo() *serverInfoWatcher { |
| 309 infow := &serverInfoWatcher{ |
| 310 worker: w, |
| 311 watcher: w.st.WatchStateServerInfo(), |
| 312 } |
| 313 w.start(infow.loop) |
| 314 return infow |
| 315 } |
| 316 |
| 317 func (infow *serverInfoWatcher) loop() error { |
| 318 for { |
| 319 select { |
| 320 case _, ok := <-infow.watcher.Changes(): |
| 321 if !ok { |
| 322 return infow.watcher.Err() |
| 323 } |
| 324 infow.worker.notify(infow.updateMachines) |
| 325 case <-infow.worker.tomb.Dying(): |
| 326 return tomb.ErrDying |
| 327 } |
| 328 } |
| 329 } |
| 330 |
| 331 func (infow *serverInfoWatcher) stop() { |
| 332 infow.watcher.Stop() |
| 333 } |
| 334 |
| 335 // updateMachines is a notifyFunc that updates the current |
| 336 // machines when the state server info has changed. |
| 337 func (infow *serverInfoWatcher) updateMachines() (bool, error) { |
| 338 info, err := infow.worker.st.StateServerInfo() |
| 339 if err != nil { |
| 340 return false, fmt.Errorf("cannot get state server info: %v", err
) |
| 341 } |
| 342 changed := false |
| 343 // Stop machine goroutines that no longer correspond to state server |
| 344 // machines. |
| 345 for _, m := range infow.worker.machines { |
| 346 if !inStrings(m.id, info.MachineIds) { |
| 347 m.stop() |
| 348 delete(infow.worker.machines, m.id) |
| 349 changed = true |
| 350 } |
| 351 } |
| 352 // Start machines with no watcher |
| 353 for _, id := range info.MachineIds { |
| 354 if _, ok := infow.worker.machines[id]; ok { |
| 355 continue |
| 356 } |
| 357 logger.Debugf("found new machine %q", id) |
| 358 stm, err := infow.worker.st.Machine(id) |
| 359 if err != nil { |
| 360 if errors.IsNotFoundError(err) { |
| 361 // If the machine isn't found, it must have been |
| 362 // removed and will soon enough be removed |
| 363 // from the state server list. This will probabl
y |
| 364 // never happen, but we'll code defensively anyw
ay. |
| 365 logger.Warningf("machine %q from state server li
st not found", id) |
| 366 continue |
| 367 } |
| 368 return false, fmt.Errorf("cannot get machine %q: %v", id
, err) |
| 369 } |
| 370 infow.worker.machines[id] = infow.worker.newMachine(stm) |
| 371 changed = true |
| 372 } |
| 373 return changed, nil |
| 374 } |
| 375 |
| 376 // machine represents a machine in State. |
| 377 type machine struct { |
| 378 id string |
| 379 wantsVote bool |
| 380 hostPort string |
| 381 |
| 382 worker *pgWorker |
| 383 stm stateMachine |
| 384 machineWatcher state.NotifyWatcher |
| 385 } |
| 386 |
| 387 func (m *machine) String() string { |
| 388 return m.id |
| 389 } |
| 390 |
| 391 func (m *machine) GoString() string { |
| 392 return fmt.Sprintf("&peergrouper.machine{id: %q, wantsVote: %v, hostPort
: %q}", m.id, m.wantsVote, m.hostPort) |
| 393 } |
| 394 |
| 395 func (w *pgWorker) newMachine(stm stateMachine) *machine { |
| 396 m := &machine{ |
| 397 worker: w, |
| 398 id: stm.Id(), |
| 399 stm: stm, |
| 400 hostPort: stm.StateHostPort(), |
| 401 wantsVote: stm.WantsVote(), |
| 402 machineWatcher: stm.Watch(), |
| 403 } |
| 404 w.start(m.loop) |
| 405 return m |
| 406 } |
| 407 |
| 408 func (m *machine) loop() error { |
| 409 for { |
| 410 select { |
| 411 case _, ok := <-m.machineWatcher.Changes(): |
| 412 if !ok { |
| 413 return m.machineWatcher.Err() |
| 414 } |
| 415 m.worker.notify(m.refresh) |
| 416 case <-m.worker.tomb.Dying(): |
| 417 return nil |
| 418 } |
| 419 } |
| 420 } |
| 421 |
| 422 func (m *machine) stop() { |
| 423 m.machineWatcher.Stop() |
| 424 } |
| 425 |
| 426 func (m *machine) refresh() (bool, error) { |
| 427 if err := m.stm.Refresh(); err != nil { |
| 428 if errors.IsNotFoundError(err) { |
| 429 // We want to be robust when the machine |
| 430 // state is out of date with respect to the |
| 431 // state server info, so if the machine |
| 432 // has been removed, just assume that |
| 433 // no change has happened - the machine |
| 434 // loop will be stopped very soon anyway. |
| 435 return false, nil |
| 436 } |
| 437 return false, err |
| 438 } |
| 439 changed := false |
| 440 if wantsVote := m.stm.WantsVote(); wantsVote != m.wantsVote { |
| 441 m.wantsVote = wantsVote |
| 442 changed = true |
| 443 } |
| 444 if hostPort := m.stm.StateHostPort(); hostPort != m.hostPort { |
| 445 m.hostPort = hostPort |
| 446 changed = true |
| 447 } |
| 448 return changed, nil |
| 449 } |
| 450 |
| 451 func inStrings(t string, ss []string) bool { |
| 452 for _, s := range ss { |
| 453 if s == t { |
| 454 return true |
| 455 } |
| 456 } |
| 457 return false |
| 458 } |
OLD | NEW |