Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(62)

Side by Side Diff: state/megawatcher.go

Issue 8534043: state: add status to allwatcher unit and machine
Patch Set: state: add status to allwatcher unit and machine Created 11 years, 12 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « state/apiserver/api_test.go ('k') | state/megawatcher_internal_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package state 1 package state
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "labix.org/v2/mgo" 5 "labix.org/v2/mgo"
6 "launchpad.net/juju-core/log"
6 "launchpad.net/juju-core/state/api/params" 7 "launchpad.net/juju-core/state/api/params"
7 "launchpad.net/juju-core/state/multiwatcher" 8 "launchpad.net/juju-core/state/multiwatcher"
8 "launchpad.net/juju-core/state/watcher" 9 "launchpad.net/juju-core/state/watcher"
9 "reflect" 10 "reflect"
10 ) 11 )
11 12
12 // allWatcherStateBacking implements allWatcherBacking by 13 // allWatcherStateBacking implements allWatcherBacking by
13 // fetching entities from the State. 14 // fetching entities from the State.
14 type allWatcherStateBacking struct { 15 type allWatcherStateBacking struct {
15 st *State 16 st *State
16 // collections 17 // collections
17 collectionByName map[string]allWatcherStateCollection 18 collectionByName map[string]allWatcherStateCollection
18 collectionByType map[reflect.Type]allWatcherStateCollection 19 collectionByType map[reflect.Type]allWatcherStateCollection
19 } 20 }
20 21
21 type backingMachine machineDoc 22 type backingMachine machineDoc
22 23
23 func (m *backingMachine) updated(st *State, store *multiwatcher.Store) error { 24 func (m *backingMachine) updated(st *State, store *multiwatcher.Store, id interf ace{}) error {
24 info := &params.MachineInfo{ 25 info := &params.MachineInfo{
25 Id: m.Id, 26 Id: m.Id,
26 InstanceId: string(m.InstanceId), 27 InstanceId: string(m.InstanceId),
27 } 28 }
29 oldInfo := store.Get(info.EntityId())
30 if oldInfo == nil {
31 // We're adding the entry for the first time,
32 // so fetch the associated machine status.
33 sdoc, err := getStatus(st, machineGlobalKey(m.Id))
34 if err != nil {
35 return err
36 }
37 info.Status = params.MachineStatus(sdoc.Status)
38 info.StatusInfo = sdoc.StatusInfo
39 } else {
40 // The entry already exists, so preserve the current status.
41 oldInfo := oldInfo.(*params.MachineInfo)
42 info.Status = oldInfo.Status
43 info.StatusInfo = oldInfo.StatusInfo
44 }
28 store.Update(info) 45 store.Update(info)
29 return nil 46 return nil
30 } 47 }
31 48
32 func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id inte rface{}) error { 49 func (svc *backingMachine) removed(st *State, store *multiwatcher.Store, id inte rface{}) error {
33 store.Remove(params.EntityId{ 50 store.Remove(params.EntityId{
34 Kind: "machine", 51 Kind: "machine",
35 Id: id, 52 Id: id,
36 }) 53 })
37 return nil 54 return nil
38 } 55 }
39 56
40 func (m *backingMachine) mongoId() interface{} { 57 func (m *backingMachine) mongoId() interface{} {
41 return m.Id 58 return m.Id
42 } 59 }
43 60
44 type backingUnit unitDoc 61 type backingUnit unitDoc
45 62
46 func (u *backingUnit) updated(st *State, store *multiwatcher.Store) error { 63 func (u *backingUnit) updated(st *State, store *multiwatcher.Store, id interface {}) error {
47 info := &params.UnitInfo{ 64 info := &params.UnitInfo{
48 Name: u.Name, 65 Name: u.Name,
49 Service: u.Service, 66 Service: u.Service,
50 Series: u.Series, 67 Series: u.Series,
51 PublicAddress: u.PublicAddress, 68 PublicAddress: u.PublicAddress,
52 PrivateAddress: u.PrivateAddress, 69 PrivateAddress: u.PrivateAddress,
53 MachineId: u.MachineId, 70 MachineId: u.MachineId,
54 Resolved: u.Resolved, 71 Resolved: u.Resolved,
55 Ports: u.Ports, 72 Ports: u.Ports,
56 } 73 }
57 if u.CharmURL != nil { 74 if u.CharmURL != nil {
58 info.CharmURL = u.CharmURL.String() 75 info.CharmURL = u.CharmURL.String()
59 } 76 }
77 oldInfo := store.Get(info.EntityId())
78 if oldInfo == nil {
79 // We're adding the entry for the first time,
80 // so fetch the associated unit status.
81 sdoc, err := getStatus(st, unitGlobalKey(u.Name))
82 if err != nil {
83 return err
84 }
85 info.Status = params.UnitStatus(sdoc.Status)
86 info.StatusInfo = sdoc.StatusInfo
87 } else {
88 // The entry already exists, so preserve the current status.
89 oldInfo := oldInfo.(*params.UnitInfo)
90 info.Status = oldInfo.Status
91 info.StatusInfo = oldInfo.StatusInfo
92 }
60 store.Update(info) 93 store.Update(info)
61 return nil 94 return nil
62 } 95 }
63 96
64 func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interfa ce{}) error { 97 func (svc *backingUnit) removed(st *State, store *multiwatcher.Store, id interfa ce{}) error {
65 store.Remove(params.EntityId{ 98 store.Remove(params.EntityId{
66 Kind: "unit", 99 Kind: "unit",
67 Id: id, 100 Id: id,
68 }) 101 })
69 return nil 102 return nil
70 } 103 }
71 104
72 func (m *backingUnit) mongoId() interface{} { 105 func (m *backingUnit) mongoId() interface{} {
73 return m.Name 106 return m.Name
74 } 107 }
75 108
76 type backingService serviceDoc 109 type backingService serviceDoc
77 110
78 func (svc *backingService) updated(st *State, store *multiwatcher.Store) error { 111 func (svc *backingService) updated(st *State, store *multiwatcher.Store, id inte rface{}) error {
79 info := &params.ServiceInfo{ 112 info := &params.ServiceInfo{
80 Name: svc.Name, 113 Name: svc.Name,
81 Exposed: svc.Exposed, 114 Exposed: svc.Exposed,
82 CharmURL: svc.CharmURL.String(), 115 CharmURL: svc.CharmURL.String(),
83 } 116 }
84 store.Update(info) 117 store.Update(info)
85 return nil 118 return nil
86 } 119 }
87 120
88 func (svc *backingService) removed(st *State, store *multiwatcher.Store, id inte rface{}) error { 121 func (svc *backingService) removed(st *State, store *multiwatcher.Store, id inte rface{}) error {
89 store.Remove(params.EntityId{ 122 store.Remove(params.EntityId{
90 Kind: "service", 123 Kind: "service",
91 Id: id, 124 Id: id,
92 }) 125 })
93 return nil 126 return nil
94 } 127 }
95 128
96 func (m *backingService) mongoId() interface{} { 129 func (m *backingService) mongoId() interface{} {
97 return m.Name 130 return m.Name
98 } 131 }
99 132
100 type backingRelation relationDoc 133 type backingRelation relationDoc
101 134
102 func (r *backingRelation) updated(st *State, store *multiwatcher.Store) error { 135 func (r *backingRelation) updated(st *State, store *multiwatcher.Store, id inter face{}) error {
103 eps := make([]params.Endpoint, len(r.Endpoints)) 136 eps := make([]params.Endpoint, len(r.Endpoints))
104 for i, ep := range r.Endpoints { 137 for i, ep := range r.Endpoints {
105 eps[i] = params.Endpoint{ 138 eps[i] = params.Endpoint{
106 ServiceName: ep.ServiceName, 139 ServiceName: ep.ServiceName,
107 Relation: ep.Relation, 140 Relation: ep.Relation,
108 } 141 }
109 } 142 }
110 info := &params.RelationInfo{ 143 info := &params.RelationInfo{
111 Key: r.Key, 144 Key: r.Key,
112 Endpoints: eps, 145 Endpoints: eps,
113 } 146 }
114 store.Update(info) 147 store.Update(info)
115 return nil 148 return nil
116 } 149 }
117 150
118 func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id int erface{}) error { 151 func (svc *backingRelation) removed(st *State, store *multiwatcher.Store, id int erface{}) error {
119 store.Remove(params.EntityId{ 152 store.Remove(params.EntityId{
120 Kind: "relation", 153 Kind: "relation",
121 Id: id, 154 Id: id,
122 }) 155 })
123 return nil 156 return nil
124 } 157 }
125 158
126 func (m *backingRelation) mongoId() interface{} { 159 func (m *backingRelation) mongoId() interface{} {
127 return m.Key 160 return m.Key
128 } 161 }
129 162
130 type backingAnnotation annotatorDoc 163 type backingAnnotation annotatorDoc
131 164
132 func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store) error { 165 func (a *backingAnnotation) updated(st *State, store *multiwatcher.Store, id int erface{}) error {
133 info := &params.AnnotationInfo{ 166 info := &params.AnnotationInfo{
134 Tag: a.Tag, 167 Tag: a.Tag,
135 Annotations: a.Annotations, 168 Annotations: a.Annotations,
136 } 169 }
137 store.Update(info) 170 store.Update(info)
138 return nil 171 return nil
139 } 172 }
140 173
141 func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id i nterface{}) error { 174 func (svc *backingAnnotation) removed(st *State, store *multiwatcher.Store, id i nterface{}) error {
142 tag, ok := tagForGlobalKey(id.(string)) 175 tag, ok := tagForGlobalKey(id.(string))
143 if !ok { 176 if !ok {
144 panic(fmt.Errorf("unknown global key %q in state", id)) 177 panic(fmt.Errorf("unknown global key %q in state", id))
145 } 178 }
146 store.Remove(params.EntityId{ 179 store.Remove(params.EntityId{
147 Kind: "annotation", 180 Kind: "annotation",
148 Id: tag, 181 Id: tag,
149 }) 182 })
150 return nil 183 return nil
151 } 184 }
152 185
153 func (a *backingAnnotation) mongoId() interface{} { 186 func (a *backingAnnotation) mongoId() interface{} {
154 return a.GlobalKey 187 return a.GlobalKey
155 } 188 }
156 189
190 type backingStatus statusDoc
191
192 func (s *backingStatus) updated(st *State, store *multiwatcher.Store, id interfa ce{}) error {
193 parentId, ok := backingEntityIdForGlobalKey(id.(string))
194 if !ok {
195 log.Errorf("status for entity with unrecognised global key %q", id)
196 return nil
197 }
198 info0 := store.Get(parentId)
199 switch info := info0.(type) {
200 case nil:
201 // The parent info doesn't exist. Ignore the status until it doe s.
202 return nil
203 case *params.UnitInfo:
204 newInfo := *info
205 newInfo.Status = params.UnitStatus(s.Status)
206 newInfo.StatusInfo = s.StatusInfo
207 info0 = &newInfo
208 case *params.MachineInfo:
209 newInfo := *info
210 newInfo.Status = params.MachineStatus(s.Status)
211 newInfo.StatusInfo = s.StatusInfo
212 info0 = &newInfo
213 default:
214 panic(fmt.Errorf("status for unexpected entity with id %q; type %T", id, info))
215 }
216 store.Update(info0)
217 return nil
218 }
219
220 func (s *backingStatus) removed(st *State, store *multiwatcher.Store, id interfa ce{}) error {
221 // If the status is removed, the parent will follow not long after,
222 // so do nothing.
223 return nil
224 }
225
226 func (a *backingStatus) mongoId() interface{} {
227 panic("cannot find mongo id from status document")
228 }
229
230 func backingEntityIdForGlobalKey(key string) (params.EntityId, bool) {
231 if len(key) < 3 || key[1] != '#' {
232 return params.EntityId{}, false
233 }
234 id := key[2:]
235 switch key[0] {
236 case 'm':
237 return (&params.MachineInfo{Id: id}).EntityId(), true
238 case 'u':
239 return (&params.UnitInfo{Name: id}).EntityId(), true
240 case 's':
241 return (&params.ServiceInfo{Name: id}).EntityId(), true
242 }
243 return params.EntityId{}, false
244 }
245
157 // backingEntityDoc is implemented by the documents in 246 // backingEntityDoc is implemented by the documents in
158 // collections that the allWatcherStateBacking watches. 247 // collections that the allWatcherStateBacking watches.
159 type backingEntityDoc interface { 248 type backingEntityDoc interface {
160 // updated is called when the document has changed. 249 // updated is called when the document has changed.
161 » updated(st *State, store *multiwatcher.Store) error 250 » // The mongo _id value of the document is provided in id.
251 » updated(st *State, store *multiwatcher.Store, id interface{}) error
162 252
163 // removed is called when the document has changed. 253 // removed is called when the document has changed.
164 // The receiving instance will not contain any data. 254 // The receiving instance will not contain any data.
255 // The mongo _id value of the document is provided in id.
165 removed(st *State, store *multiwatcher.Store, id interface{}) error 256 removed(st *State, store *multiwatcher.Store, id interface{}) error
166 257
167 // mongoId returns the mongo _id field of the document. 258 // mongoId returns the mongo _id field of the document.
259 // It is currently never called for subsidiary documents.
168 mongoId() interface{} 260 mongoId() interface{}
169 } 261 }
170 262
171 var ( 263 var (
172 _ backingEntityDoc = (*backingMachine)(nil) 264 _ backingEntityDoc = (*backingMachine)(nil)
173 _ backingEntityDoc = (*backingUnit)(nil) 265 _ backingEntityDoc = (*backingUnit)(nil)
174 _ backingEntityDoc = (*backingService)(nil) 266 _ backingEntityDoc = (*backingService)(nil)
175 _ backingEntityDoc = (*backingRelation)(nil) 267 _ backingEntityDoc = (*backingRelation)(nil)
176 _ backingEntityDoc = (*backingAnnotation)(nil) 268 _ backingEntityDoc = (*backingAnnotation)(nil)
269 _ backingEntityDoc = (*backingStatus)(nil)
177 ) 270 )
178 271
179 // allWatcherStateCollection holds information about a 272 // allWatcherStateCollection holds information about a
180 // collection watched by an allWatcher and the 273 // collection watched by an allWatcher and the
181 // type of value we use to store entity information 274 // type of value we use to store entity information
182 // for that collection. 275 // for that collection.
183 type allWatcherStateCollection struct { 276 type allWatcherStateCollection struct {
184 *mgo.Collection 277 *mgo.Collection
185 278
186 // infoSliceType stores the type of a slice of the info type 279 // infoSliceType stores the type of a slice of the info type
187 // that we use for this collection. In Go 1.1 we can change 280 // that we use for this collection. In Go 1.1 we can change
188 // this to use the type itself, as we'll have reflect.SliceOf. 281 // this to use the type itself, as we'll have reflect.SliceOf.
189 infoSliceType reflect.Type 282 infoSliceType reflect.Type
283 // subsidiary is true if the collection is used only
284 // modify a primary entity.
285 subsidiary bool
190 } 286 }
191 287
192 func newAllWatcherStateBacking(st *State) multiwatcher.Backing { 288 func newAllWatcherStateBacking(st *State) multiwatcher.Backing {
193 b := &allWatcherStateBacking{ 289 b := &allWatcherStateBacking{
194 st: st, 290 st: st,
195 collectionByName: make(map[string]allWatcherStateCollection), 291 collectionByName: make(map[string]allWatcherStateCollection),
196 collectionByType: make(map[reflect.Type]allWatcherStateCollectio n), 292 collectionByType: make(map[reflect.Type]allWatcherStateCollectio n),
197 } 293 }
198 collections := []allWatcherStateCollection{{ 294 collections := []allWatcherStateCollection{{
199 Collection: st.machines, 295 Collection: st.machines,
200 infoSliceType: reflect.TypeOf([]backingMachine(nil)), 296 infoSliceType: reflect.TypeOf([]backingMachine(nil)),
201 }, { 297 }, {
202 Collection: st.units, 298 Collection: st.units,
203 infoSliceType: reflect.TypeOf([]backingUnit(nil)), 299 infoSliceType: reflect.TypeOf([]backingUnit(nil)),
204 }, { 300 }, {
205 Collection: st.services, 301 Collection: st.services,
206 infoSliceType: reflect.TypeOf([]backingService(nil)), 302 infoSliceType: reflect.TypeOf([]backingService(nil)),
207 }, { 303 }, {
208 Collection: st.relations, 304 Collection: st.relations,
209 infoSliceType: reflect.TypeOf([]backingRelation(nil)), 305 infoSliceType: reflect.TypeOf([]backingRelation(nil)),
210 }, { 306 }, {
211 Collection: st.annotations, 307 Collection: st.annotations,
212 infoSliceType: reflect.TypeOf([]backingAnnotation(nil)), 308 infoSliceType: reflect.TypeOf([]backingAnnotation(nil)),
309 }, {
310 Collection: st.statuses,
311 infoSliceType: reflect.TypeOf([]backingStatus(nil)),
312 subsidiary: true,
213 }} 313 }}
214 // Populate the collection maps from the above set of collections. 314 // Populate the collection maps from the above set of collections.
215 for _, c := range collections { 315 for _, c := range collections {
216 docType := c.infoSliceType.Elem() 316 docType := c.infoSliceType.Elem()
217 if _, ok := b.collectionByType[docType]; ok { 317 if _, ok := b.collectionByType[docType]; ok {
218 panic(fmt.Errorf("duplicate collection type %s", docType )) 318 panic(fmt.Errorf("duplicate collection type %s", docType ))
219 } 319 }
220 b.collectionByType[docType] = c 320 b.collectionByType[docType] = c
221 if _, ok := b.collectionByName[c.Name]; ok { 321 if _, ok := b.collectionByName[c.Name]; ok {
222 panic(fmt.Errorf("duplicate collection name %q", c.Name) ) 322 panic(fmt.Errorf("duplicate collection name %q", c.Name) )
(...skipping 14 matching lines...) Expand all
237 func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) { 337 func (b *allWatcherStateBacking) Unwatch(in chan<- watcher.Change) {
238 for _, c := range b.collectionByName { 338 for _, c := range b.collectionByName {
239 b.st.watcher.UnwatchCollection(c.Name, in) 339 b.st.watcher.UnwatchCollection(c.Name, in)
240 } 340 }
241 } 341 }
242 342
243 // GetAll fetches all items that we want to watch from the state. 343 // GetAll fetches all items that we want to watch from the state.
244 func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error { 344 func (b *allWatcherStateBacking) GetAll(all *multiwatcher.Store) error {
245 // TODO(rog) fetch collections concurrently? 345 // TODO(rog) fetch collections concurrently?
246 for _, c := range b.collectionByName { 346 for _, c := range b.collectionByName {
347 if c.subsidiary {
348 continue
349 }
247 infoSlicePtr := reflect.New(c.infoSliceType) 350 infoSlicePtr := reflect.New(c.infoSliceType)
248 if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil { 351 if err := c.Find(nil).All(infoSlicePtr.Interface()); err != nil {
249 return fmt.Errorf("cannot get all %s: %v", c.Name, err) 352 return fmt.Errorf("cannot get all %s: %v", c.Name, err)
250 } 353 }
251 infos := infoSlicePtr.Elem() 354 infos := infoSlicePtr.Elem()
252 for i := 0; i < infos.Len(); i++ { 355 for i := 0; i < infos.Len(); i++ {
253 info := infos.Index(i).Addr().Interface().(backingEntity Doc) 356 info := infos.Index(i).Addr().Interface().(backingEntity Doc)
254 » » » info.updated(b.st, all) 357 » » » info.updated(b.st, all, info.mongoId())
255 } 358 }
256 } 359 }
257 return nil 360 return nil
258 } 361 }
259 362
260 // Changed updates the allWatcher's idea of the current state 363 // Changed updates the allWatcher's idea of the current state
261 // in response to the given change. 364 // in response to the given change.
262 func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher .Change) error { 365 func (b *allWatcherStateBacking) Changed(all *multiwatcher.Store, change watcher .Change) error {
263 c, ok := b.collectionByName[change.C] 366 c, ok := b.collectionByName[change.C]
264 if !ok { 367 if !ok {
265 panic(fmt.Errorf("unknown collection %q in fetch request", chang e.C)) 368 panic(fmt.Errorf("unknown collection %q in fetch request", chang e.C))
266 } 369 }
267 doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc ) 370 doc := reflect.New(c.infoSliceType.Elem()).Interface().(backingEntityDoc )
268 // TODO(rog) investigate ways that this can be made more efficient 371 // TODO(rog) investigate ways that this can be made more efficient
269 // than simply fetching each entity in turn. 372 // than simply fetching each entity in turn.
270 err := c.FindId(change.Id).One(doc) 373 err := c.FindId(change.Id).One(doc)
271 if err == mgo.ErrNotFound { 374 if err == mgo.ErrNotFound {
272 return doc.removed(b.st, all, change.Id) 375 return doc.removed(b.st, all, change.Id)
273 } 376 }
274 if err != nil { 377 if err != nil {
275 return err 378 return err
276 } 379 }
277 » return doc.updated(b.st, all) 380 » return doc.updated(b.st, all, change.Id)
278 } 381 }
OLDNEW
« no previous file with comments | « state/apiserver/api_test.go ('k') | state/megawatcher_internal_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b