Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(6)

Side by Side Diff: state/watcher.go

Issue 6299082: state: implement Machine.WatchUnits
Patch Set: state: implement Machine.WatchUnits Created 12 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « state/machine.go ('k') | state/watcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package state 1 package state
2 2
3 import ( 3 import (
4 "launchpad.net/goyaml" 4 "launchpad.net/goyaml"
5 "launchpad.net/juju-core/juju/state/watcher" 5 "launchpad.net/juju-core/juju/state/watcher"
6 "launchpad.net/juju-core/juju/log"
6 "launchpad.net/tomb" 7 "launchpad.net/tomb"
7 ) 8 )
8 9
9 // watcherStopper allows us to call Stop on a watcher without 10 // watcherStopper allows us to call Stop on a watcher without
10 // caring which watcher type it actually is. 11 // caring which watcher type it actually is.
11 type watcherStopper interface { 12 type watcherStopper interface {
12 Stop() error 13 Stop() error
13 } 14 }
14 15
15 // stopWatcher stops a watcher and propagates 16 // stopWatcher stops a watcher and propagates
(...skipping 352 matching lines...) Expand 10 before | Expand all | Expand 10 after
368 } 369 }
369 select { 370 select {
370 case <-w.tomb.Dying(): 371 case <-w.tomb.Dying():
371 return 372 return
372 case w.changeChan <- mc: 373 case w.changeChan <- mc:
373 } 374 }
374 } 375 }
375 } 376 }
376 } 377 }
377 378
379 type MachineUnitsWatcher struct {
380 st *State
381 machine *Machine
382 tomb tomb.Tomb
383 changeChan chan *MachineUnitsChange
384 watcher *watcher.ContentWatcher
385 }
386
387 type MachineUnitsChange struct {
388 Added, Deleted []*Unit
389 }
390
391 // newMachinesWatcher creates and starts a new machine watcher.
392 func newMachineUnitsWatcher(m *Machine) *MachineUnitsWatcher {
393 w := &MachineUnitsWatcher{
394 st: m.st,
395 machine: m,
396 changeChan: make(chan *MachineUnitsChange),
397 watcher: watcher.NewContentWatcher(m.st.zk, zkTopologyPath),
398 }
399 go w.loop()
400 return w
401 }
402
403 // Changes returns a channel that will receive changes when
404 // units are assigned or unassigned from a machine.
405 // The Added field in the first event on the channel holds the initial
406 // state as returned by State.AllMachines.
407 func (w *MachineUnitsWatcher) Changes() <-chan *MachineUnitsChange {
408 return w.changeChan
409 }
410
411 // Stop stops the watch and returns any error encountered
412 // while watching. This method should always be called
413 // before discarding the watcher.
414 func (w *MachineUnitsWatcher) Stop() error {
415 w.tomb.Kill(nil)
416 if err := w.watcher.Stop(); err != nil {
417 w.tomb.Wait()
418 return err
419 }
420 return w.tomb.Wait()
421 }
422
423 // loop is the backend for watching the ports node.
424 func (w *MachineUnitsWatcher) loop() {
425 defer w.tomb.Done()
426 defer close(w.changeChan)
427 defer stopWatcher(w.watcher, &w.tomb)
428
429 // knownUnits keeps track of the current units because
430 // when a unit is deleted, we can't create a *Unit from
431 // a key alone.
432 knownUnits := make(map[string]*Unit)
433 var knownUnitKeys []string
434
435 for {
436 select {
437 case <-w.tomb.Dying():
438 return
439 case change, ok := <-w.watcher.Changes():
440 if !ok {
441 w.tomb.Killf("content change channel closed unex pectedly")
442 return
443 }
444 topology, err := parseTopology(change.Content)
445 if err != nil {
446 w.tomb.Kill(err)
447 return
448 }
449 currentUnitKeys := topology.UnitsForMachine(w.machine.ke y)
450 added, deleted := diff(currentUnitKeys, knownUnitKeys), diff(knownUnitKeys, currentUnitKeys)
451 knownUnitKeys = currentUnitKeys
452 if len(added) == 0 && len(deleted) == 0 {
453 // The change was not relevant to this watcher.
454 continue
455 }
456 uc := new(MachineUnitsChange)
457 for _, ukey := range deleted {
458 unit := knownUnits[ukey]
459 if unit == nil {
460 panic("unknown unit deleted: " + ukey)
461 }
462 delete(knownUnits, ukey)
463 uc.Deleted = append(uc.Deleted, unit)
464 }
465 for _, ukey := range added {
466 unit, err := w.st.unitFromKey(topology, ukey)
467 if err != nil {
468 log.Printf("inconsistent topology: %v", err)
469 continue
470 }
471 knownUnits[ukey] = unit
472 uc.Added = append(uc.Added, unit)
473 }
474 select {
475 case <-w.tomb.Dying():
476 return
477 case w.changeChan <- uc:
478 }
479 }
480 }
481 }
482
378 // diff returns all the elements that exist in A but not B. 483 // diff returns all the elements that exist in A but not B.
379 func diff(A, B []string) (missing []string) { 484 func diff(A, B []string) (missing []string) {
380 next: 485 next:
381 for _, a := range A { 486 for _, a := range A {
382 for _, b := range B { 487 for _, b := range B {
383 if a == b { 488 if a == b {
384 continue next 489 continue next
385 } 490 }
386 } 491 }
387 missing = append(missing, a) 492 missing = append(missing, a)
388 } 493 }
389 return 494 return
390 } 495 }
OLDNEW
« no previous file with comments | « state/machine.go ('k') | state/watcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b