Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1562)

Side by Side Diff: environs/ec2/ec2.go

Issue 6256050: environ: add SetConfig (Closed)
Patch Set: environ: add SetConfig Created 12 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « environs/ec2/config_test.go ('k') | environs/ec2/export_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package ec2 1 package ec2
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "launchpad.net/goamz/ec2" 5 "launchpad.net/goamz/ec2"
6 "launchpad.net/goamz/s3" 6 "launchpad.net/goamz/s3"
7 "launchpad.net/juju/go/environs" 7 "launchpad.net/juju/go/environs"
8 "launchpad.net/juju/go/log" 8 "launchpad.net/juju/go/log"
9 "launchpad.net/juju/go/state" 9 "launchpad.net/juju/go/state"
10 "launchpad.net/juju/go/version" 10 "launchpad.net/juju/go/version"
11 "sync"
11 "time" 12 "time"
12 ) 13 )
13 14
14 const zkPort = 2181 15 const zkPort = 2181
15 16
16 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) 17 var zkPortSuffix = fmt.Sprintf(":%d", zkPort)
17 18
18 // A request may fail to due "eventual consistency" semantics, which 19 // A request may fail to due "eventual consistency" semantics, which
19 // should resolve fairly quickly. A request may also fail due to a slow 20 // should resolve fairly quickly. A request may also fail due to a slow
20 // state transition (for instance an instance taking a while to release 21 // state transition (for instance an instance taking a while to release
(...skipping 11 matching lines...) Expand all
32 33
33 func init() { 34 func init() {
34 environs.RegisterProvider("ec2", environProvider{}) 35 environs.RegisterProvider("ec2", environProvider{})
35 } 36 }
36 37
37 type environProvider struct{} 38 type environProvider struct{}
38 39
39 var _ environs.EnvironProvider = environProvider{} 40 var _ environs.EnvironProvider = environProvider{}
40 41
41 type environ struct { 42 type environ struct {
42 » name string 43 » name string
43 » config *providerConfig 44
44 » ec2 *ec2.EC2 45 » // configMutex protects the *Unlocked fields below.
45 » s3 *s3.S3 46 » configMutex sync.Mutex
46 » storage storage 47 » configUnlocked *providerConfig
47 » publicStorage *storage // optional. 48 » ec2Unlocked *ec2.EC2
49 » s3Unlocked *s3.S3
50 » storageUnlocked *storage
51 » publicStorageUnlocked *storage // optional.
48 } 52 }
49 53
50 var _ environs.Environ = (*environ)(nil) 54 var _ environs.Environ = (*environ)(nil)
51 55
52 type instance struct { 56 type instance struct {
53 e *environ 57 e *environ
54 *ec2.Instance 58 *ec2.Instance
55 } 59 }
56 60
57 func (inst *instance) String() string { 61 func (inst *instance) String() string {
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 } 94 }
91 } 95 }
92 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst .Id()) 96 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst .Id())
93 } 97 }
94 98
95 func (cfg *providerConfig) Open() (environs.Environ, error) { 99 func (cfg *providerConfig) Open() (environs.Environ, error) {
96 log.Printf("environs/ec2: opening environment %q", cfg.name) 100 log.Printf("environs/ec2: opening environment %q", cfg.name)
97 if Regions[cfg.region].EC2Endpoint == "" { 101 if Regions[cfg.region].EC2Endpoint == "" {
98 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope ning %q", cfg.region, cfg.name) 102 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope ning %q", cfg.region, cfg.name)
99 } 103 }
100 » e := &environ{ 104 » e := new(environ)
101 » » config: cfg, 105 » e.SetConfig(cfg)
102 » » ec2: ec2.New(cfg.auth, Regions[cfg.region]), 106 » return e, nil
103 » » s3: s3.New(cfg.auth, Regions[cfg.region]), 107 }
108
109 func (e *environ) SetConfig(cfg environs.EnvironConfig) {
110 » config := cfg.(*providerConfig)
111 » e.configMutex.Lock()
112 » defer e.configMutex.Unlock()
113 » e.configUnlocked = config
114 » e.ec2Unlocked = ec2.New(config.auth, Regions[config.region])
115 » e.s3Unlocked = s3.New(config.auth, Regions[config.region])
116
117 » // create new storage instances, existing instances continue
118 » // to reference their existing configuration.
119 » e.storageUnlocked = &storage{
120 » » bucket: e.s3Unlocked.Bucket(config.bucket),
104 } 121 }
105 » e.storage.bucket = e.s3.Bucket(cfg.bucket) 122 » if config.publicBucket != "" {
106 » if cfg.publicBucket != "" { 123 » » e.publicStorageUnlocked = &storage{
107 » » e.publicStorage = &storage{bucket: e.s3.Bucket(cfg.publicBucket) } 124 » » » bucket: e.s3Unlocked.Bucket(config.publicBucket),
125 » » }
126 » } else {
127 » » e.publicStorageUnlocked = nil
108 } 128 }
109 » return e, nil 129 }
130
131 func (e *environ) config() *providerConfig {
132 » e.configMutex.Lock()
133 » defer e.configMutex.Unlock()
134 » return e.configUnlocked
135 }
136
137 func (e *environ) ec2() *ec2.EC2 {
138 » e.configMutex.Lock()
139 » defer e.configMutex.Unlock()
140 » return e.ec2Unlocked
141 }
142
143 func (e *environ) s3() *s3.S3 {
144 » e.configMutex.Lock()
145 » defer e.configMutex.Unlock()
146 » return e.s3Unlocked
147 }
148
149 func (e *environ) Storage() environs.Storage {
150 » e.configMutex.Lock()
151 » defer e.configMutex.Unlock()
152 » return e.storageUnlocked
153 }
154
155 func (e *environ) PublicStorage() environs.StorageReader {
156 » e.configMutex.Lock()
157 » defer e.configMutex.Unlock()
158 » if e.publicStorageUnlocked == nil {
159 » » return environs.EmptyStorage
160 » }
161 » return e.publicStorageUnlocked
110 } 162 }
111 163
112 func (e *environ) Bootstrap(uploadTools bool) error { 164 func (e *environ) Bootstrap(uploadTools bool) error {
113 log.Printf("environs/ec2: bootstrapping environment %q", e.name) 165 log.Printf("environs/ec2: bootstrapping environment %q", e.name)
114 _, err := e.loadState() 166 _, err := e.loadState()
115 if err == nil { 167 if err == nil {
116 return fmt.Errorf("environment is already bootstrapped") 168 return fmt.Errorf("environment is already bootstrapped")
117 } 169 }
118 if _, notFound := err.(*environs.NotFoundError); !notFound { 170 if _, notFound := err.(*environs.NotFoundError); !notFound {
119 return fmt.Errorf("cannot query old bootstrap state: %v", err) 171 return fmt.Errorf("cannot query old bootstrap state: %v", err)
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
178 UseSSH: true, 230 UseSSH: true,
179 }, nil 231 }, nil
180 } 232 }
181 233
182 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta nce, error) { 234 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta nce, error) {
183 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) 235 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name)
184 return e.startInstance(machineId, info, false) 236 return e.startInstance(machineId, info, false)
185 } 237 }
186 238
187 func (e *environ) userData(machineId int, info *state.Info, master bool, toolsUR L string) ([]byte, error) { 239 func (e *environ) userData(machineId int, info *state.Info, master bool, toolsUR L string) ([]byte, error) {
240 config := e.config()
188 cfg := &machineConfig{ 241 cfg := &machineConfig{
189 provisioner: master, 242 provisioner: master,
190 zookeeper: master, 243 zookeeper: master,
191 stateInfo: info, 244 stateInfo: info,
192 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data /instance-id)", 245 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data /instance-id)",
193 providerType: "ec2", 246 providerType: "ec2",
194 toolsURL: toolsURL, 247 toolsURL: toolsURL,
195 machineId: machineId, 248 machineId: machineId,
196 } 249 }
197 250
198 » if e.config.authorizedKeys == "" { 251 » if config.authorizedKeys == "" {
199 var err error 252 var err error
200 » » cfg.authorizedKeys, err = authorizedKeys(e.config.authorizedKeys Path) 253 » » cfg.authorizedKeys, err = authorizedKeys(config.authorizedKeysPa th)
201 if err != nil { 254 if err != nil {
202 return nil, fmt.Errorf("cannot get ssh authorized keys: %v", err) 255 return nil, fmt.Errorf("cannot get ssh authorized keys: %v", err)
203 } 256 }
204 } 257 }
205 cloudcfg, err := newCloudInit(cfg) 258 cloudcfg, err := newCloudInit(cfg)
206 if err != nil { 259 if err != nil {
207 return nil, err 260 return nil, err
208 } 261 }
209 return cloudcfg.Render() 262 return cloudcfg.Render()
210 } 263 }
(...skipping 15 matching lines...) Expand all
226 if err != nil { 279 if err != nil {
227 return nil, fmt.Errorf("cannot make user data: %v", err) 280 return nil, fmt.Errorf("cannot make user data: %v", err)
228 } 281 }
229 groups, err := e.setUpGroups(machineId) 282 groups, err := e.setUpGroups(machineId)
230 if err != nil { 283 if err != nil {
231 return nil, fmt.Errorf("cannot set up groups: %v", err) 284 return nil, fmt.Errorf("cannot set up groups: %v", err)
232 } 285 }
233 var instances *ec2.RunInstancesResp 286 var instances *ec2.RunInstancesResp
234 287
235 for a := shortAttempt.start(); a.next(); { 288 for a := shortAttempt.start(); a.next(); {
236 » » instances, err = e.ec2.RunInstances(&ec2.RunInstances{ 289 » » instances, err = e.ec2().RunInstances(&ec2.RunInstances{
237 ImageId: spec.imageId, 290 ImageId: spec.imageId,
238 MinCount: 1, 291 MinCount: 1,
239 MaxCount: 1, 292 MaxCount: 1,
240 UserData: userData, 293 UserData: userData,
241 InstanceType: "m1.small", 294 InstanceType: "m1.small",
242 SecurityGroups: groups, 295 SecurityGroups: groups,
243 }) 296 })
244 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { 297 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" {
245 break 298 break
246 } 299 }
(...skipping 28 matching lines...) Expand all
275 need = append(need, ids[i]) 328 need = append(need, ids[i])
276 } 329 }
277 } 330 }
278 if len(need) == 0 { 331 if len(need) == 0 {
279 return nil 332 return nil
280 } 333 }
281 filter := ec2.NewFilter() 334 filter := ec2.NewFilter()
282 filter.Add("instance-state-name", "pending", "running") 335 filter.Add("instance-state-name", "pending", "running")
283 filter.Add("group-name", e.groupName()) 336 filter.Add("group-name", e.groupName())
284 filter.Add("instance-id", need...) 337 filter.Add("instance-id", need...)
285 » resp, err := e.ec2.Instances(nil, filter) 338 » resp, err := e.ec2().Instances(nil, filter)
286 if err != nil { 339 if err != nil {
287 return err 340 return err
288 } 341 }
289 n := 0 342 n := 0
290 // For each requested id, add it to the returned instances 343 // For each requested id, add it to the returned instances
291 // if we find it in the response. 344 // if we find it in the response.
292 for i, id := range ids { 345 for i, id := range ids {
293 if insts[i] != nil { 346 if insts[i] != nil {
294 continue 347 continue
295 } 348 }
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
338 } 391 }
339 return insts, nil 392 return insts, nil
340 } 393 }
341 394
342 func (e *environ) Destroy(insts []environs.Instance) error { 395 func (e *environ) Destroy(insts []environs.Instance) error {
343 log.Printf("environs/ec2: destroying environment %q", e.name) 396 log.Printf("environs/ec2: destroying environment %q", e.name)
344 // Try to find all the instances in the environ's group. 397 // Try to find all the instances in the environ's group.
345 filter := ec2.NewFilter() 398 filter := ec2.NewFilter()
346 filter.Add("instance-state-name", "pending", "running") 399 filter.Add("instance-state-name", "pending", "running")
347 filter.Add("group-name", e.groupName()) 400 filter.Add("group-name", e.groupName())
348 » resp, err := e.ec2.Instances(nil, filter) 401 » resp, err := e.ec2().Instances(nil, filter)
349 if err != nil { 402 if err != nil {
350 return fmt.Errorf("cannot get instances: %v", err) 403 return fmt.Errorf("cannot get instances: %v", err)
351 } 404 }
352 var ids []string 405 var ids []string
353 found := make(map[string]bool) 406 found := make(map[string]bool)
354 for _, r := range resp.Reservations { 407 for _, r := range resp.Reservations {
355 for _, inst := range r.Instances { 408 for _, inst := range r.Instances {
356 ids = append(ids, inst.InstanceId) 409 ids = append(ids, inst.InstanceId)
357 found[inst.InstanceId] = true 410 found[inst.InstanceId] = true
358 } 411 }
359 } 412 }
360 413
361 // Then add any instances we've been told about but haven't yet shown 414 // Then add any instances we've been told about but haven't yet shown
362 // up in the instance list. 415 // up in the instance list.
363 for _, inst := range insts { 416 for _, inst := range insts {
364 id := inst.(*instance).InstanceId 417 id := inst.(*instance).InstanceId
365 if !found[id] { 418 if !found[id] {
366 ids = append(ids, id) 419 ids = append(ids, id)
367 found[id] = true 420 found[id] = true
368 } 421 }
369 } 422 }
370 err = e.terminateInstances(ids) 423 err = e.terminateInstances(ids)
371 if err != nil { 424 if err != nil {
372 return err 425 return err
373 } 426 }
374 » err = e.storage.deleteAll() 427 » // to properly observe e.storageUnlocked we need to get it's value while
428 » // holding e.configMutex. e.Storage() does this for us, then we convert
429 » // back to the (*storage) to access the private deleteAll() method.
430 » st := e.Storage().(*storage)
431 » err = st.deleteAll()
375 if err != nil { 432 if err != nil {
376 return err 433 return err
377 } 434 }
378 return nil 435 return nil
379 } 436 }
380 437
381 func (e *environ) terminateInstances(ids []string) error { 438 func (e *environ) terminateInstances(ids []string) error {
382 if len(ids) == 0 { 439 if len(ids) == 0 {
383 return nil 440 return nil
384 } 441 }
385 var err error 442 var err error
443 ec2inst := e.ec2()
386 for a := shortAttempt.start(); a.next(); { 444 for a := shortAttempt.start(); a.next(); {
387 » » _, err = e.ec2.TerminateInstances(ids) 445 » » _, err = ec2inst.TerminateInstances(ids)
388 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound" { 446 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound" {
389 return err 447 return err
390 } 448 }
391 } 449 }
392 if len(ids) == 1 { 450 if len(ids) == 1 {
393 return err 451 return err
394 } 452 }
395 var firstErr error 453 var firstErr error
396 // If we get a NotFound error, it means that no instances have been 454 // If we get a NotFound error, it means that no instances have been
397 // terminated even if some exist, so try them one by one, ignoring 455 // terminated even if some exist, so try them one by one, ignoring
398 // NotFound errors. 456 // NotFound errors.
399 for _, id := range ids { 457 for _, id := range ids {
400 » » _, err = e.ec2.TerminateInstances([]string{id}) 458 » » _, err = ec2inst.TerminateInstances([]string{id})
401 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { 459 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" {
402 err = nil 460 err = nil
403 } 461 }
404 if err != nil && firstErr == nil { 462 if err != nil && firstErr == nil {
405 firstErr = err 463 firstErr = err
406 } 464 }
407 } 465 }
408 return firstErr 466 return firstErr
409 } 467 }
410 468
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
444 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil 502 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil
445 } 503 }
446 504
447 // zeroGroup holds the zero security group. 505 // zeroGroup holds the zero security group.
448 var zeroGroup ec2.SecurityGroup 506 var zeroGroup ec2.SecurityGroup
449 507
450 // ensureGroup returns the security group with name and perms. 508 // ensureGroup returns the security group with name and perms.
451 // If a group with name does not exist, one will be created. 509 // If a group with name does not exist, one will be created.
452 // If it exists, its permissions are set to perms. 510 // If it exists, its permissions are set to perms.
453 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr oup, err error) { 511 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr oup, err error) {
454 » resp, err := e.ec2.CreateSecurityGroup(name, "juju group") 512 » ec2inst := e.ec2()
513 » resp, err := ec2inst.CreateSecurityGroup(name, "juju group")
455 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { 514 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" {
456 return zeroGroup, err 515 return zeroGroup, err
457 } 516 }
458 517
459 want := newPermSet(perms) 518 want := newPermSet(perms)
460 var have permSet 519 var have permSet
461 if err == nil { 520 if err == nil {
462 g = resp.SecurityGroup 521 g = resp.SecurityGroup
463 } else { 522 } else {
464 » » resp, err := e.ec2.SecurityGroups(ec2.SecurityGroupNames(name), nil) 523 » » resp, err := ec2inst.SecurityGroups(ec2.SecurityGroupNames(name) , nil)
465 if err != nil { 524 if err != nil {
466 return zeroGroup, err 525 return zeroGroup, err
467 } 526 }
468 // It's possible that the old group has the wrong 527 // It's possible that the old group has the wrong
469 // description here, but if it does it's probably due 528 // description here, but if it does it's probably due
470 // to something deliberately playing games with juju, 529 // to something deliberately playing games with juju,
471 // so we ignore it. 530 // so we ignore it.
472 have = newPermSet(resp.Groups[0].IPPerms) 531 have = newPermSet(resp.Groups[0].IPPerms)
473 g = resp.Groups[0].SecurityGroup 532 g = resp.Groups[0].SecurityGroup
474 } 533 }
475 revoke := make(permSet) 534 revoke := make(permSet)
476 for p := range have { 535 for p := range have {
477 if !want[p] { 536 if !want[p] {
478 revoke[p] = true 537 revoke[p] = true
479 } 538 }
480 } 539 }
481 if len(revoke) > 0 { 540 if len(revoke) > 0 {
482 » » _, err := e.ec2.RevokeSecurityGroup(g, revoke.ipPerms()) 541 » » _, err := ec2inst.RevokeSecurityGroup(g, revoke.ipPerms())
483 if err != nil { 542 if err != nil {
484 return zeroGroup, fmt.Errorf("cannot revoke security gro up: %v", err) 543 return zeroGroup, fmt.Errorf("cannot revoke security gro up: %v", err)
485 } 544 }
486 } 545 }
487 546
488 add := make(permSet) 547 add := make(permSet)
489 for p := range want { 548 for p := range want {
490 if !have[p] { 549 if !have[p] {
491 add[p] = true 550 add[p] = true
492 } 551 }
493 } 552 }
494 if len(add) > 0 { 553 if len(add) > 0 {
495 » » _, err := e.ec2.AuthorizeSecurityGroup(g, add.ipPerms()) 554 » » _, err := ec2inst.AuthorizeSecurityGroup(g, add.ipPerms())
496 if err != nil { 555 if err != nil {
497 return zeroGroup, fmt.Errorf("cannot authorize securityG roup: %v", err) 556 return zeroGroup, fmt.Errorf("cannot authorize securityG roup: %v", err)
498 } 557 }
499 } 558 }
500 return g, nil 559 return g, nil
501 } 560 }
502 561
503 // permKey represents a permission for a group or an ip address range 562 // permKey represents a permission for a group or an ip address range
504 // to access the given range of ports. Only one of groupId or ipAddr 563 // to access the given range of ports. Only one of groupId or ipAddr
505 // should be non-empty. 564 // should be non-empty.
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 619
561 // If the err is of type *ec2.Error, ec2ErrCode returns 620 // If the err is of type *ec2.Error, ec2ErrCode returns
562 // its code, otherwise it returns the empty string. 621 // its code, otherwise it returns the empty string.
563 func ec2ErrCode(err error) string { 622 func ec2ErrCode(err error) string {
564 ec2err, _ := err.(*ec2.Error) 623 ec2err, _ := err.(*ec2.Error)
565 if ec2err == nil { 624 if ec2err == nil {
566 return "" 625 return ""
567 } 626 }
568 return ec2err.Code 627 return ec2err.Code
569 } 628 }
OLDNEW
« no previous file with comments | « environs/ec2/config_test.go ('k') | environs/ec2/export_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b