OLD | NEW |
1 package state | 1 package state |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "labix.org/v2/mgo" | 5 "labix.org/v2/mgo" |
| 6 "launchpad.net/juju-core/log" |
6 "launchpad.net/juju-core/state/api/params" | 7 "launchpad.net/juju-core/state/api/params" |
7 "launchpad.net/juju-core/state/multiwatcher" | 8 "launchpad.net/juju-core/state/multiwatcher" |
8 "launchpad.net/juju-core/state/watcher" | 9 "launchpad.net/juju-core/state/watcher" |
9 "reflect" | 10 "reflect" |
10 ) | 11 ) |
11 | 12 |
12 // allWatcherStateBacking implements allWatcherBacking by | 13 // allWatcherStateBacking implements allWatcherBacking by |
13 // fetching entities from the State. | 14 // fetching entities from the State. |
14 type allWatcherStateBacking struct { | 15 type allWatcherStateBacking struct { |
15 st *State | 16 st *State |
16 // collections | 17 // collections |
17 collectionByName map[string]allWatcherStateCollection | 18 collectionByName map[string]allWatcherStateCollection |
18 collectionByType map[reflect.Type]allWatcherStateCollection | 19 collectionByType map[reflect.Type]allWatcherStateCollection |
19 } | 20 } |
20 | 21 |
21 type backingMachine machineDoc | 22 type backingMachine machineDoc |
22 | 23 |
23 func (m *backingMachine) updated(st *State, store *multiwatcher.Store) error { | 24 func (m *backingMachine) updated(st *State, store *multiwatcher.Store, id interf
ace{}) error { |
24 info := ¶ms.MachineInfo{ | 25 info := ¶ms.MachineInfo{ |
25 Id: m.Id, | 26 Id: m.Id, |
26 InstanceId: string(m.InstanceId), | 27 InstanceId: string(m.InstanceId), |
27 } | 28 } |
| 29 oldInfo := store.Get(info.EntityId()) |
| 30 if oldInfo == nil { |
| 31 // We're adding the entry for the first time, |
| 32 // so fetch the associated machine status. |
| 33 sdoc, err := getStatus(st, machineGlobalKey(m.Id)) |
| 34 if err != nil { |
| 35 return err |
| 36 } |
| 37 info.Status = params.MachineStatus(sdoc.Status) |
| 38 info.StatusInfo = sdoc.StatusInfo |
| 39 } else { |
| 40 // The entry already exists, so preserve the current status. |
| 41 oldInfo := oldInfo.(*params.MachineInfo) |
| 42 info.Status = oldInfo.Status |
| 43 info.StatusInfo = oldInfo.StatusInfo |
| 44 } |
28 store.Update(info) | 45 store.Update(info) |
29 return nil | 46 return nil |
30 } | 47 } |
31 | 48 |
32 func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id inte
rface{}) error { | 49 func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id inte
rface{}) error { |
33 store.Remove(params.EntityId{ | 50 store.Remove(params.EntityId{ |
34 Kind: "machine", | 51 Kind: "machine", |
35 Id: id, | 52 Id: id, |
36 }) | 53 }) |
37 return nil | 54 return nil |
38 } | 55 } |
39 | 56 |
40 func (m *backingMachine) mongoId() interface{} { | 57 func (m *backingMachine) mongoId() interface{} { |
41 return m.Id | 58 return m.Id |
42 } | 59 } |
43 | 60 |
44 type backingUnit unitDoc | 61 type backingUnit unitDoc |
45 | 62 |
46 func (u *backingUnit) updated(st *State, store *multiwatcher.Store) error { | 63 func (u *backingUnit) updated(st *State, store *multiwatcher.Store, id interface
{}) error { |
47 info := ¶ms.UnitInfo{ | 64 info := ¶ms.UnitInfo{ |
48 Name: u.Name, | 65 Name: u.Name, |
49 Service: u.Service, | 66 Service: u.Service, |
50 Series: u.Series, | 67 Series: u.Series, |
51 PublicAddress: u.PublicAddress, | 68 PublicAddress: u.PublicAddress, |
52 PrivateAddress: u.PrivateAddress, | 69 PrivateAddress: u.PrivateAddress, |
53 MachineId: u.MachineId, | 70 MachineId: u.MachineId, |
54 Resolved: u.Resolved, | 71 Resolved: u.Resolved, |
55 Ports: u.Ports, | 72 Ports: u.Ports, |
56 } | 73 } |
57 if u.CharmURL != nil { | 74 if u.CharmURL != nil { |
58 info.CharmURL = u.CharmURL.String() | 75 info.CharmURL = u.CharmURL.String() |
59 } | 76 } |
| 77 oldInfo := store.Get(info.EntityId()) |
| 78 if oldInfo == nil { |
| 79 // We're adding the entry for the first time, |
| 80 // so fetch the associated unit status. |
| 81 sdoc, err := getStatus(st, unitGlobalKey(u.Name)) |
| 82 if err != nil { |
| 83 return err |
| 84 } |
| 85 info.Status = params.UnitStatus(sdoc.Status) |
| 86 info.StatusInfo = sdoc.StatusInfo |
| 87 } else { |
| 88 // The entry already exists, so preserve the current status. |
| 89 oldInfo := oldInfo.(*params.UnitInfo) |
| 90 info.Status = oldInfo.Status |
| 91 info.StatusInfo = oldInfo.StatusInfo |
| 92 } |
60 store.Update(info) | 93 store.Update(info) |
61 return nil | 94 return nil |
62 } | 95 } |
63 | 96 |
64 func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interfa
ce{}) error { | 97 func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interfa
ce{}) error { |
65 store.Remove(params.EntityId{ | 98 store.Remove(params.EntityId{ |
66 Kind: "unit", | 99 Kind: "unit", |
67 Id: id, | 100 Id: id, |
68 }) | 101 }) |
69 return nil | 102 return nil |
70 } | 103 } |
71 | 104 |
72 func (m *backingUnit) mongoId() interface{} { | 105 func (m *backingUnit) mongoId() interface{} { |
73 return m.Name | 106 return m.Name |
74 } | 107 } |
75 | 108 |
76 type backingService serviceDoc | 109 type backingService serviceDoc |
77 | 110 |
78 func (svc *backingService) updated(st *State, store *multiwatcher.Store) error { | 111 func (svc *backingService) updated(st *State, store *multiwatcher.Store, id inte
rface{}) error { |
79 info := ¶ms.ServiceInfo{ | 112 info := ¶ms.ServiceInfo{ |
80 Name: svc.Name, | 113 Name: svc.Name, |
81 Exposed: svc.Exposed, | 114 Exposed: svc.Exposed, |
82 CharmURL: svc.CharmURL.String(), | 115 CharmURL: svc.CharmURL.String(), |
83 } | 116 } |
84 store.Update(info) | 117 store.Update(info) |
85 return nil | 118 return nil |
86 } | 119 } |
87 | 120 |
88 func (svc *backingService) removed(st *State, store *multiwatcher.Store, id inte
rface{}) error { | 121 func (svc *backingService) removed(st *State, store *multiwatcher.Store, id inte
rface{}) error { |
89 store.Remove(params.EntityId{ | 122 store.Remove(params.EntityId{ |
90 Kind: "service", | 123 Kind: "service", |
91 Id: id, | 124 Id: id, |
92 }) | 125 }) |
93 return nil | 126 return nil |
94 } | 127 } |
95 | 128 |
96 func (m *backingService) mongoId() interface{} { | 129 func (m *backingService) mongoId() interface{} { |
97 return m.Name | 130 return m.Name |
98 } | 131 } |
99 | 132 |
100 type backingRelation relationDoc | 133 type backingRelation relationDoc |
101 | 134 |
102 func (r *backingRelation) updated(st *State, store *multiwatcher.Store) error { | 135 func (r *backingRelation) updated(st *State, store *multiwatcher.Store, id inter
face{}) error { |
103 eps := make([]params.Endpoint, len(r.Endpoints)) | 136 eps := make([]params.Endpoint, len(r.Endpoints)) |
104 for i, ep := range r.Endpoints { | 137 for i, ep := range r.Endpoints { |
105 eps[i] = params.Endpoint{ | 138 eps[i] = params.Endpoint{ |
106 ServiceName: ep.ServiceName, | 139 ServiceName: ep.ServiceName, |
107 Relation: ep.Relation, | 140 Relation: ep.Relation, |
108 } | 141 } |
109 } | 142 } |
110 info := ¶ms.RelationInfo{ | 143 info := ¶ms.RelationInfo{ |
111 Key: r.Key, | 144 Key: r.Key, |
112 Endpoints: eps, | 145 Endpoints: eps, |
113 } | 146 } |
114 store.Update(info) | 147 store.Update(info) |
115 return nil | 148 return nil |
116 } | 149 } |
117 | 150 |
118 func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id int
erface{}) error { | 151 func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id int
erface{}) error { |
119 store.Remove(params.EntityId{ | 152 store.Remove(params.EntityId{ |
120 Kind: "relation", | 153 Kind: "relation", |
121 Id: id, | 154 Id: id, |
122 }) | 155 }) |
123 return nil | 156 return nil |
124 } | 157 } |
125 | 158 |
126 func (m *backingRelation) mongoId() interface{} { | 159 func (m *backingRelation) mongoId() interface{} { |
127 return m.Key | 160 return m.Key |
128 } | 161 } |
129 | 162 |
130 type backingAnnotation annotatorDoc | 163 type backingAnnotation annotatorDoc |
131 | 164 |
132 func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store) error
{ | 165 func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store, id int
erface{}) error { |
133 info := ¶ms.AnnotationInfo{ | 166 info := ¶ms.AnnotationInfo{ |
134 Tag: a.Tag, | 167 Tag: a.Tag, |
135 Annotations: a.Annotations, | 168 Annotations: a.Annotations, |
136 } | 169 } |
137 store.Update(info) | 170 store.Update(info) |
138 return nil | 171 return nil |
139 } | 172 } |
140 | 173 |
141 func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id i
nterface{}) error { | 174 func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id i
nterface{}) error { |
142 tag, ok := tagForGlobalKey(id.(string)) | 175 tag, ok := tagForGlobalKey(id.(string)) |
143 if !ok { | 176 if !ok { |
144 panic(fmt.Errorf("unknown global key %q in state", id)) | 177 panic(fmt.Errorf("unknown global key %q in state", id)) |
145 } | 178 } |
146 store.Remove(params.EntityId{ | 179 store.Remove(params.EntityId{ |
147 Kind: "annotation", | 180 Kind: "annotation", |
148 Id: tag, | 181 Id: tag, |
149 }) | 182 }) |
150 return nil | 183 return nil |
151 } | 184 } |
152 | 185 |
153 func (a *backingAnnotation) mongoId() interface{} { | 186 func (a *backingAnnotation) mongoId() interface{} { |
154 return a.GlobalKey | 187 return a.GlobalKey |
155 } | 188 } |
156 | 189 |
| 190 type backingStatus statusDoc |
| 191 |
| 192 func (s *backingStatus) updated(st *State, store *multiwatcher.Store, id interfa
ce{}) error { |
| 193 parentId, ok := backingEntityIdForGlobalKey(id.(string)) |
| 194 if !ok { |
| 195 log.Errorf("status for entity with unrecognised global key %q",
id) |
| 196 return nil |
| 197 } |
| 198 info0 := store.Get(parentId) |
| 199 switch info := info0.(type) { |
| 200 case nil: |
| 201 // The parent info doesn't exist. Ignore the status until it doe
s. |
| 202 return nil |
| 203 case *params.UnitInfo: |
| 204 newInfo := *info |
| 205 newInfo.Status = params.UnitStatus(s.Status) |
| 206 newInfo.StatusInfo = s.StatusInfo |
| 207 info0 = &newInfo |
| 208 case *params.MachineInfo: |
| 209 newInfo := *info |
| 210 newInfo.Status = params.MachineStatus(s.Status) |
| 211 newInfo.StatusInfo = s.StatusInfo |
| 212 info0 = &newInfo |
| 213 default: |
| 214 panic(fmt.Errorf("status for unexpected entity with id %q; type
%T", id, info)) |
| 215 } |
| 216 store.Update(info0) |
| 217 return nil |
| 218 } |
| 219 |
| 220 func (s *backingStatus) removed(st *State, store *multiwatcher.Store, id interfa
ce{}) error { |
| 221 // If the status is removed, the parent will follow not long after, |
| 222 // so do nothing. |
| 223 return nil |
| 224 } |
| 225 |
| 226 func (a *backingStatus) mongoId() interface{} { |
| 227 panic("cannot find mongo id from status document") |
| 228 } |
| 229 |
| 230 func backingEntityIdForGlobalKey(key string) (params.EntityId, bool) { |
| 231 if len(key) < 3 || key[1] != '#' { |
| 232 return params.EntityId{}, false |
| 233 } |
| 234 id := key[2:] |
| 235 switch key[0] { |
| 236 case 'm': |
| 237 return (¶ms.MachineInfo{Id: id}).EntityId(), true |
| 238 case 'u': |
| 239 return (¶ms.UnitInfo{Name: id}).EntityId(), true |
| 240 case 's': |
| 241 return (¶ms.ServiceInfo{Name: id}).EntityId(), true |
| 242 } |
| 243 return params.EntityId{}, false |
| 244 } |
| 245 |
157 // backingEntityDoc is implemented by the documents in | 246 // backingEntityDoc is implemented by the documents in |
158 // collections that the allWatcherStateBacking watches. | 247 // collections that the allWatcherStateBacking watches. |
159 type backingEntityDoc interface { | 248 type backingEntityDoc interface { |
160 // updated is called when the document has changed. | 249 // updated is called when the document has changed. |
161 » updated(st *State, store *multiwatcher.Store) error | 250 » // The mongo _id value of the document is provided in id. |
| 251 » updated(st *State, store *multiwatcher.Store, id interface{}) error |
162 | 252 |
163 // removed is called when the document has changed. | 253 // removed is called when the document has changed. |
164 // The receiving instance will not contain any data. | 254 // The receiving instance will not contain any data. |
| 255 // The mongo _id value of the document is provided in id. |
165 removed(st *State, store *multiwatcher.Store, id interface{}) error | 256 removed(st *State, store *multiwatcher.Store, id interface{}) error |
166 | 257 |
167 // mongoId returns the mongo _id field of the document. | 258 // mongoId returns the mongo _id field of the document. |
| 259 // It is currently never called for subsidiary documents. |
168 mongoId() interface{} | 260 mongoId() interface{} |
169 } | 261 } |
170 | 262 |
171 var ( | 263 var ( |
172 _ backingEntityDoc = (*backingMachine)(nil) | 264 _ backingEntityDoc = (*backingMachine)(nil) |
173 _ backingEntityDoc = (*backingUnit)(nil) | 265 _ backingEntityDoc = (*backingUnit)(nil) |
174 _ backingEntityDoc = (*backingService)(nil) | 266 _ backingEntityDoc = (*backingService)(nil) |
175 _ backingEntityDoc = (*backingRelation)(nil) | 267 _ backingEntityDoc = (*backingRelation)(nil) |
176 _ backingEntityDoc = (*backingAnnotation)(nil) | 268 _ backingEntityDoc = (*backingAnnotation)(nil) |
| 269 _ backingEntityDoc = (*backingStatus)(nil) |
177 ) | 270 ) |
178 | 271 |
179 // allWatcherStateCollection holds information about a | 272 // allWatcherStateCollection holds information about a |
180 // collection watched by an allWatcher and the | 273 // collection watched by an allWatcher and the |
181 // type of value we use to store entity information | 274 // type of value we use to store entity information |
182 // for that collection. | 275 // for that collection. |
183 type allWatcherStateCollection struct { | 276 type allWatcherStateCollection struct { |
184 *mgo.Collection | 277 *mgo.Collection |
185 | 278 |
186 // infoSliceType stores the type of a slice of the info type | 279 // infoSliceType stores the type of a slice of the info type |
187 // that we use for this collection. In Go 1.1 we can change | 280 // that we use for this collection. In Go 1.1 we can change |
188 // this to use the type itself, as we'll have reflect.SliceOf. | 281 // this to use the type itself, as we'll have reflect.SliceOf. |
189 infoSliceType reflect.Type | 282 infoSliceType reflect.Type |
| 283 // subsidiary is true if the collection is used only |
| 284 // modify a primary entity. |
| 285 subsidiary bool |
190 } | 286 } |
191 | 287 |
192 func newAllWatcherStateBacking(st *State) multiwatcher.Backing { | 288 func newAllWatcherStateBacking(st *State) multiwatcher.Backing { |
193 b := &allWatcherStateBacking{ | 289 b := &allWatcherStateBacking{ |
194 st: st, | 290 st: st, |
195 collectionByName: make(map[string]allWatcherStateCollection), | 291 collectionByName: make(map[string]allWatcherStateCollection), |
196 collectionByType: make(map[reflect.Type]allWatcherStateCollectio
n), | 292 collectionByType: make(map[reflect.Type]allWatcherStateCollectio
n), |
197 } | 293 } |
198 collections := []allWatcherStateCollection{{ | 294 collections := []allWatcherStateCollection{{ |
199 Collection: st.machines, | 295 Collection: st.machines, |
200 infoSliceType: reflect.TypeOf([]backingMachine(nil)), | 296 infoSliceType: reflect.TypeOf([]backingMachine(nil)), |
201 }, { | 297 }, { |
202 Collection: st.units, | 298 Collection: st.units, |
203 infoSliceType: reflect.TypeOf([]backingUnit(nil)), | 299 infoSliceType: reflect.TypeOf([]backingUnit(nil)), |
204 }, { | 300 }, { |
205 Collection: st.services, | 301 Collection: st.services, |
206 infoSliceType: reflect.TypeOf([]backingService(nil)), | 302 infoSliceType: reflect.TypeOf([]backingService(nil)), |
207 }, { | 303 }, { |
208 Collection: st.relations, | 304 Collection: st.relations, |
209 infoSliceType: reflect.TypeOf([]backingRelation(nil)), | 305 infoSliceType: reflect.TypeOf([]backingRelation(nil)), |
210 }, { | 306 }, { |
211 Collection: st.annotations, | 307 Collection: st.annotations, |
212 infoSliceType: reflect.TypeOf([]backingAnnotation(nil)), | 308 infoSliceType: reflect.TypeOf([]backingAnnotation(nil)), |
| 309 }, { |
| 310 Collection: st.statuses, |
| 311 infoSliceType: reflect.TypeOf([]backingStatus(nil)), |
| 312 subsidiary: true, |
213 }} | 313 }} |
214 // Populate the collection maps from the above set of collections. | 314 // Populate the collection maps from the above set of collections. |
215 for _, c := range collections { | 315 for _, c := range collections { |
216 docType := c.infoSliceType.Elem() | 316 docType := c.infoSliceType.Elem() |
217 if _, ok := b.collectionByType[docType]; ok { | 317 if _, ok := b.collectionByType[docType]; ok { |
218 panic(fmt.Errorf("duplicate collection type %s", docType
)) | 318 panic(fmt.Errorf("duplicate collection type %s", docType
)) |
219 } | 319 } |
220 b.collectionByType[docType] = c | 320 b.collectionByType[docType] = c |
221 if _, ok := b.collectionByName[c.Name]; ok { | 321 if _, ok := b.collectionByName[c.Name]; ok { |
222 panic(fmt.Errorf("duplicate collection name %q", c.Name)
) | 322 panic(fmt.Errorf("duplicate collection name %q", c.Name)
) |
(...skipping 14 matching lines...) Expand all Loading... |
237 func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) { | 337 func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) { |
238 for _, c := range b.collectionByName { | 338 for _, c := range b.collectionByName { |
239 b.st.watcher.UnwatchCollection(c.Name, in) | 339 b.st.watcher.UnwatchCollection(c.Name, in) |
240 } | 340 } |
241 } | 341 } |
242 | 342 |
243 // GetAll fetches all items that we want to watch from the state. | 343 // GetAll fetches all items that we want to watch from the state. |
244 func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error { | 344 func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error { |
245 // TODO(rog) fetch collections concurrently? | 345 // TODO(rog) fetch collections concurrently? |
246 for _, c := range b.collectionByName { | 346 for _, c := range b.collectionByName { |
| 347 if c.subsidiary { |
| 348 continue |
| 349 } |
247 infoSlicePtr := reflect.New(c.infoSliceType) | 350 infoSlicePtr := reflect.New(c.infoSliceType) |
248 if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil
{ | 351 if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil
{ |
249 return fmt.Errorf("cannot get all %s: %v", c.Name, err) | 352 return fmt.Errorf("cannot get all %s: %v", c.Name, err) |
250 } | 353 } |
251 infos := infoSlicePtr.Elem() | 354 infos := infoSlicePtr.Elem() |
252 for i := 0; i < infos.Len(); i++ { | 355 for i := 0; i < infos.Len(); i++ { |
253 info := infos.Index(i).Addr().Interface().(backingEntity
Doc) | 356 info := infos.Index(i).Addr().Interface().(backingEntity
Doc) |
254 » » » info.updated(b.st, all) | 357 » » » info.updated(b.st, all, info.mongoId()) |
255 } | 358 } |
256 } | 359 } |
257 return nil | 360 return nil |
258 } | 361 } |
259 | 362 |
260 // Changed updates the allWatcher's idea of the current state | 363 // Changed updates the allWatcher's idea of the current state |
261 // in response to the given change. | 364 // in response to the given change. |
262 func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher
.Change) error { | 365 func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher
.Change) error { |
263 c, ok := b.collectionByName[change.C] | 366 c, ok := b.collectionByName[change.C] |
264 if !ok { | 367 if !ok { |
265 panic(fmt.Errorf("unknown collection %q in fetch request", chang
e.C)) | 368 panic(fmt.Errorf("unknown collection %q in fetch request", chang
e.C)) |
266 } | 369 } |
267 doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc
) | 370 doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc
) |
268 // TODO(rog) investigate ways that this can be made more efficient | 371 // TODO(rog) investigate ways that this can be made more efficient |
269 // than simply fetching each entity in turn. | 372 // than simply fetching each entity in turn. |
270 err := c.FindId(change.Id).One(doc) | 373 err := c.FindId(change.Id).One(doc) |
271 if err == mgo.ErrNotFound { | 374 if err == mgo.ErrNotFound { |
272 return doc.removed(b.st, all, change.Id) | 375 return doc.removed(b.st, all, change.Id) |
273 } | 376 } |
274 if err != nil { | 377 if err != nil { |
275 return err | 378 return err |
276 } | 379 } |
277 » return doc.updated(b.st, all) | 380 » return doc.updated(b.st, all, change.Id) |
278 } | 381 } |
OLD | NEW |