LEFT | RIGHT |
(no file at all) | |
| 1 // Copyright 2014 Canonical Ltd. |
| 2 // Licensed under the AGPLv3, see LICENCE file for details. |
| 3 |
| 4 package peergrouper |
| 5 |
| 6 import ( |
| 7 "errors" |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 gc "launchpad.net/gocheck" |
| 12 |
| 13 "launchpad.net/juju-core/juju/testing" |
| 14 coretesting "launchpad.net/juju-core/testing" |
| 15 jc "launchpad.net/juju-core/testing/checkers" |
| 16 "launchpad.net/juju-core/testing/testbase" |
| 17 "launchpad.net/juju-core/utils/voyeur" |
| 18 "launchpad.net/juju-core/worker" |
| 19 ) |
| 20 |
| 21 type workerJujuConnSuite struct { |
| 22 testing.JujuConnSuite |
| 23 } |
| 24 |
| 25 var _ = gc.Suite(&workerJujuConnSuite{}) |
| 26 |
| 27 func (s *workerJujuConnSuite) TestStartStop(c *gc.C) { |
| 28 w, err := New(s.State) |
| 29 c.Assert(err, gc.IsNil) |
| 30 err = worker.Stop(w) |
| 31 c.Assert(err, gc.IsNil) |
| 32 } |
| 33 |
| 34 type workerSuite struct { |
| 35 testbase.LoggingSuite |
| 36 } |
| 37 |
| 38 var _ = gc.Suite(&workerSuite{}) |
| 39 |
| 40 func (s *workerSuite) SetUpTest(c *gc.C) { |
| 41 s.LoggingSuite.SetUpTest(c) |
| 42 resetErrors() |
| 43 } |
| 44 |
| 45 // initState initializes the fake state with a single |
| 46 // replicaset member and numMachines machines |
| 47 // primed to vote. |
| 48 func initState(c *gc.C, st *fakeState, numMachines int) { |
| 49 var ids []string |
| 50 for i := 10; i < 10+numMachines; i++ { |
| 51 id := fmt.Sprint(i) |
| 52 m := st.addMachine(id, true) |
| 53 m.setStateHostPort(fmt.Sprintf("0.1.2.%d:%d", i, mongoPort)) |
| 54 ids = append(ids, id) |
| 55 } |
| 56 st.machine("10").SetHasVote(true) |
| 57 st.setStateServers(ids...) |
| 58 st.session.Set(mkMembers("0v")) |
| 59 st.session.setStatus(mkStatuses("0p")) |
| 60 st.check = checkInvariants |
| 61 } |
| 62 |
| 63 func (s *workerSuite) TestSetsAndUpdatesMembers(c *gc.C) { |
| 64 testbase.PatchValue(&pollInterval, 5*time.Millisecond) |
| 65 |
| 66 st := newFakeState() |
| 67 initState(c, st, 3) |
| 68 |
| 69 memberWatcher := st.session.members.Watch() |
| 70 mustNext(c, memberWatcher) |
| 71 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v")) |
| 72 |
| 73 logger.Infof("starting worker") |
| 74 w := newWorker(st) |
| 75 defer func() { |
| 76 c.Check(worker.Stop(w), gc.IsNil) |
| 77 }() |
| 78 |
| 79 // Wait for the worker to set the initial members. |
| 80 mustNext(c, memberWatcher) |
| 81 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v 1 2")) |
| 82 |
| 83 // Update the status of the new members |
| 84 // and check that they become voting. |
| 85 c.Logf("updating new member status") |
| 86 st.session.setStatus(mkStatuses("0p 1s 2s")) |
| 87 mustNext(c, memberWatcher) |
| 88 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v 1v 2v")) |
| 89 |
| 90 c.Logf("adding another machine") |
| 91 // Add another machine. |
| 92 m13 := st.addMachine("13", false) |
| 93 m13.setStateHostPort(fmt.Sprintf("0.1.2.%d:%d", 13, mongoPort)) |
| 94 st.setStateServers("10", "11", "12", "13") |
| 95 |
| 96 c.Logf("waiting for new member to be added") |
| 97 mustNext(c, memberWatcher) |
| 98 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v 1v 2v 3")) |
| 99 |
| 100 // Remove vote from an existing member; |
| 101 // and give it to the new machine. |
| 102 // Also set the status of the new machine to |
| 103 // healthy. |
| 104 c.Logf("removing vote from machine 10 and adding it to machine 13") |
| 105 st.machine("10").setWantsVote(false) |
| 106 st.machine("13").setWantsVote(true) |
| 107 |
| 108 st.session.setStatus(mkStatuses("0p 1s 2s 3s")) |
| 109 |
| 110 // Check that the new machine gets the vote and the |
| 111 // old machine loses it. |
| 112 c.Logf("waiting for vote switch") |
| 113 mustNext(c, memberWatcher) |
| 114 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0 1v 2v 3v")) |
| 115 |
| 116 c.Logf("removing old machine") |
| 117 // Remove the old machine. |
| 118 st.removeMachine("10") |
| 119 st.setStateServers("11", "12", "13") |
| 120 |
| 121 // Check that it's removed from the members. |
| 122 c.Logf("waiting for removal") |
| 123 mustNext(c, memberWatcher) |
| 124 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("1v 2v 3v")) |
| 125 } |
| 126 |
| 127 func (s *workerSuite) TestAddressChange(c *gc.C) { |
| 128 st := newFakeState() |
| 129 initState(c, st, 3) |
| 130 |
| 131 memberWatcher := st.session.members.Watch() |
| 132 mustNext(c, memberWatcher) |
| 133 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v")) |
| 134 |
| 135 logger.Infof("starting worker") |
| 136 w := newWorker(st) |
| 137 defer func() { |
| 138 c.Check(worker.Stop(w), gc.IsNil) |
| 139 }() |
| 140 |
| 141 // Wait for the worker to set the initial members. |
| 142 mustNext(c, memberWatcher) |
| 143 c.Assert(memberWatcher.Value(), jc.DeepEquals, mkMembers("0v 1 2")) |
| 144 |
| 145 // Change an address and wait for it to be changed in the |
| 146 // members. |
| 147 st.machine("11").setStateHostPort("0.1.99.99:9876") |
| 148 |
| 149 mustNext(c, memberWatcher) |
| 150 expectMembers := mkMembers("0v 1 2") |
| 151 expectMembers[1].Address = "0.1.99.99:9876" |
| 152 c.Assert(memberWatcher.Value(), jc.DeepEquals, expectMembers) |
| 153 } |
| 154 |
| 155 var fatalErrorsTests = []struct { |
| 156 errPattern string |
| 157 err error |
| 158 expectErr string |
| 159 }{{ |
| 160 errPattern: "State.StateServerInfo", |
| 161 expectErr: "cannot get state server info: sample", |
| 162 }, { |
| 163 errPattern: "Machine.SetHasVote 11 true", |
| 164 expectErr: `cannot set voting status of "11" to true: sample`, |
| 165 }, { |
| 166 errPattern: "Session.CurrentStatus", |
| 167 expectErr: "cannot get replica set status: sample", |
| 168 }, { |
| 169 errPattern: "Session.CurrentMembers", |
| 170 expectErr: "cannot get replica set members: sample", |
| 171 }, { |
| 172 errPattern: "State.Machine *", |
| 173 expectErr: `cannot get machine "10": sample`, |
| 174 }} |
| 175 |
| 176 func (s *workerSuite) TestFatalErrors(c *gc.C) { |
| 177 testbase.PatchValue(&pollInterval, 5*time.Millisecond) |
| 178 for i, test := range fatalErrorsTests { |
| 179 c.Logf("test %d: %s -> %s", i, test.errPattern, test.expectErr) |
| 180 resetErrors() |
| 181 st := newFakeState() |
| 182 st.session.InstantlyReady = true |
| 183 initState(c, st, 3) |
| 184 setErrorFor(test.errPattern, errors.New("sample")) |
| 185 w := newWorker(st) |
| 186 done := make(chan error) |
| 187 go func() { |
| 188 done <- w.Wait() |
| 189 }() |
| 190 select { |
| 191 case err := <-done: |
| 192 c.Assert(err, gc.ErrorMatches, test.expectErr) |
| 193 case <-time.After(coretesting.LongWait): |
| 194 c.Fatalf("timed out waiting for error") |
| 195 } |
| 196 } |
| 197 } |
| 198 |
| 199 func (s *workerSuite) TestSetMembersErrorIsNotFatal(c *gc.C) { |
| 200 st := newFakeState() |
| 201 initState(c, st, 3) |
| 202 st.session.setStatus(mkStatuses("0p 1s 2s")) |
| 203 var isSet voyeur.Value |
| 204 count := 0 |
| 205 setErrorFuncFor("Session.Set", func() error { |
| 206 isSet.Set(count) |
| 207 count++ |
| 208 return errors.New("sample") |
| 209 }) |
| 210 testbase.PatchValue(&retryInterval, 5*time.Millisecond) |
| 211 w := newWorker(st) |
| 212 defer func() { |
| 213 c.Check(worker.Stop(w), gc.IsNil) |
| 214 }() |
| 215 isSetWatcher := isSet.Watch() |
| 216 n0, _ := mustNext(c, isSetWatcher) |
| 217 // The worker should not retry more than every |
| 218 // retryInterval. |
| 219 time.Sleep(retryInterval * 10) |
| 220 n1, _ := mustNext(c, isSetWatcher) |
| 221 c.Assert(n0.(int)-n0.(int), jc.LessThan, 11) |
| 222 c.Assert(n1, jc.GreaterThan, n0) |
| 223 } |
| 224 |
| 225 func mustNext(c *gc.C, w *voyeur.Watcher) (val interface{}, ok bool) { |
| 226 done := make(chan struct{}) |
| 227 go func() { |
| 228 c.Logf("mustNext %p", w) |
| 229 ok = w.Next() |
| 230 val = w.Value() |
| 231 c.Logf("mustNext done %p, ok %v", w, ok) |
| 232 done <- struct{}{} |
| 233 }() |
| 234 select { |
| 235 case <-done: |
| 236 return |
| 237 case <-time.After(coretesting.LongWait): |
| 238 c.Fatalf("timed out waiting for value to be set") |
| 239 } |
| 240 panic("unreachable") |
| 241 } |
LEFT | RIGHT |