OLD | NEW |
1 package state | 1 package state |
2 | 2 |
3 import ( | 3 import ( |
4 "launchpad.net/goyaml" | 4 "launchpad.net/goyaml" |
5 "launchpad.net/juju-core/juju/state/watcher" | 5 "launchpad.net/juju-core/juju/state/watcher" |
| 6 "launchpad.net/juju-core/juju/log" |
6 "launchpad.net/tomb" | 7 "launchpad.net/tomb" |
7 ) | 8 ) |
8 | 9 |
9 // watcherStopper allows us to call Stop on a watcher without | 10 // watcherStopper allows us to call Stop on a watcher without |
10 // caring which watcher type it actually is. | 11 // caring which watcher type it actually is. |
11 type watcherStopper interface { | 12 type watcherStopper interface { |
12 Stop() error | 13 Stop() error |
13 } | 14 } |
14 | 15 |
15 // stopWatcher stops a watcher and propagates | 16 // stopWatcher stops a watcher and propagates |
(...skipping 352 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
368 } | 369 } |
369 select { | 370 select { |
370 case <-w.tomb.Dying(): | 371 case <-w.tomb.Dying(): |
371 return | 372 return |
372 case w.changeChan <- mc: | 373 case w.changeChan <- mc: |
373 } | 374 } |
374 } | 375 } |
375 } | 376 } |
376 } | 377 } |
377 | 378 |
| 379 type MachineUnitsWatcher struct { |
| 380 st *State |
| 381 machine *Machine |
| 382 tomb tomb.Tomb |
| 383 changeChan chan *MachineUnitsChange |
| 384 watcher *watcher.ContentWatcher |
| 385 } |
| 386 |
| 387 type MachineUnitsChange struct { |
| 388 Added, Deleted []*Unit |
| 389 } |
| 390 |
| 391 // newMachinesWatcher creates and starts a new machine watcher. |
| 392 func newMachineUnitsWatcher(m *Machine) *MachineUnitsWatcher { |
| 393 w := &MachineUnitsWatcher{ |
| 394 st: m.st, |
| 395 machine: m, |
| 396 changeChan: make(chan *MachineUnitsChange), |
| 397 watcher: watcher.NewContentWatcher(m.st.zk, zkTopologyPath), |
| 398 } |
| 399 go w.loop() |
| 400 return w |
| 401 } |
| 402 |
| 403 // Changes returns a channel that will receive changes when |
| 404 // units are assigned or unassigned from a machine. |
| 405 // The Added field in the first event on the channel holds the initial |
| 406 // state as returned by State.AllMachines. |
| 407 func (w *MachineUnitsWatcher) Changes() <-chan *MachineUnitsChange { |
| 408 return w.changeChan |
| 409 } |
| 410 |
| 411 // Stop stops the watch and returns any error encountered |
| 412 // while watching. This method should always be called |
| 413 // before discarding the watcher. |
| 414 func (w *MachineUnitsWatcher) Stop() error { |
| 415 w.tomb.Kill(nil) |
| 416 if err := w.watcher.Stop(); err != nil { |
| 417 w.tomb.Wait() |
| 418 return err |
| 419 } |
| 420 return w.tomb.Wait() |
| 421 } |
| 422 |
| 423 // loop is the backend for watching the ports node. |
| 424 func (w *MachineUnitsWatcher) loop() { |
| 425 defer w.tomb.Done() |
| 426 defer close(w.changeChan) |
| 427 defer stopWatcher(w.watcher, &w.tomb) |
| 428 |
| 429 // knownUnits keeps track of the current units because |
| 430 // when a unit is deleted, we can't create a *Unit from |
| 431 // a key alone. |
| 432 knownUnits := make(map[string]*Unit) |
| 433 var knownUnitKeys []string |
| 434 |
| 435 for { |
| 436 select { |
| 437 case <-w.tomb.Dying(): |
| 438 return |
| 439 case change, ok := <-w.watcher.Changes(): |
| 440 if !ok { |
| 441 w.tomb.Killf("content change channel closed unex
pectedly") |
| 442 return |
| 443 } |
| 444 topology, err := parseTopology(change.Content) |
| 445 if err != nil { |
| 446 w.tomb.Kill(err) |
| 447 return |
| 448 } |
| 449 currentUnitKeys := topology.UnitsForMachine(w.machine.ke
y) |
| 450 added, deleted := diff(currentUnitKeys, knownUnitKeys),
diff(knownUnitKeys, currentUnitKeys) |
| 451 knownUnitKeys = currentUnitKeys |
| 452 if len(added) == 0 && len(deleted) == 0 { |
| 453 // The change was not relevant to this watcher. |
| 454 continue |
| 455 } |
| 456 uc := new(MachineUnitsChange) |
| 457 for _, ukey := range deleted { |
| 458 unit := knownUnits[ukey] |
| 459 if unit == nil { |
| 460 panic("unknown unit deleted: " + ukey) |
| 461 } |
| 462 delete(knownUnits, ukey) |
| 463 uc.Deleted = append(uc.Deleted, unit) |
| 464 } |
| 465 for _, ukey := range added { |
| 466 unit, err := w.st.unitFromKey(topology, ukey) |
| 467 if err != nil { |
| 468 log.Printf("inconsistent topology: %v",
err) |
| 469 continue |
| 470 } |
| 471 knownUnits[ukey] = unit |
| 472 uc.Added = append(uc.Added, unit) |
| 473 } |
| 474 select { |
| 475 case <-w.tomb.Dying(): |
| 476 return |
| 477 case w.changeChan <- uc: |
| 478 } |
| 479 } |
| 480 } |
| 481 } |
| 482 |
378 // diff returns all the elements that exist in A but not B. | 483 // diff returns all the elements that exist in A but not B. |
379 func diff(A, B []string) (missing []string) { | 484 func diff(A, B []string) (missing []string) { |
380 next: | 485 next: |
381 for _, a := range A { | 486 for _, a := range A { |
382 for _, b := range B { | 487 for _, b := range B { |
383 if a == b { | 488 if a == b { |
384 continue next | 489 continue next |
385 } | 490 } |
386 } | 491 } |
387 missing = append(missing, a) | 492 missing = append(missing, a) |
388 } | 493 } |
389 return | 494 return |
390 } | 495 } |
OLD | NEW |