Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(218)

Side by Side Diff: worker/provisioner/container_initialisation.go

Issue 28890043: Use single watcher for containr setup (Closed)
Patch Set: Created 10 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package provisioner 4 package provisioner
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "sync/atomic" 8 "sync/atomic"
9 9
10 "launchpad.net/juju-core/agent" 10 "launchpad.net/juju-core/agent"
11 "launchpad.net/juju-core/container" 11 "launchpad.net/juju-core/container"
12 "launchpad.net/juju-core/container/kvm" 12 "launchpad.net/juju-core/container/kvm"
13 "launchpad.net/juju-core/container/lxc" 13 "launchpad.net/juju-core/container/lxc"
14 "launchpad.net/juju-core/instance" 14 "launchpad.net/juju-core/instance"
15 "launchpad.net/juju-core/state"
15 apiprovisioner "launchpad.net/juju-core/state/api/provisioner" 16 apiprovisioner "launchpad.net/juju-core/state/api/provisioner"
16 "launchpad.net/juju-core/state/api/watcher" 17 "launchpad.net/juju-core/state/api/watcher"
17 "launchpad.net/juju-core/worker" 18 "launchpad.net/juju-core/worker"
18 ) 19 )
19 20
20 var ProvisonerTypes = map[instance.ContainerType]ProvisionerType{ 21 var ProvisonerTypes = map[instance.ContainerType]ProvisionerType{
21 instance.LXC: LXC, 22 instance.LXC: LXC,
22 instance.KVM: KVM, 23 instance.KVM: KVM,
23 } 24 }
24 25
25 // ContainerSetup is a StringsWatchHandler that is notified when containers of 26 // ContainerSetup is a StringsWatchHandler that is notified when containers
26 // the specified type are created on the given machine. It will set up the 27 // are created on the given machine. It will set up the machine to be able
27 // machine to be able to create containers and start a provisioner. 28 // to create containers and start a suitable provisioner.
28 type ContainerSetup struct { 29 type ContainerSetup struct {
29 » runner worker.Runner 30 » runner worker.Runner
30 » containerType instance.ContainerType 31 » supportedContainers []instance.ContainerType
31 » provisioner *apiprovisioner.State 32 » provisioner *apiprovisioner.State
32 » machine *apiprovisioner.Machine 33 » machine *apiprovisioner.Machine
33 » config agent.Config 34 » config agent.Config
34 35
35 // Save the workerName so the worker thread can be stopped. 36 // Save the workerName so the worker thread can be stopped.
36 workerName string 37 workerName string
37 » // setupDone is non zero if the container setup has been invoked. 38 » // setupDone[containerType] is non zero if the container setup has been invoked
38 » setupDone int32 39 » // for that container type.
40 » setupDone map[instance.ContainerType]*int32
41 » // The number of provisioners started. Once all necessary provisioners h ave
42 » // been started, the container watcher can be stopped.
43 » numberProvisioners int32
39 } 44 }
40 45
41 // NewContainerSetupHandler returns a StringsWatchHandler which is notified when 46 // NewContainerSetupHandler returns a StringsWatchHandler which is notified when
42 // containers are created on the given machine. 47 // containers are created on the given machine.
43 func NewContainerSetupHandler(runner worker.Runner, workerName string, container instance.ContainerType, 48 func NewContainerSetupHandler(runner worker.Runner, workerName string, supported Containers []instance.ContainerType,
44 machine *apiprovisioner.Machine, provisioner *apiprovisioner.State, 49 machine *apiprovisioner.Machine, provisioner *apiprovisioner.State,
45 config agent.Config) worker.StringsWatchHandler { 50 config agent.Config) worker.StringsWatchHandler {
46 51
47 return &ContainerSetup{ 52 return &ContainerSetup{
48 » » runner: runner, 53 » » runner: runner,
49 » » containerType: container, 54 » » machine: machine,
50 » » machine: machine, 55 » » supportedContainers: supportedContainers,
51 » » provisioner: provisioner, 56 » » provisioner: provisioner,
52 » » config: config, 57 » » config: config,
53 » » workerName: workerName, 58 » » workerName: workerName,
54 } 59 }
55 } 60 }
56 61
57 // SetUp is defined on the StringsWatchHandler interface. 62 // SetUp is defined on the StringsWatchHandler interface.
58 func (cs *ContainerSetup) SetUp() (watcher watcher.StringsWatcher, err error) { 63 func (cs *ContainerSetup) SetUp() (watcher watcher.StringsWatcher, err error) {
59 » if watcher, err = cs.machine.WatchContainers(cs.containerType); err != n il { 64 » // Set up the semaphores for each container type.
65 » cs.setupDone = make(map[instance.ContainerType]*int32, len(instance.Cont ainerTypes))
66 » for _, containerType := range instance.ContainerTypes {
67 » » zero := int32(0)
68 » » cs.setupDone[containerType] = &zero
69 » }
70 » // Listen to all container lifecycle events on our machine.
71 » if watcher, err = cs.machine.WatchAllContainers(); err != nil {
60 return nil, err 72 return nil, err
61 } 73 }
62 return watcher, nil 74 return watcher, nil
63 } 75 }
64 76
65 // Handle is called whenever containers change on the machine being watched. 77 // Handle is called whenever containers change on the machine being watched.
66 // All machines start out with so containers so the first time Handle is called, 78 // All machines start out with so containers so the first time Handle is called,
67 // it will be because a container has been added. 79 // it will be because a container has been added.
68 func (cs *ContainerSetup) Handle(containerIds []string) error { 80 func (cs *ContainerSetup) Handle(containerIds []string) (resultError error) {
69 // Consume the initial watcher event. 81 // Consume the initial watcher event.
70 if len(containerIds) == 0 { 82 if len(containerIds) == 0 {
71 return nil 83 return nil
72 } 84 }
73 85
74 » // This callback must only be invoked once. Stopping the watcher 86 » logger.Tracef("initial container setup with ids: %v", containerIds)
75 » // below should be sufficient but I'm paranoid. 87 » for _, id := range containerIds {
76 » if atomic.LoadInt32(&cs.setupDone) != 0 { 88 » » containerType := state.ContainerTypeFromId(id)
77 » » return nil 89 » » // If this container type has been dealt with, do nothing.
90 » » if atomic.LoadInt32(cs.setupDone[containerType]) != 0 {
91 » » » continue
92 » » }
93 » » if err := cs.initialiseAndStartProvisioner(containerType); err ! = nil {
94 » » » logger.Errorf("starting container provisioner for %v: %v ", containerType, err)
95 » » » // Just because dealing with one type of container fails , we won't exit the entire
96 » » » // function because we still want to try and start other container types. So we
97 » » » // take note of and return the first such error.
98 » » » if resultError == nil {
99 » » » » resultError = err
100 » » » }
101 » » }
78 } 102 }
79 » atomic.StoreInt32(&cs.setupDone, 1) 103 » return resultError
104 }
80 105
81 » logger.Tracef("initial container setup with ids: %v", containerIds) 106 func (cs *ContainerSetup) initialiseAndStartProvisioner(containerType instance.C ontainerType) error {
82 » // We only care about the initial container creation. 107 » // Flag that this container type has been handled.
83 » // This worker has done its job so stop it. 108 » atomic.StoreInt32(cs.setupDone[containerType], 1)
84 » // We do not expect there will be an error, and there's not much we can do anyway. 109
85 » if err := cs.runner.StopWorker(cs.workerName); err != nil { 110 » if atomic.AddInt32(&cs.numberProvisioners, 1) == int32(len(cs.supportedC ontainers)) {
86 » » logger.Warningf("stopping machine agent container watcher: %v", err) 111 » » // We only care about the initial container creation.
112 » » // This worker has done its job so stop it.
113 » » // We do not expect there will be an error, and there's not much we can do anyway.
114 » » if err := cs.runner.StopWorker(cs.workerName); err != nil {
115 » » » logger.Warningf("stopping machine agent container watche r: %v", err)
116 » » }
87 } 117 }
88 » if err := cs.initaliseContainer(); err != nil { 118
89 » » return fmt.Errorf("setting up container dependnecies on host mac hine: %v", err) 119 » if err := cs.initaliseContainer(containerType); err != nil {
120 » » return fmt.Errorf("setting up container dependnecies for %v on h ost machine: %v", containerType, err)
90 } 121 }
91 » if provisionerType, ok := ProvisonerTypes[cs.containerType]; ok { 122 » if provisionerType, ok := ProvisonerTypes[containerType]; ok {
92 return StartProvisioner(cs.runner, provisionerType, cs.provision er, cs.config) 123 return StartProvisioner(cs.runner, provisionerType, cs.provision er, cs.config)
93 } 124 }
94 » return fmt.Errorf("invalid container type %q", cs.containerType) 125 » return fmt.Errorf("invalid container type %q", containerType)
95 } 126 }
96 127
97 // TearDown is defined on the StringsWatchHandler interface. 128 // TearDown is defined on the StringsWatchHandler interface.
98 func (cs *ContainerSetup) TearDown() error { 129 func (cs *ContainerSetup) TearDown() error {
99 // Nothing to do here. 130 // Nothing to do here.
100 return nil 131 return nil
101 } 132 }
102 133
103 func (cs *ContainerSetup) initaliseContainer() error { 134 func (cs *ContainerSetup) initaliseContainer(containerType instance.ContainerTyp e) error {
104 var initialiser container.Initialiser 135 var initialiser container.Initialiser
105 » switch cs.containerType { 136 » switch containerType {
106 case instance.LXC: 137 case instance.LXC:
107 initialiser = lxc.NewContainerInitialiser() 138 initialiser = lxc.NewContainerInitialiser()
108 case instance.KVM: 139 case instance.KVM:
109 initialiser = kvm.NewContainerInitialiser() 140 initialiser = kvm.NewContainerInitialiser()
110 default: 141 default:
111 » » return fmt.Errorf("unknown container type: %v", cs.containerType ) 142 » » return fmt.Errorf("unknown container type: %v", containerType)
112 } 143 }
113 return initialiser.Initialise() 144 return initialiser.Initialise()
114 } 145 }
115 146
116 // Override for testing. 147 // Override for testing.
117 var StartProvisioner = startProvisionerWorker 148 var StartProvisioner = startProvisionerWorker
118 149
119 // startProvisionerWorker kicks off a provisioner task responsible for creating containers 150 // startProvisionerWorker kicks off a provisioner task responsible for creating containers
120 // of the specified type on the machine. 151 // of the specified type on the machine.
121 func startProvisionerWorker(runner worker.Runner, provisionerType ProvisionerTyp e, 152 func startProvisionerWorker(runner worker.Runner, provisionerType ProvisionerTyp e,
122 provisioner *apiprovisioner.State, config agent.Config) error { 153 provisioner *apiprovisioner.State, config agent.Config) error {
123 154
124 workerName := fmt.Sprintf("%s-provisioner", provisionerType) 155 workerName := fmt.Sprintf("%s-provisioner", provisionerType)
125 // The provisioner task is created after a container record has already been added to the machine. 156 // The provisioner task is created after a container record has already been added to the machine.
126 // It will see that the container does not have an instance yet and crea te one. 157 // It will see that the container does not have an instance yet and crea te one.
127 return runner.StartWorker(workerName, func() (worker.Worker, error) { 158 return runner.StartWorker(workerName, func() (worker.Worker, error) {
128 return NewProvisioner(provisionerType, provisioner, config), nil 159 return NewProvisioner(provisionerType, provisioner, config), nil
129 }) 160 })
130 } 161 }
OLDNEW

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b