| OLD | NEW |
| 1 package ec2 | 1 package ec2 |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "fmt" | 4 "fmt" |
| 5 "launchpad.net/goamz/ec2" | 5 "launchpad.net/goamz/ec2" |
| 6 "launchpad.net/goamz/s3" | 6 "launchpad.net/goamz/s3" |
| 7 "launchpad.net/juju/go/environs" | 7 "launchpad.net/juju/go/environs" |
| 8 "launchpad.net/juju/go/log" | 8 "launchpad.net/juju/go/log" |
| 9 "launchpad.net/juju/go/state" | 9 "launchpad.net/juju/go/state" |
| 10 "sync" |
| 10 "time" | 11 "time" |
| 11 ) | 12 ) |
| 12 | 13 |
| 13 const zkPort = 2181 | 14 const zkPort = 2181 |
| 14 | 15 |
| 15 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) | 16 var zkPortSuffix = fmt.Sprintf(":%d", zkPort) |
| 16 | 17 |
| 17 // A request may fail to due "eventual consistency" semantics, which | 18 // A request may fail to due "eventual consistency" semantics, which |
| 18 // should resolve fairly quickly. A request may also fail due to a slow | 19 // should resolve fairly quickly. A request may also fail due to a slow |
| 19 // state transition (for instance an instance taking a while to release | 20 // state transition (for instance an instance taking a while to release |
| (...skipping 11 matching lines...) Expand all Loading... |
| 31 | 32 |
| 32 func init() { | 33 func init() { |
| 33 environs.RegisterProvider("ec2", environProvider{}) | 34 environs.RegisterProvider("ec2", environProvider{}) |
| 34 } | 35 } |
| 35 | 36 |
| 36 type environProvider struct{} | 37 type environProvider struct{} |
| 37 | 38 |
| 38 var _ environs.EnvironProvider = environProvider{} | 39 var _ environs.EnvironProvider = environProvider{} |
| 39 | 40 |
| 40 type environ struct { | 41 type environ struct { |
| 41 » name string | 42 » name string |
| 42 » config *providerConfig | 43 |
| 43 » ec2 *ec2.EC2 | 44 » // configMutex protects visibility of configUnlocked |
| 44 » s3 *s3.S3 | 45 » configMutex sync.Mutex |
| 46 » configUnlocked *providerConfig |
| 47 |
| 48 » // storage has it's own mutex |
| 45 storage storage | 49 storage storage |
| 46 } | 50 } |
| 47 | 51 |
| 48 var _ environs.Environ = (*environ)(nil) | 52 var _ environs.Environ = (*environ)(nil) |
| 49 | 53 |
| 50 type instance struct { | 54 type instance struct { |
| 51 e *environ | 55 e *environ |
| 52 *ec2.Instance | 56 *ec2.Instance |
| 53 } | 57 } |
| 54 | 58 |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 88 } | 92 } |
| 89 } | 93 } |
| 90 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst
.Id()) | 94 return "", fmt.Errorf("timed out trying to get DNS address for %v", inst
.Id()) |
| 91 } | 95 } |
| 92 | 96 |
| 93 func (cfg *providerConfig) Open() (environs.Environ, error) { | 97 func (cfg *providerConfig) Open() (environs.Environ, error) { |
| 94 log.Printf("environs/ec2: opening environment %q", cfg.name) | 98 log.Printf("environs/ec2: opening environment %q", cfg.name) |
| 95 if Regions[cfg.region].EC2Endpoint == "" { | 99 if Regions[cfg.region].EC2Endpoint == "" { |
| 96 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope
ning %q", cfg.region, cfg.name) | 100 return nil, fmt.Errorf("no ec2 endpoint found for region %q, ope
ning %q", cfg.region, cfg.name) |
| 97 } | 101 } |
| 98 » e := &environ{ | 102 » e := new(environ) |
| 99 » » config: cfg, | 103 » e.setConfig(cfg) |
| 100 » » ec2: ec2.New(cfg.auth, Regions[cfg.region]), | |
| 101 » » s3: s3.New(cfg.auth, Regions[cfg.region]), | |
| 102 » } | |
| 103 » e.storage.bucket = e.s3.Bucket(cfg.bucket) | |
| 104 return e, nil | 104 return e, nil |
| 105 } | 105 } |
| 106 | 106 |
| 107 func (e *environ) Bootstrap(uploadTools bool) error { | 107 func (e *environ) Bootstrap(uploadTools bool) error { |
| 108 log.Printf("environs/ec2: bootstrapping environment %q", e.name) | 108 log.Printf("environs/ec2: bootstrapping environment %q", e.name) |
| 109 _, err := e.loadState() | 109 _, err := e.loadState() |
| 110 if err == nil { | 110 if err == nil { |
| 111 return fmt.Errorf("environment is already bootstrapped") | 111 return fmt.Errorf("environment is already bootstrapped") |
| 112 } | 112 } |
| 113 if _, notFound := err.(*environs.NotFoundError); !notFound { | 113 if _, notFound := err.(*environs.NotFoundError); !notFound { |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 168 } | 168 } |
| 169 if len(addrs) == 0 { | 169 if len(addrs) == 0 { |
| 170 return nil, fmt.Errorf("timed out waiting for zk address from %v
", st.ZookeeperInstances) | 170 return nil, fmt.Errorf("timed out waiting for zk address from %v
", st.ZookeeperInstances) |
| 171 } | 171 } |
| 172 return &state.Info{ | 172 return &state.Info{ |
| 173 Addrs: addrs, | 173 Addrs: addrs, |
| 174 UseSSH: true, | 174 UseSSH: true, |
| 175 }, nil | 175 }, nil |
| 176 } | 176 } |
| 177 | 177 |
| 178 func (e *environ) SetConfig(cfg environs.EnvironConfig) error { |
| 179 c, ok := cfg.(*providerConfig) |
| 180 if !ok { |
| 181 panic(fmt.Sprintf("configuration was %T, expected %T", cfg, new(
providerConfig))) |
| 182 } |
| 183 e.setConfig(c) |
| 184 return nil |
| 185 } |
| 186 |
| 187 func (e *environ) setConfig(cfg *providerConfig) { |
| 188 e.configMutex.Lock() |
| 189 e.configUnlocked = cfg |
| 190 e.configMutex.Unlock() |
| 191 e.storage.setBucket(e.s3().Bucket(cfg.bucket)) |
| 192 } |
| 193 |
| 194 // config returns the current *providerConfig. |
| 195 func (e *environ) config() *providerConfig { |
| 196 e.configMutex.Lock() |
| 197 defer e.configMutex.Unlock() |
| 198 return e.configUnlocked |
| 199 } |
| 200 |
| 201 // ec2 returns the current *ec2.EC2. |
| 202 func (e *environ) ec2() *ec2.EC2 { |
| 203 cfg := e.config() |
| 204 return ec2.New(cfg.auth, Regions[cfg.region]) |
| 205 } |
| 206 |
| 207 // s3 returns the current *s3.S3. |
| 208 func (e *environ) s3() *s3.S3 { |
| 209 cfg := e.config() |
| 210 return s3.New(cfg.auth, Regions[cfg.region]) |
| 211 } |
| 212 |
| 178 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta
nce, error) { | 213 func (e *environ) StartInstance(machineId int, info *state.Info) (environs.Insta
nce, error) { |
| 179 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) | 214 log.Printf("environs/ec2: starting machine %d in %q", machineId, e.name) |
| 180 return e.startInstance(machineId, info, false) | 215 return e.startInstance(machineId, info, false) |
| 181 } | 216 } |
| 182 | 217 |
| 183 func (e *environ) userData(machineId int, info *state.Info, master bool) ([]byte
, error) { | 218 func (e *environ) userData(machineId int, info *state.Info, master bool) ([]byte
, error) { |
| 219 config := e.config() |
| 184 cfg := &machineConfig{ | 220 cfg := &machineConfig{ |
| 185 provisioner: master, | 221 provisioner: master, |
| 186 zookeeper: master, | 222 zookeeper: master, |
| 187 stateInfo: info, | 223 stateInfo: info, |
| 188 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data
/instance-id)", | 224 instanceIdAccessor: "$(curl http://169.254.169.254/1.0/meta-data
/instance-id)", |
| 189 providerType: "ec2", | 225 providerType: "ec2", |
| 190 » » origin: e.config.origin, | 226 » » origin: config.origin, |
| 191 machineId: fmt.Sprint(machineId), | 227 machineId: fmt.Sprint(machineId), |
| 192 } | 228 } |
| 193 | 229 |
| 194 » if e.config.authorizedKeys == "" { | 230 » if config.authorizedKeys == "" { |
| 195 var err error | 231 var err error |
| 196 » » cfg.authorizedKeys, err = authorizedKeys(e.config.authorizedKeys
Path) | 232 » » cfg.authorizedKeys, err = authorizedKeys(config.authorizedKeysPa
th) |
| 197 if err != nil { | 233 if err != nil { |
| 198 return nil, fmt.Errorf("cannot get ssh authorized keys:
%v", err) | 234 return nil, fmt.Errorf("cannot get ssh authorized keys:
%v", err) |
| 199 } | 235 } |
| 200 } | 236 } |
| 201 cloudcfg, err := newCloudInit(cfg) | 237 cloudcfg, err := newCloudInit(cfg) |
| 202 if err != nil { | 238 if err != nil { |
| 203 return nil, err | 239 return nil, err |
| 204 } | 240 } |
| 205 return cloudcfg.Render() | 241 return cloudcfg.Render() |
| 206 } | 242 } |
| (...skipping 10 matching lines...) Expand all Loading... |
| 217 if err != nil { | 253 if err != nil { |
| 218 return nil, err | 254 return nil, err |
| 219 } | 255 } |
| 220 groups, err := e.setUpGroups(machineId) | 256 groups, err := e.setUpGroups(machineId) |
| 221 if err != nil { | 257 if err != nil { |
| 222 return nil, fmt.Errorf("cannot set up groups: %v", err) | 258 return nil, fmt.Errorf("cannot set up groups: %v", err) |
| 223 } | 259 } |
| 224 var instances *ec2.RunInstancesResp | 260 var instances *ec2.RunInstancesResp |
| 225 | 261 |
| 226 for a := shortAttempt.start(); a.next(); { | 262 for a := shortAttempt.start(); a.next(); { |
| 227 » » instances, err = e.ec2.RunInstances(&ec2.RunInstances{ | 263 » » instances, err = e.ec2().RunInstances(&ec2.RunInstances{ |
| 228 ImageId: image.ImageId, | 264 ImageId: image.ImageId, |
| 229 MinCount: 1, | 265 MinCount: 1, |
| 230 MaxCount: 1, | 266 MaxCount: 1, |
| 231 UserData: userData, | 267 UserData: userData, |
| 232 InstanceType: "m1.small", | 268 InstanceType: "m1.small", |
| 233 SecurityGroups: groups, | 269 SecurityGroups: groups, |
| 234 }) | 270 }) |
| 235 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { | 271 if err == nil || ec2ErrCode(err) != "InvalidGroup.NotFound" { |
| 236 break | 272 break |
| 237 } | 273 } |
| (...skipping 28 matching lines...) Expand all Loading... |
| 266 need = append(need, ids[i]) | 302 need = append(need, ids[i]) |
| 267 } | 303 } |
| 268 } | 304 } |
| 269 if len(need) == 0 { | 305 if len(need) == 0 { |
| 270 return nil | 306 return nil |
| 271 } | 307 } |
| 272 filter := ec2.NewFilter() | 308 filter := ec2.NewFilter() |
| 273 filter.Add("instance-state-name", "pending", "running") | 309 filter.Add("instance-state-name", "pending", "running") |
| 274 filter.Add("group-name", e.groupName()) | 310 filter.Add("group-name", e.groupName()) |
| 275 filter.Add("instance-id", need...) | 311 filter.Add("instance-id", need...) |
| 276 » resp, err := e.ec2.Instances(nil, filter) | 312 » resp, err := e.ec2().Instances(nil, filter) |
| 277 if err != nil { | 313 if err != nil { |
| 278 return err | 314 return err |
| 279 } | 315 } |
| 280 n := 0 | 316 n := 0 |
| 281 // For each requested id, add it to the returned instances | 317 // For each requested id, add it to the returned instances |
| 282 // if we find it in the response. | 318 // if we find it in the response. |
| 283 for i, id := range ids { | 319 for i, id := range ids { |
| 284 if insts[i] != nil { | 320 if insts[i] != nil { |
| 285 continue | 321 continue |
| 286 } | 322 } |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 329 } | 365 } |
| 330 return insts, nil | 366 return insts, nil |
| 331 } | 367 } |
| 332 | 368 |
| 333 func (e *environ) Destroy(insts []environs.Instance) error { | 369 func (e *environ) Destroy(insts []environs.Instance) error { |
| 334 log.Printf("environs/ec2: destroying environment %q", e.name) | 370 log.Printf("environs/ec2: destroying environment %q", e.name) |
| 335 // Try to find all the instances in the environ's group. | 371 // Try to find all the instances in the environ's group. |
| 336 filter := ec2.NewFilter() | 372 filter := ec2.NewFilter() |
| 337 filter.Add("instance-state-name", "pending", "running") | 373 filter.Add("instance-state-name", "pending", "running") |
| 338 filter.Add("group-name", e.groupName()) | 374 filter.Add("group-name", e.groupName()) |
| 339 » resp, err := e.ec2.Instances(nil, filter) | 375 » resp, err := e.ec2().Instances(nil, filter) |
| 340 if err != nil { | 376 if err != nil { |
| 341 return fmt.Errorf("cannot get instances: %v", err) | 377 return fmt.Errorf("cannot get instances: %v", err) |
| 342 } | 378 } |
| 343 var ids []string | 379 var ids []string |
| 344 found := make(map[string]bool) | 380 found := make(map[string]bool) |
| 345 for _, r := range resp.Reservations { | 381 for _, r := range resp.Reservations { |
| 346 for _, inst := range r.Instances { | 382 for _, inst := range r.Instances { |
| 347 ids = append(ids, inst.InstanceId) | 383 ids = append(ids, inst.InstanceId) |
| 348 found[inst.InstanceId] = true | 384 found[inst.InstanceId] = true |
| 349 } | 385 } |
| (...skipping 17 matching lines...) Expand all Loading... |
| 367 return err | 403 return err |
| 368 } | 404 } |
| 369 return nil | 405 return nil |
| 370 } | 406 } |
| 371 | 407 |
| 372 func (e *environ) terminateInstances(ids []string) error { | 408 func (e *environ) terminateInstances(ids []string) error { |
| 373 if len(ids) == 0 { | 409 if len(ids) == 0 { |
| 374 return nil | 410 return nil |
| 375 } | 411 } |
| 376 var err error | 412 var err error |
| 413 ec2 := e.ec2() |
| 377 for a := shortAttempt.start(); a.next(); { | 414 for a := shortAttempt.start(); a.next(); { |
| 378 » » _, err = e.ec2.TerminateInstances(ids) | 415 » » _, err = ec2.TerminateInstances(ids) |
| 379 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound"
{ | 416 if err == nil || ec2ErrCode(err) != "InvalidInstanceID.NotFound"
{ |
| 380 return err | 417 return err |
| 381 } | 418 } |
| 382 } | 419 } |
| 383 if len(ids) == 1 { | 420 if len(ids) == 1 { |
| 384 return err | 421 return err |
| 385 } | 422 } |
| 386 var firstErr error | 423 var firstErr error |
| 387 // If we get a NotFound error, it means that no instances have been | 424 // If we get a NotFound error, it means that no instances have been |
| 388 // terminated even if some exist, so try them one by one, ignoring | 425 // terminated even if some exist, so try them one by one, ignoring |
| 389 // NotFound errors. | 426 // NotFound errors. |
| 390 for _, id := range ids { | 427 for _, id := range ids { |
| 391 » » _, err = e.ec2.TerminateInstances([]string{id}) | 428 » » _, err = ec2.TerminateInstances([]string{id}) |
| 392 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { | 429 if ec2ErrCode(err) == "InvalidInstanceID.NotFound" { |
| 393 err = nil | 430 err = nil |
| 394 } | 431 } |
| 395 if err != nil && firstErr == nil { | 432 if err != nil && firstErr == nil { |
| 396 firstErr = err | 433 firstErr = err |
| 397 } | 434 } |
| 398 } | 435 } |
| 399 return firstErr | 436 return firstErr |
| 400 } | 437 } |
| 401 | 438 |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 435 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil | 472 return []ec2.SecurityGroup{jujuGroup, jujuMachineGroup}, nil |
| 436 } | 473 } |
| 437 | 474 |
| 438 // zeroGroup holds the zero security group. | 475 // zeroGroup holds the zero security group. |
| 439 var zeroGroup ec2.SecurityGroup | 476 var zeroGroup ec2.SecurityGroup |
| 440 | 477 |
| 441 // ensureGroup returns the security group with name and perms. | 478 // ensureGroup returns the security group with name and perms. |
| 442 // If a group with name does not exist, one will be created. | 479 // If a group with name does not exist, one will be created. |
| 443 // If it exists, its permissions are set to perms. | 480 // If it exists, its permissions are set to perms. |
| 444 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr
oup, err error) { | 481 func (e *environ) ensureGroup(name string, perms []ec2.IPPerm) (g ec2.SecurityGr
oup, err error) { |
| 445 » resp, err := e.ec2.CreateSecurityGroup(name, "juju group") | 482 » ectwo := e.ec2() |
| 483 » resp, err := ectwo.CreateSecurityGroup(name, "juju group") |
| 446 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { | 484 if err != nil && ec2ErrCode(err) != "InvalidGroup.Duplicate" { |
| 447 return zeroGroup, err | 485 return zeroGroup, err |
| 448 } | 486 } |
| 449 | 487 |
| 450 want := newPermSet(perms) | 488 want := newPermSet(perms) |
| 451 var have permSet | 489 var have permSet |
| 452 if err == nil { | 490 if err == nil { |
| 453 g = resp.SecurityGroup | 491 g = resp.SecurityGroup |
| 454 } else { | 492 } else { |
| 455 » » resp, err := e.ec2.SecurityGroups(ec2.SecurityGroupNames(name),
nil) | 493 » » resp, err := ectwo.SecurityGroups(ec2.SecurityGroupNames(name),
nil) |
| 456 if err != nil { | 494 if err != nil { |
| 457 return zeroGroup, err | 495 return zeroGroup, err |
| 458 } | 496 } |
| 459 // It's possible that the old group has the wrong | 497 // It's possible that the old group has the wrong |
| 460 // description here, but if it does it's probably due | 498 // description here, but if it does it's probably due |
| 461 // to something deliberately playing games with juju, | 499 // to something deliberately playing games with juju, |
| 462 // so we ignore it. | 500 // so we ignore it. |
| 463 have = newPermSet(resp.Groups[0].IPPerms) | 501 have = newPermSet(resp.Groups[0].IPPerms) |
| 464 g = resp.Groups[0].SecurityGroup | 502 g = resp.Groups[0].SecurityGroup |
| 465 } | 503 } |
| 466 revoke := make(permSet) | 504 revoke := make(permSet) |
| 467 for p := range have { | 505 for p := range have { |
| 468 if !want[p] { | 506 if !want[p] { |
| 469 revoke[p] = true | 507 revoke[p] = true |
| 470 } | 508 } |
| 471 } | 509 } |
| 472 if len(revoke) > 0 { | 510 if len(revoke) > 0 { |
| 473 » » _, err := e.ec2.RevokeSecurityGroup(g, revoke.ipPerms()) | 511 » » _, err := ectwo.RevokeSecurityGroup(g, revoke.ipPerms()) |
| 474 if err != nil { | 512 if err != nil { |
| 475 return zeroGroup, fmt.Errorf("cannot revoke security gro
up: %v", err) | 513 return zeroGroup, fmt.Errorf("cannot revoke security gro
up: %v", err) |
| 476 } | 514 } |
| 477 } | 515 } |
| 478 | 516 |
| 479 add := make(permSet) | 517 add := make(permSet) |
| 480 for p := range want { | 518 for p := range want { |
| 481 if !have[p] { | 519 if !have[p] { |
| 482 add[p] = true | 520 add[p] = true |
| 483 } | 521 } |
| 484 } | 522 } |
| 485 if len(add) > 0 { | 523 if len(add) > 0 { |
| 486 » » _, err := e.ec2.AuthorizeSecurityGroup(g, add.ipPerms()) | 524 » » _, err := ectwo.AuthorizeSecurityGroup(g, add.ipPerms()) |
| 487 if err != nil { | 525 if err != nil { |
| 488 return zeroGroup, fmt.Errorf("cannot authorize securityG
roup: %v", err) | 526 return zeroGroup, fmt.Errorf("cannot authorize securityG
roup: %v", err) |
| 489 } | 527 } |
| 490 } | 528 } |
| 491 return g, nil | 529 return g, nil |
| 492 } | 530 } |
| 493 | 531 |
| 494 // permKey represents a permission for a group or an ip address range | 532 // permKey represents a permission for a group or an ip address range |
| 495 // to access the given range of ports. Only one of groupId or ipAddr | 533 // to access the given range of ports. Only one of groupId or ipAddr |
| 496 // should be non-empty. | 534 // should be non-empty. |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 551 | 589 |
| 552 // If the err is of type *ec2.Error, ec2ErrCode returns | 590 // If the err is of type *ec2.Error, ec2ErrCode returns |
| 553 // its code, otherwise it returns the empty string. | 591 // its code, otherwise it returns the empty string. |
| 554 func ec2ErrCode(err error) string { | 592 func ec2ErrCode(err error) string { |
| 555 ec2err, _ := err.(*ec2.Error) | 593 ec2err, _ := err.(*ec2.Error) |
| 556 if ec2err == nil { | 594 if ec2err == nil { |
| 557 return "" | 595 return "" |
| 558 } | 596 } |
| 559 return ec2err.Code | 597 return ec2err.Code |
| 560 } | 598 } |
| OLD | NEW |