Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(12)

Delta Between Two Patch Sets: provider/ec2/storage.go

Issue 13632056: Reduce unnecessary s3 timeouts (Closed)
Left Patch Set: Created 11 years, 6 months ago
Right Patch Set: Reduce unnecessary s3 timeouts Created 11 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
LEFTRIGHT
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package ec2 4 package ec2
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "io" 8 "io"
9 "net"
9 "sync" 10 "sync"
10 "time" 11 "time"
11 12
12 "launchpad.net/goamz/s3" 13 "launchpad.net/goamz/s3"
13 14
14 » "launchpad.net/juju-core/environs" 15 » "launchpad.net/juju-core/environs/storage"
15 "launchpad.net/juju-core/errors" 16 "launchpad.net/juju-core/errors"
16 "launchpad.net/juju-core/utils" 17 "launchpad.net/juju-core/utils"
17 "net"
thumper 2013/09/18 02:50:55 this should go with the top block of standard libr
wallyworld 2013/09/18 07:29:16 Done.
18 ) 18 )
19 19
20 func init() { 20 func init() {
21 // We will decide when to retry and under what circumstances, not s3. 21 // We will decide when to retry and under what circumstances, not s3.
22 // Sometimes it is expected a file may not exist and we don't want s3 22 // Sometimes it is expected a file may not exist and we don't want s3
23 // to hold things up by unilaterally deciding to retry for no good reaso n. 23 // to hold things up by unilaterally deciding to retry for no good reaso n.
24 s3.RetryAttempts(false) 24 s3.RetryAttempts(false)
25 } 25 }
26 26
27 func NewStorage(bucket *s3.Bucket) environs.Storage { 27 func NewStorage(bucket *s3.Bucket) storage.Storage {
28 » return &storage{bucket: bucket} 28 » return &ec2storage{bucket: bucket}
29 } 29 }
30 30
31 // storage implements environs.Storage on 31 // ec2storage implements storage.Storage on
32 // an ec2.bucket. 32 // an ec2.bucket.
33 type storage struct { 33 type ec2storage struct {
34 sync.Mutex 34 sync.Mutex
35 madeBucket bool 35 madeBucket bool
36 bucket *s3.Bucket 36 bucket *s3.Bucket
37 } 37 }
38 38
39 // makeBucket makes the environent's control bucket, the 39 // makeBucket makes the environent's control bucket, the
40 // place where bootstrap information and deployed charms 40 // place where bootstrap information and deployed charms
41 // are stored. To avoid two round trips on every PUT operation, 41 // are stored. To avoid two round trips on every PUT operation,
42 // we do this only once for each environ. 42 // we do this only once for each environ.
43 func (s *storage) makeBucket() error { 43 func (s *ec2storage) makeBucket() error {
44 s.Lock() 44 s.Lock()
45 defer s.Unlock() 45 defer s.Unlock()
46 if s.madeBucket { 46 if s.madeBucket {
47 return nil 47 return nil
48 } 48 }
49 // PutBucket always return a 200 if we recreate an existing bucket for t he 49 // PutBucket always return a 200 if we recreate an existing bucket for t he
50 // original s3.amazonaws.com endpoint. For all other endpoints PutBucket 50 // original s3.amazonaws.com endpoint. For all other endpoints PutBucket
51 // returns 409 with a known subcode. 51 // returns 409 with a known subcode.
52 if err := s.bucket.PutBucket(s3.Private); err != nil && s3ErrCode(err) ! = "BucketAlreadyOwnedByYou" { 52 if err := s.bucket.PutBucket(s3.Private); err != nil && s3ErrCode(err) ! = "BucketAlreadyOwnedByYou" {
53 return err 53 return err
54 } 54 }
55 55
56 s.madeBucket = true 56 s.madeBucket = true
57 return nil 57 return nil
58 } 58 }
59 59
60 func (s *storage) Put(file string, r io.Reader, length int64) error { 60 func (s *ec2storage) Put(file string, r io.Reader, length int64) error {
61 if err := s.makeBucket(); err != nil { 61 if err := s.makeBucket(); err != nil {
62 return fmt.Errorf("cannot make S3 control bucket: %v", err) 62 return fmt.Errorf("cannot make S3 control bucket: %v", err)
63 } 63 }
64 err := s.bucket.PutReader(file, r, length, "binary/octet-stream", s3.Pri vate) 64 err := s.bucket.PutReader(file, r, length, "binary/octet-stream", s3.Pri vate)
65 if err != nil { 65 if err != nil {
66 return fmt.Errorf("cannot write file %q to control bucket: %v", file, err) 66 return fmt.Errorf("cannot write file %q to control bucket: %v", file, err)
67 } 67 }
68 return nil 68 return nil
69 } 69 }
70 70
71 func (s *storage) Get(file string) (r io.ReadCloser, err error) { 71 func (s *ec2storage) Get(file string) (r io.ReadCloser, err error) {
72 r, err = s.bucket.GetReader(file) 72 r, err = s.bucket.GetReader(file)
73 return r, maybeNotFound(err) 73 return r, maybeNotFound(err)
74 } 74 }
75 75
76 func (s *storage) URL(name string) (string, error) { 76 func (s *ec2storage) URL(name string) (string, error) {
77 // 10 years should be good enough. 77 // 10 years should be good enough.
78 return s.bucket.SignedURL(name, time.Now().AddDate(10, 0, 0)), nil 78 return s.bucket.SignedURL(name, time.Now().AddDate(10, 0, 0)), nil
79 } 79 }
80 80
81 var storageAttempt = utils.AttemptStrategy{ 81 var storageAttempt = utils.AttemptStrategy{
82 Total: 5 * time.Second, 82 Total: 5 * time.Second,
83 Delay: 200 * time.Millisecond, 83 Delay: 200 * time.Millisecond,
84 } 84 }
85 85
86 // ConsistencyStrategy is specified in the StorageReader interface. 86 // ConsistencyStrategy is specified in the StorageReader interface.
87 func (s *storage) DefaultConsistencyStrategy() utils.AttemptStrategy { 87 func (s *ec2storage) DefaultConsistencyStrategy() utils.AttemptStrategy {
88 return storageAttempt 88 return storageAttempt
89 } 89 }
90 90
91 // ShouldRetry is specified in the StorageReader interface. 91 // ShouldRetry is specified in the StorageReader interface.
92 func (s *storage) ShouldRetry(err error) bool { 92 func (s *ec2storage) ShouldRetry(err error) bool {
93 if err == nil { 93 if err == nil {
94 return false 94 return false
95 } 95 }
96 switch err { 96 switch err {
97 case io.ErrUnexpectedEOF, io.EOF: 97 case io.ErrUnexpectedEOF, io.EOF:
98 return true 98 return true
99 } 99 }
100 if s3ErrorStatusCode(err) == 404 { 100 if s3ErrorStatusCode(err) == 404 {
101 return true 101 return true
102 } 102 }
(...skipping 24 matching lines...) Expand all
127 } 127 }
128 128
129 // s3ErrCode returns the text status code of the S3 error code. 129 // s3ErrCode returns the text status code of the S3 error code.
130 func s3ErrCode(err error) string { 130 func s3ErrCode(err error) string {
131 if err, ok := err.(*s3.Error); ok { 131 if err, ok := err.(*s3.Error); ok {
132 return err.Code 132 return err.Code
133 } 133 }
134 return "" 134 return ""
135 } 135 }
136 136
137 func (s *storage) Remove(file string) error { 137 func (s *ec2storage) Remove(file string) error {
138 err := s.bucket.Del(file) 138 err := s.bucket.Del(file)
139 // If we can't delete the object because the bucket doesn't 139 // If we can't delete the object because the bucket doesn't
140 // exist, then we don't care. 140 // exist, then we don't care.
141 if s3ErrorStatusCode(err) == 404 { 141 if s3ErrorStatusCode(err) == 404 {
142 return nil 142 return nil
143 } 143 }
144 return err 144 return err
145 } 145 }
146 146
147 func (s *storage) List(prefix string) ([]string, error) { 147 func (s *ec2storage) List(prefix string) ([]string, error) {
148 // TODO cope with more than 1000 objects in the bucket. 148 // TODO cope with more than 1000 objects in the bucket.
149 resp, err := s.bucket.List(prefix, "", "", 0) 149 resp, err := s.bucket.List(prefix, "", "", 0)
150 if err != nil { 150 if err != nil {
151 // If the bucket is not found, it's not an error 151 // If the bucket is not found, it's not an error
152 // because it's only created when the first 152 // because it's only created when the first
153 // file is put. 153 // file is put.
154 if s3ErrorStatusCode(err) == 404 { 154 if s3ErrorStatusCode(err) == 404 {
155 return nil, nil 155 return nil, nil
156 } 156 }
157 return nil, err 157 return nil, err
158 } 158 }
159 var names []string 159 var names []string
160 for _, key := range resp.Contents { 160 for _, key := range resp.Contents {
161 names = append(names, key.Key) 161 names = append(names, key.Key)
162 } 162 }
163 return names, nil 163 return names, nil
164 } 164 }
165 165
166 func (s *storage) RemoveAll() error { 166 func (s *ec2storage) RemoveAll() error {
167 » names, err := environs.DefaultList(s, "") 167 » names, err := storage.ListWithDefaultRetry(s, "")
168 if err != nil { 168 if err != nil {
169 return err 169 return err
170 } 170 }
171 // Remove all the objects in parallel to minimize round-trips. 171 // Remove all the objects in parallel to minimize round-trips.
172 // If we're in danger of having hundreds of objects, 172 // If we're in danger of having hundreds of objects,
173 // we'll want to change this to limit the number 173 // we'll want to change this to limit the number
174 // of concurrent operations. 174 // of concurrent operations.
175 var wg sync.WaitGroup 175 var wg sync.WaitGroup
176 wg.Add(len(names)) 176 wg.Add(len(names))
177 errc := make(chan error, len(names)) 177 errc := make(chan error, len(names))
(...skipping 19 matching lines...) Expand all
197 // might have succeeded even if we get an error. 197 // might have succeeded even if we get an error.
198 s.madeBucket = false 198 s.madeBucket = false
199 err = deleteBucket(s) 199 err = deleteBucket(s)
200 err = s.bucket.DelBucket() 200 err = s.bucket.DelBucket()
201 if s3ErrorStatusCode(err) == 404 { 201 if s3ErrorStatusCode(err) == 404 {
202 return nil 202 return nil
203 } 203 }
204 return err 204 return err
205 } 205 }
206 206
207 func deleteBucket(s *storage) (err error) { 207 func deleteBucket(s *ec2storage) (err error) {
208 for a := s.DefaultConsistencyStrategy().Start(); a.Next(); { 208 for a := s.DefaultConsistencyStrategy().Start(); a.Next(); {
209 err = s.bucket.DelBucket() 209 err = s.bucket.DelBucket()
210 if err == nil || !s.ShouldRetry(err) { 210 if err == nil || !s.ShouldRetry(err) {
211 break 211 break
212 } 212 }
213 } 213 }
214 return err 214 return err
215 } 215 }
216 216
217 func maybeNotFound(err error) error { 217 func maybeNotFound(err error) error {
218 if err != nil && s3ErrorStatusCode(err) == 404 { 218 if err != nil && s3ErrorStatusCode(err) == 404 {
219 return errors.NewNotFoundError(err, "") 219 return errors.NewNotFoundError(err, "")
220 } 220 }
221 return err 221 return err
222 } 222 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b