Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(564)

Side by Side Diff: worker/firewaller/firewaller.go

Issue 6811091: state: simplify entity watchers
Patch Set: Created 5 years, 2 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 package firewaller 1 package firewaller
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "launchpad.net/juju-core/environs" 5 "launchpad.net/juju-core/environs"
6 "launchpad.net/juju-core/environs/config" 6 "launchpad.net/juju-core/environs/config"
7 "launchpad.net/juju-core/log" 7 "launchpad.net/juju-core/log"
8 "launchpad.net/juju-core/state" 8 "launchpad.net/juju-core/state"
9 "launchpad.net/juju-core/state/watcher" 9 "launchpad.net/juju-core/state/watcher"
10 "launchpad.net/juju-core/worker" 10 "launchpad.net/juju-core/worker"
(...skipping 483 matching lines...) Expand 10 before | Expand all | Expand 10 after
494 type portsChange struct { 494 type portsChange struct {
495 unitd *unitData 495 unitd *unitData
496 ports []state.Port 496 ports []state.Port
497 } 497 }
498 498
499 // unitData holds unit details and watches port changes. 499 // unitData holds unit details and watches port changes.
500 type unitData struct { 500 type unitData struct {
501 tomb tomb.Tomb 501 tomb tomb.Tomb
502 fw *Firewaller 502 fw *Firewaller
503 unit *state.Unit 503 unit *state.Unit
504 watcher *state.UnitWatcher
505 serviced *serviceData 504 serviced *serviceData
506 machined *machineData 505 machined *machineData
507 ports []state.Port 506 ports []state.Port
508 } 507 }
509 508
510 // newUnitData returns a new data value for tracking details of the unit, 509 // newUnitData returns a new data value for tracking details of the unit,
511 // and starts watching the unit for port changes. 510 // and starts watching the unit for port changes.
512 func newUnitData(unit *state.Unit, fw *Firewaller) *unitData { 511 func newUnitData(unit *state.Unit, fw *Firewaller) *unitData {
513 ud := &unitData{ 512 ud := &unitData{
514 » » fw: fw, 513 » » fw: fw,
515 » » unit: unit, 514 » » unit: unit,
516 » » watcher: unit.Watch(), 515 » » ports: make([]state.Port, 0),
517 » » ports: make([]state.Port, 0),
518 } 516 }
519 go ud.watchLoop() 517 go ud.watchLoop()
520 return ud 518 return ud
521 } 519 }
522 520
523 // watchLoop watches the unit for port changes. 521 // watchLoop watches the unit for port changes.
524 func (ud *unitData) watchLoop() { 522 func (ud *unitData) watchLoop() {
525 defer ud.tomb.Done() 523 defer ud.tomb.Done()
526 » defer ud.watcher.Stop() 524 » w := ud.unit.Watch()
525 » defer watcher.Stop(w, &ud.tomb)
527 var ports []state.Port 526 var ports []state.Port
528 for { 527 for {
529 select { 528 select {
530 case <-ud.tomb.Dying(): 529 case <-ud.tomb.Dying():
531 return 530 return
532 » » case unit, ok := <-ud.watcher.Changes(): 531 » » case _, ok := <-w.Changes():
533 if !ok { 532 if !ok {
534 » » » » // TODO(niemeyer): Unit watcher shouldn't return a unit. 533 » » » » ud.fw.tomb.Kill(watcher.MustErr(w))
535 » » » » err := watcher.MustErr(ud.watcher) 534 » » » » return
535 » » » }
536 » » » if err := ud.unit.Refresh(); err != nil {
536 if !state.IsNotFound(err) { 537 if !state.IsNotFound(err) {
537 ud.fw.tomb.Kill(err) 538 ud.fw.tomb.Kill(err)
538 } 539 }
539 return 540 return
540 } 541 }
541 » » » change := unit.OpenedPorts() 542 » » » change := ud.unit.OpenedPorts()
542 if samePorts(change, ports) { 543 if samePorts(change, ports) {
543 continue 544 continue
544 } 545 }
545 ports = append([]state.Port(nil), change...) 546 ports = append([]state.Port(nil), change...)
rog 2012/11/09 10:48:58 s/[]state.Port(nil)/ports[:0]/ no need for unnece
fwereade 2012/11/19 09:07:20 Done.
546 select { 547 select {
547 case ud.fw.portsChange <- &portsChange{ud, change}: 548 case ud.fw.portsChange <- &portsChange{ud, change}:
548 case <-ud.tomb.Dying(): 549 case <-ud.tomb.Dying():
549 return 550 return
550 } 551 }
551 } 552 }
552 } 553 }
553 } 554 }
554 555
555 // samePorts returns whether old and new contain the same set of ports. 556 // samePorts returns whether old and new contain the same set of ports.
(...skipping 20 matching lines...) Expand all
576 type exposedChange struct { 577 type exposedChange struct {
577 serviced *serviceData 578 serviced *serviceData
578 exposed bool 579 exposed bool
579 } 580 }
580 581
581 // serviceData holds service details and watches exposure changes. 582 // serviceData holds service details and watches exposure changes.
582 type serviceData struct { 583 type serviceData struct {
583 tomb tomb.Tomb 584 tomb tomb.Tomb
584 fw *Firewaller 585 fw *Firewaller
585 service *state.Service 586 service *state.Service
586 watcher *state.ServiceWatcher
587 exposed bool 587 exposed bool
588 unitds map[string]*unitData 588 unitds map[string]*unitData
589 } 589 }
590 590
591 // newServiceData returns a new data value for tracking details of the 591 // newServiceData returns a new data value for tracking details of the
592 // service, and starts watching the service for exposure changes. 592 // service, and starts watching the service for exposure changes.
593 func newServiceData(service *state.Service, fw *Firewaller) *serviceData { 593 func newServiceData(service *state.Service, fw *Firewaller) *serviceData {
594 sd := &serviceData{ 594 sd := &serviceData{
595 fw: fw, 595 fw: fw,
596 service: service, 596 service: service,
597 watcher: service.Watch(),
598 unitds: make(map[string]*unitData), 597 unitds: make(map[string]*unitData),
599 } 598 }
600 sd.exposed = service.IsExposed() 599 sd.exposed = service.IsExposed()
601 go sd.watchLoop(sd.exposed) 600 go sd.watchLoop(sd.exposed)
rog 2012/11/09 10:48:58 perhaps we could consider using the pattern that's
fwereade 2012/11/19 09:07:20 I'm not sure the pattern quite fits here -- we don
602 return sd 601 return sd
603 } 602 }
604 603
605 // watchLoop watches the service's exposed flag for changes. 604 // watchLoop watches the service's exposed flag for changes.
606 func (sd *serviceData) watchLoop(exposed bool) { 605 func (sd *serviceData) watchLoop(exposed bool) {
607 defer sd.tomb.Done() 606 defer sd.tomb.Done()
608 » defer sd.watcher.Stop() 607 » w := sd.service.Watch()
608 » defer watcher.Stop(w, &sd.tomb)
609 for { 609 for {
610 select { 610 select {
611 case <-sd.tomb.Dying(): 611 case <-sd.tomb.Dying():
612 return 612 return
613 » » case service, ok := <-sd.watcher.Changes(): 613 » » case _, ok := <-w.Changes():
614 if !ok { 614 if !ok {
615 » » » » // TODO(niemeyer): Service watcher shouldn't ret urn a service. 615 » » » » sd.fw.tomb.Kill(watcher.MustErr(w))
616 » » » » err := watcher.MustErr(sd.watcher) 616 » » » » return
617 » » » }
618 » » » if err := sd.service.Refresh(); err != nil {
617 if !state.IsNotFound(err) { 619 if !state.IsNotFound(err) {
618 sd.fw.tomb.Kill(err) 620 sd.fw.tomb.Kill(err)
619 } 621 }
620 return 622 return
621 } 623 }
622 » » » change := service.IsExposed() 624 » » » change := sd.service.IsExposed()
623 if change == exposed { 625 if change == exposed {
624 continue 626 continue
625 } 627 }
626 exposed = change 628 exposed = change
627 select { 629 select {
628 case sd.fw.exposedChange <- &exposedChange{sd, change}: 630 case sd.fw.exposedChange <- &exposedChange{sd, change}:
629 case <-sd.tomb.Dying(): 631 case <-sd.tomb.Dying():
630 return 632 return
631 } 633 }
632 } 634 }
(...skipping 12 matching lines...) Expand all
645 for _, a := range A { 647 for _, a := range A {
646 for _, b := range B { 648 for _, b := range B {
647 if a == b { 649 if a == b {
648 continue next 650 continue next
649 } 651 }
650 } 652 }
651 missing = append(missing, a) 653 missing = append(missing, a)
652 } 654 }
653 return 655 return
654 } 656 }
OLDNEW
« state/watcher.go ('K') | « state/watcher.go ('k') | worker/uniter/filter.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d