Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 package ec2 | 1 package ec2 |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "io" | 5 "io" |
6 "launchpad.net/goamz/s3" | 6 "launchpad.net/goamz/s3" |
7 "launchpad.net/juju/go/environs" | 7 "launchpad.net/juju/go/environs" |
8 "sync" | 8 "sync" |
9 "time" | |
9 ) | 10 ) |
10 | 11 |
11 // storage implements environs.Storage on | 12 // storage implements environs.Storage on |
12 // an ec2.bucket. | 13 // an ec2.bucket. |
13 type storage struct { | 14 type storage struct { |
14 » // bucketMutex protects the visibility of buckedUnlocked | 15 » // bucketMutex protects seralises calls to· |
15 » // and madeBucket. | 16 » // bucket.PutBucket. |
16 » bucketMutex sync.Mutex | 17 » bucketMutex sync.Mutex |
17 » bucketUnlocked *s3.Bucket | 18 » bucket *s3.Bucket |
18 » madeBucket bool | |
19 } | |
20 | |
21 func (s *storage) bucket() *s3.Bucket { | |
22 » s.bucketMutex.Lock() | |
23 » defer s.bucketMutex.Unlock() | |
24 » return s.bucketUnlocked | |
25 } | |
26 | |
27 func (s *storage) setBucket(bucket *s3.Bucket) { | |
28 » s.bucketMutex.Lock() | |
29 » defer s.bucketMutex.Unlock() | |
30 » s.bucketUnlocked = bucket | |
31 » s.madeBucket = false | |
32 } | 19 } |
33 | 20 |
34 func (s *storage) Put(file string, r io.Reader, length int64) error { | 21 func (s *storage) Put(file string, r io.Reader, length int64) error { |
35 if err := s.makeBucket(); err != nil { | 22 if err := s.makeBucket(); err != nil { |
36 return fmt.Errorf("ec2: cannot make S3 control bucket: %v", err) | 23 return fmt.Errorf("ec2: cannot make S3 control bucket: %v", err) |
37 } | 24 } |
38 » err := s.bucket().PutReader(file, r, length, "binary/octet-stream", s3.P rivate) | 25 » err := s.bucket.PutReader(file, r, length, "binary/octet-stream", s3.Pri vate) |
39 if err != nil { | 26 if err != nil { |
40 return fmt.Errorf("ec2: cannot write file %q to control bucket: %v", file, err) | 27 return fmt.Errorf("ec2: cannot write file %q to control bucket: %v", file, err) |
41 } | 28 } |
42 return nil | 29 return nil |
43 } | 30 } |
44 | 31 |
45 // makeBucket makes the environent's control bucket, the | 32 // makeBucket makes the environent's control bucket, the |
46 // place where bootstrap information and deployed charms | 33 // place where bootstrap information and deployed charms |
47 // are stored. To avoid two round trips on every PUT operation, | 34 // are stored. To avoid two round trips on every PUT operation, |
48 // we do this only once for each environ. | 35 // we do this only once for each environ. |
49 func (s *storage) makeBucket() error { | 36 func (s *storage) makeBucket() error { |
50 // try to make the bucket - PutBucket will succeed if the | 37 // try to make the bucket - PutBucket will succeed if the |
51 // bucket already exists. | 38 // bucket already exists. |
52 s.bucketMutex.Lock() | 39 s.bucketMutex.Lock() |
53 defer s.bucketMutex.Unlock() | 40 defer s.bucketMutex.Unlock() |
rog
2012/05/25 08:07:22
if there's no madeBucket, then there's no need for
niemeyer
2012/05/25 19:08:02
Indeed, no need for the method either.
| |
54 | 41 » return s.bucket.PutBucket(s3.Private) |
55 » // we already hold bucketMutex so calling through s.bucket()· | |
56 » // will deadlock.· | |
57 » err := s.bucketUnlocked.PutBucket(s3.Private) | |
58 » if err == nil { | |
59 » » s.madeBucket = true | |
60 » } | |
61 » return err | |
62 } | 42 } |
63 | 43 |
64 func (s *storage) Get(file string) (r io.ReadCloser, err error) { | 44 func (s *storage) Get(file string) (r io.ReadCloser, err error) { |
65 for a := shortAttempt.start(); a.next(); { | 45 for a := shortAttempt.start(); a.next(); { |
66 » » r, err = s.bucket().GetReader(file) | 46 » » r, err = s.bucket.GetReader(file) |
67 if s3ErrorStatusCode(err) == 404 { | 47 if s3ErrorStatusCode(err) == 404 { |
68 continue | 48 continue |
69 } | 49 } |
70 return | 50 return |
71 } | 51 } |
72 return r, maybeNotFound(err) | 52 return r, maybeNotFound(err) |
73 } | 53 } |
74 | 54 |
75 func (s *storage) URL(name string) (string, error) { | 55 func (s *storage) URL(name string) (string, error) { |
76 » return s.bucket().URL(name), nil | 56 » // 10 years should be good enough. |
57 » return s.bucket.SignedURL(name, time.Now().AddDate(1, 0, 0)), nil | |
77 } | 58 } |
78 | 59 |
79 // s3ErrorStatusCode returns the HTTP status of the S3 request error, | 60 // s3ErrorStatusCode returns the HTTP status of the S3 request error, |
80 // if it is an error from an S3 operation, or 0 if it was not. | 61 // if it is an error from an S3 operation, or 0 if it was not. |
81 func s3ErrorStatusCode(err error) int { | 62 func s3ErrorStatusCode(err error) int { |
82 if err, _ := err.(*s3.Error); err != nil { | 63 if err, _ := err.(*s3.Error); err != nil { |
83 return err.StatusCode | 64 return err.StatusCode |
84 } | 65 } |
85 return 0 | 66 return 0 |
86 } | 67 } |
87 | 68 |
88 func (s *storage) Remove(file string) error { | 69 func (s *storage) Remove(file string) error { |
89 » err := s.bucket().Del(file) | 70 » err := s.bucket.Del(file) |
90 // If we can't delete the object because the bucket doesn't | 71 // If we can't delete the object because the bucket doesn't |
91 // exist, then we don't care. | 72 // exist, then we don't care. |
92 if s3ErrorStatusCode(err) == 404 { | 73 if s3ErrorStatusCode(err) == 404 { |
93 return nil | 74 return nil |
94 } | 75 } |
95 return err | 76 return err |
96 } | 77 } |
97 | 78 |
98 func (s *storage) List(prefix string) ([]string, error) { | 79 func (s *storage) List(prefix string) ([]string, error) { |
99 // TODO cope with more than 1000 objects in the bucket. | 80 // TODO cope with more than 1000 objects in the bucket. |
100 » resp, err := s.bucket().List(prefix, "", "", 0) | 81 » resp, err := s.bucket.List(prefix, "", "", 0) |
101 if err != nil { | 82 if err != nil { |
102 return nil, maybeNotFound(err) | 83 return nil, maybeNotFound(err) |
103 } | 84 } |
104 var names []string | 85 var names []string |
105 for _, key := range resp.Contents { | 86 for _, key := range resp.Contents { |
106 names = append(names, key.Key) | 87 names = append(names, key.Key) |
107 } | 88 } |
108 return names, nil | 89 return names, nil |
109 } | 90 } |
110 | 91 |
111 func (e *environ) Storage() environs.Storage { | 92 func (s *storage) deleteAll() error { |
112 » return &e.storage | 93 » names, err := s.List("") |
94 » if err != nil { | |
95 » » if _, ok := err.(*environs.NotFoundError); ok { | |
96 » » » return nil | |
97 » » } | |
98 » » return err | |
99 » } | |
100 » // Remove all the objects in parallel so that we incur less round-trips. | |
101 » // If we're in danger of having hundreds of objects, | |
102 » // we'll want to change this to limit the number | |
103 » // of concurrent operations. | |
104 » var wg sync.WaitGroup | |
105 » wg.Add(len(names)) | |
106 » errc := make(chan error, len(names)) | |
107 » for _, name := range names { | |
108 » » name := name | |
109 » » go func() { | |
110 » » » if err := s.Remove(name); err != nil { | |
111 » » » » errc <- err | |
112 » » » } | |
113 » » » wg.Done() | |
114 » » }() | |
115 » } | |
116 » wg.Wait() | |
117 » select { | |
118 » case err := <-errc: | |
119 » » return fmt.Errorf("cannot delete all provider state: %v", err) | |
120 » default: | |
121 » } | |
122 | |
123 » return s.bucket.DelBucket() | |
113 } | 124 } |
114 | |
115 func (e *environ) PublicStorage() environs.StorageReader { | |
116 // TODO use public storage bucket | |
117 return environs.EmptyStorage | |
118 } | |
LEFT | RIGHT |