LEFT | RIGHT |
1 package main | 1 package main |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | |
5 | |
6 "launchpad.net/gnuflag" | 4 "launchpad.net/gnuflag" |
7 "launchpad.net/juju/go/cmd" | 5 "launchpad.net/juju/go/cmd" |
8 "launchpad.net/juju/go/environs" | 6 "launchpad.net/juju/go/environs" |
9 "launchpad.net/juju/go/log" | 7 "launchpad.net/juju/go/log" |
10 "launchpad.net/juju/go/state" | 8 "launchpad.net/juju/go/state" |
| 9 "launchpad.net/tomb" |
11 | 10 |
12 // register providers | 11 // register providers |
13 _ "launchpad.net/juju/go/environs/dummy" | 12 _ "launchpad.net/juju/go/environs/dummy" |
14 _ "launchpad.net/juju/go/environs/ec2" | 13 _ "launchpad.net/juju/go/environs/ec2" |
15 ) | 14 ) |
16 | 15 |
17 const PROVIDER_MACHINE_ID = "provider-machine-id" | |
18 | |
19 // ProvisioningAgent is a cmd.Command responsible for running a provisioning age
nt. | 16 // ProvisioningAgent is a cmd.Command responsible for running a provisioning age
nt. |
20 type ProvisioningAgent struct { | 17 type ProvisioningAgent struct { |
21 » Conf AgentConf | 18 » Conf AgentConf |
22 » environ environs.Environ // the provider this agent operates against. | |
23 » State *state.State | |
24 | |
25 » providerIdToInstance map[string]environs.Instance | |
26 » machineIdToProviderId map[int]string | |
27 } | |
28 | |
29 func NewProvisioningAgent() *ProvisioningAgent { | |
30 » return &ProvisioningAgent{ | |
31 » » providerIdToInstance: make(map[string]environs.Instance), | |
32 » » machineIdToProviderId: make(map[int]string), | |
33 » } | |
34 } | 19 } |
35 | 20 |
36 // Info returns usage information for the command. | 21 // Info returns usage information for the command. |
37 func (a *ProvisioningAgent) Info() *cmd.Info { | 22 func (a *ProvisioningAgent) Info() *cmd.Info { |
38 return &cmd.Info{"provisioning", "", "run a juju provisioning agent", ""
} | 23 return &cmd.Info{"provisioning", "", "run a juju provisioning agent", ""
} |
39 } | 24 } |
40 | 25 |
41 // Init initializes the command for running. | 26 // Init initializes the command for running. |
42 func (a *ProvisioningAgent) Init(f *gnuflag.FlagSet, args []string) error { | 27 func (a *ProvisioningAgent) Init(f *gnuflag.FlagSet, args []string) error { |
43 a.Conf.addFlags(f) | 28 a.Conf.addFlags(f) |
44 if err := f.Parse(true, args); err != nil { | 29 if err := f.Parse(true, args); err != nil { |
45 return err | 30 return err |
46 } | 31 } |
47 return a.Conf.checkArgs(f.Args()) | 32 return a.Conf.checkArgs(f.Args()) |
48 } | 33 } |
49 | 34 |
50 // Run runs a provisioning agent. | 35 // Run runs a provisioning agent. |
51 func (a *ProvisioningAgent) Run(_ *cmd.Context) error { | 36 func (a *ProvisioningAgent) Run(_ *cmd.Context) error { |
52 » var err error | 37 » // TODO(dfc) place the logic in a loop with a suitable delay |
53 » a.State, err = state.Open(&a.Conf.StateInfo) | 38 » st, err := state.Open(&a.Conf.StateInfo) |
54 if err != nil { | 39 if err != nil { |
55 return err | 40 return err |
56 } | 41 } |
| 42 p := NewProvisioner(st) |
| 43 return p.Wait() |
| 44 } |
57 | 45 |
58 » // step 1. wait for a valid environment | 46 type Provisioner struct { |
59 » configWatcher := a.State.WatchEnvironConfig() | 47 » st *state.State |
60 » for { | 48 » environ environs.Environ |
61 » » log.Printf("provisioning: waiting for valid environment") | 49 » tomb tomb.Tomb |
62 » » config, ok := <-configWatcher.Changes() | 50 |
63 » » if !ok { | 51 » environWatcher *state.ConfigWatcher |
64 » » » return fmt.Errorf("environment watcher has shutdown: %v"
, configWatcher.Stop()) | 52 » machinesWatcher *state.MachinesWatcher |
65 » » } | 53 } |
66 » » var err error | 54 |
67 » » a.environ, err = environs.NewEnviron(config.Map()) | 55 // NewProvisioner returns a Provisioner. |
68 » » if err == nil { | 56 func NewProvisioner(st *state.State) *Provisioner { |
69 » » » break | 57 » p := &Provisioner{ |
70 » » } | 58 » » st: st, |
71 » » log.Printf("provisioning: unable to create environment from supp
lied configuration: %v", err) | |
72 } | 59 } |
73 » log.Printf("provisioning: valid environment configured") | 60 » go p.loop() |
| 61 » return p |
| 62 } |
74 | 63 |
75 » // step 2. listen for changes to the environment or the machine topology
and action both. | 64 func (p *Provisioner) loop() { |
76 » machinesWatcher := a.State.WatchMachines() | 65 » defer p.tomb.Done() |
| 66 |
| 67 » p.environWatcher = p.st.WatchEnvironConfig() |
| 68 » // TODO(dfc) we need a method like state.IsConnected() here to exit clea
nly if |
| 69 » // there is a connection problem. |
77 for { | 70 for { |
78 select { | 71 select { |
79 » » case changes, ok := <-configWatcher.Changes(): | 72 » » case <-p.tomb.Dying(): |
| 73 » » » return |
| 74 » » case config, ok := <-p.environWatcher.Changes(): |
80 if !ok { | 75 if !ok { |
81 » » » » return fmt.Errorf("environment watcher has shutd
own: %v", configWatcher.Stop()) | 76 » » » » err := p.environWatcher.Stop() |
| 77 » » » » if err != nil { |
| 78 » » » » » p.tomb.Kill(err) |
| 79 » » » » } |
| 80 » » » » return |
82 } | 81 } |
83 » » » config, err := environs.NewConfig(changes.Map()) | 82 » » » var err error |
| 83 » » » p.environ, err = environs.NewEnviron(config.Map()) |
84 if err != nil { | 84 if err != nil { |
85 » » » » log.Printf("provisioning: new configuration rece
ived, but was not valid: %v", err) | 85 » » » » log.Printf("provisioner loaded invalid environme
nt configuration: %v", err) |
86 continue | 86 continue |
87 } | 87 } |
88 » » » a.environ.SetConfig(config) | 88 » » » log.Printf("provisioner loaded new environment configura
tion") |
89 » » » log.Printf("provisioning: new configuartion applied") | 89 » » » p.innerLoop() |
90 » » case changes, ok := <-machinesWatcher.Changes(): | |
91 » » » if !ok { | |
92 » » » » return fmt.Errorf("machines watcher has shutdown
: %v", configWatcher.Stop()) | |
93 » » » } | |
94 » » » for _, added := range changes.Added { | |
95 » » » » if err := a.addMachine(added); err != nil { | |
96 » » » » » // TODO(dfc) implement retry logic | |
97 » » » » » return err | |
98 » » » » } | |
99 » » » » log.Printf("provisioning: machine %d added", add
ed.Id()) | |
100 » » » } | |
101 » » » for _, deleted := range changes.Deleted { | |
102 » » » » if err := a.deleteMachine(deleted); err != nil { | |
103 » » » » » // TODO(dfc) implement retry logic | |
104 » » » » » return err | |
105 » » » » } | |
106 » » » » log.Printf("provisioning: machine %d deleted", d
eleted.Id()) | |
107 » » » } | |
108 } | 90 } |
109 } | 91 } |
110 if err = configWatcher.Stop(); err == nil { | |
111 err = machinesWatcher.Stop() | |
112 } | |
113 return err | |
114 } | 92 } |
115 | 93 |
116 func (a *ProvisioningAgent) addMachine(m *state.Machine) error { | 94 func (p *Provisioner) innerLoop() { |
117 » id, err := m.InstanceId() | 95 » p.machinesWatcher = p.st.WatchMachines() |
118 » if err != nil { | 96 » // TODO(dfc) we need a method like state.IsConnected() here to exit clea
nly if |
119 » » return err | 97 » // there is a connection problem. |
| 98 » for { |
| 99 » » select { |
| 100 » » case <-p.tomb.Dying(): |
| 101 » » » return |
| 102 » » case change, ok := <-p.environWatcher.Changes(): |
| 103 » » » if !ok { |
| 104 » » » » err := p.environWatcher.Stop() |
| 105 » » » » if err != nil { |
| 106 » » » » » p.tomb.Kill(err) |
| 107 » » » » } |
| 108 » » » » return |
| 109 » » » } |
| 110 » » » config, err := environs.NewConfig(change.Map()) |
| 111 » » » if err != nil { |
| 112 » » » » log.Printf("provisioner loaded invalid environme
nt configuration: %v", err) |
| 113 » » » » continue |
| 114 » » » } |
| 115 » » » p.environ.SetConfig(config) |
| 116 » » » log.Printf("provisioner loaded new environment configura
tion") |
| 117 » » case machines, ok := <-p.machinesWatcher.Changes(): |
| 118 » » » if !ok { |
| 119 » » » » err := p.machinesWatcher.Stop() |
| 120 » » » » if err != nil { |
| 121 » » » » » p.tomb.Kill(err) |
| 122 » » » » } |
| 123 » » » » return |
| 124 » » » } |
| 125 » » » p.processMachines(machines) |
| 126 » » } |
120 } | 127 } |
121 if id != "" { | |
122 return fmt.Errorf("machine-%010d already reports a provider id %
q, skipping", m.Id(), id) | |
123 } | |
124 | |
125 // TODO(dfc) the state.Info passed to environ.StartInstance remains cont
entious | |
126 // however as the PA only knows one state.Info, and that info is used by
MAs and· | |
127 // UAs to locate the ZK for this environment, it is logical to use the s
ame· | |
128 // state.Info as the PA.· | |
129 inst, err := a.environ.StartInstance(m.Id(), &a.Conf.StateInfo) | |
130 if err != nil { | |
131 return err | |
132 } | |
133 | |
134 // store the reference from the provider in ZK | |
135 if err := m.SetInstanceId(inst.Id()); err != nil { | |
136 return fmt.Errorf("unable to store provider id: %v", err) | |
137 } | |
138 | |
139 // data is stashed in ZK, populate the caches | |
140 a.machineIdToProviderId[m.Id()] = inst.Id() | |
141 a.providerIdToInstance[inst.Id()] = inst | |
142 return nil | |
143 } | 128 } |
144 | 129 |
145 func (a *ProvisioningAgent) deleteMachine(m *state.Machine) error { | 130 // Wait waits for the Provisioner to exit. |
146 » insts, err := a.InstancesForMachines(m) | 131 func (p *Provisioner) Wait() error { |
147 » if err != nil { | 132 » return p.tomb.Wait() |
148 » » return fmt.Errorf("machine-%010d has no refrence to a provider i
d, skipping", m.Id()) | |
149 » } | |
150 » return a.environ.StopInstances(insts) | |
151 } | 133 } |
152 | 134 |
153 // InstanceForMachine returns the environs.Instance that represents this machine
s' running | 135 // Stop stops the Provisioner and returns any error encountered while |
154 // instance. | 136 // provisioning. |
155 func (a *ProvisioningAgent) InstanceForMachine(m *state.Machine) (environs.Insta
nce, error) { | 137 func (p *Provisioner) Stop() error { |
156 » id, ok := a.machineIdToProviderId[m.Id()] | 138 » p.tomb.Kill(nil) |
157 » if !ok { | 139 » return p.tomb.Wait() |
158 » » // not cached locally, ask the provider. | |
159 » » var err error | |
160 » » id, err = m.InstanceId() | |
161 » » if err != nil { | |
162 » » » return nil, err | |
163 » » } | |
164 » » if id == "" { | |
165 » » » // nobody knows about this machine, give up. | |
166 » » » return nil, fmt.Errorf("instance not found") | |
167 » » } | |
168 » » a.machineIdToProviderId[m.Id()] = id | |
169 » } | |
170 » inst, ok := a.providerIdToInstance[id] | |
171 » if !ok { | |
172 » » // not cached locally, ask the provider | |
173 » » var err error | |
174 » » inst, err = a.findInstance(id) | |
175 » » if err != nil { | |
176 » » » // the provider doesn't know about this instance, give u
p. | |
177 » » » return nil, err | |
178 » » } | |
179 » » return nil, nil | |
180 » } | |
181 » return inst, nil | |
182 } | 140 } |
183 | 141 |
184 // InstancesForMachines returns a list of environs.Instance that represent the l
ist of machines running | 142 func (p *Provisioner) processMachines(changes *state.MachinesChange) {} |
185 // in the provider. | |
186 func (a *ProvisioningAgent) InstancesForMachines(machines ...*state.Machine) ([]
environs.Instance, error) { | |
187 » var insts []environs.Instance | |
188 » for _, m := range machines { | |
189 » » inst, err := a.InstanceForMachine(m) | |
190 » » if err != nil { | |
191 » » » return nil, err | |
192 » » } | |
193 » » insts = append(insts, inst) | |
194 » } | |
195 » return insts, nil | |
196 } | |
197 | |
198 func (a *ProvisioningAgent) findInstance(id string) (environs.Instance, error) { | |
199 » insts, err := a.environ.Instances([]string{id}) | |
200 » if err != nil { | |
201 » » return nil, err | |
202 » } | |
203 » if len(insts) < 1 { | |
204 » » return nil, fmt.Errorf("instance not found") | |
205 » } | |
206 » return insts[0], nil | |
207 } | |
LEFT | RIGHT |