Left: | ||
Right: |
OLD | NEW |
---|---|
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package main | 4 package main |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "os" | 8 "os" |
9 "path/filepath" | 9 "path/filepath" |
10 "time" | 10 "time" |
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
160 } | 160 } |
161 m := entity.(*machineagent.Machine) | 161 m := entity.(*machineagent.Machine) |
162 needsStateWorker := false | 162 needsStateWorker := false |
163 for _, job := range m.Jobs() { | 163 for _, job := range m.Jobs() { |
164 needsStateWorker = needsStateWorker || stateJobs[job] | 164 needsStateWorker = needsStateWorker || stateJobs[job] |
165 } | 165 } |
166 if needsStateWorker { | 166 if needsStateWorker { |
167 ensureStateWorker() | 167 ensureStateWorker() |
168 } | 168 } |
169 runner := worker.NewRunner(allFatal, moreImportant) | 169 runner := worker.NewRunner(allFatal, moreImportant) |
170 » // No agents currently connect to the API, so just | 170 » // Only the machiner currently connect to the API. |
rog
2013/07/22 15:19:18
s/connect/connects/
dimitern
2013/07/22 18:03:02
Done.
| |
171 » // return the runner running nothing. | 171 » // Add other workers here as they are ready. |
172 » runner.StartWorker("machiner", func() (worker.Worker, error) { | |
173 » » return machiner.NewMachiner(st.Machiner(), a.Tag()), nil | |
174 » }) | |
172 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel f a worker.Worker. | 175 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel f a worker.Worker. |
173 } | 176 } |
174 | 177 |
175 // StateJobs returns a worker running all the workers that require | 178 // StateJobs returns a worker running all the workers that require |
176 // a *state.State connection. | 179 // a *state.State connection. |
177 func (a *MachineAgent) StateWorker() (worker.Worker, error) { | 180 func (a *MachineAgent) StateWorker() (worker.Worker, error) { |
178 st, entity, err := openState(a.Conf.Conf, a) | 181 st, entity, err := openState(a.Conf.Conf, a) |
179 if err != nil { | 182 if err != nil { |
180 return nil, err | 183 return nil, err |
181 } | 184 } |
182 // If this fails, other bits will fail, so we just log the error, and | 185 // If this fails, other bits will fail, so we just log the error, and |
183 // let the other failures actually restart runners | 186 // let the other failures actually restart runners |
184 if err := EnsureAPIInfo(a.Conf.Conf, st, entity); err != nil { | 187 if err := EnsureAPIInfo(a.Conf.Conf, st, entity); err != nil { |
185 log.Warningf("failed to EnsureAPIInfo: %v", err) | 188 log.Warningf("failed to EnsureAPIInfo: %v", err) |
186 } | 189 } |
187 reportOpenedState(st) | 190 reportOpenedState(st) |
188 m := entity.(*state.Machine) | 191 m := entity.(*state.Machine) |
189 // TODO(rog) use more discriminating test for errors | 192 // TODO(rog) use more discriminating test for errors |
190 // rather than taking everything down indiscriminately. | 193 // rather than taking everything down indiscriminately. |
191 dataDir := a.Conf.DataDir | 194 dataDir := a.Conf.DataDir |
192 runner := worker.NewRunner(allFatal, moreImportant) | 195 runner := worker.NewRunner(allFatal, moreImportant) |
193 runner.StartWorker("upgrader", func() (worker.Worker, error) { | 196 runner.StartWorker("upgrader", func() (worker.Worker, error) { |
194 // TODO(rog) use id instead of *Machine (or introduce Clone meth od) | 197 // TODO(rog) use id instead of *Machine (or introduce Clone meth od) |
195 return NewUpgrader(st, m, dataDir), nil | 198 return NewUpgrader(st, m, dataDir), nil |
196 }) | 199 }) |
197 runner.StartWorker("machiner", func() (worker.Worker, error) { | |
198 return machiner.NewMachiner(st, m.Id()), nil | |
199 }) | |
200 // At this stage, since we don't embed lxc containers, just start an lxc | 200 // At this stage, since we don't embed lxc containers, just start an lxc |
201 // provisioner task for non-lxc containers. Since we have only LXC | 201 // provisioner task for non-lxc containers. Since we have only LXC |
202 // containers and normal machines, this effectively means that we only | 202 // containers and normal machines, this effectively means that we only |
203 // have an LXC provisioner when we have a normally provisioned machine | 203 // have an LXC provisioner when we have a normally provisioned machine |
204 // (through the environ-provisioner). With the upcoming advent of KVM | 204 // (through the environ-provisioner). With the upcoming advent of KVM |
205 // containers, it is likely that we will want an LXC provisioner on a KV M | 205 // containers, it is likely that we will want an LXC provisioner on a KV M |
206 // machine, and once we get nested LXC containers, we can remove this | 206 // machine, and once we get nested LXC containers, we can remove this |
207 // check. | 207 // check. |
208 providerType := os.Getenv("JUJU_PROVIDER_TYPE") | 208 providerType := os.Getenv("JUJU_PROVIDER_TYPE") |
209 if providerType != provider.Local && m.ContainerType() != instance.LXC { | 209 if providerType != provider.Local && m.ContainerType() != instance.LXC { |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
301 case stateReporter <- st: | 301 case stateReporter <- st: |
302 default: | 302 default: |
303 } | 303 } |
304 } | 304 } |
305 | 305 |
306 func sendOpenedStates(dst chan<- *state.State) (undo func()) { | 306 func sendOpenedStates(dst chan<- *state.State) (undo func()) { |
307 var original chan<- *state.State | 307 var original chan<- *state.State |
308 original, stateReporter = stateReporter, dst | 308 original, stateReporter = stateReporter, dst |
309 return func() { stateReporter = original } | 309 return func() { stateReporter = original } |
310 } | 310 } |
OLD | NEW |