OLD | NEW |
1 package ec2 | 1 package ec2 |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "launchpad.net/goamz/ec2" | 5 "launchpad.net/goamz/ec2" |
6 "launchpad.net/goamz/s3" | 6 "launchpad.net/goamz/s3" |
7 "launchpad.net/juju/go/environs" | 7 "launchpad.net/juju/go/environs" |
8 "launchpad.net/juju/go/log" | 8 "launchpad.net/juju/go/log" |
9 "launchpad.net/juju/go/state" | 9 "launchpad.net/juju/go/state" |
10 "launchpad.net/juju/go/version" | 10 "launchpad.net/juju/go/version" |
| 11 "sync" |
11 "time" | 12 "time" |
12 ) | 13 ) |
13 | 14 |
14 const zkPort = 2181 | 15 const zkPort = 2181 |
15 | 16 |
16 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) | 17 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) |
17 | 18 |
18 // A request may fail to due "eventual consistency" semantics, which | 19 // A request may fail to due "eventual consistency" semantics, which |
19 // should resolve fairly quickly. A request may also fail due to a slow | 20 // should resolve fairly quickly. A request may also fail due to a slow |
20 // state transition (for instance an instance taking a while to release | 21 // state transition (for instance an instance taking a while to release |
(...skipping 11 matching lines...) Expand all Loading... |
32 | 33 |
33 func init() { | 34 func init() { |
34 environs.RegisterProvider("ec2", environProvider{}) | 35 environs.RegisterProvider("ec2", environProvider{}) |
35 } | 36 } |
36 | 37 |
37 type environProvider struct{} | 38 type environProvider struct{} |
38 | 39 |
39 var _ environs.EnvironProvider = environProvider{} | 40 var _ environs.EnvironProvider = environProvider{} |
40 | 41 |
41 type environ struct { | 42 type environ struct { |
42 » name string | 43 » name string |
43 » config *providerConfig | 44 |
44 » ec2 *ec2.EC2 | 45 » // configMutex protects the *Unlocked fields below. |
45 » s3 *s3.S3 | 46 » configMutex sync.Mutex |
46 » storage storage | 47 » configUnlocked *providerConfig |
47 » publicStorage *storage // optional. | 48 » ec2Unlocked *ec2.EC2 |
| 49 » s3Unlocked *s3.S3 |
| 50 » storageUnlocked *storage |
| 51 » publicStorageUnlocked *storage // optional. |
48 } | 52 } |
49 | 53 |
50 var _ environs.Environ = (*environ)(nil) | 54 var _ environs.Environ = (*environ)(nil) |
51 | 55 |
52 type instance struct { | 56 type instance struct { |
53 e *environ | 57 e *environ |
54 *ec2.Instance | 58 *ec2.Instance |
55 } | 59 } |
56 | 60 |
57 func (inst *instance) String() string { | 61 func (inst *instance) String() string { |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 } | 94 } |
91 } | 95 } |
92 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst
.Id()) | 96 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst
.Id()) |
93 } | 97 } |
94 | 98 |
95 func (cfg *providerConfig) Open() (environs.Environ, error) { | 99 func (cfg *providerConfig) Open() (environs.Environ, error) { |
96 log.Printf("environs/ec2: opening environment %q", cfg.name) | 100 log.Printf("environs/ec2: opening environment %q", cfg.name) |
97 if Regions[cfg.region].EC2Endpoint == "" { | 101 if Regions[cfg.region].EC2Endpoint == "" { |
98 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope
ning %q", cfg.region, cfg.name) | 102 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope
ning %q", cfg.region, cfg.name) |
99 } | 103 } |
100 » e := &environ{ | 104 » e := new(environ) |
101 » » config: cfg, | 105 » e.SetConfig(cfg) |
102 » » ec2: ec2.New(cfg.auth, Regions[cfg.region]), | 106 » return e, nil |
103 » » s3: s3.New(cfg.auth, Regions[cfg.region]), | 107 } |
| 108 |
| 109 func (e *environ) SetConfig(cfg environs.EnvironConfig) { |
| 110 » config := cfg.(*providerConfig) |
| 111 » e.configMutex.Lock() |
| 112 » defer e.configMutex.Unlock() |
| 113 » e.configUnlocked = config |
| 114 » e.ec2Unlocked = ec2.New(config.auth, Regions[config.region]) |
| 115 » e.s3Unlocked = s3.New(config.auth, Regions[config.region]) |
| 116 |
| 117 » // create new storage instances, existing instances continue |
| 118 » // to reference their existing configuration. |
| 119 » e.storageUnlocked = &storage{ |
| 120 » » bucket: e.s3Unlocked.Bucket(config.bucket), |
104 } | 121 } |
105 » e.storage.bucket = e.s3.Bucket(cfg.bucket) | 122 » if config.publicBucket != "" { |
106 » if cfg.publicBucket != "" { | 123 » » e.publicStorageUnlocked = &storage{ |
107 » » e.publicStorage = &storage{bucket: e.s3.Bucket(cfg.publicBucket)
} | 124 » » » bucket: e.s3Unlocked.Bucket(config.publicBucket), |
| 125 » » } |
| 126 » } else { |
| 127 » » e.publicStorageUnlocked = nil |
108 } | 128 } |
109 » return e, nil | 129 } |
| 130 |
| 131 func (e *environ) config() *providerConfig { |
| 132 » e.configMutex.Lock() |
| 133 » defer e.configMutex.Unlock() |
| 134 » return e.configUnlocked |
| 135 } |
| 136 |
| 137 func (e *environ) ec2() *ec2.EC2 { |
| 138 » e.configMutex.Lock() |
| 139 » defer e.configMutex.Unlock() |
| 140 » return e.ec2Unlocked |
| 141 } |
| 142 |
| 143 func (e *environ) s3() *s3.S3 { |
| 144 » e.configMutex.Lock() |
| 145 » defer e.configMutex.Unlock() |
| 146 » return e.s3Unlocked |
| 147 } |
| 148 |
| 149 func (e *environ) Storage() environs.Storage { |
| 150 » e.configMutex.Lock() |
| 151 » defer e.configMutex.Unlock() |
| 152 » return e.storageUnlocked |
| 153 } |
| 154 |
| 155 func (e *environ) PublicStorage() environs.StorageReader { |
| 156 » e.configMutex.Lock() |
| 157 » defer e.configMutex.Unlock() |
| 158 » if e.publicStorageUnlocked == nil { |
| 159 » » return environs.EmptyStorage |
| 160 » } |
| 161 » return e.publicStorageUnlocked |
110 } | 162 } |
111 | 163 |
112 func (e *environ) Bootstrap(uploadTools bool) error { | 164 func (e *environ) Bootstrap(uploadTools bool) error { |
113 log.Printf("environs/ec2: bootstrapping environment %q", e.name) | 165 log.Printf("environs/ec2: bootstrapping environment %q", e.name) |
114 _, err := e.loadState() | 166 _, err := e.loadState() |
115 if err == nil { | 167 if err == nil { |
116 return fmt.Errorf("environment is already bootstrapped") | 168 return fmt.Errorf("environment is already bootstrapped") |
117 } | 169 } |
118 if _, notFound := err.(*environs.NotFoundError); !notFound { | 170 if _, notFound := err.(*environs.NotFoundError); !notFound { |
119 return fmt.Errorf("cannot query old bootstrap state: %v", err) | 171 return fmt.Errorf("cannot query old bootstrap state: %v", err) |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
178 UseSSH: true, | 230 UseSSH: true, |
179 }, nil | 231 }, nil |
180 } | 232 } |
181 | 233 |
182 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta
nce, error) { | 234 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta
nce, error) { |
183 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) | 235 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) |
184 return e.startInstance(machineId, info, false) | 236 return e.startInstance(machineId, info, false) |
185 } | 237 } |
186 | 238 |
187 func (e *environ) userData(machineId int, info *state.Info, master bool, toolsUR
L string) ([]byte, error) { | 239 func (e *environ) userData(machineId int, info *state.Info, master bool, toolsUR
L string) ([]byte, error) { |
| 240 config := e.config() |
188 cfg := &machineConfig{ | 241 cfg := &machineConfig{ |
189 provisioner: master, | 242 provisioner: master, |
190 zookeeper: master, | 243 zookeeper: master, |
191 stateInfo: info, | 244 stateInfo: info, |
192 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data
/instance-id)", | 245 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data
/instance-id)", |
193 providerType: "ec2", | 246 providerType: "ec2", |
194 toolsURL: toolsURL, | 247 toolsURL: toolsURL, |
195 machineId: machineId, | 248 machineId: machineId, |
196 } | 249 } |
197 | 250 |
198 » if e.config.authorizedKeys == "" { | 251 » if config.authorizedKeys == "" { |
199 var err error | 252 var err error |
200 » » cfg.authorizedKeys, err = authorizedKeys(e.config.authorizedKeys
Path) | 253 » » cfg.authorizedKeys, err = authorizedKeys(config.authorizedKeysPa
th) |
201 if err != nil { | 254 if err != nil { |
202 return nil, fmt.Errorf("cannot get ssh authorized keys:
%v", err) | 255 return nil, fmt.Errorf("cannot get ssh authorized keys:
%v", err) |
203 } | 256 } |
204 } | 257 } |
205 cloudcfg, err := newCloudInit(cfg) | 258 cloudcfg, err := newCloudInit(cfg) |
206 if err != nil { | 259 if err != nil { |
207 return nil, err | 260 return nil, err |
208 } | 261 } |
209 return cloudcfg.Render() | 262 return cloudcfg.Render() |
210 } | 263 } |
(...skipping 15 matching lines...) Expand all Loading... |
226 if err != nil { | 279 if err != nil { |
227 return nil, fmt.Errorf("cannot make user data: %v", err) | 280 return nil, fmt.Errorf("cannot make user data: %v", err) |
228 } | 281 } |
229 groups, err := e.setUpGroups(machineId) | 282 groups, err := e.setUpGroups(machineId) |
230 if err != nil { | 283 if err != nil { |
231 return nil, fmt.Errorf("cannot set up groups: %v", err) | 284 return nil, fmt.Errorf("cannot set up groups: %v", err) |
232 } | 285 } |
233 var instances *ec2.RunInstancesResp | 286 var instances *ec2.RunInstancesResp |
234 | 287 |
235 for a := shortAttempt.start(); a.next(); { | 288 for a := shortAttempt.start(); a.next(); { |
236 » » instances, err = e.ec2.RunInstances(&ec2.RunInstances{ | 289 » » instances, err = e.ec2().RunInstances(&ec2.RunInstances{ |
237 ImageId: spec.imageId, | 290 ImageId: spec.imageId, |
238 MinCount: 1, | 291 MinCount: 1, |
239 MaxCount: 1, | 292 MaxCount: 1, |
240 UserData: userData, | 293 UserData: userData, |
241 InstanceType: "m1.small", | 294 InstanceType: "m1.small", |
242 SecurityGroups: groups, | 295 SecurityGroups: groups, |
243 }) | 296 }) |
244 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { | 297 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { |
245 break | 298 break |
246 } | 299 } |
(...skipping 28 matching lines...) Expand all Loading... |
275 need = append(need, ids[i]) | 328 need = append(need, ids[i]) |
276 } | 329 } |
277 } | 330 } |
278 if len(need) == 0 { | 331 if len(need) == 0 { |
279 return nil | 332 return nil |
280 } | 333 } |
281 filter := ec2.NewFilter() | 334 filter := ec2.NewFilter() |
282 filter.Add("instance-state-name", "pending", "running") | 335 filter.Add("instance-state-name", "pending", "running") |
283 filter.Add("group-name", e.groupName()) | 336 filter.Add("group-name", e.groupName()) |
284 filter.Add("instance-id", need...) | 337 filter.Add("instance-id", need...) |
285 » resp, err := e.ec2.Instances(nil, filter) | 338 » resp, err := e.ec2().Instances(nil, filter) |
286 if err != nil { | 339 if err != nil { |
287 return err | 340 return err |
288 } | 341 } |
289 n := 0 | 342 n := 0 |
290 // For each requested id, add it to the returned instances | 343 // For each requested id, add it to the returned instances |
291 // if we find it in the response. | 344 // if we find it in the response. |
292 for i, id := range ids { | 345 for i, id := range ids { |
293 if insts[i] != nil { | 346 if insts[i] != nil { |
294 continue | 347 continue |
295 } | 348 } |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
338 } | 391 } |
339 return insts, nil | 392 return insts, nil |
340 } | 393 } |
341 | 394 |
342 func (e *environ) Destroy(insts []environs.Instance) error { | 395 func (e *environ) Destroy(insts []environs.Instance) error { |
343 log.Printf("environs/ec2: destroying environment %q", e.name) | 396 log.Printf("environs/ec2: destroying environment %q", e.name) |
344 // Try to find all the instances in the environ's group. | 397 // Try to find all the instances in the environ's group. |
345 filter := ec2.NewFilter() | 398 filter := ec2.NewFilter() |
346 filter.Add("instance-state-name", "pending", "running") | 399 filter.Add("instance-state-name", "pending", "running") |
347 filter.Add("group-name", e.groupName()) | 400 filter.Add("group-name", e.groupName()) |
348 » resp, err := e.ec2.Instances(nil, filter) | 401 » resp, err := e.ec2().Instances(nil, filter) |
349 if err != nil { | 402 if err != nil { |
350 return fmt.Errorf("cannot get instances: %v", err) | 403 return fmt.Errorf("cannot get instances: %v", err) |
351 } | 404 } |
352 var ids []string | 405 var ids []string |
353 found := make(map[string]bool) | 406 found := make(map[string]bool) |
354 for _, r := range resp.Reservations { | 407 for _, r := range resp.Reservations { |
355 for _, inst := range r.Instances { | 408 for _, inst := range r.Instances { |
356 ids = append(ids, inst.InstanceId) | 409 ids = append(ids, inst.InstanceId) |
357 found[inst.InstanceId] = true | 410 found[inst.InstanceId] = true |
358 } | 411 } |
359 } | 412 } |
360 | 413 |
361 // Then add any instances we've been told about but haven't yet shown | 414 // Then add any instances we've been told about but haven't yet shown |
362 // up in the instance list. | 415 // up in the instance list. |
363 for _, inst := range insts { | 416 for _, inst := range insts { |
364 id := inst.(*instance).InstanceId | 417 id := inst.(*instance).InstanceId |
365 if !found[id] { | 418 if !found[id] { |
366 ids = append(ids, id) | 419 ids = append(ids, id) |
367 found[id] = true | 420 found[id] = true |
368 } | 421 } |
369 } | 422 } |
370 err = e.terminateInstances(ids) | 423 err = e.terminateInstances(ids) |
371 if err != nil { | 424 if err != nil { |
372 return err | 425 return err |
373 } | 426 } |
374 » err = e.storage.deleteAll() | 427 » // to properly observe e.storageUnlocked we need to get it's value while |
| 428 » // holding e.configMutex. e.Storage() does this for us, then we convert |
| 429 » // back to the (*storage) to access the private deleteAll() method. |
| 430 » st := e.Storage().(*storage) |
| 431 » err = st.deleteAll() |
375 if err != nil { | 432 if err != nil { |
376 return err | 433 return err |
377 } | 434 } |
378 return nil | 435 return nil |
379 } | 436 } |
380 | 437 |
381 func (e *environ) terminateInstances(ids []string) error { | 438 func (e *environ) terminateInstances(ids []string) error { |
382 if len(ids) == 0 { | 439 if len(ids) == 0 { |
383 return nil | 440 return nil |
384 } | 441 } |
385 var err error | 442 var err error |
| 443 ec2inst := e.ec2() |
386 for a := shortAttempt.start(); a.next(); { | 444 for a := shortAttempt.start(); a.next(); { |
387 » » _, err = e.ec2.TerminateInstances(ids) | 445 » » _, err = ec2inst.TerminateInstances(ids) |
388 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound"
{ | 446 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound"
{ |
389 return err | 447 return err |
390 } | 448 } |
391 } | 449 } |
392 if len(ids) == 1 { | 450 if len(ids) == 1 { |
393 return err | 451 return err |
394 } | 452 } |
395 var firstErr error | 453 var firstErr error |
396 // If we get a NotFound error, it means that no instances have been | 454 // If we get a NotFound error, it means that no instances have been |
397 // terminated even if some exist, so try them one by one, ignoring | 455 // terminated even if some exist, so try them one by one, ignoring |
398 // NotFound errors. | 456 // NotFound errors. |
399 for _, id := range ids { | 457 for _, id := range ids { |
400 » » _, err = e.ec2.TerminateInstances([]string{id}) | 458 » » _, err = ec2inst.TerminateInstances([]string{id}) |
401 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { | 459 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { |
402 err = nil | 460 err = nil |
403 } | 461 } |
404 if err != nil && firstErr == nil { | 462 if err != nil && firstErr == nil { |
405 firstErr = err | 463 firstErr = err |
406 } | 464 } |
407 } | 465 } |
408 return firstErr | 466 return firstErr |
409 } | 467 } |
410 | 468 |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
444 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil | 502 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil |
445 } | 503 } |
446 | 504 |
447 // zeroGroup holds the zero security group. | 505 // zeroGroup holds the zero security group. |
448 var zeroGroup ec2.SecurityGroup | 506 var zeroGroup ec2.SecurityGroup |
449 | 507 |
450 // ensureGroup returns the security group with name and perms. | 508 // ensureGroup returns the security group with name and perms. |
451 // If a group with name does not exist, one will be created. | 509 // If a group with name does not exist, one will be created. |
452 // If it exists, its permissions are set to perms. | 510 // If it exists, its permissions are set to perms. |
453 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr
oup, err error) { | 511 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr
oup, err error) { |
454 » resp, err := e.ec2.CreateSecurityGroup(name, "juju group") | 512 » ec2inst := e.ec2() |
| 513 » resp, err := ec2inst.CreateSecurityGroup(name, "juju group") |
455 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { | 514 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { |
456 return zeroGroup, err | 515 return zeroGroup, err |
457 } | 516 } |
458 | 517 |
459 want := newPermSet(perms) | 518 want := newPermSet(perms) |
460 var have permSet | 519 var have permSet |
461 if err == nil { | 520 if err == nil { |
462 g = resp.SecurityGroup | 521 g = resp.SecurityGroup |
463 } else { | 522 } else { |
464 » » resp, err := e.ec2.SecurityGroups(ec2.SecurityGroupNames(name),
nil) | 523 » » resp, err := ec2inst.SecurityGroups(ec2.SecurityGroupNames(name)
, nil) |
465 if err != nil { | 524 if err != nil { |
466 return zeroGroup, err | 525 return zeroGroup, err |
467 } | 526 } |
468 // It's possible that the old group has the wrong | 527 // It's possible that the old group has the wrong |
469 // description here, but if it does it's probably due | 528 // description here, but if it does it's probably due |
470 // to something deliberately playing games with juju, | 529 // to something deliberately playing games with juju, |
471 // so we ignore it. | 530 // so we ignore it. |
472 have = newPermSet(resp.Groups[0].IPPerms) | 531 have = newPermSet(resp.Groups[0].IPPerms) |
473 g = resp.Groups[0].SecurityGroup | 532 g = resp.Groups[0].SecurityGroup |
474 } | 533 } |
475 revoke := make(permSet) | 534 revoke := make(permSet) |
476 for p := range have { | 535 for p := range have { |
477 if !want[p] { | 536 if !want[p] { |
478 revoke[p] = true | 537 revoke[p] = true |
479 } | 538 } |
480 } | 539 } |
481 if len(revoke) > 0 { | 540 if len(revoke) > 0 { |
482 » » _, err := e.ec2.RevokeSecurityGroup(g, revoke.ipPerms()) | 541 » » _, err := ec2inst.RevokeSecurityGroup(g, revoke.ipPerms()) |
483 if err != nil { | 542 if err != nil { |
484 return zeroGroup, fmt.Errorf("cannot revoke security gro
up: %v", err) | 543 return zeroGroup, fmt.Errorf("cannot revoke security gro
up: %v", err) |
485 } | 544 } |
486 } | 545 } |
487 | 546 |
488 add := make(permSet) | 547 add := make(permSet) |
489 for p := range want { | 548 for p := range want { |
490 if !have[p] { | 549 if !have[p] { |
491 add[p] = true | 550 add[p] = true |
492 } | 551 } |
493 } | 552 } |
494 if len(add) > 0 { | 553 if len(add) > 0 { |
495 » » _, err := e.ec2.AuthorizeSecurityGroup(g, add.ipPerms()) | 554 » » _, err := ec2inst.AuthorizeSecurityGroup(g, add.ipPerms()) |
496 if err != nil { | 555 if err != nil { |
497 return zeroGroup, fmt.Errorf("cannot authorize securityG
roup: %v", err) | 556 return zeroGroup, fmt.Errorf("cannot authorize securityG
roup: %v", err) |
498 } | 557 } |
499 } | 558 } |
500 return g, nil | 559 return g, nil |
501 } | 560 } |
502 | 561 |
503 // permKey represents a permission for a group or an ip address range | 562 // permKey represents a permission for a group or an ip address range |
504 // to access the given range of ports. Only one of groupId or ipAddr | 563 // to access the given range of ports. Only one of groupId or ipAddr |
505 // should be non-empty. | 564 // should be non-empty. |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
560 | 619 |
561 // If the err is of type *ec2.Error, ec2ErrCode returns | 620 // If the err is of type *ec2.Error, ec2ErrCode returns |
562 // its code, otherwise it returns the empty string. | 621 // its code, otherwise it returns the empty string. |
563 func ec2ErrCode(err error) string { | 622 func ec2ErrCode(err error) string { |
564 ec2err, _ := err.(*ec2.Error) | 623 ec2err, _ := err.(*ec2.Error) |
565 if ec2err == nil { | 624 if ec2err == nil { |
566 return "" | 625 return "" |
567 } | 626 } |
568 return ec2err.Code | 627 return ec2err.Code |
569 } | 628 } |
OLD | NEW |