Left: | ||
Right: |
OLD | NEW |
---|---|
(Empty) | |
1 package firewaller | |
2 | |
3 import ( | |
4 "launchpad.net/juju-core/log" | |
5 "launchpad.net/juju-core/state" | |
6 "launchpad.net/juju-core/state/watcher" | |
7 "launchpad.net/tomb" | |
8 ) | |
9 | |
10 // Firewaller watches the state for ports opened or closed | |
11 // and reflects those changes onto the backing environment. | |
12 type Firewaller struct { | |
13 st *state.State | |
14 tomb tomb.Tomb | |
15 machinesWatcher *state.MachinesWatcher | |
16 machines map[int]*machineTracker | |
17 machineUnitsChanges chan *machineUnitsChange | |
18 } | |
19 | |
20 // NewFirewaller returns a new Firewaller. | |
21 func NewFirewaller(st *state.State) (*Firewaller, error) { | |
niemeyer
2012/07/19 00:16:16
Nice to see how this was simplified by moving logi
| |
22 fw := &Firewaller{ | |
23 st: st, | |
24 machinesWatcher: st.WatchMachines(), | |
25 machines: make(map[int]*machineTracker), | |
26 machineUnitsChanges: make(chan *machineUnitsChange), | |
27 } | |
28 go fw.loop() | |
29 return fw, nil | |
30 } | |
31 | |
32 func (fw *Firewaller) loop() { | |
33 defer fw.finish() | |
niemeyer
2012/07/19 00:16:16
Nice cleanup as well!
| |
34 for { | |
35 select { | |
36 case <-fw.tomb.Dying(): | |
37 return | |
38 case change, ok := <-fw.machinesWatcher.Changes(): | |
39 if !ok { | |
40 return | |
41 } | |
42 for _, removedMachine := range change.Removed { | |
43 m, ok := fw.machines[removedMachine.Id()] | |
44 if !ok { | |
45 panic("trying to remove machine that was n't added") | |
46 } | |
47 delete(fw.machines, removedMachine.Id()) | |
48 if err := m.stop(); err != nil { | |
49 log.Printf("machine tracker %d returned error when stopping: %v", removedMachine.Id(), err) | |
50 } | |
51 log.Debugf("firewaller: remove-machine %v", remo vedMachine.Id()) | |
niemeyer
2012/07/19 00:16:16
Either this should be dropped, or clarified to sta
niemeyer
2012/07/19 00:20:19
Oops. I now realize you're using this to test the
TheMue
2012/07/19 11:18:13
Done.
| |
52 } | |
53 for _, addedMachine := range change.Added { | |
54 m := newMachineTracker(addedMachine, fw) | |
55 fw.machines[addedMachine.Id()] = m | |
56 log.Debugf("firewaller: add-machine %v", m.id) | |
niemeyer
2012/07/19 00:16:16
Ditto.
TheMue
2012/07/19 11:18:13
Done.
| |
57 } | |
58 case <-fw.machineUnitsChanges: | |
59 // TODO(mue) fill with life. | |
60 } | |
61 } | |
62 } | |
63 | |
64 // finishes cleans up when the firewaller is stopping. | |
65 func (fw *Firewaller) finish() { | |
66 watcher.Stop(fw.machinesWatcher, &fw.tomb) | |
67 for _, m := range fw.machines { | |
68 fw.tomb.Kill(m.stop()) | |
69 } | |
70 fw.tomb.Done() | |
71 } | |
72 | |
73 // Wait waits for the Firewaller to exit. | |
74 func (fw *Firewaller) Wait() error { | |
75 return fw.tomb.Wait() | |
76 } | |
77 | |
78 // Stop stops the Firewaller and returns any error encountered while stopping. | |
79 func (fw *Firewaller) Stop() error { | |
80 fw.tomb.Kill(nil) | |
81 return fw.tomb.Wait() | |
82 } | |
83 | |
84 // machineUnitsChange contains the changed units for one specific machine.· | |
85 type machineUnitsChange struct { | |
86 machine *machineTracker | |
87 change *state.MachineUnitsChange | |
88 } | |
89 | |
90 // machineTracker keeps track of the unit changes of a machine. | |
91 type machineTracker struct { | |
92 tomb tomb.Tomb | |
93 firewaller *Firewaller | |
94 id int | |
95 watcher *state.MachineUnitsWatcher | |
96 ports map[state.Port]*unitTracker | |
97 } | |
98 | |
99 // newMachineTracker creates a new machine tracker keeping track of | |
100 // unit changes of the passed machine. | |
101 func newMachineTracker(mst *state.Machine, fw *Firewaller) *machineTracker { | |
102 mt := &machineTracker{ | |
103 firewaller: fw, | |
104 id: mst.Id(), | |
105 watcher: mst.WatchUnits(), | |
106 ports: make(map[state.Port]*unitTracker), | |
107 } | |
108 go mt.loop() | |
109 return mt | |
110 } | |
111 | |
112 // loop is the backend watching for machine units changes. | |
113 func (mt *machineTracker) loop() { | |
114 defer mt.tomb.Done() | |
115 defer mt.watcher.Stop() | |
116 for { | |
117 select { | |
118 case <-mt.firewaller.tomb.Dying(): | |
niemeyer
2012/07/19 00:16:16
Why do we need this here? If the firewaller dies,
TheMue
2012/07/19 11:18:13
Done.
| |
119 return | |
120 case <-mt.tomb.Dying(): | |
121 return | |
122 case change, ok := <-mt.watcher.Changes(): | |
123 // Send change or nil. | |
124 select { | |
125 case mt.firewaller.machineUnitsChanges <- &machineUnitsC hange{mt, change}: | |
126 case <-mt.firewaller.tomb.Dying(): | |
niemeyer
2012/07/19 00:16:16
Ditto.
TheMue
2012/07/19 11:18:13
Done.
| |
127 return | |
128 case <-mt.tomb.Dying(): | |
129 return | |
130 } | |
131 // The watcher terminated prematurely, so end the loop. | |
132 if !ok { | |
133 mt.firewaller.tomb.Kill(watcher.MustErr(mt.watch er)) | |
134 return | |
135 } | |
136 } | |
137 } | |
138 } | |
139 | |
140 // stop stops the machine tracker. | |
141 func (mt *machineTracker) stop() error { | |
142 mt.tomb.Kill(nil) | |
143 return mt.tomb.Wait() | |
144 } | |
145 | |
146 type unitTracker struct { | |
147 service *serviceTracker | |
148 id string | |
149 ports []state.Port | |
150 } | |
151 | |
152 type serviceTracker struct { | |
153 exposed bool | |
154 } | |
OLD | NEW |