OLD | NEW |
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package firewaller | 4 package firewaller |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "launchpad.net/juju-core/environs" | 8 "launchpad.net/juju-core/environs" |
9 "launchpad.net/juju-core/environs/config" | 9 "launchpad.net/juju-core/environs/config" |
10 "launchpad.net/juju-core/errors" | 10 "launchpad.net/juju-core/errors" |
| 11 "launchpad.net/juju-core/instance" |
11 "launchpad.net/juju-core/log" | 12 "launchpad.net/juju-core/log" |
12 "launchpad.net/juju-core/state" | 13 "launchpad.net/juju-core/state" |
13 "launchpad.net/juju-core/state/api/params" | |
14 "launchpad.net/juju-core/state/watcher" | 14 "launchpad.net/juju-core/state/watcher" |
15 "launchpad.net/juju-core/worker" | 15 "launchpad.net/juju-core/worker" |
16 "launchpad.net/tomb" | 16 "launchpad.net/tomb" |
17 ) | 17 ) |
18 | 18 |
19 // Firewaller watches the state for ports opened or closed | 19 // Firewaller watches the state for ports opened or closed |
20 // and reflects those changes onto the backing environment. | 20 // and reflects those changes onto the backing environment. |
21 type Firewaller struct { | 21 type Firewaller struct { |
22 tomb tomb.Tomb | 22 tomb tomb.Tomb |
23 st *state.State | 23 st *state.State |
24 environ environs.Environ | 24 environ environs.Environ |
25 environWatcher *state.EnvironConfigWatcher | 25 environWatcher *state.EnvironConfigWatcher |
26 machinesWatcher *state.LifecycleWatcher | 26 machinesWatcher *state.LifecycleWatcher |
27 machineds map[string]*machineData | 27 machineds map[string]*machineData |
28 unitsChange chan *unitsChange | 28 unitsChange chan *unitsChange |
29 unitds map[string]*unitData | 29 unitds map[string]*unitData |
30 portsChange chan *portsChange | 30 portsChange chan *portsChange |
31 serviceds map[string]*serviceData | 31 serviceds map[string]*serviceData |
32 exposedChange chan *exposedChange | 32 exposedChange chan *exposedChange |
33 globalMode bool | 33 globalMode bool |
34 » globalPortRef map[params.Port]int | 34 » globalPortRef map[instance.Port]int |
35 } | 35 } |
36 | 36 |
37 // NewFirewaller returns a new Firewaller. | 37 // NewFirewaller returns a new Firewaller. |
38 func NewFirewaller(st *state.State) *Firewaller { | 38 func NewFirewaller(st *state.State) *Firewaller { |
39 fw := &Firewaller{ | 39 fw := &Firewaller{ |
40 st: st, | 40 st: st, |
41 environWatcher: st.WatchEnvironConfig(), | 41 environWatcher: st.WatchEnvironConfig(), |
42 machinesWatcher: st.WatchEnvironMachines(), | 42 machinesWatcher: st.WatchEnvironMachines(), |
43 machineds: make(map[string]*machineData), | 43 machineds: make(map[string]*machineData), |
44 unitsChange: make(chan *unitsChange), | 44 unitsChange: make(chan *unitsChange), |
(...skipping 14 matching lines...) Expand all Loading... |
59 | 59 |
60 var err error | 60 var err error |
61 var reconciled bool | 61 var reconciled bool |
62 | 62 |
63 fw.environ, err = worker.WaitForEnviron(fw.environWatcher, fw.tomb.Dying
()) | 63 fw.environ, err = worker.WaitForEnviron(fw.environWatcher, fw.tomb.Dying
()) |
64 if err != nil { | 64 if err != nil { |
65 return err | 65 return err |
66 } | 66 } |
67 if fw.environ.Config().FirewallMode() == config.FwGlobal { | 67 if fw.environ.Config().FirewallMode() == config.FwGlobal { |
68 fw.globalMode = true | 68 fw.globalMode = true |
69 » » fw.globalPortRef = make(map[params.Port]int) | 69 » » fw.globalPortRef = make(map[instance.Port]int) |
70 } | 70 } |
71 for { | 71 for { |
72 select { | 72 select { |
73 case <-fw.tomb.Dying(): | 73 case <-fw.tomb.Dying(): |
74 return tomb.ErrDying | 74 return tomb.ErrDying |
75 case change, ok := <-fw.environWatcher.Changes(): | 75 case change, ok := <-fw.environWatcher.Changes(): |
76 if !ok { | 76 if !ok { |
77 return watcher.MustErr(fw.environWatcher) | 77 return watcher.MustErr(fw.environWatcher) |
78 } | 78 } |
79 if err := fw.environ.SetConfig(change); err != nil { | 79 if err := fw.environ.SetConfig(change); err != nil { |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
128 } | 128 } |
129 } | 129 } |
130 | 130 |
131 // startMachine creates a new data value for tracking details of the | 131 // startMachine creates a new data value for tracking details of the |
132 // machine and starts watching the machine for units added or removed. | 132 // machine and starts watching the machine for units added or removed. |
133 func (fw *Firewaller) startMachine(id string) error { | 133 func (fw *Firewaller) startMachine(id string) error { |
134 machined := &machineData{ | 134 machined := &machineData{ |
135 fw: fw, | 135 fw: fw, |
136 id: id, | 136 id: id, |
137 unitds: make(map[string]*unitData), | 137 unitds: make(map[string]*unitData), |
138 » » ports: make([]params.Port, 0), | 138 » » ports: make([]instance.Port, 0), |
139 } | 139 } |
140 m, err := machined.machine() | 140 m, err := machined.machine() |
141 if errors.IsNotFoundError(err) { | 141 if errors.IsNotFoundError(err) { |
142 return nil | 142 return nil |
143 } else if err != nil { | 143 } else if err != nil { |
144 return fmt.Errorf("worker/firewaller: cannot watch machine units
: %v", err) | 144 return fmt.Errorf("worker/firewaller: cannot watch machine units
: %v", err) |
145 } | 145 } |
146 unitw := m.WatchUnits() | 146 unitw := m.WatchUnits() |
147 select { | 147 select { |
148 case <-fw.tomb.Dying(): | 148 case <-fw.tomb.Dying(): |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
187 if fw.serviceds[serviceName] == nil { | 187 if fw.serviceds[serviceName] == nil { |
188 err := fw.startService(service) | 188 err := fw.startService(service) |
189 if err != nil { | 189 if err != nil { |
190 delete(fw.unitds, unitName) | 190 delete(fw.unitds, unitName) |
191 return err | 191 return err |
192 } | 192 } |
193 } | 193 } |
194 unitd.serviced = fw.serviceds[serviceName] | 194 unitd.serviced = fw.serviceds[serviceName] |
195 unitd.serviced.unitds[unitName] = unitd | 195 unitd.serviced.unitds[unitName] = unitd |
196 | 196 |
197 » ports := make([]params.Port, len(unitd.ports)) | 197 » ports := make([]instance.Port, len(unitd.ports)) |
198 copy(ports, unitd.ports) | 198 copy(ports, unitd.ports) |
199 | 199 |
200 go unitd.watchLoop(ports) | 200 go unitd.watchLoop(ports) |
201 return nil | 201 return nil |
202 } | 202 } |
203 | 203 |
204 // startService creates a new data value for tracking details of the | 204 // startService creates a new data value for tracking details of the |
205 // service and starts watching the service for exposure changes. | 205 // service and starts watching the service for exposure changes. |
206 func (fw *Firewaller) startService(service *state.Service) error { | 206 func (fw *Firewaller) startService(service *state.Service) error { |
207 serviced := &serviceData{ | 207 serviced := &serviceData{ |
208 fw: fw, | 208 fw: fw, |
209 service: service, | 209 service: service, |
210 exposed: service.IsExposed(), | 210 exposed: service.IsExposed(), |
211 unitds: make(map[string]*unitData), | 211 unitds: make(map[string]*unitData), |
212 } | 212 } |
213 fw.serviceds[service.Name()] = serviced | 213 fw.serviceds[service.Name()] = serviced |
214 go serviced.watchLoop(serviced.exposed) | 214 go serviced.watchLoop(serviced.exposed) |
215 return nil | 215 return nil |
216 } | 216 } |
217 | 217 |
218 // reconcileGlobal compares the initially started watcher for machines, | 218 // reconcileGlobal compares the initially started watcher for machines, |
219 // units and services with the opened and closed ports globally and | 219 // units and services with the opened and closed ports globally and |
220 // opens and closes the appropriate ports for the whole environment. | 220 // opens and closes the appropriate ports for the whole environment. |
221 func (fw *Firewaller) reconcileGlobal() error { | 221 func (fw *Firewaller) reconcileGlobal() error { |
222 initialPorts, err := fw.environ.Ports() | 222 initialPorts, err := fw.environ.Ports() |
223 if err != nil { | 223 if err != nil { |
224 return err | 224 return err |
225 } | 225 } |
226 » collector := make(map[params.Port]bool) | 226 » collector := make(map[instance.Port]bool) |
227 for _, unitd := range fw.unitds { | 227 for _, unitd := range fw.unitds { |
228 if unitd.serviced.exposed { | 228 if unitd.serviced.exposed { |
229 for _, port := range unitd.ports { | 229 for _, port := range unitd.ports { |
230 collector[port] = true | 230 collector[port] = true |
231 } | 231 } |
232 } | 232 } |
233 } | 233 } |
234 » wantedPorts := []params.Port{} | 234 » wantedPorts := []instance.Port{} |
235 for port := range collector { | 235 for port := range collector { |
236 wantedPorts = append(wantedPorts, port) | 236 wantedPorts = append(wantedPorts, port) |
237 } | 237 } |
238 // Check which ports to open or to close. | 238 // Check which ports to open or to close. |
239 toOpen := diff(wantedPorts, initialPorts) | 239 toOpen := diff(wantedPorts, initialPorts) |
240 toClose := diff(initialPorts, wantedPorts) | 240 toClose := diff(initialPorts, wantedPorts) |
241 if len(toOpen) > 0 { | 241 if len(toOpen) > 0 { |
242 log.Infof("worker/firewaller: opening global ports %v", toOpen) | 242 log.Infof("worker/firewaller: opening global ports %v", toOpen) |
243 if err := fw.environ.OpenPorts(toOpen); err != nil { | 243 if err := fw.environ.OpenPorts(toOpen); err != nil { |
244 return err | 244 return err |
(...skipping 21 matching lines...) Expand all Loading... |
266 return err | 266 return err |
267 } | 267 } |
268 continue | 268 continue |
269 } else if err != nil { | 269 } else if err != nil { |
270 return err | 270 return err |
271 } | 271 } |
272 instanceId, ok := m.InstanceId() | 272 instanceId, ok := m.InstanceId() |
273 if !ok { | 273 if !ok { |
274 return errors.NotFoundf("instance id for %v", m) | 274 return errors.NotFoundf("instance id for %v", m) |
275 } | 275 } |
276 » » instances, err := fw.environ.Instances([]state.InstanceId{instan
ceId}) | 276 » » instances, err := fw.environ.Instances([]instance.Id{instanceId}
) |
277 if err == environs.ErrNoInstances { | 277 if err == environs.ErrNoInstances { |
278 return nil | 278 return nil |
279 } else if err != nil { | 279 } else if err != nil { |
280 return err | 280 return err |
281 } | 281 } |
282 initialPorts, err := instances[0].Ports(machined.id) | 282 initialPorts, err := instances[0].Ports(machined.id) |
283 if err != nil { | 283 if err != nil { |
284 return err | 284 return err |
285 } | 285 } |
286 // Check which ports to open or to close. | 286 // Check which ports to open or to close. |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
357 if err := fw.flushMachine(machined); err != nil { | 357 if err := fw.flushMachine(machined); err != nil { |
358 return err | 358 return err |
359 } | 359 } |
360 } | 360 } |
361 return nil | 361 return nil |
362 } | 362 } |
363 | 363 |
364 // flushMachine opens and closes ports for the passed machine. | 364 // flushMachine opens and closes ports for the passed machine. |
365 func (fw *Firewaller) flushMachine(machined *machineData) error { | 365 func (fw *Firewaller) flushMachine(machined *machineData) error { |
366 // Gather ports to open and close. | 366 // Gather ports to open and close. |
367 » ports := map[params.Port]bool{} | 367 » ports := map[instance.Port]bool{} |
368 for _, unitd := range machined.unitds { | 368 for _, unitd := range machined.unitds { |
369 if unitd.serviced.exposed { | 369 if unitd.serviced.exposed { |
370 for _, port := range unitd.ports { | 370 for _, port := range unitd.ports { |
371 ports[port] = true | 371 ports[port] = true |
372 } | 372 } |
373 } | 373 } |
374 } | 374 } |
375 » want := []params.Port{} | 375 » want := []instance.Port{} |
376 for port := range ports { | 376 for port := range ports { |
377 want = append(want, port) | 377 want = append(want, port) |
378 } | 378 } |
379 toOpen := diff(want, machined.ports) | 379 toOpen := diff(want, machined.ports) |
380 toClose := diff(machined.ports, want) | 380 toClose := diff(machined.ports, want) |
381 machined.ports = want | 381 machined.ports = want |
382 if fw.globalMode { | 382 if fw.globalMode { |
383 return fw.flushGlobalPorts(toOpen, toClose) | 383 return fw.flushGlobalPorts(toOpen, toClose) |
384 } | 384 } |
385 return fw.flushInstancePorts(machined, toOpen, toClose) | 385 return fw.flushInstancePorts(machined, toOpen, toClose) |
386 } | 386 } |
387 | 387 |
388 // flushGlobalPorts opens and closes global ports in the environment. | 388 // flushGlobalPorts opens and closes global ports in the environment. |
389 // It keeps a reference count for ports so that only 0-to-1 and 1-to-0 events | 389 // It keeps a reference count for ports so that only 0-to-1 and 1-to-0 events |
390 // modify the environment. | 390 // modify the environment. |
391 func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []params.Port) error { | 391 func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []instance.Port) error
{ |
392 // Filter which ports are really to open or close. | 392 // Filter which ports are really to open or close. |
393 » var toOpen, toClose []params.Port | 393 » var toOpen, toClose []instance.Port |
394 for _, port := range rawOpen { | 394 for _, port := range rawOpen { |
395 if fw.globalPortRef[port] == 0 { | 395 if fw.globalPortRef[port] == 0 { |
396 toOpen = append(toOpen, port) | 396 toOpen = append(toOpen, port) |
397 } | 397 } |
398 fw.globalPortRef[port]++ | 398 fw.globalPortRef[port]++ |
399 } | 399 } |
400 for _, port := range rawClose { | 400 for _, port := range rawClose { |
401 fw.globalPortRef[port]-- | 401 fw.globalPortRef[port]-- |
402 if fw.globalPortRef[port] == 0 { | 402 if fw.globalPortRef[port] == 0 { |
403 toClose = append(toClose, port) | 403 toClose = append(toClose, port) |
(...skipping 14 matching lines...) Expand all Loading... |
418 // TODO(mue) Add local retry logic. | 418 // TODO(mue) Add local retry logic. |
419 return err | 419 return err |
420 } | 420 } |
421 state.SortPorts(toClose) | 421 state.SortPorts(toClose) |
422 log.Infof("worker/firewaller: closed ports %v in environment", t
oClose) | 422 log.Infof("worker/firewaller: closed ports %v in environment", t
oClose) |
423 } | 423 } |
424 return nil | 424 return nil |
425 } | 425 } |
426 | 426 |
427 // flushGlobalPorts opens and closes ports global on the machine. | 427 // flushGlobalPorts opens and closes ports global on the machine. |
428 func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose
[]params.Port) error { | 428 func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose
[]instance.Port) error { |
429 // If there's nothing to do, do nothing. | 429 // If there's nothing to do, do nothing. |
430 // This is important because when a machine is first created, | 430 // This is important because when a machine is first created, |
431 // it will have no instance id but also no open ports - | 431 // it will have no instance id but also no open ports - |
432 // InstanceId will fail but we don't care. | 432 // InstanceId will fail but we don't care. |
433 if len(toOpen) == 0 && len(toClose) == 0 { | 433 if len(toOpen) == 0 && len(toClose) == 0 { |
434 return nil | 434 return nil |
435 } | 435 } |
436 m, err := machined.machine() | 436 m, err := machined.machine() |
437 if errors.IsNotFoundError(err) { | 437 if errors.IsNotFoundError(err) { |
438 return nil | 438 return nil |
439 } | 439 } |
440 if err != nil { | 440 if err != nil { |
441 return err | 441 return err |
442 } | 442 } |
443 instanceId, ok := m.InstanceId() | 443 instanceId, ok := m.InstanceId() |
444 if !ok { | 444 if !ok { |
445 return errors.NotFoundf("instance id for %v", m) | 445 return errors.NotFoundf("instance id for %v", m) |
446 } | 446 } |
447 » instances, err := fw.environ.Instances([]state.InstanceId{instanceId}) | 447 » instances, err := fw.environ.Instances([]instance.Id{instanceId}) |
448 if err != nil { | 448 if err != nil { |
449 return err | 449 return err |
450 } | 450 } |
451 // Open and close the ports. | 451 // Open and close the ports. |
452 if len(toOpen) > 0 { | 452 if len(toOpen) > 0 { |
453 if err := instances[0].OpenPorts(machined.id, toOpen); err != ni
l { | 453 if err := instances[0].OpenPorts(machined.id, toOpen); err != ni
l { |
454 // TODO(mue) Add local retry logic. | 454 // TODO(mue) Add local retry logic. |
455 return err | 455 return err |
456 } | 456 } |
457 state.SortPorts(toOpen) | 457 state.SortPorts(toOpen) |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
575 machined *machineData | 575 machined *machineData |
576 units []string | 576 units []string |
577 } | 577 } |
578 | 578 |
579 // machineData holds machine details and watches units added or removed. | 579 // machineData holds machine details and watches units added or removed. |
580 type machineData struct { | 580 type machineData struct { |
581 tomb tomb.Tomb | 581 tomb tomb.Tomb |
582 fw *Firewaller | 582 fw *Firewaller |
583 id string | 583 id string |
584 unitds map[string]*unitData | 584 unitds map[string]*unitData |
585 » ports []params.Port | 585 » ports []instance.Port |
586 } | 586 } |
587 | 587 |
588 func (md *machineData) machine() (*state.Machine, error) { | 588 func (md *machineData) machine() (*state.Machine, error) { |
589 return md.fw.st.Machine(md.id) | 589 return md.fw.st.Machine(md.id) |
590 } | 590 } |
591 | 591 |
592 // watchLoop watches the machine for units added or removed. | 592 // watchLoop watches the machine for units added or removed. |
593 func (md *machineData) watchLoop(unitw *state.MachineUnitsWatcher) { | 593 func (md *machineData) watchLoop(unitw *state.MachineUnitsWatcher) { |
594 defer md.tomb.Done() | 594 defer md.tomb.Done() |
595 defer watcher.Stop(unitw, &md.tomb) | 595 defer watcher.Stop(unitw, &md.tomb) |
(...skipping 20 matching lines...) Expand all Loading... |
616 | 616 |
617 // stopWatch stops the machine watching. | 617 // stopWatch stops the machine watching. |
618 func (md *machineData) Stop() error { | 618 func (md *machineData) Stop() error { |
619 md.tomb.Kill(nil) | 619 md.tomb.Kill(nil) |
620 return md.tomb.Wait() | 620 return md.tomb.Wait() |
621 } | 621 } |
622 | 622 |
623 // portsChange contains the changed ports for one specific unit. | 623 // portsChange contains the changed ports for one specific unit. |
624 type portsChange struct { | 624 type portsChange struct { |
625 unitd *unitData | 625 unitd *unitData |
626 » ports []params.Port | 626 » ports []instance.Port |
627 } | 627 } |
628 | 628 |
629 // unitData holds unit details and watches port changes. | 629 // unitData holds unit details and watches port changes. |
630 type unitData struct { | 630 type unitData struct { |
631 tomb tomb.Tomb | 631 tomb tomb.Tomb |
632 fw *Firewaller | 632 fw *Firewaller |
633 unit *state.Unit | 633 unit *state.Unit |
634 serviced *serviceData | 634 serviced *serviceData |
635 machined *machineData | 635 machined *machineData |
636 » ports []params.Port | 636 » ports []instance.Port |
637 } | 637 } |
638 | 638 |
639 // watchLoop watches the unit for port changes. | 639 // watchLoop watches the unit for port changes. |
640 func (ud *unitData) watchLoop(latestPorts []params.Port) { | 640 func (ud *unitData) watchLoop(latestPorts []instance.Port) { |
641 defer ud.tomb.Done() | 641 defer ud.tomb.Done() |
642 w := ud.unit.Watch() | 642 w := ud.unit.Watch() |
643 defer watcher.Stop(w, &ud.tomb) | 643 defer watcher.Stop(w, &ud.tomb) |
644 for { | 644 for { |
645 select { | 645 select { |
646 case <-ud.tomb.Dying(): | 646 case <-ud.tomb.Dying(): |
647 return | 647 return |
648 case _, ok := <-w.Changes(): | 648 case _, ok := <-w.Changes(): |
649 if !ok { | 649 if !ok { |
650 ud.fw.tomb.Kill(watcher.MustErr(w)) | 650 ud.fw.tomb.Kill(watcher.MustErr(w)) |
(...skipping 14 matching lines...) Expand all Loading... |
665 case ud.fw.portsChange <- &portsChange{ud, change}: | 665 case ud.fw.portsChange <- &portsChange{ud, change}: |
666 case <-ud.tomb.Dying(): | 666 case <-ud.tomb.Dying(): |
667 return | 667 return |
668 } | 668 } |
669 } | 669 } |
670 } | 670 } |
671 } | 671 } |
672 | 672 |
673 // samePorts returns whether old and new contain the same set of ports. | 673 // samePorts returns whether old and new contain the same set of ports. |
674 // Both old and new must be sorted. | 674 // Both old and new must be sorted. |
675 func samePorts(old, new []params.Port) bool { | 675 func samePorts(old, new []instance.Port) bool { |
676 if len(old) != len(new) { | 676 if len(old) != len(new) { |
677 return false | 677 return false |
678 } | 678 } |
679 for i, p := range old { | 679 for i, p := range old { |
680 if new[i] != p { | 680 if new[i] != p { |
681 return false | 681 return false |
682 } | 682 } |
683 } | 683 } |
684 return true | 684 return true |
685 } | 685 } |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
739 } | 739 } |
740 } | 740 } |
741 | 741 |
742 // Stop stops the service watching. | 742 // Stop stops the service watching. |
743 func (sd *serviceData) Stop() error { | 743 func (sd *serviceData) Stop() error { |
744 sd.tomb.Kill(nil) | 744 sd.tomb.Kill(nil) |
745 return sd.tomb.Wait() | 745 return sd.tomb.Wait() |
746 } | 746 } |
747 | 747 |
748 // diff returns all the ports that exist in A but not B. | 748 // diff returns all the ports that exist in A but not B. |
749 func diff(A, B []params.Port) (missing []params.Port) { | 749 func diff(A, B []instance.Port) (missing []instance.Port) { |
750 next: | 750 next: |
751 for _, a := range A { | 751 for _, a := range A { |
752 for _, b := range B { | 752 for _, b := range B { |
753 if a == b { | 753 if a == b { |
754 continue next | 754 continue next |
755 } | 755 } |
756 } | 756 } |
757 missing = append(missing, a) | 757 missing = append(missing, a) |
758 } | 758 } |
759 return | 759 return |
760 } | 760 } |
OLD | NEW |