Left: | ||
Right: |
OLD | NEW |
---|---|
(Empty) | |
1 package firewaller | |
2 | |
3 import ( | |
4 "launchpad.net/juju-core/environs" | |
5 "launchpad.net/juju-core/log" | |
6 "launchpad.net/juju-core/state" | |
7 "launchpad.net/juju-core/state/watcher" | |
8 "launchpad.net/tomb" | |
9 ) | |
10 | |
11 // Firewaller watches the state for ports open or closed | |
niemeyer
2012/07/18 13:56:01
Sorry, my typo: s/open/opened/
TheMue
2012/07/18 16:26:40
Done.
| |
12 // and reflects those changes onto the backing environment. | |
13 type Firewaller struct { | |
14 st *state.State | |
15 environ environs.Environ | |
16 tomb tomb.Tomb | |
17 machines map[int]*machineTracker | |
18 } | |
19 | |
20 // NewFirewaller returns a new Firewaller. | |
21 func NewFirewaller(environ environs.Environ) (*Firewaller, error) { | |
22 info, err := environ.StateInfo() | |
23 if err != nil { | |
24 return nil, err | |
25 } | |
26 st, err := state.Open(info) | |
27 if err != nil { | |
28 return nil, err | |
29 } | |
30 fw := &Firewaller{ | |
31 st: st, | |
32 environ: environ, | |
33 machines: make(map[int]*machineTracker), | |
34 } | |
35 go fw.loop() | |
36 return fw, nil | |
37 } | |
38 | |
39 func (fw *Firewaller) loop() { | |
40 defer fw.finish() | |
41 // Set up channels and watchers. | |
42 machineUnitsChanges := make(chan *machineUnitsChange) | |
43 defer close(machineUnitsChanges) | |
44 machinesWatcher := fw.st.WatchMachines() | |
45 defer watcher.Stop(machinesWatcher, &fw.tomb) | |
46 for { | |
47 select { | |
48 case <-fw.tomb.Dying(): | |
49 return | |
50 case change, ok := <-machinesWatcher.Changes(): | |
51 if !ok { | |
52 return | |
53 } | |
54 for _, removedMachine := range change.Removed { | |
55 m, ok := fw.machines[removedMachine.Id()] | |
56 if !ok { | |
57 panic("trying to remove machine that was n't added") | |
58 } | |
59 if err := m.stop(); err != nil { | |
60 log.Printf("can't stop machine tracker: %v", err) | |
61 continue | |
62 } | |
63 delete(fw.machines, removedMachine.Id()) | |
niemeyer
2012/07/18 14:11:25
This should also be moved above the if block.
TheMue
2012/07/18 16:26:40
Done.
| |
64 } | |
65 for _, addedMachine := range change.Added { | |
66 m := newMachineTracker(addedMachine, fw, machine UnitsChanges) | |
67 fw.machines[addedMachine.Id()] = m | |
68 log.Debugf("Added machine %v", m.id) | |
69 } | |
70 case <-machineUnitsChanges: | |
71 // TODO(mue) fill with life. | |
72 } | |
73 } | |
74 } | |
75 | |
76 // finishes cleans up when the firewaller is stopping. | |
77 func (fw *Firewaller) finish() { | |
78 for _, m := range fw.machines { | |
79 fw.tomb.Kill(m.stop()) | |
80 } | |
81 fw.st.Close() | |
82 fw.tomb.Done() | |
83 } | |
84 | |
85 // Wait waits for the Firewaller to exit. | |
86 func (fw *Firewaller) Wait() error { | |
87 return fw.tomb.Wait() | |
88 } | |
89 | |
90 // Stop stops the Firewaller and returns any error encountered while stopping. | |
91 func (fw *Firewaller) Stop() error { | |
92 fw.tomb.Kill(nil) | |
93 return fw.tomb.Wait() | |
94 } | |
95 | |
96 // machineUnitsChange contains the changed units for one specific machine.· | |
97 type machineUnitsChange struct { | |
98 machine *machineTracker | |
99 change *state.MachineUnitsChange | |
100 } | |
101 | |
102 // machineTracker keeps track of the unit changes of a machine. | |
103 type machineTracker struct { | |
104 firewaller *Firewaller | |
105 changes chan<- *machineUnitsChange | |
106 tomb tomb.Tomb | |
107 id int | |
108 watcher *state.MachineUnitsWatcher | |
109 ports map[state.Port]*unitTracker | |
110 } | |
111 | |
112 // newMachineTracker creates a new machine tracker keeping track of | |
113 // unit changes of the passed machine. | |
114 func newMachineTracker(mst *state.Machine, fw *Firewaller, changes chan<- *machi neUnitsChange) *machineTracker { | |
115 mt := &machineTracker{ | |
116 firewaller: fw, | |
117 changes: changes, | |
118 id: mst.Id(), | |
119 watcher: mst.WatchUnits(), | |
120 ports: make(map[state.Port]*unitTracker), | |
121 } | |
122 go mt.loop() | |
123 return mt | |
124 } | |
125 | |
126 // loop is the backend watching for machine units changes. | |
127 func (mt *machineTracker) loop() { | |
128 defer mt.tomb.Done() | |
129 defer mt.watcher.Stop() | |
130 for { | |
131 select { | |
132 case <-mt.firewaller.tomb.Dying(): | |
133 return | |
134 case <-mt.tomb.Dying(): | |
135 return | |
136 case change, ok := <-mt.watcher.Changes(): | |
137 // Send change or nil. | |
138 select { | |
139 case mt.changes <- &machineUnitsChange{mt, change}: | |
niemeyer
2012/07/18 13:56:01
What happens if mt.changes is closed?
We have to
rog
2012/07/18 14:10:01
AFAICS mt.changes is the same channel as machineUn
TheMue
2012/07/18 16:26:40
Done.
| |
140 case <-mt.firewaller.tomb.Dying(): | |
141 return | |
142 case <-mt.tomb.Dying(): | |
143 return | |
144 } | |
145 // Had been an error, so end the loop. | |
rog
2012/07/18 12:26:39
// The watcher terminated prematurely, so end the
TheMue
2012/07/18 16:26:40
Done.
| |
146 if !ok { | |
147 mt.firewaller.tomb.Kill(watcher.MustErr(mt.watch er)) | |
148 return | |
149 } | |
150 } | |
151 } | |
152 } | |
153 | |
154 // stop stops the machine tracker. | |
155 func (mt *machineTracker) stop() error { | |
156 mt.tomb.Kill(nil) | |
157 return mt.tomb.Wait() | |
158 } | |
159 | |
160 type serviceTracker struct { | |
161 exposed bool | |
162 } | |
163 | |
164 type unitTracker struct { | |
165 service *serviceTracker | |
166 id string | |
167 ports []state.Port | |
168 } | |
OLD | NEW |