OLD | NEW |
1 // Copyright 2013 Canonical Ltd. | 1 // Copyright 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package maas | 4 package maas |
5 | 5 |
6 import ( | 6 import ( |
7 "bytes" | 7 "bytes" |
8 "encoding/base64" | 8 "encoding/base64" |
9 "fmt" | 9 "fmt" |
10 "io" | 10 "io" |
11 "io/ioutil" | 11 "io/ioutil" |
12 "net/url" | 12 "net/url" |
13 "sort" | 13 "sort" |
14 "sync" | 14 "sync" |
15 | 15 |
16 "launchpad.net/gomaasapi" | 16 "launchpad.net/gomaasapi" |
17 | 17 |
18 » "launchpad.net/juju-core/environs" | 18 » "launchpad.net/juju-core/environs/storage" |
19 "launchpad.net/juju-core/errors" | 19 "launchpad.net/juju-core/errors" |
20 "launchpad.net/juju-core/utils" | 20 "launchpad.net/juju-core/utils" |
21 ) | 21 ) |
22 | 22 |
23 type maasStorage struct { | 23 type maasStorage struct { |
24 // Mutex protects the "*Unlocked" fields. | 24 // Mutex protects the "*Unlocked" fields. |
25 sync.Mutex | 25 sync.Mutex |
26 | 26 |
27 // The Environ that this Storage is for. | 27 // The Environ that this Storage is for. |
28 environUnlocked *maasEnviron | 28 environUnlocked *maasEnviron |
29 | 29 |
30 // Reference to the URL on the API where files are stored. | 30 // Reference to the URL on the API where files are stored. |
31 maasClientUnlocked gomaasapi.MAASObject | 31 maasClientUnlocked gomaasapi.MAASObject |
32 } | 32 } |
33 | 33 |
34 var _ environs.Storage = (*maasStorage)(nil) | 34 var _ storage.Storage = (*maasStorage)(nil) |
35 | 35 |
36 func NewStorage(env *maasEnviron) environs.Storage { | 36 func NewStorage(env *maasEnviron) storage.Storage { |
37 storage := new(maasStorage) | 37 storage := new(maasStorage) |
38 storage.environUnlocked = env | 38 storage.environUnlocked = env |
39 storage.maasClientUnlocked = env.getMAASClient().GetSubObject("files") | 39 storage.maasClientUnlocked = env.getMAASClient().GetSubObject("files") |
40 return storage | 40 return storage |
41 } | 41 } |
42 | 42 |
43 // getSnapshot returns a consistent copy of a maasStorage. Use this if you | 43 // getSnapshot returns a consistent copy of a maasStorage. Use this if you |
44 // need a consistent view of the object's entire state, without having to | 44 // need a consistent view of the object's entire state, without having to |
45 // lock the object the whole time. | 45 // lock the object the whole time. |
46 // | 46 // |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 | 154 |
155 partialURL, err := url.Parse(uri) | 155 partialURL, err := url.Parse(uri) |
156 if err != nil { | 156 if err != nil { |
157 return "", err | 157 return "", err |
158 } | 158 } |
159 fullURL := fileObj.URL().ResolveReference(partialURL) | 159 fullURL := fileObj.URL().ResolveReference(partialURL) |
160 return fullURL.String(), nil | 160 return fullURL.String(), nil |
161 } | 161 } |
162 | 162 |
163 // ConsistencyStrategy is specified in the StorageReader interface. | 163 // ConsistencyStrategy is specified in the StorageReader interface. |
164 func (stor *maasStorage) ConsistencyStrategy() utils.AttemptStrategy { | 164 func (stor *maasStorage) DefaultConsistencyStrategy() utils.AttemptStrategy { |
165 // This storage backend has immediate consistency, so there's no | 165 // This storage backend has immediate consistency, so there's no |
166 // need to wait. One attempt should do. | 166 // need to wait. One attempt should do. |
167 return utils.AttemptStrategy{} | 167 return utils.AttemptStrategy{} |
168 } | 168 } |
169 | 169 |
| 170 // ShouldRetry is specified in the StorageReader interface. |
| 171 func (stor *maasStorage) ShouldRetry(err error) bool { |
| 172 return false |
| 173 } |
| 174 |
170 // Put is specified in the StorageWriter interface. | 175 // Put is specified in the StorageWriter interface. |
171 func (stor *maasStorage) Put(name string, r io.Reader, length int64) error { | 176 func (stor *maasStorage) Put(name string, r io.Reader, length int64) error { |
172 data, err := ioutil.ReadAll(io.LimitReader(r, length)) | 177 data, err := ioutil.ReadAll(io.LimitReader(r, length)) |
173 if err != nil { | 178 if err != nil { |
174 return err | 179 return err |
175 } | 180 } |
176 params := url.Values{"filename": {name}} | 181 params := url.Values{"filename": {name}} |
177 files := map[string][]byte{"file": data} | 182 files := map[string][]byte{"file": data} |
178 snapshot := stor.getSnapshot() | 183 snapshot := stor.getSnapshot() |
179 _, err = snapshot.maasClientUnlocked.CallPostFiles("add", params, files) | 184 _, err = snapshot.maasClientUnlocked.CallPostFiles("add", params, files) |
180 return err | 185 return err |
181 } | 186 } |
182 | 187 |
183 // Remove is specified in the StorageWriter interface. | 188 // Remove is specified in the StorageWriter interface. |
184 func (stor *maasStorage) Remove(name string) error { | 189 func (stor *maasStorage) Remove(name string) error { |
185 // The only thing that can go wrong here, really, is that the file | 190 // The only thing that can go wrong here, really, is that the file |
186 // does not exist. But deletion is idempotent: deleting a file that | 191 // does not exist. But deletion is idempotent: deleting a file that |
187 // is no longer there anyway is success, not failure. | 192 // is no longer there anyway is success, not failure. |
188 stor.getSnapshot().maasClientUnlocked.GetSubObject(name).Delete() | 193 stor.getSnapshot().maasClientUnlocked.GetSubObject(name).Delete() |
189 return nil | 194 return nil |
190 } | 195 } |
191 | 196 |
192 // RemoveAll is specified in the StorageWriter interface. | 197 // RemoveAll is specified in the StorageWriter interface. |
193 func (stor *maasStorage) RemoveAll() error { | 198 func (stor *maasStorage) RemoveAll() error { |
194 » names, err := stor.List("") | 199 » names, err := storage.ListWithDefaultRetry(stor, "") |
195 if err != nil { | 200 if err != nil { |
196 return err | 201 return err |
197 } | 202 } |
198 // Remove all the objects in parallel so that we incur less round-trips. | 203 // Remove all the objects in parallel so that we incur less round-trips. |
199 // If we're in danger of having hundreds of objects, | 204 // If we're in danger of having hundreds of objects, |
200 // we'll want to change this to limit the number | 205 // we'll want to change this to limit the number |
201 // of concurrent operations. | 206 // of concurrent operations. |
202 var wg sync.WaitGroup | 207 var wg sync.WaitGroup |
203 wg.Add(len(names)) | 208 wg.Add(len(names)) |
204 errc := make(chan error, len(names)) | 209 errc := make(chan error, len(names)) |
205 for _, name := range names { | 210 for _, name := range names { |
206 name := name | 211 name := name |
207 go func() { | 212 go func() { |
208 defer wg.Done() | 213 defer wg.Done() |
209 if err := stor.Remove(name); err != nil { | 214 if err := stor.Remove(name); err != nil { |
210 errc <- err | 215 errc <- err |
211 } | 216 } |
212 }() | 217 }() |
213 } | 218 } |
214 wg.Wait() | 219 wg.Wait() |
215 select { | 220 select { |
216 case err := <-errc: | 221 case err := <-errc: |
217 return fmt.Errorf("cannot delete all provider state: %v", err) | 222 return fmt.Errorf("cannot delete all provider state: %v", err) |
218 default: | 223 default: |
219 } | 224 } |
220 return nil | 225 return nil |
221 } | 226 } |
OLD | NEW |