Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(114)

Delta Between Two Patch Sets: cmd/jujud/provisioning.go

Issue 6250068: cmd/jujud: strawman provisioning agent (Closed)
Left Patch Set: cmd/jujud: strawman provisioning agent Created 11 years, 10 months ago
Right Patch Set: cmd/jujud: strawman provisioning agent Created 11 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « [revision details] ('k') | cmd/jujud/provisioning_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 package main 1 package main
2 2
3 import ( 3 import (
4 "fmt"
5
6 "launchpad.net/gnuflag" 4 "launchpad.net/gnuflag"
7 "launchpad.net/juju/go/cmd" 5 "launchpad.net/juju/go/cmd"
8 "launchpad.net/juju/go/environs" 6 "launchpad.net/juju/go/environs"
9 "launchpad.net/juju/go/log" 7 "launchpad.net/juju/go/log"
10 "launchpad.net/juju/go/state" 8 "launchpad.net/juju/go/state"
9 "launchpad.net/tomb"
11 10
12 // register providers 11 // register providers
13 _ "launchpad.net/juju/go/environs/dummy" 12 _ "launchpad.net/juju/go/environs/dummy"
14 _ "launchpad.net/juju/go/environs/ec2" 13 _ "launchpad.net/juju/go/environs/ec2"
15 ) 14 )
16 15
17 const PROVIDER_MACHINE_ID = "provider-machine-id"
18
19 // ProvisioningAgent is a cmd.Command responsible for running a provisioning age nt. 16 // ProvisioningAgent is a cmd.Command responsible for running a provisioning age nt.
20 type ProvisioningAgent struct { 17 type ProvisioningAgent struct {
21 » Conf AgentConf 18 » Conf AgentConf
22 » environ environs.Environ // the provider this agent operates against.
23 » State *state.State
24
25 » providerIdToInstance map[string]environs.Instance
26 » machineIdToProviderId map[int]string
27 }
28
29 func NewProvisioningAgent() *ProvisioningAgent {
30 » return &ProvisioningAgent{
31 » » providerIdToInstance: make(map[string]environs.Instance),
32 » » machineIdToProviderId: make(map[int]string),
33 » }
34 } 19 }
35 20
36 // Info returns usage information for the command. 21 // Info returns usage information for the command.
37 func (a *ProvisioningAgent) Info() *cmd.Info { 22 func (a *ProvisioningAgent) Info() *cmd.Info {
38 return &cmd.Info{"provisioning", "", "run a juju provisioning agent", "" } 23 return &cmd.Info{"provisioning", "", "run a juju provisioning agent", "" }
39 } 24 }
40 25
41 // Init initializes the command for running. 26 // Init initializes the command for running.
42 func (a *ProvisioningAgent) Init(f *gnuflag.FlagSet, args []string) error { 27 func (a *ProvisioningAgent) Init(f *gnuflag.FlagSet, args []string) error {
43 a.Conf.addFlags(f) 28 a.Conf.addFlags(f)
44 if err := f.Parse(true, args); err != nil { 29 if err := f.Parse(true, args); err != nil {
45 return err 30 return err
46 } 31 }
47 return a.Conf.checkArgs(f.Args()) 32 return a.Conf.checkArgs(f.Args())
48 } 33 }
49 34
50 // Run runs a provisioning agent. 35 // Run runs a provisioning agent.
51 func (a *ProvisioningAgent) Run(_ *cmd.Context) error { 36 func (a *ProvisioningAgent) Run(_ *cmd.Context) error {
52 » var err error 37 » // TODO(dfc) place the logic in a loop with a suitable delay
53 » a.State, err = state.Open(&a.Conf.StateInfo) 38 » st, err := state.Open(&a.Conf.StateInfo)
54 if err != nil { 39 if err != nil {
55 return err 40 return err
56 } 41 }
42 p := NewProvisioner(st)
43 return p.Wait()
44 }
57 45
58 » // step 1. wait for a valid environment 46 type Provisioner struct {
59 » configWatcher := a.State.WatchEnvironConfig() 47 » st *state.State
60 » for { 48 » environ environs.Environ
61 » » log.Printf("provisioning: waiting for valid environment") 49 » tomb tomb.Tomb
62 » » config, ok := <-configWatcher.Changes() 50
63 » » if !ok { 51 » environWatcher *state.ConfigWatcher
64 » » » return fmt.Errorf("environment watcher has shutdown: %v" , configWatcher.Stop()) 52 » machinesWatcher *state.MachinesWatcher
65 » » } 53 }
66 » » var err error 54
67 » » a.environ, err = environs.NewEnviron(config.Map()) 55 // NewProvisioner returns a Provisioner.
68 » » if err == nil { 56 func NewProvisioner(st *state.State) *Provisioner {
69 » » » break 57 » p := &Provisioner{
70 » » } 58 » » st: st,
71 » » log.Printf("provisioning: unable to create environment from supp lied configuration: %v", err)
72 } 59 }
73 » log.Printf("provisioning: valid environment configured") 60 » go p.loop()
61 » return p
62 }
74 63
75 » // step 2. listen for changes to the environment or the machine topology and action both. 64 func (p *Provisioner) loop() {
76 » machinesWatcher := a.State.WatchMachines() 65 » defer p.tomb.Done()
66
67 » p.environWatcher = p.st.WatchEnvironConfig()
68 » // TODO(dfc) we need a method like state.IsConnected() here to exit clea nly if
69 » // there is a connection problem.
77 for { 70 for {
78 select { 71 select {
79 » » case changes, ok := <-configWatcher.Changes(): 72 » » case <-p.tomb.Dying():
73 » » » return
74 » » case config, ok := <-p.environWatcher.Changes():
80 if !ok { 75 if !ok {
81 » » » » return fmt.Errorf("environment watcher has shutd own: %v", configWatcher.Stop()) 76 » » » » err := p.environWatcher.Stop()
77 » » » » if err != nil {
78 » » » » » p.tomb.Kill(err)
79 » » » » }
80 » » » » return
82 } 81 }
83 » » » config, err := environs.NewConfig(changes.Map()) 82 » » » var err error
83 » » » p.environ, err = environs.NewEnviron(config.Map())
84 if err != nil { 84 if err != nil {
85 » » » » log.Printf("provisioning: new configuration rece ived, but was not valid: %v", err) 85 » » » » log.Printf("provisioner loaded invalid environme nt configuration: %v", err)
86 continue 86 continue
87 } 87 }
88 » » » a.environ.SetConfig(config) 88 » » » log.Printf("provisioner loaded new environment configura tion")
89 » » » log.Printf("provisioning: new configuartion applied") 89 » » » p.innerLoop()
90 » » case changes, ok := <-machinesWatcher.Changes():
91 » » » if !ok {
92 » » » » return fmt.Errorf("machines watcher has shutdown : %v", configWatcher.Stop())
93 » » » }
94 » » » for _, added := range changes.Added {
95 » » » » if err := a.addMachine(added); err != nil {
96 » » » » » // TODO(dfc) implement retry logic
97 » » » » » return err
98 » » » » }
99 » » » » log.Printf("provisioning: machine %d added", add ed.Id())
100 » » » }
101 » » » for _, deleted := range changes.Deleted {
102 » » » » if err := a.deleteMachine(deleted); err != nil {
103 » » » » » // TODO(dfc) implement retry logic
104 » » » » » return err
105 » » » » }
106 » » » » log.Printf("provisioning: machine %d deleted", d eleted.Id())
107 » » » }
108 } 90 }
109 } 91 }
110 if err = configWatcher.Stop(); err == nil {
111 err = machinesWatcher.Stop()
112 }
113 return err
114 } 92 }
115 93
116 func (a *ProvisioningAgent) addMachine(m *state.Machine) error { 94 func (p *Provisioner) innerLoop() {
117 » id, err := m.InstanceId() 95 » p.machinesWatcher = p.st.WatchMachines()
118 » if err != nil { 96 » // TODO(dfc) we need a method like state.IsConnected() here to exit clea nly if
119 » » return err 97 » // there is a connection problem.
98 » for {
99 » » select {
100 » » case <-p.tomb.Dying():
101 » » » return
102 » » case change, ok := <-p.environWatcher.Changes():
103 » » » if !ok {
104 » » » » err := p.environWatcher.Stop()
105 » » » » if err != nil {
106 » » » » » p.tomb.Kill(err)
107 » » » » }
108 » » » » return
109 » » » }
110 » » » config, err := environs.NewConfig(change.Map())
111 » » » if err != nil {
112 » » » » log.Printf("provisioner loaded invalid environme nt configuration: %v", err)
113 » » » » continue
114 » » » }
115 » » » p.environ.SetConfig(config)
116 » » » log.Printf("provisioner loaded new environment configura tion")
117 » » case machines, ok := <-p.machinesWatcher.Changes():
118 » » » if !ok {
119 » » » » err := p.machinesWatcher.Stop()
120 » » » » if err != nil {
121 » » » » » p.tomb.Kill(err)
122 » » » » }
123 » » » » return
124 » » » }
125 » » » p.processMachines(machines)
126 » » }
120 } 127 }
121 if id != "" {
122 return fmt.Errorf("machine-%010d already reports a provider id % q, skipping", m.Id(), id)
123 }
124
125 // TODO(dfc) the state.Info passed to environ.StartInstance remains cont entious
126 // however as the PA only knows one state.Info, and that info is used by MAs and·
127 // UAs to locate the ZK for this environment, it is logical to use the s ame·
128 // state.Info as the PA.·
129 inst, err := a.environ.StartInstance(m.Id(), &a.Conf.StateInfo)
130 if err != nil {
131 return err
132 }
133
134 // store the reference from the provider in ZK
135 if err := m.SetInstanceId(inst.Id()); err != nil {
136 return fmt.Errorf("unable to store provider id: %v", err)
137 }
138
139 // data is stashed in ZK, populate the caches
140 a.machineIdToProviderId[m.Id()] = inst.Id()
141 a.providerIdToInstance[inst.Id()] = inst
142 return nil
143 } 128 }
144 129
145 func (a *ProvisioningAgent) deleteMachine(m *state.Machine) error { 130 // Wait waits for the Provisioner to exit.
146 » insts, err := a.InstancesForMachines(m) 131 func (p *Provisioner) Wait() error {
147 » if err != nil { 132 » return p.tomb.Wait()
148 » » return fmt.Errorf("machine-%010d has no refrence to a provider i d, skipping", m.Id())
149 » }
150 » return a.environ.StopInstances(insts)
151 } 133 }
152 134
153 // InstanceForMachine returns the environs.Instance that represents this machine s' running 135 // Stop stops the Provisioner and returns any error encountered while
154 // instance. 136 // provisioning.
155 func (a *ProvisioningAgent) InstanceForMachine(m *state.Machine) (environs.Insta nce, error) { 137 func (p *Provisioner) Stop() error {
156 » id, ok := a.machineIdToProviderId[m.Id()] 138 » p.tomb.Kill(nil)
157 » if !ok { 139 » return p.tomb.Wait()
158 » » // not cached locally, ask the provider.
159 » » var err error
160 » » id, err = m.InstanceId()
161 » » if err != nil {
162 » » » return nil, err
163 » » }
164 » » if id == "" {
165 » » » // nobody knows about this machine, give up.
166 » » » return nil, fmt.Errorf("instance not found")
167 » » }
168 » » a.machineIdToProviderId[m.Id()] = id
169 » }
170 » inst, ok := a.providerIdToInstance[id]
171 » if !ok {
172 » » // not cached locally, ask the provider
173 » » var err error
174 » » inst, err = a.findInstance(id)
175 » » if err != nil {
176 » » » // the provider doesn't know about this instance, give u p.
177 » » » return nil, err
178 » » }
179 » » return nil, nil
180 » }
181 » return inst, nil
182 } 140 }
183 141
184 // InstancesForMachines returns a list of environs.Instance that represent the l ist of machines running 142 func (p *Provisioner) processMachines(changes *state.MachinesChange) {}
185 // in the provider.
186 func (a *ProvisioningAgent) InstancesForMachines(machines ...*state.Machine) ([] environs.Instance, error) {
187 » var insts []environs.Instance
188 » for _, m := range machines {
189 » » inst, err := a.InstanceForMachine(m)
190 » » if err != nil {
191 » » » return nil, err
192 » » }
193 » » insts = append(insts, inst)
194 » }
195 » return insts, nil
196 }
197
198 func (a *ProvisioningAgent) findInstance(id string) (environs.Instance, error) {
199 » insts, err := a.environ.Instances([]string{id})
200 » if err != nil {
201 » » return nil, err
202 » }
203 » if len(insts) < 1 {
204 » » return nil, fmt.Errorf("instance not found")
205 » }
206 » return insts[0], nil
207 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b