Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(372)

Side by Side Diff: worker/firewaller/firewaller.go

Issue 6374069: worker: started implementation of the firewaller (Closed)
Patch Set: worker: started implementation of the firewaller Created 11 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package firewaller
2
3 import (
4 "launchpad.net/juju-core/environs"
5 "launchpad.net/juju-core/log"
6 "launchpad.net/juju-core/state"
7 "launchpad.net/juju-core/state/watcher"
8 "launchpad.net/tomb"
9 )
10
11 // Firewaller watches the state for ports open or closed
niemeyer 2012/07/18 13:56:01 Sorry, my typo: s/open/opened/
TheMue 2012/07/18 16:26:40 Done.
12 // and reflects those changes onto the backing environment.
13 type Firewaller struct {
14 st *state.State
15 environ environs.Environ
16 tomb tomb.Tomb
17 machines map[int]*machineTracker
18 }
19
20 // NewFirewaller returns a new Firewaller.
21 func NewFirewaller(environ environs.Environ) (*Firewaller, error) {
22 info, err := environ.StateInfo()
23 if err != nil {
24 return nil, err
25 }
26 st, err := state.Open(info)
27 if err != nil {
28 return nil, err
29 }
30 fw := &Firewaller{
31 st: st,
32 environ: environ,
33 machines: make(map[int]*machineTracker),
34 }
35 go fw.loop()
36 return fw, nil
37 }
38
39 func (fw *Firewaller) loop() {
40 defer fw.finish()
41 // Set up channels and watchers.
42 machineUnitsChanges := make(chan *machineUnitsChange)
43 defer close(machineUnitsChanges)
44 machinesWatcher := fw.st.WatchMachines()
45 defer watcher.Stop(machinesWatcher, &fw.tomb)
46 for {
47 select {
48 case <-fw.tomb.Dying():
49 return
50 case change, ok := <-machinesWatcher.Changes():
51 if !ok {
52 return
53 }
54 for _, removedMachine := range change.Removed {
55 m, ok := fw.machines[removedMachine.Id()]
56 if !ok {
57 panic("trying to remove machine that was n't added")
58 }
59 if err := m.stop(); err != nil {
60 log.Printf("can't stop machine tracker: %v", err)
61 continue
62 }
63 delete(fw.machines, removedMachine.Id())
niemeyer 2012/07/18 14:11:25 This should also be moved above the if block.
TheMue 2012/07/18 16:26:40 Done.
64 }
65 for _, addedMachine := range change.Added {
66 m := newMachineTracker(addedMachine, fw, machine UnitsChanges)
67 fw.machines[addedMachine.Id()] = m
68 log.Debugf("Added machine %v", m.id)
69 }
70 case <-machineUnitsChanges:
71 // TODO(mue) fill with life.
72 }
73 }
74 }
75
76 // finishes cleans up when the firewaller is stopping.
77 func (fw *Firewaller) finish() {
78 for _, m := range fw.machines {
79 fw.tomb.Kill(m.stop())
80 }
81 fw.st.Close()
82 fw.tomb.Done()
83 }
84
85 // Wait waits for the Firewaller to exit.
86 func (fw *Firewaller) Wait() error {
87 return fw.tomb.Wait()
88 }
89
90 // Stop stops the Firewaller and returns any error encountered while stopping.
91 func (fw *Firewaller) Stop() error {
92 fw.tomb.Kill(nil)
93 return fw.tomb.Wait()
94 }
95
96 // machineUnitsChange contains the changed units for one specific machine.·
97 type machineUnitsChange struct {
98 machine *machineTracker
99 change *state.MachineUnitsChange
100 }
101
102 // machineTracker keeps track of the unit changes of a machine.
103 type machineTracker struct {
104 firewaller *Firewaller
105 changes chan<- *machineUnitsChange
106 tomb tomb.Tomb
107 id int
108 watcher *state.MachineUnitsWatcher
109 ports map[state.Port]*unitTracker
110 }
111
112 // newMachineTracker creates a new machine tracker keeping track of
113 // unit changes of the passed machine.
114 func newMachineTracker(mst *state.Machine, fw *Firewaller, changes chan<- *machi neUnitsChange) *machineTracker {
115 mt := &machineTracker{
116 firewaller: fw,
117 changes: changes,
118 id: mst.Id(),
119 watcher: mst.WatchUnits(),
120 ports: make(map[state.Port]*unitTracker),
121 }
122 go mt.loop()
123 return mt
124 }
125
126 // loop is the backend watching for machine units changes.
127 func (mt *machineTracker) loop() {
128 defer mt.tomb.Done()
129 defer mt.watcher.Stop()
130 for {
131 select {
132 case <-mt.firewaller.tomb.Dying():
133 return
134 case <-mt.tomb.Dying():
135 return
136 case change, ok := <-mt.watcher.Changes():
137 // Send change or nil.
138 select {
139 case mt.changes <- &machineUnitsChange{mt, change}:
niemeyer 2012/07/18 13:56:01 What happens if mt.changes is closed? We have to
rog 2012/07/18 14:10:01 AFAICS mt.changes is the same channel as machineUn
TheMue 2012/07/18 16:26:40 Done.
140 case <-mt.firewaller.tomb.Dying():
141 return
142 case <-mt.tomb.Dying():
143 return
144 }
145 // Had been an error, so end the loop.
rog 2012/07/18 12:26:39 // The watcher terminated prematurely, so end the
TheMue 2012/07/18 16:26:40 Done.
146 if !ok {
147 mt.firewaller.tomb.Kill(watcher.MustErr(mt.watch er))
148 return
149 }
150 }
151 }
152 }
153
154 // stop stops the machine tracker.
155 func (mt *machineTracker) stop() error {
156 mt.tomb.Kill(nil)
157 return mt.tomb.Wait()
158 }
159
160 type serviceTracker struct {
161 exposed bool
162 }
163
164 type unitTracker struct {
165 service *serviceTracker
166 id string
167 ports []state.Port
168 }
OLDNEW

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b