Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(910)

Side by Side Diff: cmd/jujud/machine.go

Issue 25040043: Refactor container provisioner (Closed)
Patch Set: - Created 10 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package main 4 package main
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "os" 8 "os"
9 "path/filepath" 9 "path/filepath"
10 "time" 10 "time"
11 11
12 "launchpad.net/gnuflag" 12 "launchpad.net/gnuflag"
13 "launchpad.net/loggo" 13 "launchpad.net/loggo"
14 "launchpad.net/tomb" 14 "launchpad.net/tomb"
15 15
16 "launchpad.net/juju-core/agent" 16 "launchpad.net/juju-core/agent"
17 "launchpad.net/juju-core/charm" 17 "launchpad.net/juju-core/charm"
18 "launchpad.net/juju-core/cmd" 18 "launchpad.net/juju-core/cmd"
19 "launchpad.net/juju-core/container/kvm"
19 "launchpad.net/juju-core/instance" 20 "launchpad.net/juju-core/instance"
20 "launchpad.net/juju-core/log"
21 "launchpad.net/juju-core/names" 21 "launchpad.net/juju-core/names"
22 "launchpad.net/juju-core/provider" 22 "launchpad.net/juju-core/provider"
23 "launchpad.net/juju-core/state" 23 "launchpad.net/juju-core/state"
24 "launchpad.net/juju-core/state/api" 24 "launchpad.net/juju-core/state/api"
25 apiagent "launchpad.net/juju-core/state/api/agent"
25 "launchpad.net/juju-core/state/api/params" 26 "launchpad.net/juju-core/state/api/params"
27 apiprovisioner "launchpad.net/juju-core/state/api/provisioner"
26 "launchpad.net/juju-core/state/apiserver" 28 "launchpad.net/juju-core/state/apiserver"
27 "launchpad.net/juju-core/upstart" 29 "launchpad.net/juju-core/upstart"
28 "launchpad.net/juju-core/worker" 30 "launchpad.net/juju-core/worker"
29 "launchpad.net/juju-core/worker/addressupdater" 31 "launchpad.net/juju-core/worker/addressupdater"
30 "launchpad.net/juju-core/worker/cleaner" 32 "launchpad.net/juju-core/worker/cleaner"
31 "launchpad.net/juju-core/worker/deployer" 33 "launchpad.net/juju-core/worker/deployer"
32 "launchpad.net/juju-core/worker/firewaller" 34 "launchpad.net/juju-core/worker/firewaller"
33 "launchpad.net/juju-core/worker/localstorage" 35 "launchpad.net/juju-core/worker/localstorage"
34 » "launchpad.net/juju-core/worker/logger" 36 » workerlogger "launchpad.net/juju-core/worker/logger"
35 "launchpad.net/juju-core/worker/machiner" 37 "launchpad.net/juju-core/worker/machiner"
36 "launchpad.net/juju-core/worker/minunitsworker" 38 "launchpad.net/juju-core/worker/minunitsworker"
37 "launchpad.net/juju-core/worker/provisioner" 39 "launchpad.net/juju-core/worker/provisioner"
38 "launchpad.net/juju-core/worker/resumer" 40 "launchpad.net/juju-core/worker/resumer"
39 "launchpad.net/juju-core/worker/upgrader" 41 "launchpad.net/juju-core/worker/upgrader"
40 ) 42 )
41 43
42 type workerRunner interface { 44 var logger = loggo.GetLogger("juju.cmd.jujud")
43 » worker.Worker
44 » StartWorker(id string, startFunc func() (worker.Worker, error)) error
45 » StopWorker(id string) error
46 }
47 45
48 var newRunner = func(isFatal func(error) bool, moreImportant func(e0, e1 error) bool) workerRunner { 46 var newRunner = func(isFatal func(error) bool, moreImportant func(e0, e1 error) bool) worker.Runner {
49 return worker.NewRunner(isFatal, moreImportant) 47 return worker.NewRunner(isFatal, moreImportant)
50 } 48 }
51 49
52 const bootstrapMachineId = "0" 50 const bootstrapMachineId = "0"
53 51
54 var retryDelay = 3 * time.Second 52 var retryDelay = 3 * time.Second
55 53
56 // MachineAgent is a cmd.Command responsible for running a machine agent. 54 // MachineAgent is a cmd.Command responsible for running a machine agent.
57 type MachineAgent struct { 55 type MachineAgent struct {
58 cmd.CommandBase 56 cmd.CommandBase
59 tomb tomb.Tomb 57 tomb tomb.Tomb
60 Conf AgentConf 58 Conf AgentConf
61 MachineId string 59 MachineId string
62 » runner workerRunner 60 » runner worker.Runner
63 } 61 }
64 62
65 // Info returns usage information for the command. 63 // Info returns usage information for the command.
66 func (a *MachineAgent) Info() *cmd.Info { 64 func (a *MachineAgent) Info() *cmd.Info {
67 return &cmd.Info{ 65 return &cmd.Info{
68 Name: "machine", 66 Name: "machine",
69 Purpose: "run a juju machine agent", 67 Purpose: "run a juju machine agent",
70 } 68 }
71 } 69 }
72 70
(...skipping 26 matching lines...) Expand all
99 } 97 }
100 98
101 // Run runs a machine agent. 99 // Run runs a machine agent.
102 func (a *MachineAgent) Run(_ *cmd.Context) error { 100 func (a *MachineAgent) Run(_ *cmd.Context) error {
103 // Due to changes in the logging, and needing to care about old 101 // Due to changes in the logging, and needing to care about old
104 // environments that have been upgraded, we need to explicitly remove th e 102 // environments that have been upgraded, we need to explicitly remove th e
105 // file writer if one has been added, otherwise we will get duplicate 103 // file writer if one has been added, otherwise we will get duplicate
106 // lines of all logging in the log file. 104 // lines of all logging in the log file.
107 loggo.RemoveWriter("logfile") 105 loggo.RemoveWriter("logfile")
108 defer a.tomb.Done() 106 defer a.tomb.Done()
109 » log.Infof("machine agent %v start", a.Tag()) 107 » logger.Infof("machine agent %v start", a.Tag())
110 if err := a.Conf.read(a.Tag()); err != nil { 108 if err := a.Conf.read(a.Tag()); err != nil {
111 return err 109 return err
112 } 110 }
113 charm.CacheDir = filepath.Join(a.Conf.dataDir, "charmcache") 111 charm.CacheDir = filepath.Join(a.Conf.dataDir, "charmcache")
114 112
115 // ensureStateWorker ensures that there is a worker that 113 // ensureStateWorker ensures that there is a worker that
116 // connects to the state that runs within itself all the workers 114 // connects to the state that runs within itself all the workers
117 // that need a state connection. Unless we're bootstrapping, we 115 // that need a state connection. Unless we're bootstrapping, we
118 // need to connect to the API server to find out if we need to 116 // need to connect to the API server to find out if we need to
119 // call this, so we make the APIWorker call it when necessary if 117 // call this, so we make the APIWorker call it when necessary if
120 // the machine requires it. Note that ensureStateWorker can be 118 // the machine requires it. Note that ensureStateWorker can be
121 // called many times - StartWorker does nothing if there is 119 // called many times - StartWorker does nothing if there is
122 // already a worker started with the given name. 120 // already a worker started with the given name.
123 ensureStateWorker := func() { 121 ensureStateWorker := func() {
124 a.runner.StartWorker("state", a.StateWorker) 122 a.runner.StartWorker("state", a.StateWorker)
125 } 123 }
126 // We might be bootstrapping, and the API server is not 124 // We might be bootstrapping, and the API server is not
127 // running yet. If so, make sure we run a state worker instead. 125 // running yet. If so, make sure we run a state worker instead.
128 if a.MachineId == bootstrapMachineId { 126 if a.MachineId == bootstrapMachineId {
129 // TODO(rog) When we have HA, we only want to do this 127 // TODO(rog) When we have HA, we only want to do this
130 // when we really are bootstrapping - once other 128 // when we really are bootstrapping - once other
131 // instances of the API server have been started, we 129 // instances of the API server have been started, we
132 // should follow the normal course of things and ignore 130 // should follow the normal course of things and ignore
133 // the fact that this was once the bootstrap machine. 131 // the fact that this was once the bootstrap machine.
134 » » log.Infof("Starting StateWorker for machine-0") 132 » » logger.Infof("Starting StateWorker for machine-0")
135 ensureStateWorker() 133 ensureStateWorker()
136 } 134 }
137 a.runner.StartWorker("api", func() (worker.Worker, error) { 135 a.runner.StartWorker("api", func() (worker.Worker, error) {
138 return a.APIWorker(ensureStateWorker) 136 return a.APIWorker(ensureStateWorker)
139 }) 137 })
140 err := a.runner.Wait() 138 err := a.runner.Wait()
141 if err == worker.ErrTerminateAgent { 139 if err == worker.ErrTerminateAgent {
142 err = a.uninstallAgent() 140 err = a.uninstallAgent()
143 } 141 }
144 err = agentDone(err) 142 err = agentDone(err)
(...skipping 19 matching lines...) Expand all
164 } 162 }
165 } 163 }
166 runner := newRunner(connectionIsFatal(st), moreImportant) 164 runner := newRunner(connectionIsFatal(st), moreImportant)
167 runner.StartWorker("machiner", func() (worker.Worker, error) { 165 runner.StartWorker("machiner", func() (worker.Worker, error) {
168 return machiner.NewMachiner(st.Machiner(), agentConfig), nil 166 return machiner.NewMachiner(st.Machiner(), agentConfig), nil
169 }) 167 })
170 runner.StartWorker("upgrader", func() (worker.Worker, error) { 168 runner.StartWorker("upgrader", func() (worker.Worker, error) {
171 return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil 169 return upgrader.NewUpgrader(st.Upgrader(), agentConfig), nil
172 }) 170 })
173 runner.StartWorker("logger", func() (worker.Worker, error) { 171 runner.StartWorker("logger", func() (worker.Worker, error) {
174 » » return logger.NewLogger(st.Logger(), agentConfig), nil 172 » » return workerlogger.NewLogger(st.Logger(), agentConfig), nil
175 }) 173 })
176 » // At this stage, since we don't embed LXC containers, just start an lxc 174
177 » // provisioner task for non-lxc containers. Since we have only LXC 175 » // Perform the operations needed to set up hosting for containers.
178 » // containers and normal machines, this effectively means that we only 176 » if err := a.setupContainerSupport(runner, st, entity); err != nil {
179 » // have an LXC provisioner when we have a normally provisioned machine 177 » » return nil, fmt.Errorf("setting up container support: %v", err)
180 » // (through the environ-provisioner). With the upcoming advent of KVM
181 » // containers, it is likely that we will want an LXC provisioner on a KV M
182 » // machine, and once we get nested LXC containers, we can remove this
183 » // check.
184 » //
185 » // TODO(dimitern) 2013-09-25 bug #1230289
186 » // Create jobs for container providers, rather than
187 » // using the provider and container type like this.
188 » providerType := agentConfig.Value(agent.ProviderType)
189 » if providerType != provider.Local && entity.ContainerType() != instance. LXC {
190 » » workerName := fmt.Sprintf("%s-provisioner", provisioner.LXC)
191 » » runner.StartWorker(workerName, func() (worker.Worker, error) {
192 » » » return provisioner.NewProvisioner(provisioner.LXC, st.Pr ovisioner(), agentConfig), nil
193 » » })
194 } 178 }
195 for _, job := range entity.Jobs() { 179 for _, job := range entity.Jobs() {
196 switch job { 180 switch job {
197 case params.JobHostUnits: 181 case params.JobHostUnits:
198 runner.StartWorker("deployer", func() (worker.Worker, er ror) { 182 runner.StartWorker("deployer", func() (worker.Worker, er ror) {
199 apiDeployer := st.Deployer() 183 apiDeployer := st.Deployer()
200 context := newDeployContext(apiDeployer, agentCo nfig) 184 context := newDeployContext(apiDeployer, agentCo nfig)
201 return deployer.NewDeployer(apiDeployer, context ), nil 185 return deployer.NewDeployer(apiDeployer, context ), nil
202 }) 186 })
203 case params.JobManageEnviron: 187 case params.JobManageEnviron:
204 runner.StartWorker("environ-provisioner", func() (worker .Worker, error) { 188 runner.StartWorker("environ-provisioner", func() (worker .Worker, error) {
205 return provisioner.NewProvisioner(provisioner.EN VIRON, st.Provisioner(), agentConfig), nil 189 return provisioner.NewProvisioner(provisioner.EN VIRON, st.Provisioner(), agentConfig), nil
206 }) 190 })
207 // TODO(dimitern): Add firewaller here, when using the A PI. 191 // TODO(dimitern): Add firewaller here, when using the A PI.
208 case params.JobManageState: 192 case params.JobManageState:
209 // Not yet implemented with the API. 193 // Not yet implemented with the API.
210 default: 194 default:
211 // TODO(dimitern): Once all workers moved over to using 195 // TODO(dimitern): Once all workers moved over to using
212 // the API, report "unknown job type" here. 196 // the API, report "unknown job type" here.
213 } 197 }
214 } 198 }
215 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel f a worker.Worker. 199 return newCloseWorker(runner, st), nil // Note: a worker.Runner is itsel f a worker.Worker.
216 } 200 }
217 201
202 // setupContainerSupport determines what containers can be run on this machine a nd
203 // initialises suitable infrastructure to support such containers.
204 func (a *MachineAgent) setupContainerSupport(runner worker.Runner, st *api.State , entity *apiagent.Entity) error {
205 var supportedContainers []instance.ContainerType
206 // We don't yet support nested lxc containers but anything else can run an LXC container.
207 if entity.ContainerType() != instance.LXC {
208 supportedContainers = append(supportedContainers, instance.LXC)
209 }
210 supportsKvm, err := kvm.IsKVMSupported()
211 if err != nil {
212 logger.Warningf("determining kvm support: %v\nno kvm containers possible", err)
213 }
214 if err == nil && supportsKvm {
215 supportedContainers = append(supportedContainers, instance.KVM)
216 }
217 // If no containers are supported, there's no need to go further.
218 if len(supportedContainers) == 0 {
219 return nil
220 }
221 return a.updateSupportedContainers(runner, st, entity.Tag(), supportedCo ntainers)
222 }
223
224 // updateSupportedContainers records in state that a machine can run the specifi ed containers.
225 // It starts a watcher and when a container of a given type is first added to th e machine,
226 // the watcher is killed, the machine is set up to be able to start containers o f the given type,
227 // and a suitable provisioner is started.
228 func (a *MachineAgent) updateSupportedContainers(runner worker.Runner, st *api.S tate,
229 tag string, containers []instance.ContainerType) error {
230
231 var machine *apiprovisioner.Machine
232 var err error
233 pr := st.Provisioner()
234 if machine, err = pr.Machine(tag); err != nil {
235 return fmt.Errorf("%s is not in state: %v", tag, err)
236 }
237 if err := machine.AddSupportedContainers(containers...); err != nil {
william.reade 2013/11/18 15:30:58 This does look to me more like we're setting the f
238 return fmt.Errorf("adding supported containers to %s: %v", tag, err)
239 }
240 // Start the watcher to fire when a container is first requested on the machine.
241 for _, ctype := range containers {
242 watcherName := fmt.Sprintf("%s-watcher", ctype)
243 handler := provisioner.NewContainerSetupHandler(runner, watcherN ame, ctype, machine, pr, a.Conf.config)
244 runner.StartWorker(watcherName, func() (worker.Worker, error) {
245 return worker.NewStringsWorker(handler), nil
246 })
247 }
248 return nil
249 }
250
218 // StateJobs returns a worker running all the workers that require 251 // StateJobs returns a worker running all the workers that require
219 // a *state.State cofnnection. 252 // a *state.State cofnnection.
220 func (a *MachineAgent) StateWorker() (worker.Worker, error) { 253 func (a *MachineAgent) StateWorker() (worker.Worker, error) {
221 agentConfig := a.Conf.config 254 agentConfig := a.Conf.config
222 st, entity, err := openState(agentConfig, a) 255 st, entity, err := openState(agentConfig, a)
223 if err != nil { 256 if err != nil {
224 return nil, err 257 return nil, err
225 } 258 }
226 reportOpenedState(st) 259 reportOpenedState(st)
227 m := entity.(*state.Machine) 260 m := entity.(*state.Machine)
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
272 runner.StartWorker("resumer", func() (worker.Worker, err or) { 305 runner.StartWorker("resumer", func() (worker.Worker, err or) {
273 // The action of resumer is so subtle that it is not tested, 306 // The action of resumer is so subtle that it is not tested,
274 // because we can't figure out how to do so with out brutalising 307 // because we can't figure out how to do so with out brutalising
275 // the transaction log. 308 // the transaction log.
276 return resumer.NewResumer(st), nil 309 return resumer.NewResumer(st), nil
277 }) 310 })
278 runner.StartWorker("minunitsworker", func() (worker.Work er, error) { 311 runner.StartWorker("minunitsworker", func() (worker.Work er, error) {
279 return minunitsworker.NewMinUnitsWorker(st), nil 312 return minunitsworker.NewMinUnitsWorker(st), nil
280 }) 313 })
281 default: 314 default:
282 » » » log.Warningf("ignoring unknown job %q", job) 315 » » » logger.Warningf("ignoring unknown job %q", job)
283 } 316 }
284 } 317 }
285 return newCloseWorker(runner, st), nil 318 return newCloseWorker(runner, st), nil
286 } 319 }
287 320
288 func (a *MachineAgent) Entity(st *state.State) (AgentState, error) { 321 func (a *MachineAgent) Entity(st *state.State) (AgentState, error) {
289 m, err := st.Machine(a.MachineId) 322 m, err := st.Machine(a.MachineId)
290 if err != nil { 323 if err != nil {
291 return nil, err 324 return nil, err
292 } 325 }
293 // Check the machine nonce as provisioned matches the agent.Conf value. 326 // Check the machine nonce as provisioned matches the agent.Conf value.
294 if !m.CheckProvisioned(a.Conf.config.Nonce()) { 327 if !m.CheckProvisioned(a.Conf.config.Nonce()) {
295 // The agent is running on a different machine to the one it 328 // The agent is running on a different machine to the one it
296 // should be according to state. It must stop immediately. 329 // should be according to state. It must stop immediately.
297 » » log.Errorf("running machine %v agent on inappropriate instance", m) 330 » » logger.Errorf("running machine %v agent on inappropriate instanc e", m)
298 return nil, worker.ErrTerminateAgent 331 return nil, worker.ErrTerminateAgent
299 } 332 }
300 return m, nil 333 return m, nil
301 } 334 }
302 335
303 func (a *MachineAgent) Tag() string { 336 func (a *MachineAgent) Tag() string {
304 return names.MachineTag(a.MachineId) 337 return names.MachineTag(a.MachineId)
305 } 338 }
306 339
307 func (m *MachineAgent) uninstallAgent() error { 340 func (m *MachineAgent) uninstallAgent() error {
(...skipping 30 matching lines...) Expand all
338 select { 371 select {
339 case apiReporter <- st: 372 case apiReporter <- st:
340 default: 373 default:
341 } 374 }
342 } 375 }
343 func sendOpenedAPIs(dst chan<- *api.State) (undo func()) { 376 func sendOpenedAPIs(dst chan<- *api.State) (undo func()) {
344 var original chan<- *api.State 377 var original chan<- *api.State
345 original, apiReporter = apiReporter, dst 378 original, apiReporter = apiReporter, dst
346 return func() { apiReporter = original } 379 return func() { apiReporter = original }
347 } 380 }
OLDNEW
« no previous file with comments | « cmd/jujud/agent.go ('k') | cmd/jujud/unit.go » ('j') | state/api/provisioner/machine.go » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b