OLD | NEW |
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package provisioner | 4 package provisioner |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "sync/atomic" | 8 "sync/atomic" |
9 | 9 |
10 "launchpad.net/juju-core/agent" | 10 "launchpad.net/juju-core/agent" |
11 "launchpad.net/juju-core/container" | 11 "launchpad.net/juju-core/container" |
12 "launchpad.net/juju-core/container/kvm" | 12 "launchpad.net/juju-core/container/kvm" |
13 "launchpad.net/juju-core/container/lxc" | 13 "launchpad.net/juju-core/container/lxc" |
14 "launchpad.net/juju-core/instance" | 14 "launchpad.net/juju-core/instance" |
| 15 "launchpad.net/juju-core/state" |
15 apiprovisioner "launchpad.net/juju-core/state/api/provisioner" | 16 apiprovisioner "launchpad.net/juju-core/state/api/provisioner" |
16 "launchpad.net/juju-core/state/api/watcher" | 17 "launchpad.net/juju-core/state/api/watcher" |
17 "launchpad.net/juju-core/worker" | 18 "launchpad.net/juju-core/worker" |
18 ) | 19 ) |
19 | 20 |
20 var ProvisonerTypes = map[instance.ContainerType]ProvisionerType{ | 21 var ProvisonerTypes = map[instance.ContainerType]ProvisionerType{ |
21 instance.LXC: LXC, | 22 instance.LXC: LXC, |
22 instance.KVM: KVM, | 23 instance.KVM: KVM, |
23 } | 24 } |
24 | 25 |
25 // ContainerSetup is a StringsWatchHandler that is notified when containers of | 26 // ContainerSetup is a StringsWatchHandler that is notified when containers |
26 // the specified type are created on the given machine. It will set up the | 27 // are created on the given machine. It will set up the machine to be able |
27 // machine to be able to create containers and start a provisioner. | 28 // to create containers and start a suitable provisioner. |
28 type ContainerSetup struct { | 29 type ContainerSetup struct { |
29 » runner worker.Runner | 30 » runner worker.Runner |
30 » containerType instance.ContainerType | 31 » supportedContainers []instance.ContainerType |
31 » provisioner *apiprovisioner.State | 32 » provisioner *apiprovisioner.State |
32 » machine *apiprovisioner.Machine | 33 » machine *apiprovisioner.Machine |
33 » config agent.Config | 34 » config agent.Config |
34 | 35 |
35 // Save the workerName so the worker thread can be stopped. | 36 // Save the workerName so the worker thread can be stopped. |
36 workerName string | 37 workerName string |
37 » // setupDone is non zero if the container setup has been invoked. | 38 » // setupDone[containerType] is non zero if the container setup has been
invoked |
38 » setupDone int32 | 39 » // for that container type. |
| 40 » setupDone map[instance.ContainerType]*int32 |
| 41 » // The number of provisioners started. Once all necessary provisioners h
ave |
| 42 » // been started, the container watcher can be stopped. |
| 43 » numberProvisioners int32 |
39 } | 44 } |
40 | 45 |
41 // NewContainerSetupHandler returns a StringsWatchHandler which is notified when | 46 // NewContainerSetupHandler returns a StringsWatchHandler which is notified when |
42 // containers are created on the given machine. | 47 // containers are created on the given machine. |
43 func NewContainerSetupHandler(runner worker.Runner, workerName string, container
instance.ContainerType, | 48 func NewContainerSetupHandler(runner worker.Runner, workerName string, supported
Containers []instance.ContainerType, |
44 machine *apiprovisioner.Machine, provisioner *apiprovisioner.State, | 49 machine *apiprovisioner.Machine, provisioner *apiprovisioner.State, |
45 config agent.Config) worker.StringsWatchHandler { | 50 config agent.Config) worker.StringsWatchHandler { |
46 | 51 |
47 return &ContainerSetup{ | 52 return &ContainerSetup{ |
48 » » runner: runner, | 53 » » runner: runner, |
49 » » containerType: container, | 54 » » machine: machine, |
50 » » machine: machine, | 55 » » supportedContainers: supportedContainers, |
51 » » provisioner: provisioner, | 56 » » provisioner: provisioner, |
52 » » config: config, | 57 » » config: config, |
53 » » workerName: workerName, | 58 » » workerName: workerName, |
54 } | 59 } |
55 } | 60 } |
56 | 61 |
57 // SetUp is defined on the StringsWatchHandler interface. | 62 // SetUp is defined on the StringsWatchHandler interface. |
58 func (cs *ContainerSetup) SetUp() (watcher watcher.StringsWatcher, err error) { | 63 func (cs *ContainerSetup) SetUp() (watcher watcher.StringsWatcher, err error) { |
59 » if watcher, err = cs.machine.WatchContainers(cs.containerType); err != n
il { | 64 » // Set up the semaphores for each container type. |
| 65 » cs.setupDone = make(map[instance.ContainerType]*int32, len(instance.Cont
ainerTypes)) |
| 66 » for _, containerType := range instance.ContainerTypes { |
| 67 » » zero := int32(0) |
| 68 » » cs.setupDone[containerType] = &zero |
| 69 » } |
| 70 » // Listen to all container lifecycle events on our machine. |
| 71 » if watcher, err = cs.machine.WatchAllContainers(); err != nil { |
60 return nil, err | 72 return nil, err |
61 } | 73 } |
62 return watcher, nil | 74 return watcher, nil |
63 } | 75 } |
64 | 76 |
65 // Handle is called whenever containers change on the machine being watched. | 77 // Handle is called whenever containers change on the machine being watched. |
66 // All machines start out with so containers so the first time Handle is called, | 78 // All machines start out with so containers so the first time Handle is called, |
67 // it will be because a container has been added. | 79 // it will be because a container has been added. |
68 func (cs *ContainerSetup) Handle(containerIds []string) error { | 80 func (cs *ContainerSetup) Handle(containerIds []string) (resultError error) { |
69 // Consume the initial watcher event. | 81 // Consume the initial watcher event. |
70 if len(containerIds) == 0 { | 82 if len(containerIds) == 0 { |
71 return nil | 83 return nil |
72 } | 84 } |
73 | 85 |
74 » // This callback must only be invoked once. Stopping the watcher | 86 » logger.Tracef("initial container setup with ids: %v", containerIds) |
75 » // below should be sufficient but I'm paranoid. | 87 » for _, id := range containerIds { |
76 » if atomic.LoadInt32(&cs.setupDone) != 0 { | 88 » » containerType := state.ContainerTypeFromId(id) |
77 » » return nil | 89 » » // If this container type has been dealt with, do nothing. |
| 90 » » if atomic.LoadInt32(cs.setupDone[containerType]) != 0 { |
| 91 » » » continue |
| 92 » » } |
| 93 » » if err := cs.initialiseAndStartProvisioner(containerType); err !
= nil { |
| 94 » » » logger.Errorf("starting container provisioner for %v: %v
", containerType, err) |
| 95 » » » // Just because dealing with one type of container fails
, we won't exit the entire |
| 96 » » » // function because we still want to try and start other
container types. So we |
| 97 » » » // take note of and return the first such error. |
| 98 » » » if resultError == nil { |
| 99 » » » » resultError = err |
| 100 » » » } |
| 101 » » } |
78 } | 102 } |
79 » atomic.StoreInt32(&cs.setupDone, 1) | 103 » return resultError |
| 104 } |
80 | 105 |
81 » logger.Tracef("initial container setup with ids: %v", containerIds) | 106 func (cs *ContainerSetup) initialiseAndStartProvisioner(containerType instance.C
ontainerType) error { |
82 » // We only care about the initial container creation. | 107 » // Flag that this container type has been handled. |
83 » // This worker has done its job so stop it. | 108 » atomic.StoreInt32(cs.setupDone[containerType], 1) |
84 » // We do not expect there will be an error, and there's not much we can
do anyway. | 109 |
85 » if err := cs.runner.StopWorker(cs.workerName); err != nil { | 110 » if atomic.AddInt32(&cs.numberProvisioners, 1) == int32(len(cs.supportedC
ontainers)) { |
86 » » logger.Warningf("stopping machine agent container watcher: %v",
err) | 111 » » // We only care about the initial container creation. |
| 112 » » // This worker has done its job so stop it. |
| 113 » » // We do not expect there will be an error, and there's not much
we can do anyway. |
| 114 » » if err := cs.runner.StopWorker(cs.workerName); err != nil { |
| 115 » » » logger.Warningf("stopping machine agent container watche
r: %v", err) |
| 116 » » } |
87 } | 117 } |
88 » if err := cs.initaliseContainer(); err != nil { | 118 |
89 » » return fmt.Errorf("setting up container dependnecies on host mac
hine: %v", err) | 119 » if err := cs.initaliseContainer(containerType); err != nil { |
| 120 » » return fmt.Errorf("setting up container dependnecies for %v on h
ost machine: %v", containerType, err) |
90 } | 121 } |
91 » if provisionerType, ok := ProvisonerTypes[cs.containerType]; ok { | 122 » if provisionerType, ok := ProvisonerTypes[containerType]; ok { |
92 return StartProvisioner(cs.runner, provisionerType, cs.provision
er, cs.config) | 123 return StartProvisioner(cs.runner, provisionerType, cs.provision
er, cs.config) |
93 } | 124 } |
94 » return fmt.Errorf("invalid container type %q", cs.containerType) | 125 » return fmt.Errorf("invalid container type %q", containerType) |
95 } | 126 } |
96 | 127 |
97 // TearDown is defined on the StringsWatchHandler interface. | 128 // TearDown is defined on the StringsWatchHandler interface. |
98 func (cs *ContainerSetup) TearDown() error { | 129 func (cs *ContainerSetup) TearDown() error { |
99 // Nothing to do here. | 130 // Nothing to do here. |
100 return nil | 131 return nil |
101 } | 132 } |
102 | 133 |
103 func (cs *ContainerSetup) initaliseContainer() error { | 134 func (cs *ContainerSetup) initaliseContainer(containerType instance.ContainerTyp
e) error { |
104 var initialiser container.Initialiser | 135 var initialiser container.Initialiser |
105 » switch cs.containerType { | 136 » switch containerType { |
106 case instance.LXC: | 137 case instance.LXC: |
107 initialiser = lxc.NewContainerInitialiser() | 138 initialiser = lxc.NewContainerInitialiser() |
108 case instance.KVM: | 139 case instance.KVM: |
109 initialiser = kvm.NewContainerInitialiser() | 140 initialiser = kvm.NewContainerInitialiser() |
110 default: | 141 default: |
111 » » return fmt.Errorf("unknown container type: %v", cs.containerType
) | 142 » » return fmt.Errorf("unknown container type: %v", containerType) |
112 } | 143 } |
113 return initialiser.Initialise() | 144 return initialiser.Initialise() |
114 } | 145 } |
115 | 146 |
116 // Override for testing. | 147 // Override for testing. |
117 var StartProvisioner = startProvisionerWorker | 148 var StartProvisioner = startProvisionerWorker |
118 | 149 |
119 // startProvisionerWorker kicks off a provisioner task responsible for creating
containers | 150 // startProvisionerWorker kicks off a provisioner task responsible for creating
containers |
120 // of the specified type on the machine. | 151 // of the specified type on the machine. |
121 func startProvisionerWorker(runner worker.Runner, provisionerType ProvisionerTyp
e, | 152 func startProvisionerWorker(runner worker.Runner, provisionerType ProvisionerTyp
e, |
122 provisioner *apiprovisioner.State, config agent.Config) error { | 153 provisioner *apiprovisioner.State, config agent.Config) error { |
123 | 154 |
124 workerName := fmt.Sprintf("%s-provisioner", provisionerType) | 155 workerName := fmt.Sprintf("%s-provisioner", provisionerType) |
125 // The provisioner task is created after a container record has already
been added to the machine. | 156 // The provisioner task is created after a container record has already
been added to the machine. |
126 // It will see that the container does not have an instance yet and crea
te one. | 157 // It will see that the container does not have an instance yet and crea
te one. |
127 return runner.StartWorker(workerName, func() (worker.Worker, error) { | 158 return runner.StartWorker(workerName, func() (worker.Worker, error) { |
128 return NewProvisioner(provisionerType, provisioner, config), nil | 159 return NewProvisioner(provisionerType, provisioner, config), nil |
129 }) | 160 }) |
130 } | 161 } |
OLD | NEW |