Left: | ||
Right: |
OLD | NEW |
---|---|
1 // The state package enables reading, observing, and changing | 1 // The state package enables reading, observing, and changing |
2 // the state stored in MongoDB of a whole environment | 2 // the state stored in MongoDB of a whole environment |
3 // managed by juju. | 3 // managed by juju. |
4 package state | 4 package state |
5 | 5 |
6 import ( | 6 import ( |
7 "fmt" | 7 "fmt" |
8 "labix.org/v2/mgo" | 8 "labix.org/v2/mgo" |
9 "labix.org/v2/mgo/bson" | 9 "labix.org/v2/mgo/bson" |
10 "labix.org/v2/mgo/txn" | 10 "labix.org/v2/mgo/txn" |
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
123 settings *mgo.Collection | 123 settings *mgo.Collection |
124 settingsrefs *mgo.Collection | 124 settingsrefs *mgo.Collection |
125 constraints *mgo.Collection | 125 constraints *mgo.Collection |
126 units *mgo.Collection | 126 units *mgo.Collection |
127 users *mgo.Collection | 127 users *mgo.Collection |
128 presence *mgo.Collection | 128 presence *mgo.Collection |
129 cleanups *mgo.Collection | 129 cleanups *mgo.Collection |
130 annotations *mgo.Collection | 130 annotations *mgo.Collection |
131 statuses *mgo.Collection | 131 statuses *mgo.Collection |
132 runner *txn.Runner | 132 runner *txn.Runner |
133 txnHooks chan ([]func()) | |
133 watcher *watcher.Watcher | 134 watcher *watcher.Watcher |
134 pwatcher *presence.Watcher | 135 pwatcher *presence.Watcher |
135 // mu guards allManager. | 136 // mu guards allManager. |
136 mu sync.Mutex | 137 mu sync.Mutex |
137 allManager *multiwatcher.StoreManager | 138 allManager *multiwatcher.StoreManager |
138 } | 139 } |
139 | 140 |
141 // runTxn runs the supplied operations as a single mgo/txn transaction, and | |
142 // includes a mechanism whereby tests can use SetTxnHooks to induce arbitrary | |
143 // state mutations before and after particular transactions are attempted. | |
144 func (st *State) runTxn(ops []txn.Op) error { | |
thumper
2013/05/09 16:13:58
I have to say that this function name has one of m
fwereade
2013/05/15 09:29:22
I'm fine with full names in general. Keep hassling
| |
145 txnHooks := <-st.txnHooks | |
146 st.txnHooks <- nil | |
147 switch len(txnHooks) { | |
148 default: | |
149 defer func() { | |
150 if txnHooks[1] != nil { | |
151 txnHooks[1]() | |
152 } | |
153 if <-st.txnHooks != nil { | |
154 panic("concurrent use of transaction hooks") | |
dimitern
2013/05/03 15:06:29
this is to safeguard against txns running in gorou
fwereade
2013/05/15 09:29:22
Yeah -- concurrent use of this mechanism is not sa
| |
155 } | |
156 st.txnHooks <- txnHooks[2:] | |
157 }() | |
158 fallthrough | |
159 case 1: | |
160 if txnHooks[0] != nil { | |
161 txnHooks[0]() | |
162 } | |
163 case 0: | |
164 } | |
165 return st.runner.Run(ops, "", nil) | |
166 } | |
167 | |
140 func (st *State) Watch() *multiwatcher.Watcher { | 168 func (st *State) Watch() *multiwatcher.Watcher { |
141 st.mu.Lock() | 169 st.mu.Lock() |
142 if st.allManager == nil { | 170 if st.allManager == nil { |
143 st.allManager = multiwatcher.NewStoreManager(newAllWatcherStateB acking(st)) | 171 st.allManager = multiwatcher.NewStoreManager(newAllWatcherStateB acking(st)) |
144 } | 172 } |
145 st.mu.Unlock() | 173 st.mu.Unlock() |
146 return multiwatcher.NewWatcher(st.allManager) | 174 return multiwatcher.NewWatcher(st.allManager) |
147 } | 175 } |
148 | 176 |
149 func (st *State) EnvironConfig() (*config.Config, error) { | 177 func (st *State) EnvironConfig() (*config.Config, error) { |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
259 Series: series, | 287 Series: series, |
260 InstanceId: instanceId, | 288 InstanceId: instanceId, |
261 Nonce: nonce, | 289 Nonce: nonce, |
262 Jobs: jobs, | 290 Jobs: jobs, |
263 } | 291 } |
264 mdoc, ops, err := st.addMachineOps(mdoc, cons) | 292 mdoc, ops, err := st.addMachineOps(mdoc, cons) |
265 if err != nil { | 293 if err != nil { |
266 return nil, err | 294 return nil, err |
267 } | 295 } |
268 | 296 |
269 » err = st.runner.Run(ops, "", nil) | 297 » err = st.runTxn(ops) |
270 if err != nil { | 298 if err != nil { |
271 return nil, err | 299 return nil, err |
272 } | 300 } |
273 // Refresh to pick the txn-revno. | 301 // Refresh to pick the txn-revno. |
274 m = newMachine(st, mdoc) | 302 m = newMachine(st, mdoc) |
275 if err = m.Refresh(); err != nil { | 303 if err = m.Refresh(); err != nil { |
276 return nil, err | 304 return nil, err |
277 } | 305 } |
278 return m, nil | 306 return m, nil |
279 } | 307 } |
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
569 // Collect peer relation addition operations. | 597 // Collect peer relation addition operations. |
570 peerOps, err := st.addPeerRelationsOps(name, peers) | 598 peerOps, err := st.addPeerRelationsOps(name, peers) |
571 if err != nil { | 599 if err != nil { |
572 return nil, err | 600 return nil, err |
573 } | 601 } |
574 ops = append(ops, peerOps...) | 602 ops = append(ops, peerOps...) |
575 | 603 |
576 // Run the transaction; happily, there's never any reason to retry, | 604 // Run the transaction; happily, there's never any reason to retry, |
577 // because all the possible failed assertions imply that the service | 605 // because all the possible failed assertions imply that the service |
578 // already exists. | 606 // already exists. |
579 » if err := st.runner.Run(ops, "", nil); err == txn.ErrAborted { | 607 » if err := st.runTxn(ops); err == txn.ErrAborted { |
580 return nil, fmt.Errorf("service already exists") | 608 return nil, fmt.Errorf("service already exists") |
581 } else if err != nil { | 609 } else if err != nil { |
582 return nil, err | 610 return nil, err |
583 } | 611 } |
584 // Refresh to pick the txn-revno. | 612 // Refresh to pick the txn-revno. |
585 if err = svc.Refresh(); err != nil { | 613 if err = svc.Refresh(); err != nil { |
586 return nil, err | 614 return nil, err |
587 } | 615 } |
588 return svc, nil | 616 return svc, nil |
589 } | 617 } |
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
811 Endpoints: eps, | 839 Endpoints: eps, |
812 Life: Alive, | 840 Life: Alive, |
813 } | 841 } |
814 ops = append(ops, txn.Op{ | 842 ops = append(ops, txn.Op{ |
815 C: st.relations.Name, | 843 C: st.relations.Name, |
816 Id: doc.Key, | 844 Id: doc.Key, |
817 Assert: txn.DocMissing, | 845 Assert: txn.DocMissing, |
818 Insert: doc, | 846 Insert: doc, |
819 }) | 847 }) |
820 // Run the transaction, and retry on abort. | 848 // Run the transaction, and retry on abort. |
821 » » if err = st.runner.Run(ops, "", nil); err == txn.ErrAborted { | 849 » » if err = st.runTxn(ops); err == txn.ErrAborted { |
822 continue | 850 continue |
823 } else if err != nil { | 851 } else if err != nil { |
824 return nil, err | 852 return nil, err |
825 } | 853 } |
826 return &Relation{st, *doc}, nil | 854 return &Relation{st, *doc}, nil |
827 } | 855 } |
828 return nil, ErrExcessiveContention | 856 return nil, ErrExcessiveContention |
829 } | 857 } |
830 | 858 |
831 // EndpointsRelation returns the existing relation with the given endpoints. | 859 // EndpointsRelation returns the existing relation with the given endpoints. |
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1036 // delete directly. | 1064 // delete directly. |
1037 if _, err := c.RemoveAll(sel); err != nil { | 1065 if _, err := c.RemoveAll(sel); err != nil { |
1038 return fmt.Errorf("cannot remove documents marke d for cleanup: %v", err) | 1066 return fmt.Errorf("cannot remove documents marke d for cleanup: %v", err) |
1039 } | 1067 } |
1040 } | 1068 } |
1041 ops := []txn.Op{{ | 1069 ops := []txn.Op{{ |
1042 C: st.cleanups.Name, | 1070 C: st.cleanups.Name, |
1043 Id: doc.Id, | 1071 Id: doc.Id, |
1044 Remove: true, | 1072 Remove: true, |
1045 }} | 1073 }} |
1046 » » if err := st.runner.Run(ops, "", nil); err != nil { | 1074 » » if err := st.runTxn(ops); err != nil { |
1047 return fmt.Errorf("cannot remove empty cleanup document: %v", err) | 1075 return fmt.Errorf("cannot remove empty cleanup document: %v", err) |
1048 } | 1076 } |
1049 } | 1077 } |
1050 if err := iter.Err(); err != nil { | 1078 if err := iter.Err(); err != nil { |
1051 return fmt.Errorf("cannot read cleanup document: %v", err) | 1079 return fmt.Errorf("cannot read cleanup document: %v", err) |
1052 } | 1080 } |
1053 return nil | 1081 return nil |
1054 } | 1082 } |
1055 | 1083 |
1056 var tagPrefix = map[byte]string{ | 1084 var tagPrefix = map[byte]string{ |
1057 'm': "machine-", | 1085 'm': "machine-", |
1058 's': "service-", | 1086 's': "service-", |
1059 'u': "unit-", | 1087 'u': "unit-", |
1060 'e': "environment-", | 1088 'e': "environment-", |
1061 } | 1089 } |
1062 | 1090 |
1063 func tagForGlobalKey(key string) (string, bool) { | 1091 func tagForGlobalKey(key string) (string, bool) { |
1064 if len(key) < 3 || key[1] != '#' { | 1092 if len(key) < 3 || key[1] != '#' { |
1065 return "", false | 1093 return "", false |
1066 } | 1094 } |
1067 p, ok := tagPrefix[key[0]] | 1095 p, ok := tagPrefix[key[0]] |
1068 if !ok { | 1096 if !ok { |
1069 return "", false | 1097 return "", false |
1070 } | 1098 } |
1071 return p + key[2:], true | 1099 return p + key[2:], true |
1072 } | 1100 } |
OLD | NEW |