Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package ec2 | 4 package ec2 |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "io" | 8 "io" |
9 "net" | |
9 "sync" | 10 "sync" |
10 "time" | 11 "time" |
11 | 12 |
12 "launchpad.net/goamz/s3" | 13 "launchpad.net/goamz/s3" |
13 | 14 |
14 » "launchpad.net/juju-core/environs" | 15 » "launchpad.net/juju-core/environs/storage" |
15 "launchpad.net/juju-core/errors" | 16 "launchpad.net/juju-core/errors" |
16 "launchpad.net/juju-core/utils" | 17 "launchpad.net/juju-core/utils" |
17 "net" | |
thumper
2013/09/18 02:50:55
this should go with the top block of standard libr
wallyworld
2013/09/18 07:29:16
Done.
| |
18 ) | 18 ) |
19 | 19 |
20 func init() { | 20 func init() { |
21 // We will decide when to retry and under what circumstances, not s3. | 21 // We will decide when to retry and under what circumstances, not s3. |
22 // Sometimes it is expected a file may not exist and we don't want s3 | 22 // Sometimes it is expected a file may not exist and we don't want s3 |
23 // to hold things up by unilaterally deciding to retry for no good reaso n. | 23 // to hold things up by unilaterally deciding to retry for no good reaso n. |
24 s3.RetryAttempts(false) | 24 s3.RetryAttempts(false) |
25 } | 25 } |
26 | 26 |
27 func NewStorage(bucket *s3.Bucket) environs.Storage { | 27 func NewStorage(bucket *s3.Bucket) storage.Storage { |
28 » return &storage{bucket: bucket} | 28 » return &ec2storage{bucket: bucket} |
29 } | 29 } |
30 | 30 |
31 // storage implements environs.Storage on | 31 // ec2storage implements storage.Storage on |
32 // an ec2.bucket. | 32 // an ec2.bucket. |
33 type storage struct { | 33 type ec2storage struct { |
34 sync.Mutex | 34 sync.Mutex |
35 madeBucket bool | 35 madeBucket bool |
36 bucket *s3.Bucket | 36 bucket *s3.Bucket |
37 } | 37 } |
38 | 38 |
39 // makeBucket makes the environent's control bucket, the | 39 // makeBucket makes the environent's control bucket, the |
40 // place where bootstrap information and deployed charms | 40 // place where bootstrap information and deployed charms |
41 // are stored. To avoid two round trips on every PUT operation, | 41 // are stored. To avoid two round trips on every PUT operation, |
42 // we do this only once for each environ. | 42 // we do this only once for each environ. |
43 func (s *storage) makeBucket() error { | 43 func (s *ec2storage) makeBucket() error { |
44 s.Lock() | 44 s.Lock() |
45 defer s.Unlock() | 45 defer s.Unlock() |
46 if s.madeBucket { | 46 if s.madeBucket { |
47 return nil | 47 return nil |
48 } | 48 } |
49 // PutBucket always return a 200 if we recreate an existing bucket for t he | 49 // PutBucket always return a 200 if we recreate an existing bucket for t he |
50 // original s3.amazonaws.com endpoint. For all other endpoints PutBucket | 50 // original s3.amazonaws.com endpoint. For all other endpoints PutBucket |
51 // returns 409 with a known subcode. | 51 // returns 409 with a known subcode. |
52 if err := s.bucket.PutBucket(s3.Private); err != nil && s3ErrCode(err) ! = "BucketAlreadyOwnedByYou" { | 52 if err := s.bucket.PutBucket(s3.Private); err != nil && s3ErrCode(err) ! = "BucketAlreadyOwnedByYou" { |
53 return err | 53 return err |
54 } | 54 } |
55 | 55 |
56 s.madeBucket = true | 56 s.madeBucket = true |
57 return nil | 57 return nil |
58 } | 58 } |
59 | 59 |
60 func (s *storage) Put(file string, r io.Reader, length int64) error { | 60 func (s *ec2storage) Put(file string, r io.Reader, length int64) error { |
61 if err := s.makeBucket(); err != nil { | 61 if err := s.makeBucket(); err != nil { |
62 return fmt.Errorf("cannot make S3 control bucket: %v", err) | 62 return fmt.Errorf("cannot make S3 control bucket: %v", err) |
63 } | 63 } |
64 err := s.bucket.PutReader(file, r, length, "binary/octet-stream", s3.Pri vate) | 64 err := s.bucket.PutReader(file, r, length, "binary/octet-stream", s3.Pri vate) |
65 if err != nil { | 65 if err != nil { |
66 return fmt.Errorf("cannot write file %q to control bucket: %v", file, err) | 66 return fmt.Errorf("cannot write file %q to control bucket: %v", file, err) |
67 } | 67 } |
68 return nil | 68 return nil |
69 } | 69 } |
70 | 70 |
71 func (s *storage) Get(file string) (r io.ReadCloser, err error) { | 71 func (s *ec2storage) Get(file string) (r io.ReadCloser, err error) { |
72 r, err = s.bucket.GetReader(file) | 72 r, err = s.bucket.GetReader(file) |
73 return r, maybeNotFound(err) | 73 return r, maybeNotFound(err) |
74 } | 74 } |
75 | 75 |
76 func (s *storage) URL(name string) (string, error) { | 76 func (s *ec2storage) URL(name string) (string, error) { |
77 // 10 years should be good enough. | 77 // 10 years should be good enough. |
78 return s.bucket.SignedURL(name, time.Now().AddDate(10, 0, 0)), nil | 78 return s.bucket.SignedURL(name, time.Now().AddDate(10, 0, 0)), nil |
79 } | 79 } |
80 | 80 |
81 var storageAttempt = utils.AttemptStrategy{ | 81 var storageAttempt = utils.AttemptStrategy{ |
82 Total: 5 * time.Second, | 82 Total: 5 * time.Second, |
83 Delay: 200 * time.Millisecond, | 83 Delay: 200 * time.Millisecond, |
84 } | 84 } |
85 | 85 |
86 // ConsistencyStrategy is specified in the StorageReader interface. | 86 // ConsistencyStrategy is specified in the StorageReader interface. |
87 func (s *storage) DefaultConsistencyStrategy() utils.AttemptStrategy { | 87 func (s *ec2storage) DefaultConsistencyStrategy() utils.AttemptStrategy { |
88 return storageAttempt | 88 return storageAttempt |
89 } | 89 } |
90 | 90 |
91 // ShouldRetry is specified in the StorageReader interface. | 91 // ShouldRetry is specified in the StorageReader interface. |
92 func (s *storage) ShouldRetry(err error) bool { | 92 func (s *ec2storage) ShouldRetry(err error) bool { |
93 if err == nil { | 93 if err == nil { |
94 return false | 94 return false |
95 } | 95 } |
96 switch err { | 96 switch err { |
97 case io.ErrUnexpectedEOF, io.EOF: | 97 case io.ErrUnexpectedEOF, io.EOF: |
98 return true | 98 return true |
99 } | 99 } |
100 if s3ErrorStatusCode(err) == 404 { | 100 if s3ErrorStatusCode(err) == 404 { |
101 return true | 101 return true |
102 } | 102 } |
(...skipping 24 matching lines...) Expand all Loading... | |
127 } | 127 } |
128 | 128 |
129 // s3ErrCode returns the text status code of the S3 error code. | 129 // s3ErrCode returns the text status code of the S3 error code. |
130 func s3ErrCode(err error) string { | 130 func s3ErrCode(err error) string { |
131 if err, ok := err.(*s3.Error); ok { | 131 if err, ok := err.(*s3.Error); ok { |
132 return err.Code | 132 return err.Code |
133 } | 133 } |
134 return "" | 134 return "" |
135 } | 135 } |
136 | 136 |
137 func (s *storage) Remove(file string) error { | 137 func (s *ec2storage) Remove(file string) error { |
138 err := s.bucket.Del(file) | 138 err := s.bucket.Del(file) |
139 // If we can't delete the object because the bucket doesn't | 139 // If we can't delete the object because the bucket doesn't |
140 // exist, then we don't care. | 140 // exist, then we don't care. |
141 if s3ErrorStatusCode(err) == 404 { | 141 if s3ErrorStatusCode(err) == 404 { |
142 return nil | 142 return nil |
143 } | 143 } |
144 return err | 144 return err |
145 } | 145 } |
146 | 146 |
147 func (s *storage) List(prefix string) ([]string, error) { | 147 func (s *ec2storage) List(prefix string) ([]string, error) { |
148 // TODO cope with more than 1000 objects in the bucket. | 148 // TODO cope with more than 1000 objects in the bucket. |
149 resp, err := s.bucket.List(prefix, "", "", 0) | 149 resp, err := s.bucket.List(prefix, "", "", 0) |
150 if err != nil { | 150 if err != nil { |
151 // If the bucket is not found, it's not an error | 151 // If the bucket is not found, it's not an error |
152 // because it's only created when the first | 152 // because it's only created when the first |
153 // file is put. | 153 // file is put. |
154 if s3ErrorStatusCode(err) == 404 { | 154 if s3ErrorStatusCode(err) == 404 { |
155 return nil, nil | 155 return nil, nil |
156 } | 156 } |
157 return nil, err | 157 return nil, err |
158 } | 158 } |
159 var names []string | 159 var names []string |
160 for _, key := range resp.Contents { | 160 for _, key := range resp.Contents { |
161 names = append(names, key.Key) | 161 names = append(names, key.Key) |
162 } | 162 } |
163 return names, nil | 163 return names, nil |
164 } | 164 } |
165 | 165 |
166 func (s *storage) RemoveAll() error { | 166 func (s *ec2storage) RemoveAll() error { |
167 » names, err := environs.DefaultList(s, "") | 167 » names, err := storage.ListWithDefaultRetry(s, "") |
168 if err != nil { | 168 if err != nil { |
169 return err | 169 return err |
170 } | 170 } |
171 // Remove all the objects in parallel to minimize round-trips. | 171 // Remove all the objects in parallel to minimize round-trips. |
172 // If we're in danger of having hundreds of objects, | 172 // If we're in danger of having hundreds of objects, |
173 // we'll want to change this to limit the number | 173 // we'll want to change this to limit the number |
174 // of concurrent operations. | 174 // of concurrent operations. |
175 var wg sync.WaitGroup | 175 var wg sync.WaitGroup |
176 wg.Add(len(names)) | 176 wg.Add(len(names)) |
177 errc := make(chan error, len(names)) | 177 errc := make(chan error, len(names)) |
(...skipping 19 matching lines...) Expand all Loading... | |
197 // might have succeeded even if we get an error. | 197 // might have succeeded even if we get an error. |
198 s.madeBucket = false | 198 s.madeBucket = false |
199 err = deleteBucket(s) | 199 err = deleteBucket(s) |
200 err = s.bucket.DelBucket() | 200 err = s.bucket.DelBucket() |
201 if s3ErrorStatusCode(err) == 404 { | 201 if s3ErrorStatusCode(err) == 404 { |
202 return nil | 202 return nil |
203 } | 203 } |
204 return err | 204 return err |
205 } | 205 } |
206 | 206 |
207 func deleteBucket(s *storage) (err error) { | 207 func deleteBucket(s *ec2storage) (err error) { |
208 for a := s.DefaultConsistencyStrategy().Start(); a.Next(); { | 208 for a := s.DefaultConsistencyStrategy().Start(); a.Next(); { |
209 err = s.bucket.DelBucket() | 209 err = s.bucket.DelBucket() |
210 if err == nil || !s.ShouldRetry(err) { | 210 if err == nil || !s.ShouldRetry(err) { |
211 break | 211 break |
212 } | 212 } |
213 } | 213 } |
214 return err | 214 return err |
215 } | 215 } |
216 | 216 |
217 func maybeNotFound(err error) error { | 217 func maybeNotFound(err error) error { |
218 if err != nil && s3ErrorStatusCode(err) == 404 { | 218 if err != nil && s3ErrorStatusCode(err) == 404 { |
219 return errors.NewNotFoundError(err, "") | 219 return errors.NewNotFoundError(err, "") |
220 } | 220 } |
221 return err | 221 return err |
222 } | 222 } |
LEFT | RIGHT |