Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(257)

Side by Side Diff: state/state.go

Issue 9175043: state: testable transactions
Patch Set: Created 11 years, 11 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 // The state package enables reading, observing, and changing 1 // The state package enables reading, observing, and changing
2 // the state stored in MongoDB of a whole environment 2 // the state stored in MongoDB of a whole environment
3 // managed by juju. 3 // managed by juju.
4 package state 4 package state
5 5
6 import ( 6 import (
7 "fmt" 7 "fmt"
8 "labix.org/v2/mgo" 8 "labix.org/v2/mgo"
9 "labix.org/v2/mgo/bson" 9 "labix.org/v2/mgo/bson"
10 "labix.org/v2/mgo/txn" 10 "labix.org/v2/mgo/txn"
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
123 settings *mgo.Collection 123 settings *mgo.Collection
124 settingsrefs *mgo.Collection 124 settingsrefs *mgo.Collection
125 constraints *mgo.Collection 125 constraints *mgo.Collection
126 units *mgo.Collection 126 units *mgo.Collection
127 users *mgo.Collection 127 users *mgo.Collection
128 presence *mgo.Collection 128 presence *mgo.Collection
129 cleanups *mgo.Collection 129 cleanups *mgo.Collection
130 annotations *mgo.Collection 130 annotations *mgo.Collection
131 statuses *mgo.Collection 131 statuses *mgo.Collection
132 runner *txn.Runner 132 runner *txn.Runner
133 txnHooks chan ([]func())
133 watcher *watcher.Watcher 134 watcher *watcher.Watcher
134 pwatcher *presence.Watcher 135 pwatcher *presence.Watcher
135 // mu guards allManager. 136 // mu guards allManager.
136 mu sync.Mutex 137 mu sync.Mutex
137 allManager *multiwatcher.StoreManager 138 allManager *multiwatcher.StoreManager
138 } 139 }
139 140
141 // runTxn runs the supplied operations as a single mgo/txn transaction, and
142 // includes a mechanism whereby tests can use SetTxnHooks to induce arbitrary
143 // state mutations before and after particular transactions are attempted.
144 func (st *State) runTxn(ops []txn.Op) error {
thumper 2013/05/09 16:13:58 I have to say that this function name has one of m
fwereade 2013/05/15 09:29:22 I'm fine with full names in general. Keep hassling
145 txnHooks := <-st.txnHooks
146 st.txnHooks <- nil
147 switch len(txnHooks) {
148 default:
149 defer func() {
150 if txnHooks[1] != nil {
151 txnHooks[1]()
152 }
153 if <-st.txnHooks != nil {
154 panic("concurrent use of transaction hooks")
dimitern 2013/05/03 15:06:29 this is to safeguard against txns running in gorou
fwereade 2013/05/15 09:29:22 Yeah -- concurrent use of this mechanism is not sa
155 }
156 st.txnHooks <- txnHooks[2:]
157 }()
158 fallthrough
159 case 1:
160 if txnHooks[0] != nil {
161 txnHooks[0]()
162 }
163 case 0:
164 }
165 return st.runner.Run(ops, "", nil)
166 }
167
140 func (st *State) Watch() *multiwatcher.Watcher { 168 func (st *State) Watch() *multiwatcher.Watcher {
141 st.mu.Lock() 169 st.mu.Lock()
142 if st.allManager == nil { 170 if st.allManager == nil {
143 st.allManager = multiwatcher.NewStoreManager(newAllWatcherStateB acking(st)) 171 st.allManager = multiwatcher.NewStoreManager(newAllWatcherStateB acking(st))
144 } 172 }
145 st.mu.Unlock() 173 st.mu.Unlock()
146 return multiwatcher.NewWatcher(st.allManager) 174 return multiwatcher.NewWatcher(st.allManager)
147 } 175 }
148 176
149 func (st *State) EnvironConfig() (*config.Config, error) { 177 func (st *State) EnvironConfig() (*config.Config, error) {
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
259 Series: series, 287 Series: series,
260 InstanceId: instanceId, 288 InstanceId: instanceId,
261 Nonce: nonce, 289 Nonce: nonce,
262 Jobs: jobs, 290 Jobs: jobs,
263 } 291 }
264 mdoc, ops, err := st.addMachineOps(mdoc, cons) 292 mdoc, ops, err := st.addMachineOps(mdoc, cons)
265 if err != nil { 293 if err != nil {
266 return nil, err 294 return nil, err
267 } 295 }
268 296
269 » err = st.runner.Run(ops, "", nil) 297 » err = st.runTxn(ops)
270 if err != nil { 298 if err != nil {
271 return nil, err 299 return nil, err
272 } 300 }
273 // Refresh to pick the txn-revno. 301 // Refresh to pick the txn-revno.
274 m = newMachine(st, mdoc) 302 m = newMachine(st, mdoc)
275 if err = m.Refresh(); err != nil { 303 if err = m.Refresh(); err != nil {
276 return nil, err 304 return nil, err
277 } 305 }
278 return m, nil 306 return m, nil
279 } 307 }
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after
569 // Collect peer relation addition operations. 597 // Collect peer relation addition operations.
570 peerOps, err := st.addPeerRelationsOps(name, peers) 598 peerOps, err := st.addPeerRelationsOps(name, peers)
571 if err != nil { 599 if err != nil {
572 return nil, err 600 return nil, err
573 } 601 }
574 ops = append(ops, peerOps...) 602 ops = append(ops, peerOps...)
575 603
576 // Run the transaction; happily, there's never any reason to retry, 604 // Run the transaction; happily, there's never any reason to retry,
577 // because all the possible failed assertions imply that the service 605 // because all the possible failed assertions imply that the service
578 // already exists. 606 // already exists.
579 » if err := st.runner.Run(ops, "", nil); err == txn.ErrAborted { 607 » if err := st.runTxn(ops); err == txn.ErrAborted {
580 return nil, fmt.Errorf("service already exists") 608 return nil, fmt.Errorf("service already exists")
581 } else if err != nil { 609 } else if err != nil {
582 return nil, err 610 return nil, err
583 } 611 }
584 // Refresh to pick the txn-revno. 612 // Refresh to pick the txn-revno.
585 if err = svc.Refresh(); err != nil { 613 if err = svc.Refresh(); err != nil {
586 return nil, err 614 return nil, err
587 } 615 }
588 return svc, nil 616 return svc, nil
589 } 617 }
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after
811 Endpoints: eps, 839 Endpoints: eps,
812 Life: Alive, 840 Life: Alive,
813 } 841 }
814 ops = append(ops, txn.Op{ 842 ops = append(ops, txn.Op{
815 C: st.relations.Name, 843 C: st.relations.Name,
816 Id: doc.Key, 844 Id: doc.Key,
817 Assert: txn.DocMissing, 845 Assert: txn.DocMissing,
818 Insert: doc, 846 Insert: doc,
819 }) 847 })
820 // Run the transaction, and retry on abort. 848 // Run the transaction, and retry on abort.
821 » » if err = st.runner.Run(ops, "", nil); err == txn.ErrAborted { 849 » » if err = st.runTxn(ops); err == txn.ErrAborted {
822 continue 850 continue
823 } else if err != nil { 851 } else if err != nil {
824 return nil, err 852 return nil, err
825 } 853 }
826 return &Relation{st, *doc}, nil 854 return &Relation{st, *doc}, nil
827 } 855 }
828 return nil, ErrExcessiveContention 856 return nil, ErrExcessiveContention
829 } 857 }
830 858
831 // EndpointsRelation returns the existing relation with the given endpoints. 859 // EndpointsRelation returns the existing relation with the given endpoints.
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
1036 // delete directly. 1064 // delete directly.
1037 if _, err := c.RemoveAll(sel); err != nil { 1065 if _, err := c.RemoveAll(sel); err != nil {
1038 return fmt.Errorf("cannot remove documents marke d for cleanup: %v", err) 1066 return fmt.Errorf("cannot remove documents marke d for cleanup: %v", err)
1039 } 1067 }
1040 } 1068 }
1041 ops := []txn.Op{{ 1069 ops := []txn.Op{{
1042 C: st.cleanups.Name, 1070 C: st.cleanups.Name,
1043 Id: doc.Id, 1071 Id: doc.Id,
1044 Remove: true, 1072 Remove: true,
1045 }} 1073 }}
1046 » » if err := st.runner.Run(ops, "", nil); err != nil { 1074 » » if err := st.runTxn(ops); err != nil {
1047 return fmt.Errorf("cannot remove empty cleanup document: %v", err) 1075 return fmt.Errorf("cannot remove empty cleanup document: %v", err)
1048 } 1076 }
1049 } 1077 }
1050 if err := iter.Err(); err != nil { 1078 if err := iter.Err(); err != nil {
1051 return fmt.Errorf("cannot read cleanup document: %v", err) 1079 return fmt.Errorf("cannot read cleanup document: %v", err)
1052 } 1080 }
1053 return nil 1081 return nil
1054 } 1082 }
1055 1083
1056 var tagPrefix = map[byte]string{ 1084 var tagPrefix = map[byte]string{
1057 'm': "machine-", 1085 'm': "machine-",
1058 's': "service-", 1086 's': "service-",
1059 'u': "unit-", 1087 'u': "unit-",
1060 'e': "environment-", 1088 'e': "environment-",
1061 } 1089 }
1062 1090
1063 func tagForGlobalKey(key string) (string, bool) { 1091 func tagForGlobalKey(key string) (string, bool) {
1064 if len(key) < 3 || key[1] != '#' { 1092 if len(key) < 3 || key[1] != '#' {
1065 return "", false 1093 return "", false
1066 } 1094 }
1067 p, ok := tagPrefix[key[0]] 1095 p, ok := tagPrefix[key[0]]
1068 if !ok { 1096 if !ok {
1069 return "", false 1097 return "", false
1070 } 1098 }
1071 return p + key[2:], true 1099 return p + key[2:], true
1072 } 1100 }
OLDNEW
« state/machine_test.go ('K') | « state/settings.go ('k') | state/unit.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b