Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(104)

Side by Side Diff: environs/ec2/ec2.go

Issue 6209092: environs: add NewConfig (Closed)
Patch Set: environs: add NewConfig Created 5 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « environs/ec2/config_test.go ('k') | environs/ec2/export_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package ec2 1 package ec2
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "launchpad.net/goamz/ec2" 5 "launchpad.net/goamz/ec2"
6 "launchpad.net/goamz/s3" 6 "launchpad.net/goamz/s3"
7 "launchpad.net/juju/go/environs" 7 "launchpad.net/juju/go/environs"
8 "launchpad.net/juju/go/log" 8 "launchpad.net/juju/go/log"
9 "launchpad.net/juju/go/state" 9 "launchpad.net/juju/go/state"
10 "sync"
10 "time" 11 "time"
11 ) 12 )
12 13
13 const zkPort = 2181 14 const zkPort = 2181
14 15
15 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) 16 var zkPortSuffix = fmt.Sprintf(":%d", zkPort)
16 17
17 // A request may fail to due "eventual consistency" semantics, which 18 // A request may fail to due "eventual consistency" semantics, which
18 // should resolve fairly quickly. A request may also fail due to a slow 19 // should resolve fairly quickly. A request may also fail due to a slow
19 // state transition (for instance an instance taking a while to release 20 // state transition (for instance an instance taking a while to release
(...skipping 11 matching lines...) Expand all
31 32
32 func init() { 33 func init() {
33 environs.RegisterProvider("ec2", environProvider{}) 34 environs.RegisterProvider("ec2", environProvider{})
34 } 35 }
35 36
36 type environProvider struct{} 37 type environProvider struct{}
37 38
38 var _ environs.EnvironProvider = environProvider{} 39 var _ environs.EnvironProvider = environProvider{}
39 40
40 type environ struct { 41 type environ struct {
41 » name string 42 » name string
42 » config *providerConfig 43
43 » ec2 *ec2.EC2 44 » // configMutex protects visibility of configUnlocked
44 » s3 *s3.S3 45 » configMutex sync.Mutex
46 » configUnlocked *providerConfig
47
48 » // storage has it's own mutex
45 storage storage 49 storage storage
46 } 50 }
47 51
48 var _ environs.Environ = (*environ)(nil) 52 var _ environs.Environ = (*environ)(nil)
49 53
50 type instance struct { 54 type instance struct {
51 e *environ 55 e *environ
52 *ec2.Instance 56 *ec2.Instance
53 } 57 }
54 58
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
88 } 92 }
89 } 93 }
90 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst .Id()) 94 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst .Id())
91 } 95 }
92 96
93 func (cfg *providerConfig) Open() (environs.Environ, error) { 97 func (cfg *providerConfig) Open() (environs.Environ, error) {
94 log.Printf("environs/ec2: opening environment %q", cfg.name) 98 log.Printf("environs/ec2: opening environment %q", cfg.name)
95 if Regions[cfg.region].EC2Endpoint == "" { 99 if Regions[cfg.region].EC2Endpoint == "" {
96 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope ning %q", cfg.region, cfg.name) 100 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope ning %q", cfg.region, cfg.name)
97 } 101 }
98 » e := &environ{ 102 » e := new(environ)
99 » » config: cfg, 103 » e.setConfig(cfg)
100 » » ec2: ec2.New(cfg.auth, Regions[cfg.region]),
101 » » s3: s3.New(cfg.auth, Regions[cfg.region]),
102 » }
103 » e.storage.bucket = e.s3.Bucket(cfg.bucket)
104 return e, nil 104 return e, nil
105 } 105 }
106 106
107 func (e *environ) Bootstrap(uploadTools bool) error { 107 func (e *environ) Bootstrap(uploadTools bool) error {
108 log.Printf("environs/ec2: bootstrapping environment %q", e.name) 108 log.Printf("environs/ec2: bootstrapping environment %q", e.name)
109 _, err := e.loadState() 109 _, err := e.loadState()
110 if err == nil { 110 if err == nil {
111 return fmt.Errorf("environment is already bootstrapped") 111 return fmt.Errorf("environment is already bootstrapped")
112 } 112 }
113 if _, notFound := err.(*environs.NotFoundError); !notFound { 113 if _, notFound := err.(*environs.NotFoundError); !notFound {
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
168 } 168 }
169 if len(addrs) == 0 { 169 if len(addrs) == 0 {
170 return nil, fmt.Errorf("timed out waiting for zk address from %v ", st.ZookeeperInstances) 170 return nil, fmt.Errorf("timed out waiting for zk address from %v ", st.ZookeeperInstances)
171 } 171 }
172 return &state.Info{ 172 return &state.Info{
173 Addrs: addrs, 173 Addrs: addrs,
174 UseSSH: true, 174 UseSSH: true,
175 }, nil 175 }, nil
176 } 176 }
177 177
178 func (e *environ) SetConfig(cfg environs.EnvironConfig) error {
179 c, ok := cfg.(*providerConfig)
180 if !ok {
181 panic(fmt.Sprintf("configuration was %T, expected %T", cfg, new( providerConfig)))
182 }
183 e.setConfig(c)
184 return nil
185 }
186
187 func (e *environ) setConfig(cfg *providerConfig) {
188 e.configMutex.Lock()
189 e.configUnlocked = cfg
190 e.configMutex.Unlock()
191 e.storage.setBucket(e.s3().Bucket(cfg.bucket))
192 }
193
194 // config returns the current *providerConfig.
195 func (e *environ) config() *providerConfig {
196 e.configMutex.Lock()
197 defer e.configMutex.Unlock()
198 return e.configUnlocked
199 }
200
201 // ec2 returns the current *ec2.EC2.
202 func (e *environ) ec2() *ec2.EC2 {
203 cfg := e.config()
204 return ec2.New(cfg.auth, Regions[cfg.region])
205 }
206
207 // s3 returns the current *s3.S3.
208 func (e *environ) s3() *s3.S3 {
209 cfg := e.config()
210 return s3.New(cfg.auth, Regions[cfg.region])
211 }
212
178 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta nce, error) { 213 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta nce, error) {
179 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) 214 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name)
180 return e.startInstance(machineId, info, false) 215 return e.startInstance(machineId, info, false)
181 } 216 }
182 217
183 func (e *environ) userData(machineId int, info *state.Info, master bool) ([]byte , error) { 218 func (e *environ) userData(machineId int, info *state.Info, master bool) ([]byte , error) {
219 config := e.config()
184 cfg := &machineConfig{ 220 cfg := &machineConfig{
185 provisioner: master, 221 provisioner: master,
186 zookeeper: master, 222 zookeeper: master,
187 stateInfo: info, 223 stateInfo: info,
188 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data /instance-id)", 224 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data /instance-id)",
189 providerType: "ec2", 225 providerType: "ec2",
190 » » origin: e.config.origin, 226 » » origin: config.origin,
191 machineId: fmt.Sprint(machineId), 227 machineId: fmt.Sprint(machineId),
192 } 228 }
193 229
194 » if e.config.authorizedKeys == "" { 230 » if config.authorizedKeys == "" {
195 var err error 231 var err error
196 » » cfg.authorizedKeys, err = authorizedKeys(e.config.authorizedKeys Path) 232 » » cfg.authorizedKeys, err = authorizedKeys(config.authorizedKeysPa th)
197 if err != nil { 233 if err != nil {
198 return nil, fmt.Errorf("cannot get ssh authorized keys: %v", err) 234 return nil, fmt.Errorf("cannot get ssh authorized keys: %v", err)
199 } 235 }
200 } 236 }
201 cloudcfg, err := newCloudInit(cfg) 237 cloudcfg, err := newCloudInit(cfg)
202 if err != nil { 238 if err != nil {
203 return nil, err 239 return nil, err
204 } 240 }
205 return cloudcfg.Render() 241 return cloudcfg.Render()
206 } 242 }
(...skipping 10 matching lines...) Expand all
217 if err != nil { 253 if err != nil {
218 return nil, err 254 return nil, err
219 } 255 }
220 groups, err := e.setUpGroups(machineId) 256 groups, err := e.setUpGroups(machineId)
221 if err != nil { 257 if err != nil {
222 return nil, fmt.Errorf("cannot set up groups: %v", err) 258 return nil, fmt.Errorf("cannot set up groups: %v", err)
223 } 259 }
224 var instances *ec2.RunInstancesResp 260 var instances *ec2.RunInstancesResp
225 261
226 for a := shortAttempt.start(); a.next(); { 262 for a := shortAttempt.start(); a.next(); {
227 » » instances, err = e.ec2.RunInstances(&ec2.RunInstances{ 263 » » instances, err = e.ec2().RunInstances(&ec2.RunInstances{
228 ImageId: image.ImageId, 264 ImageId: image.ImageId,
229 MinCount: 1, 265 MinCount: 1,
230 MaxCount: 1, 266 MaxCount: 1,
231 UserData: userData, 267 UserData: userData,
232 InstanceType: "m1.small", 268 InstanceType: "m1.small",
233 SecurityGroups: groups, 269 SecurityGroups: groups,
234 }) 270 })
235 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { 271 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" {
236 break 272 break
237 } 273 }
(...skipping 28 matching lines...) Expand all
266 need = append(need, ids[i]) 302 need = append(need, ids[i])
267 } 303 }
268 } 304 }
269 if len(need) == 0 { 305 if len(need) == 0 {
270 return nil 306 return nil
271 } 307 }
272 filter := ec2.NewFilter() 308 filter := ec2.NewFilter()
273 filter.Add("instance-state-name", "pending", "running") 309 filter.Add("instance-state-name", "pending", "running")
274 filter.Add("group-name", e.groupName()) 310 filter.Add("group-name", e.groupName())
275 filter.Add("instance-id", need...) 311 filter.Add("instance-id", need...)
276 » resp, err := e.ec2.Instances(nil, filter) 312 » resp, err := e.ec2().Instances(nil, filter)
277 if err != nil { 313 if err != nil {
278 return err 314 return err
279 } 315 }
280 n := 0 316 n := 0
281 // For each requested id, add it to the returned instances 317 // For each requested id, add it to the returned instances
282 // if we find it in the response. 318 // if we find it in the response.
283 for i, id := range ids { 319 for i, id := range ids {
284 if insts[i] != nil { 320 if insts[i] != nil {
285 continue 321 continue
286 } 322 }
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
329 } 365 }
330 return insts, nil 366 return insts, nil
331 } 367 }
332 368
333 func (e *environ) Destroy(insts []environs.Instance) error { 369 func (e *environ) Destroy(insts []environs.Instance) error {
334 log.Printf("environs/ec2: destroying environment %q", e.name) 370 log.Printf("environs/ec2: destroying environment %q", e.name)
335 // Try to find all the instances in the environ's group. 371 // Try to find all the instances in the environ's group.
336 filter := ec2.NewFilter() 372 filter := ec2.NewFilter()
337 filter.Add("instance-state-name", "pending", "running") 373 filter.Add("instance-state-name", "pending", "running")
338 filter.Add("group-name", e.groupName()) 374 filter.Add("group-name", e.groupName())
339 » resp, err := e.ec2.Instances(nil, filter) 375 » resp, err := e.ec2().Instances(nil, filter)
340 if err != nil { 376 if err != nil {
341 return fmt.Errorf("cannot get instances: %v", err) 377 return fmt.Errorf("cannot get instances: %v", err)
342 } 378 }
343 var ids []string 379 var ids []string
344 found := make(map[string]bool) 380 found := make(map[string]bool)
345 for _, r := range resp.Reservations { 381 for _, r := range resp.Reservations {
346 for _, inst := range r.Instances { 382 for _, inst := range r.Instances {
347 ids = append(ids, inst.InstanceId) 383 ids = append(ids, inst.InstanceId)
348 found[inst.InstanceId] = true 384 found[inst.InstanceId] = true
349 } 385 }
(...skipping 17 matching lines...) Expand all
367 return err 403 return err
368 } 404 }
369 return nil 405 return nil
370 } 406 }
371 407
372 func (e *environ) terminateInstances(ids []string) error { 408 func (e *environ) terminateInstances(ids []string) error {
373 if len(ids) == 0 { 409 if len(ids) == 0 {
374 return nil 410 return nil
375 } 411 }
376 var err error 412 var err error
413 ec2 := e.ec2()
377 for a := shortAttempt.start(); a.next(); { 414 for a := shortAttempt.start(); a.next(); {
378 » » _, err = e.ec2.TerminateInstances(ids) 415 » » _, err = ec2.TerminateInstances(ids)
379 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound" { 416 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound" {
380 return err 417 return err
381 } 418 }
382 } 419 }
383 if len(ids) == 1 { 420 if len(ids) == 1 {
384 return err 421 return err
385 } 422 }
386 var firstErr error 423 var firstErr error
387 // If we get a NotFound error, it means that no instances have been 424 // If we get a NotFound error, it means that no instances have been
388 // terminated even if some exist, so try them one by one, ignoring 425 // terminated even if some exist, so try them one by one, ignoring
389 // NotFound errors. 426 // NotFound errors.
390 for _, id := range ids { 427 for _, id := range ids {
391 » » _, err = e.ec2.TerminateInstances([]string{id}) 428 » » _, err = ec2.TerminateInstances([]string{id})
392 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { 429 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" {
393 err = nil 430 err = nil
394 } 431 }
395 if err != nil && firstErr == nil { 432 if err != nil && firstErr == nil {
396 firstErr = err 433 firstErr = err
397 } 434 }
398 } 435 }
399 return firstErr 436 return firstErr
400 } 437 }
401 438
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil 472 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil
436 } 473 }
437 474
438 // zeroGroup holds the zero security group. 475 // zeroGroup holds the zero security group.
439 var zeroGroup ec2.SecurityGroup 476 var zeroGroup ec2.SecurityGroup
440 477
441 // ensureGroup returns the security group with name and perms. 478 // ensureGroup returns the security group with name and perms.
442 // If a group with name does not exist, one will be created. 479 // If a group with name does not exist, one will be created.
443 // If it exists, its permissions are set to perms. 480 // If it exists, its permissions are set to perms.
444 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr oup, err error) { 481 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr oup, err error) {
445 » resp, err := e.ec2.CreateSecurityGroup(name, "juju group") 482 » ectwo := e.ec2()
483 » resp, err := ectwo.CreateSecurityGroup(name, "juju group")
446 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { 484 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" {
447 return zeroGroup, err 485 return zeroGroup, err
448 } 486 }
449 487
450 want := newPermSet(perms) 488 want := newPermSet(perms)
451 var have permSet 489 var have permSet
452 if err == nil { 490 if err == nil {
453 g = resp.SecurityGroup 491 g = resp.SecurityGroup
454 } else { 492 } else {
455 » » resp, err := e.ec2.SecurityGroups(ec2.SecurityGroupNames(name), nil) 493 » » resp, err := ectwo.SecurityGroups(ec2.SecurityGroupNames(name), nil)
456 if err != nil { 494 if err != nil {
457 return zeroGroup, err 495 return zeroGroup, err
458 } 496 }
459 // It's possible that the old group has the wrong 497 // It's possible that the old group has the wrong
460 // description here, but if it does it's probably due 498 // description here, but if it does it's probably due
461 // to something deliberately playing games with juju, 499 // to something deliberately playing games with juju,
462 // so we ignore it. 500 // so we ignore it.
463 have = newPermSet(resp.Groups[0].IPPerms) 501 have = newPermSet(resp.Groups[0].IPPerms)
464 g = resp.Groups[0].SecurityGroup 502 g = resp.Groups[0].SecurityGroup
465 } 503 }
466 revoke := make(permSet) 504 revoke := make(permSet)
467 for p := range have { 505 for p := range have {
468 if !want[p] { 506 if !want[p] {
469 revoke[p] = true 507 revoke[p] = true
470 } 508 }
471 } 509 }
472 if len(revoke) > 0 { 510 if len(revoke) > 0 {
473 » » _, err := e.ec2.RevokeSecurityGroup(g, revoke.ipPerms()) 511 » » _, err := ectwo.RevokeSecurityGroup(g, revoke.ipPerms())
474 if err != nil { 512 if err != nil {
475 return zeroGroup, fmt.Errorf("cannot revoke security gro up: %v", err) 513 return zeroGroup, fmt.Errorf("cannot revoke security gro up: %v", err)
476 } 514 }
477 } 515 }
478 516
479 add := make(permSet) 517 add := make(permSet)
480 for p := range want { 518 for p := range want {
481 if !have[p] { 519 if !have[p] {
482 add[p] = true 520 add[p] = true
483 } 521 }
484 } 522 }
485 if len(add) > 0 { 523 if len(add) > 0 {
486 » » _, err := e.ec2.AuthorizeSecurityGroup(g, add.ipPerms()) 524 » » _, err := ectwo.AuthorizeSecurityGroup(g, add.ipPerms())
487 if err != nil { 525 if err != nil {
488 return zeroGroup, fmt.Errorf("cannot authorize securityG roup: %v", err) 526 return zeroGroup, fmt.Errorf("cannot authorize securityG roup: %v", err)
489 } 527 }
490 } 528 }
491 return g, nil 529 return g, nil
492 } 530 }
493 531
494 // permKey represents a permission for a group or an ip address range 532 // permKey represents a permission for a group or an ip address range
495 // to access the given range of ports. Only one of groupId or ipAddr 533 // to access the given range of ports. Only one of groupId or ipAddr
496 // should be non-empty. 534 // should be non-empty.
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
551 589
552 // If the err is of type *ec2.Error, ec2ErrCode returns 590 // If the err is of type *ec2.Error, ec2ErrCode returns
553 // its code, otherwise it returns the empty string. 591 // its code, otherwise it returns the empty string.
554 func ec2ErrCode(err error) string { 592 func ec2ErrCode(err error) string {
555 ec2err, _ := err.(*ec2.Error) 593 ec2err, _ := err.(*ec2.Error)
556 if ec2err == nil { 594 if ec2err == nil {
557 return "" 595 return ""
558 } 596 }
559 return ec2err.Code 597 return ec2err.Code
560 } 598 }
OLDNEW
« no previous file with comments | « environs/ec2/config_test.go ('k') | environs/ec2/export_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d