Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2739)

Delta Between Two Patch Sets: worker/firewaller/firewaller.go

Issue 9738043: cmd/jujud: do not change password
Left Patch Set: cmd/jujud: do not change password Created 11 years, 10 months ago
Right Patch Set: cmd/jujud: do not change password Created 11 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « worker/deployer/deployer_test.go ('k') | worker/firewaller/firewaller_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package firewaller 4 package firewaller
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "launchpad.net/juju-core/environs" 8 "launchpad.net/juju-core/environs"
9 "launchpad.net/juju-core/environs/config" 9 "launchpad.net/juju-core/environs/config"
10 "launchpad.net/juju-core/errors" 10 "launchpad.net/juju-core/errors"
11 "launchpad.net/juju-core/instance"
11 "launchpad.net/juju-core/log" 12 "launchpad.net/juju-core/log"
12 "launchpad.net/juju-core/state" 13 "launchpad.net/juju-core/state"
13 "launchpad.net/juju-core/state/api/params"
14 "launchpad.net/juju-core/state/watcher" 14 "launchpad.net/juju-core/state/watcher"
15 "launchpad.net/juju-core/worker" 15 "launchpad.net/juju-core/worker"
16 "launchpad.net/tomb" 16 "launchpad.net/tomb"
17 ) 17 )
18 18
19 // Firewaller watches the state for ports opened or closed 19 // Firewaller watches the state for ports opened or closed
20 // and reflects those changes onto the backing environment. 20 // and reflects those changes onto the backing environment.
21 type Firewaller struct { 21 type Firewaller struct {
22 tomb tomb.Tomb 22 tomb tomb.Tomb
23 st *state.State 23 st *state.State
24 environ environs.Environ 24 environ environs.Environ
25 environWatcher *state.EnvironConfigWatcher 25 environWatcher *state.EnvironConfigWatcher
26 machinesWatcher *state.LifecycleWatcher 26 machinesWatcher *state.LifecycleWatcher
27 machineds map[string]*machineData 27 machineds map[string]*machineData
28 unitsChange chan *unitsChange 28 unitsChange chan *unitsChange
29 unitds map[string]*unitData 29 unitds map[string]*unitData
30 portsChange chan *portsChange 30 portsChange chan *portsChange
31 serviceds map[string]*serviceData 31 serviceds map[string]*serviceData
32 exposedChange chan *exposedChange 32 exposedChange chan *exposedChange
33 globalMode bool 33 globalMode bool
34 » globalPortRef map[params.Port]int 34 » globalPortRef map[instance.Port]int
35 } 35 }
36 36
37 // NewFirewaller returns a new Firewaller. 37 // NewFirewaller returns a new Firewaller.
38 func NewFirewaller(st *state.State) *Firewaller { 38 func NewFirewaller(st *state.State) *Firewaller {
39 fw := &Firewaller{ 39 fw := &Firewaller{
40 st: st, 40 st: st,
41 environWatcher: st.WatchEnvironConfig(), 41 environWatcher: st.WatchEnvironConfig(),
42 machinesWatcher: st.WatchEnvironMachines(), 42 machinesWatcher: st.WatchEnvironMachines(),
43 machineds: make(map[string]*machineData), 43 machineds: make(map[string]*machineData),
44 unitsChange: make(chan *unitsChange), 44 unitsChange: make(chan *unitsChange),
(...skipping 14 matching lines...) Expand all
59 59
60 var err error 60 var err error
61 var reconciled bool 61 var reconciled bool
62 62
63 fw.environ, err = worker.WaitForEnviron(fw.environWatcher, fw.tomb.Dying ()) 63 fw.environ, err = worker.WaitForEnviron(fw.environWatcher, fw.tomb.Dying ())
64 if err != nil { 64 if err != nil {
65 return err 65 return err
66 } 66 }
67 if fw.environ.Config().FirewallMode() == config.FwGlobal { 67 if fw.environ.Config().FirewallMode() == config.FwGlobal {
68 fw.globalMode = true 68 fw.globalMode = true
69 » » fw.globalPortRef = make(map[params.Port]int) 69 » » fw.globalPortRef = make(map[instance.Port]int)
70 } 70 }
71 for { 71 for {
72 select { 72 select {
73 case <-fw.tomb.Dying(): 73 case <-fw.tomb.Dying():
74 return tomb.ErrDying 74 return tomb.ErrDying
75 case change, ok := <-fw.environWatcher.Changes(): 75 case change, ok := <-fw.environWatcher.Changes():
76 if !ok { 76 if !ok {
77 return watcher.MustErr(fw.environWatcher) 77 return watcher.MustErr(fw.environWatcher)
78 } 78 }
79 if err := fw.environ.SetConfig(change); err != nil { 79 if err := fw.environ.SetConfig(change); err != nil {
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
128 } 128 }
129 } 129 }
130 130
131 // startMachine creates a new data value for tracking details of the 131 // startMachine creates a new data value for tracking details of the
132 // machine and starts watching the machine for units added or removed. 132 // machine and starts watching the machine for units added or removed.
133 func (fw *Firewaller) startMachine(id string) error { 133 func (fw *Firewaller) startMachine(id string) error {
134 machined := &machineData{ 134 machined := &machineData{
135 fw: fw, 135 fw: fw,
136 id: id, 136 id: id,
137 unitds: make(map[string]*unitData), 137 unitds: make(map[string]*unitData),
138 » » ports: make([]params.Port, 0), 138 » » ports: make([]instance.Port, 0),
139 } 139 }
140 m, err := machined.machine() 140 m, err := machined.machine()
141 if errors.IsNotFoundError(err) { 141 if errors.IsNotFoundError(err) {
142 return nil 142 return nil
143 } else if err != nil { 143 } else if err != nil {
144 return fmt.Errorf("worker/firewaller: cannot watch machine units : %v", err) 144 return fmt.Errorf("worker/firewaller: cannot watch machine units : %v", err)
145 } 145 }
146 unitw := m.WatchUnits() 146 unitw := m.WatchUnits()
147 select { 147 select {
148 case <-fw.tomb.Dying(): 148 case <-fw.tomb.Dying():
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
187 if fw.serviceds[serviceName] == nil { 187 if fw.serviceds[serviceName] == nil {
188 err := fw.startService(service) 188 err := fw.startService(service)
189 if err != nil { 189 if err != nil {
190 delete(fw.unitds, unitName) 190 delete(fw.unitds, unitName)
191 return err 191 return err
192 } 192 }
193 } 193 }
194 unitd.serviced = fw.serviceds[serviceName] 194 unitd.serviced = fw.serviceds[serviceName]
195 unitd.serviced.unitds[unitName] = unitd 195 unitd.serviced.unitds[unitName] = unitd
196 196
197 » ports := make([]params.Port, len(unitd.ports)) 197 » ports := make([]instance.Port, len(unitd.ports))
198 copy(ports, unitd.ports) 198 copy(ports, unitd.ports)
199 199
200 go unitd.watchLoop(ports) 200 go unitd.watchLoop(ports)
201 return nil 201 return nil
202 } 202 }
203 203
204 // startService creates a new data value for tracking details of the 204 // startService creates a new data value for tracking details of the
205 // service and starts watching the service for exposure changes. 205 // service and starts watching the service for exposure changes.
206 func (fw *Firewaller) startService(service *state.Service) error { 206 func (fw *Firewaller) startService(service *state.Service) error {
207 serviced := &serviceData{ 207 serviced := &serviceData{
208 fw: fw, 208 fw: fw,
209 service: service, 209 service: service,
210 exposed: service.IsExposed(), 210 exposed: service.IsExposed(),
211 unitds: make(map[string]*unitData), 211 unitds: make(map[string]*unitData),
212 } 212 }
213 fw.serviceds[service.Name()] = serviced 213 fw.serviceds[service.Name()] = serviced
214 go serviced.watchLoop(serviced.exposed) 214 go serviced.watchLoop(serviced.exposed)
215 return nil 215 return nil
216 } 216 }
217 217
218 // reconcileGlobal compares the initially started watcher for machines, 218 // reconcileGlobal compares the initially started watcher for machines,
219 // units and services with the opened and closed ports globally and 219 // units and services with the opened and closed ports globally and
220 // opens and closes the appropriate ports for the whole environment. 220 // opens and closes the appropriate ports for the whole environment.
221 func (fw *Firewaller) reconcileGlobal() error { 221 func (fw *Firewaller) reconcileGlobal() error {
222 initialPorts, err := fw.environ.Ports() 222 initialPorts, err := fw.environ.Ports()
223 if err != nil { 223 if err != nil {
224 return err 224 return err
225 } 225 }
226 » collector := make(map[params.Port]bool) 226 » collector := make(map[instance.Port]bool)
227 for _, unitd := range fw.unitds { 227 for _, unitd := range fw.unitds {
228 if unitd.serviced.exposed { 228 if unitd.serviced.exposed {
229 for _, port := range unitd.ports { 229 for _, port := range unitd.ports {
230 collector[port] = true 230 collector[port] = true
231 } 231 }
232 } 232 }
233 } 233 }
234 » wantedPorts := []params.Port{} 234 » wantedPorts := []instance.Port{}
235 for port := range collector { 235 for port := range collector {
236 wantedPorts = append(wantedPorts, port) 236 wantedPorts = append(wantedPorts, port)
237 } 237 }
238 // Check which ports to open or to close. 238 // Check which ports to open or to close.
239 toOpen := diff(wantedPorts, initialPorts) 239 toOpen := diff(wantedPorts, initialPorts)
240 toClose := diff(initialPorts, wantedPorts) 240 toClose := diff(initialPorts, wantedPorts)
241 if len(toOpen) > 0 { 241 if len(toOpen) > 0 {
242 log.Infof("worker/firewaller: opening global ports %v", toOpen) 242 log.Infof("worker/firewaller: opening global ports %v", toOpen)
243 if err := fw.environ.OpenPorts(toOpen); err != nil { 243 if err := fw.environ.OpenPorts(toOpen); err != nil {
244 return err 244 return err
(...skipping 21 matching lines...) Expand all
266 return err 266 return err
267 } 267 }
268 continue 268 continue
269 } else if err != nil { 269 } else if err != nil {
270 return err 270 return err
271 } 271 }
272 instanceId, ok := m.InstanceId() 272 instanceId, ok := m.InstanceId()
273 if !ok { 273 if !ok {
274 return errors.NotFoundf("instance id for %v", m) 274 return errors.NotFoundf("instance id for %v", m)
275 } 275 }
276 » » instances, err := fw.environ.Instances([]state.InstanceId{instan ceId}) 276 » » instances, err := fw.environ.Instances([]instance.Id{instanceId} )
277 if err == environs.ErrNoInstances { 277 if err == environs.ErrNoInstances {
278 return nil 278 return nil
279 } else if err != nil { 279 } else if err != nil {
280 return err 280 return err
281 } 281 }
282 initialPorts, err := instances[0].Ports(machined.id) 282 initialPorts, err := instances[0].Ports(machined.id)
283 if err != nil { 283 if err != nil {
284 return err 284 return err
285 } 285 }
286 // Check which ports to open or to close. 286 // Check which ports to open or to close.
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
357 if err := fw.flushMachine(machined); err != nil { 357 if err := fw.flushMachine(machined); err != nil {
358 return err 358 return err
359 } 359 }
360 } 360 }
361 return nil 361 return nil
362 } 362 }
363 363
364 // flushMachine opens and closes ports for the passed machine. 364 // flushMachine opens and closes ports for the passed machine.
365 func (fw *Firewaller) flushMachine(machined *machineData) error { 365 func (fw *Firewaller) flushMachine(machined *machineData) error {
366 // Gather ports to open and close. 366 // Gather ports to open and close.
367 » ports := map[params.Port]bool{} 367 » ports := map[instance.Port]bool{}
368 for _, unitd := range machined.unitds { 368 for _, unitd := range machined.unitds {
369 if unitd.serviced.exposed { 369 if unitd.serviced.exposed {
370 for _, port := range unitd.ports { 370 for _, port := range unitd.ports {
371 ports[port] = true 371 ports[port] = true
372 } 372 }
373 } 373 }
374 } 374 }
375 » want := []params.Port{} 375 » want := []instance.Port{}
376 for port := range ports { 376 for port := range ports {
377 want = append(want, port) 377 want = append(want, port)
378 } 378 }
379 toOpen := diff(want, machined.ports) 379 toOpen := diff(want, machined.ports)
380 toClose := diff(machined.ports, want) 380 toClose := diff(machined.ports, want)
381 machined.ports = want 381 machined.ports = want
382 if fw.globalMode { 382 if fw.globalMode {
383 return fw.flushGlobalPorts(toOpen, toClose) 383 return fw.flushGlobalPorts(toOpen, toClose)
384 } 384 }
385 return fw.flushInstancePorts(machined, toOpen, toClose) 385 return fw.flushInstancePorts(machined, toOpen, toClose)
386 } 386 }
387 387
388 // flushGlobalPorts opens and closes global ports in the environment. 388 // flushGlobalPorts opens and closes global ports in the environment.
389 // It keeps a reference count for ports so that only 0-to-1 and 1-to-0 events 389 // It keeps a reference count for ports so that only 0-to-1 and 1-to-0 events
390 // modify the environment. 390 // modify the environment.
391 func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []params.Port) error { 391 func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []instance.Port) error {
392 // Filter which ports are really to open or close. 392 // Filter which ports are really to open or close.
393 » var toOpen, toClose []params.Port 393 » var toOpen, toClose []instance.Port
394 for _, port := range rawOpen { 394 for _, port := range rawOpen {
395 if fw.globalPortRef[port] == 0 { 395 if fw.globalPortRef[port] == 0 {
396 toOpen = append(toOpen, port) 396 toOpen = append(toOpen, port)
397 } 397 }
398 fw.globalPortRef[port]++ 398 fw.globalPortRef[port]++
399 } 399 }
400 for _, port := range rawClose { 400 for _, port := range rawClose {
401 fw.globalPortRef[port]-- 401 fw.globalPortRef[port]--
402 if fw.globalPortRef[port] == 0 { 402 if fw.globalPortRef[port] == 0 {
403 toClose = append(toClose, port) 403 toClose = append(toClose, port)
(...skipping 14 matching lines...) Expand all
418 // TODO(mue) Add local retry logic. 418 // TODO(mue) Add local retry logic.
419 return err 419 return err
420 } 420 }
421 state.SortPorts(toClose) 421 state.SortPorts(toClose)
422 log.Infof("worker/firewaller: closed ports %v in environment", t oClose) 422 log.Infof("worker/firewaller: closed ports %v in environment", t oClose)
423 } 423 }
424 return nil 424 return nil
425 } 425 }
426 426
427 // flushGlobalPorts opens and closes ports global on the machine. 427 // flushGlobalPorts opens and closes ports global on the machine.
428 func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose []params.Port) error { 428 func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose []instance.Port) error {
429 // If there's nothing to do, do nothing. 429 // If there's nothing to do, do nothing.
430 // This is important because when a machine is first created, 430 // This is important because when a machine is first created,
431 // it will have no instance id but also no open ports - 431 // it will have no instance id but also no open ports -
432 // InstanceId will fail but we don't care. 432 // InstanceId will fail but we don't care.
433 if len(toOpen) == 0 && len(toClose) == 0 { 433 if len(toOpen) == 0 && len(toClose) == 0 {
434 return nil 434 return nil
435 } 435 }
436 m, err := machined.machine() 436 m, err := machined.machine()
437 if errors.IsNotFoundError(err) { 437 if errors.IsNotFoundError(err) {
438 return nil 438 return nil
439 } 439 }
440 if err != nil { 440 if err != nil {
441 return err 441 return err
442 } 442 }
443 instanceId, ok := m.InstanceId() 443 instanceId, ok := m.InstanceId()
444 if !ok { 444 if !ok {
445 return errors.NotFoundf("instance id for %v", m) 445 return errors.NotFoundf("instance id for %v", m)
446 } 446 }
447 » instances, err := fw.environ.Instances([]state.InstanceId{instanceId}) 447 » instances, err := fw.environ.Instances([]instance.Id{instanceId})
448 if err != nil { 448 if err != nil {
449 return err 449 return err
450 } 450 }
451 // Open and close the ports. 451 // Open and close the ports.
452 if len(toOpen) > 0 { 452 if len(toOpen) > 0 {
453 if err := instances[0].OpenPorts(machined.id, toOpen); err != ni l { 453 if err := instances[0].OpenPorts(machined.id, toOpen); err != ni l {
454 // TODO(mue) Add local retry logic. 454 // TODO(mue) Add local retry logic.
455 return err 455 return err
456 } 456 }
457 state.SortPorts(toOpen) 457 state.SortPorts(toOpen)
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
575 machined *machineData 575 machined *machineData
576 units []string 576 units []string
577 } 577 }
578 578
579 // machineData holds machine details and watches units added or removed. 579 // machineData holds machine details and watches units added or removed.
580 type machineData struct { 580 type machineData struct {
581 tomb tomb.Tomb 581 tomb tomb.Tomb
582 fw *Firewaller 582 fw *Firewaller
583 id string 583 id string
584 unitds map[string]*unitData 584 unitds map[string]*unitData
585 » ports []params.Port 585 » ports []instance.Port
586 } 586 }
587 587
588 func (md *machineData) machine() (*state.Machine, error) { 588 func (md *machineData) machine() (*state.Machine, error) {
589 return md.fw.st.Machine(md.id) 589 return md.fw.st.Machine(md.id)
590 } 590 }
591 591
592 // watchLoop watches the machine for units added or removed. 592 // watchLoop watches the machine for units added or removed.
593 func (md *machineData) watchLoop(unitw *state.MachineUnitsWatcher) { 593 func (md *machineData) watchLoop(unitw *state.MachineUnitsWatcher) {
594 defer md.tomb.Done() 594 defer md.tomb.Done()
595 defer watcher.Stop(unitw, &md.tomb) 595 defer watcher.Stop(unitw, &md.tomb)
(...skipping 20 matching lines...) Expand all
616 616
617 // stopWatch stops the machine watching. 617 // stopWatch stops the machine watching.
618 func (md *machineData) Stop() error { 618 func (md *machineData) Stop() error {
619 md.tomb.Kill(nil) 619 md.tomb.Kill(nil)
620 return md.tomb.Wait() 620 return md.tomb.Wait()
621 } 621 }
622 622
623 // portsChange contains the changed ports for one specific unit. 623 // portsChange contains the changed ports for one specific unit.
624 type portsChange struct { 624 type portsChange struct {
625 unitd *unitData 625 unitd *unitData
626 » ports []params.Port 626 » ports []instance.Port
627 } 627 }
628 628
629 // unitData holds unit details and watches port changes. 629 // unitData holds unit details and watches port changes.
630 type unitData struct { 630 type unitData struct {
631 tomb tomb.Tomb 631 tomb tomb.Tomb
632 fw *Firewaller 632 fw *Firewaller
633 unit *state.Unit 633 unit *state.Unit
634 serviced *serviceData 634 serviced *serviceData
635 machined *machineData 635 machined *machineData
636 » ports []params.Port 636 » ports []instance.Port
637 } 637 }
638 638
639 // watchLoop watches the unit for port changes. 639 // watchLoop watches the unit for port changes.
640 func (ud *unitData) watchLoop(latestPorts []params.Port) { 640 func (ud *unitData) watchLoop(latestPorts []instance.Port) {
641 defer ud.tomb.Done() 641 defer ud.tomb.Done()
642 w := ud.unit.Watch() 642 w := ud.unit.Watch()
643 defer watcher.Stop(w, &ud.tomb) 643 defer watcher.Stop(w, &ud.tomb)
644 for { 644 for {
645 select { 645 select {
646 case <-ud.tomb.Dying(): 646 case <-ud.tomb.Dying():
647 return 647 return
648 case _, ok := <-w.Changes(): 648 case _, ok := <-w.Changes():
649 if !ok { 649 if !ok {
650 ud.fw.tomb.Kill(watcher.MustErr(w)) 650 ud.fw.tomb.Kill(watcher.MustErr(w))
(...skipping 14 matching lines...) Expand all
665 case ud.fw.portsChange <- &portsChange{ud, change}: 665 case ud.fw.portsChange <- &portsChange{ud, change}:
666 case <-ud.tomb.Dying(): 666 case <-ud.tomb.Dying():
667 return 667 return
668 } 668 }
669 } 669 }
670 } 670 }
671 } 671 }
672 672
673 // samePorts returns whether old and new contain the same set of ports. 673 // samePorts returns whether old and new contain the same set of ports.
674 // Both old and new must be sorted. 674 // Both old and new must be sorted.
675 func samePorts(old, new []params.Port) bool { 675 func samePorts(old, new []instance.Port) bool {
676 if len(old) != len(new) { 676 if len(old) != len(new) {
677 return false 677 return false
678 } 678 }
679 for i, p := range old { 679 for i, p := range old {
680 if new[i] != p { 680 if new[i] != p {
681 return false 681 return false
682 } 682 }
683 } 683 }
684 return true 684 return true
685 } 685 }
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
739 } 739 }
740 } 740 }
741 741
742 // Stop stops the service watching. 742 // Stop stops the service watching.
743 func (sd *serviceData) Stop() error { 743 func (sd *serviceData) Stop() error {
744 sd.tomb.Kill(nil) 744 sd.tomb.Kill(nil)
745 return sd.tomb.Wait() 745 return sd.tomb.Wait()
746 } 746 }
747 747
748 // diff returns all the ports that exist in A but not B. 748 // diff returns all the ports that exist in A but not B.
749 func diff(A, B []params.Port) (missing []params.Port) { 749 func diff(A, B []instance.Port) (missing []instance.Port) {
750 next: 750 next:
751 for _, a := range A { 751 for _, a := range A {
752 for _, b := range B { 752 for _, b := range B {
753 if a == b { 753 if a == b {
754 continue next 754 continue next
755 } 755 }
756 } 756 }
757 missing = append(missing, a) 757 missing = append(missing, a)
758 } 758 }
759 return 759 return
760 } 760 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b