OLD | NEW |
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package main | 4 package main |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "os" | 8 "os" |
9 "path/filepath" | 9 "path/filepath" |
10 "time" | 10 "time" |
11 | 11 |
12 "launchpad.net/gnuflag" | 12 "launchpad.net/gnuflag" |
13 "launchpad.net/loggo" | 13 "launchpad.net/loggo" |
14 "launchpad.net/tomb" | 14 "launchpad.net/tomb" |
15 | 15 |
16 "launchpad.net/juju-core/agent" | 16 "launchpad.net/juju-core/agent" |
17 "launchpad.net/juju-core/charm" | 17 "launchpad.net/juju-core/charm" |
18 "launchpad.net/juju-core/cmd" | 18 "launchpad.net/juju-core/cmd" |
| 19 "launchpad.net/juju-core/container/kvm" |
19 "launchpad.net/juju-core/instance" | 20 "launchpad.net/juju-core/instance" |
20 "launchpad.net/juju-core/log" | |
21 "launchpad.net/juju-core/names" | 21 "launchpad.net/juju-core/names" |
22 "launchpad.net/juju-core/provider" | 22 "launchpad.net/juju-core/provider" |
23 "launchpad.net/juju-core/state" | 23 "launchpad.net/juju-core/state" |
24 "launchpad.net/juju-core/state/api" | 24 "launchpad.net/juju-core/state/api" |
| 25 apiagent "launchpad.net/juju-core/state/api/agent" |
25 "launchpad.net/juju-core/state/api/params" | 26 "launchpad.net/juju-core/state/api/params" |
| 27 apiprovisioner "launchpad.net/juju-core/state/api/provisioner" |
26 "launchpad.net/juju-core/state/apiserver" | 28 "launchpad.net/juju-core/state/apiserver" |
27 "launchpad.net/juju-core/upstart" | 29 "launchpad.net/juju-core/upstart" |
28 "launchpad.net/juju-core/worker" | 30 "launchpad.net/juju-core/worker" |
29 "launchpad.net/juju-core/worker/addressupdater" | 31 "launchpad.net/juju-core/worker/addressupdater" |
30 "launchpad.net/juju-core/worker/cleaner" | 32 "launchpad.net/juju-core/worker/cleaner" |
31 "launchpad.net/juju-core/worker/deployer" | 33 "launchpad.net/juju-core/worker/deployer" |
32 "launchpad.net/juju-core/worker/firewaller" | 34 "launchpad.net/juju-core/worker/firewaller" |
33 "launchpad.net/juju-core/worker/localstorage" | 35 "launchpad.net/juju-core/worker/localstorage" |
34 » "launchpad.net/juju-core/worker/logger" | 36 » workerlogger "launchpad.net/juju-core/worker/logger" |
35 "launchpad.net/juju-core/worker/machiner" | 37 "launchpad.net/juju-core/worker/machiner" |
36 "launchpad.net/juju-core/worker/minunitsworker" | 38 "launchpad.net/juju-core/worker/minunitsworker" |
37 "launchpad.net/juju-core/worker/provisioner" | 39 "launchpad.net/juju-core/worker/provisioner" |
38 "launchpad.net/juju-core/worker/resumer" | 40 "launchpad.net/juju-core/worker/resumer" |
39 "launchpad.net/juju-core/worker/upgrader" | 41 "launchpad.net/juju-core/worker/upgrader" |
40 ) | 42 ) |
41 | 43 |
42 type workerRunner interface { | 44 var logger = loggo.GetLogger("juju.cmd.jujud") |
43 » worker.Worker | |
44 » StartWorker(id string, startFunc func() (worker.Worker, error)) error | |
45 » StopWorker(id string) error | |
46 } | |
47 | 45 |
48 var newRunner = func(isFatal func(error) bool, moreImportant func(e0, e1 error)
bool) workerRunner { | 46 var newRunner = func(isFatal func(error) bool, moreImportant func(e0, e1 error)
bool) worker.Runner { |
49 return worker.NewRunner(isFatal, moreImportant) | 47 return worker.NewRunner(isFatal, moreImportant) |
50 } | 48 } |
51 | 49 |
52 const bootstrapMachineId = "0" | 50 const bootstrapMachineId = "0" |
53 | 51 |
54 var retryDelay = 3 * time.Second | 52 var retryDelay = 3 * time.Second |
55 | 53 |
56 // MachineAgent is a cmd.Command responsible for running a machine agent. | 54 // MachineAgent is a cmd.Command responsible for running a machine agent. |
57 type MachineAgent struct { | 55 type MachineAgent struct { |
58 cmd.CommandBase | 56 cmd.CommandBase |
59 tomb tomb.Tomb | 57 tomb tomb.Tomb |
60 Conf AgentConf | 58 Conf AgentConf |
61 MachineId string | 59 MachineId string |
62 » runner workerRunner | 60 » runner worker.Runner |
63 } | 61 } |
64 | 62 |
65 // Info returns usage information for the command. | 63 // Info returns usage information for the command. |
66 func (a *MachineAgent) Info() *cmd.Info { | 64 func (a *MachineAgent) Info() *cmd.Info { |
67 return &cmd.Info{ | 65 return &cmd.Info{ |
68 Name: "machine", | 66 Name: "machine", |
69 Purpose: "run a juju machine agent", | 67 Purpose: "run a juju machine agent", |
70 } | 68 } |
71 } | 69 } |
72 | 70 |
(...skipping 26 matching lines...) Expand all Loading... |
99 } | 97 } |
100 | 98 |
101 // Run runs a machine agent. | 99 // Run runs a machine agent. |
102 func (a *MachineAgent) Run(_ *cmd.Context) error { | 100 func (a *MachineAgent) Run(_ *cmd.Context) error { |
103 // Due to changes in the logging, and needing to care about old | 101 // Due to changes in the logging, and needing to care about old |
104 // environments that have been upgraded, we need to explicitly remove th
e | 102 // environments that have been upgraded, we need to explicitly remove th
e |
105 // file writer if one has been added, otherwise we will get duplicate | 103 // file writer if one has been added, otherwise we will get duplicate |
106 // lines of all logging in the log file. | 104 // lines of all logging in the log file. |
107 loggo.RemoveWriter("logfile") | 105 loggo.RemoveWriter("logfile") |
108 defer a.tomb.Done() | 106 defer a.tomb.Done() |
109 » log.Infof("machine agent %v start", a.Tag()) | 107 » logger.Infof("machine agent %v start", a.Tag()) |
110 if err := a.Conf.read(a.Tag()); err != nil { | 108 if err := a.Conf.read(a.Tag()); err != nil { |
111 return err | 109 return err |
112 } | 110 } |
113 charm.CacheDir = filepath.Join(a.Conf.dataDir, "charmcache") | 111 charm.CacheDir = filepath.Join(a.Conf.dataDir, "charmcache") |
114 | 112 |
115 // ensureStateWorker ensures that there is a worker that | 113 // ensureStateWorker ensures that there is a worker that |
116 // connects to the state that runs within itself all the workers | 114 // connects to the state that runs within itself all the workers |
117 // that need a state connection. Unless we're bootstrapping, we | 115 // that need a state connection. Unless we're bootstrapping, we |
118 // need to connect to the API server to find out if we need to | 116 // need to connect to the API server to find out if we need to |
119 // call this, so we make the APIWorker call it when necessary if | 117 // call this, so we make the APIWorker call it when necessary if |
120 // the machine requires it. Note that ensureStateWorker can be | 118 // the machine requires it. Note that ensureStateWorker can be |
121 // called many times - StartWorker does nothing if there is | 119 // called many times - StartWorker does nothing if there is |
122 // already a worker started with the given name. | 120 // already a worker started with the given name. |
123 ensureStateWorker := func() { | 121 ensureStateWorker := func() { |
124 a.runner.StartWorker("state", a.StateWorker) | 122 a.runner.StartWorker("state", a.StateWorker) |
125 } | 123 } |
126 // We might be bootstrapping, and the API server is not | 124 // We might be bootstrapping, and the API server is not |
127 // running yet. If so, make sure we run a state worker instead. | 125 // running yet. If so, make sure we run a state worker instead. |
128 if a.MachineId == bootstrapMachineId { | 126 if a.MachineId == bootstrapMachineId { |
129 // TODO(rog) When we have HA, we only want to do this | 127 // TODO(rog) When we have HA, we only want to do this |
130 // when we really are bootstrapping - once other | 128 // when we really are bootstrapping - once other |
131 // instances of the API server have been started, we | 129 // instances of the API server have been started, we |
132 // should follow the normal course of things and ignore | 130 // should follow the normal course of things and ignore |
133 // the fact that this was once the bootstrap machine. | 131 // the fact that this was once the bootstrap machine. |
134 » » log.Infof("Starting StateWorker for machine-0") | 132 » » logger.Infof("Starting StateWorker for machine-0") |
135 ensureStateWorker() | 133 ensureStateWorker() |
136 } | 134 } |
137 a.runner.StartWorker("api", func() (worker.Worker, error) { | 135 a.runner.StartWorker("api", func() (worker.Worker, error) { |
138 return a.APIWorker(ensureStateWorker) | 136 return a.APIWorker(ensureStateWorker) |
139 }) | 137 }) |
140 err := a.runner.Wait() | 138 err := a.runner.Wait() |
141 if err == worker.ErrTerminateAgent { | 139 if err == worker.ErrTerminateAgent { |
142 err = a.uninstallAgent() | 140 err = a.uninstallAgent() |
143 } | 141 } |
144 err = agentDone(err) | 142 err = agentDone(err) |
(...skipping 19 matching lines...) Expand all Loading... |
164 } | 162 } |
165 } | 163 } |
166 runner := newRunner(connectionIsFatal(st), moreImportant) | 164 runner := newRunner(connectionIsFatal(st), moreImportant) |
167 runner.StartWorker("machiner", func() (worker.Worker, error) { | 165 runner.StartWorker("machiner", func() (worker.Worker, error) { |
168 return machiner.NewMachiner(st.Machiner(), agentConfig), nil | 166 return machiner.NewMachiner(st.Machiner(), agentConfig), nil |
169 }) | 167 }) |
170 runner.StartWorker("upgrader", func() (worker.Worker, error) { | 168 runner.StartWorker("upgrader", func() (worker.Worker, error) { |
171 return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil | 169 return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil |
172 }) | 170 }) |
173 runner.StartWorker("logger", func() (worker.Worker, error) { | 171 runner.StartWorker("logger", func() (worker.Worker, error) { |
174 » » return logger.NewLogger(st.Logger(), agentConfig), nil | 172 » » return workerlogger.NewLogger(st.Logger(), agentConfig), nil |
175 }) | 173 }) |
176 » // At this stage, since we don't embed LXC containers, just start an lxc | 174 |
177 » // provisioner task for non-lxc containers. Since we have only LXC | 175 » // Perform the operations needed to set up hosting for containers. |
178 » // containers and normal machines, this effectively means that we only | 176 » if err := a.setupContainerSupport(runner, st, entity); err != nil { |
179 » // have an LXC provisioner when we have a normally provisioned machine | 177 » » return nil, fmt.Errorf("setting up container support: %v", err) |
180 » // (through the environ-provisioner). With the upcoming advent of KVM | |
181 » // containers, it is likely that we will want an LXC provisioner on a KV
M | |
182 » // machine, and once we get nested LXC containers, we can remove this | |
183 » // check. | |
184 » // | |
185 » // TODO(dimitern) 2013-09-25 bug #1230289 | |
186 » // Create jobs for container providers, rather than | |
187 » // using the provider and container type like this. | |
188 » providerType := agentConfig.Value(agent.ProviderType) | |
189 » if providerType != provider.Local && entity.ContainerType() != instance.
LXC { | |
190 » » workerName := fmt.Sprintf("%s-provisioner", provisioner.LXC) | |
191 » » runner.StartWorker(workerName, func() (worker.Worker, error) { | |
192 » » » return provisioner.NewProvisioner(provisioner.LXC, st.Pr
ovisioner(), agentConfig), nil | |
193 » » }) | |
194 } | 178 } |
195 for _, job := range entity.Jobs() { | 179 for _, job := range entity.Jobs() { |
196 switch job { | 180 switch job { |
197 case params.JobHostUnits: | 181 case params.JobHostUnits: |
198 runner.StartWorker("deployer", func() (worker.Worker, er
ror) { | 182 runner.StartWorker("deployer", func() (worker.Worker, er
ror) { |
199 apiDeployer := st.Deployer() | 183 apiDeployer := st.Deployer() |
200 context := newDeployContext(apiDeployer, agentCo
nfig) | 184 context := newDeployContext(apiDeployer, agentCo
nfig) |
201 return deployer.NewDeployer(apiDeployer, context
), nil | 185 return deployer.NewDeployer(apiDeployer, context
), nil |
202 }) | 186 }) |
203 case params.JobManageEnviron: | 187 case params.JobManageEnviron: |
204 runner.StartWorker("environ-provisioner", func() (worker
.Worker, error) { | 188 runner.StartWorker("environ-provisioner", func() (worker
.Worker, error) { |
205 return provisioner.NewProvisioner(provisioner.EN
VIRON, st.Provisioner(), agentConfig), nil | 189 return provisioner.NewProvisioner(provisioner.EN
VIRON, st.Provisioner(), agentConfig), nil |
206 }) | 190 }) |
207 // TODO(dimitern): Add firewaller here, when using the A
PI. | 191 // TODO(dimitern): Add firewaller here, when using the A
PI. |
208 case params.JobManageState: | 192 case params.JobManageState: |
209 // Not yet implemented with the API. | 193 // Not yet implemented with the API. |
210 default: | 194 default: |
211 // TODO(dimitern): Once all workers moved over to using | 195 // TODO(dimitern): Once all workers moved over to using |
212 // the API, report "unknown job type" here. | 196 // the API, report "unknown job type" here. |
213 } | 197 } |
214 } | 198 } |
215 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel
f a worker.Worker. | 199 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel
f a worker.Worker. |
216 } | 200 } |
217 | 201 |
| 202 // setupContainerSupport determines what containers can be run on this machine a
nd |
| 203 // initialises suitable infrastructure to support such containers. |
| 204 func (a *MachineAgent) setupContainerSupport(runner worker.Runner, st *api.State
, entity *apiagent.Entity) error { |
| 205 var supportedContainers []instance.ContainerType |
| 206 // We don't yet support nested lxc containers but anything else can run
an LXC container. |
| 207 if entity.ContainerType() != instance.LXC { |
| 208 supportedContainers = append(supportedContainers, instance.LXC) |
| 209 } |
| 210 supportsKvm, err := kvm.IsKVMSupported() |
| 211 if err != nil { |
| 212 logger.Warningf("determining kvm support: %v\nno kvm containers
possible", err) |
| 213 } |
| 214 if err == nil && supportsKvm { |
| 215 supportedContainers = append(supportedContainers, instance.KVM) |
| 216 } |
| 217 // If no containers are supported, there's no need to go further. |
| 218 if len(supportedContainers) == 0 { |
| 219 return nil |
| 220 } |
| 221 return a.updateSupportedContainers(runner, st, entity.Tag(), supportedCo
ntainers) |
| 222 } |
| 223 |
| 224 // updateSupportedContainers records in state that a machine can run the specifi
ed containers. |
| 225 // It starts a watcher and when a container of a given type is first added to th
e machine, |
| 226 // the watcher is killed, the machine is set up to be able to start containers o
f the given type, |
| 227 // and a suitable provisioner is started. |
| 228 func (a *MachineAgent) updateSupportedContainers(runner worker.Runner, st *api.S
tate, |
| 229 tag string, containers []instance.ContainerType) error { |
| 230 |
| 231 var machine *apiprovisioner.Machine |
| 232 var err error |
| 233 pr := st.Provisioner() |
| 234 if machine, err = pr.Machine(tag); err != nil { |
| 235 return fmt.Errorf("%s is not in state: %v", tag, err) |
| 236 } |
| 237 if err := machine.AddSupportedContainers(containers...); err != nil { |
| 238 return fmt.Errorf("adding supported containers to %s: %v", tag,
err) |
| 239 } |
| 240 // Start the watcher to fire when a container is first requested on the
machine. |
| 241 for _, ctype := range containers { |
| 242 watcherName := fmt.Sprintf("%s-watcher", ctype) |
| 243 handler := provisioner.NewContainerSetupHandler(runner, watcherN
ame, ctype, machine, pr, a.Conf.config) |
| 244 runner.StartWorker(watcherName, func() (worker.Worker, error) { |
| 245 return worker.NewStringsWorker(handler), nil |
| 246 }) |
| 247 } |
| 248 return nil |
| 249 } |
| 250 |
218 // StateJobs returns a worker running all the workers that require | 251 // StateJobs returns a worker running all the workers that require |
219 // a *state.State cofnnection. | 252 // a *state.State cofnnection. |
220 func (a *MachineAgent) StateWorker() (worker.Worker, error) { | 253 func (a *MachineAgent) StateWorker() (worker.Worker, error) { |
221 agentConfig := a.Conf.config | 254 agentConfig := a.Conf.config |
222 st, entity, err := openState(agentConfig, a) | 255 st, entity, err := openState(agentConfig, a) |
223 if err != nil { | 256 if err != nil { |
224 return nil, err | 257 return nil, err |
225 } | 258 } |
226 reportOpenedState(st) | 259 reportOpenedState(st) |
227 m := entity.(*state.Machine) | 260 m := entity.(*state.Machine) |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
272 runner.StartWorker("resumer", func() (worker.Worker, err
or) { | 305 runner.StartWorker("resumer", func() (worker.Worker, err
or) { |
273 // The action of resumer is so subtle that it is
not tested, | 306 // The action of resumer is so subtle that it is
not tested, |
274 // because we can't figure out how to do so with
out brutalising | 307 // because we can't figure out how to do so with
out brutalising |
275 // the transaction log. | 308 // the transaction log. |
276 return resumer.NewResumer(st), nil | 309 return resumer.NewResumer(st), nil |
277 }) | 310 }) |
278 runner.StartWorker("minunitsworker", func() (worker.Work
er, error) { | 311 runner.StartWorker("minunitsworker", func() (worker.Work
er, error) { |
279 return minunitsworker.NewMinUnitsWorker(st), nil | 312 return minunitsworker.NewMinUnitsWorker(st), nil |
280 }) | 313 }) |
281 default: | 314 default: |
282 » » » log.Warningf("ignoring unknown job %q", job) | 315 » » » logger.Warningf("ignoring unknown job %q", job) |
283 } | 316 } |
284 } | 317 } |
285 return newCloseWorker(runner, st), nil | 318 return newCloseWorker(runner, st), nil |
286 } | 319 } |
287 | 320 |
288 func (a *MachineAgent) Entity(st *state.State) (AgentState, error) { | 321 func (a *MachineAgent) Entity(st *state.State) (AgentState, error) { |
289 m, err := st.Machine(a.MachineId) | 322 m, err := st.Machine(a.MachineId) |
290 if err != nil { | 323 if err != nil { |
291 return nil, err | 324 return nil, err |
292 } | 325 } |
293 // Check the machine nonce as provisioned matches the agent.Conf value. | 326 // Check the machine nonce as provisioned matches the agent.Conf value. |
294 if !m.CheckProvisioned(a.Conf.config.Nonce()) { | 327 if !m.CheckProvisioned(a.Conf.config.Nonce()) { |
295 // The agent is running on a different machine to the one it | 328 // The agent is running on a different machine to the one it |
296 // should be according to state. It must stop immediately. | 329 // should be according to state. It must stop immediately. |
297 » » log.Errorf("running machine %v agent on inappropriate instance",
m) | 330 » » logger.Errorf("running machine %v agent on inappropriate instanc
e", m) |
298 return nil, worker.ErrTerminateAgent | 331 return nil, worker.ErrTerminateAgent |
299 } | 332 } |
300 return m, nil | 333 return m, nil |
301 } | 334 } |
302 | 335 |
303 func (a *MachineAgent) Tag() string { | 336 func (a *MachineAgent) Tag() string { |
304 return names.MachineTag(a.MachineId) | 337 return names.MachineTag(a.MachineId) |
305 } | 338 } |
306 | 339 |
307 func (m *MachineAgent) uninstallAgent() error { | 340 func (m *MachineAgent) uninstallAgent() error { |
(...skipping 30 matching lines...) Expand all Loading... |
338 select { | 371 select { |
339 case apiReporter <- st: | 372 case apiReporter <- st: |
340 default: | 373 default: |
341 } | 374 } |
342 } | 375 } |
343 func sendOpenedAPIs(dst chan<- *api.State) (undo func()) { | 376 func sendOpenedAPIs(dst chan<- *api.State) (undo func()) { |
344 var original chan<- *api.State | 377 var original chan<- *api.State |
345 original, apiReporter = apiReporter, dst | 378 original, apiReporter = apiReporter, dst |
346 return func() { apiReporter = original } | 379 return func() { apiReporter = original } |
347 } | 380 } |
OLD | NEW |