OLD | NEW |
(Empty) | |
| 1 # |
| 2 # Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
| 3 # |
| 4 # This file is part of txzookeeper. |
| 5 # |
| 6 # Authors: |
| 7 # Kapil Thangavelu |
| 8 # |
| 9 # txzookeeper is free software: you can redistribute it and/or modify |
| 10 # it under the terms of the GNU Lesser General Public License as published by |
| 11 # the Free Software Foundation, either version 3 of the License, or |
| 12 # (at your option) any later version. |
| 13 # |
| 14 # txzookeeper is distributed in the hope that it will be useful, |
| 15 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 # GNU Lesser General Public License for more details. |
| 18 # |
| 19 # You should have received a copy of the GNU Lesser General Public License |
| 20 # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
| 21 # |
| 22 |
| 23 import zookeeper |
| 24 |
| 25 from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList |
| 26 |
| 27 from txzookeeper.client import ZookeeperClient, ClientEvent |
| 28 from txzookeeper import managed |
| 29 from txzookeeper.tests import ZookeeperTestCase, utils |
| 30 from txzookeeper.tests import test_client |
| 31 |
| 32 |
| 33 class WatchTest(ZookeeperTestCase): |
| 34 """Watch manager and watch tests""" |
| 35 |
| 36 def setUp(self): |
| 37 self.watches = managed.WatchManager() |
| 38 |
| 39 def tearDown(self): |
| 40 self.watches.clear() |
| 41 del self.watches |
| 42 |
| 43 def test_add_remove(self): |
| 44 |
| 45 w = self.watches.add("/foobar", "child", lambda x: 1) |
| 46 self.assertIn( |
| 47 "<Watcher child /foobar", |
| 48 str(w)) |
| 49 self.assertIn(w, self.watches._watches) |
| 50 self.watches.remove(w) |
| 51 self.assertNotIn(w, self.watches._watches) |
| 52 self.watches.remove(w) |
| 53 |
| 54 @inlineCallbacks |
| 55 def test_watch_fire_removes(self): |
| 56 """Firing the watch removes it from the manager. |
| 57 """ |
| 58 w = self.watches.add("/foobar", "child", lambda x: 1) |
| 59 yield w("a") |
| 60 self.assertNotIn(w, self.watches._watches) |
| 61 |
| 62 @inlineCallbacks |
| 63 def test_watch_fire_with_error_removes(self): |
| 64 """Firing the watch removes it from the manager. |
| 65 """ |
| 66 d = Deferred() |
| 67 |
| 68 @inlineCallbacks |
| 69 def cb_error(e): |
| 70 yield d |
| 71 raise ValueError("a") |
| 72 |
| 73 w = self.watches.add("/foobar", "child", lambda x: 1) |
| 74 try: |
| 75 wd = w("a") |
| 76 d.callback(True) |
| 77 yield wd |
| 78 except ValueError: |
| 79 pass |
| 80 self.assertNotIn(w, self.watches._watches) |
| 81 |
| 82 @inlineCallbacks |
| 83 def test_reset_with_error(self): |
| 84 """A callback firing an error on reset is ignored. |
| 85 """ |
| 86 output = self.capture_log("txzk.managed") |
| 87 d = Deferred() |
| 88 results = [] |
| 89 |
| 90 @inlineCallbacks |
| 91 def callback(*args, **kw): |
| 92 results.append((args, kw)) |
| 93 yield d |
| 94 raise ValueError("a") |
| 95 |
| 96 w = self.watches.add("/foobar", "child", callback) |
| 97 reset_done = self.watches.reset() |
| 98 |
| 99 e, _ = results.pop() |
| 100 self.assertEqual( |
| 101 str(ClientEvent(*e)), |
| 102 "<ClientEvent session at '/foobar' state: connected>") |
| 103 d.callback(True) |
| 104 yield reset_done |
| 105 self.assertNotIn(w, self.watches._watches) |
| 106 self.assertIn("Error reseting watch", output.getvalue()) |
| 107 |
| 108 @inlineCallbacks |
| 109 def test_reset(self): |
| 110 """Reset fires a synthentic client event, and clears watches. |
| 111 """ |
| 112 d = Deferred() |
| 113 results = [] |
| 114 |
| 115 def callback(*args, **kw): |
| 116 results.append((args, kw)) |
| 117 return d |
| 118 |
| 119 w = self.watches.add("/foobar", "child", callback) |
| 120 reset_done = self.watches.reset() |
| 121 |
| 122 e, _ = results.pop() |
| 123 self.assertEqual( |
| 124 str(ClientEvent(*e)), |
| 125 "<ClientEvent session at '/foobar' state: connected>") |
| 126 d.callback(True) |
| 127 yield reset_done |
| 128 self.assertNotIn(w, self.watches._watches) |
| 129 |
| 130 |
| 131 class SessionClientTests(test_client.ClientTests): |
| 132 """Run through basic operations with SessionClient.""" |
| 133 timeout = 5 |
| 134 |
| 135 def setUp(self): |
| 136 super(SessionClientTests, self).setUp() |
| 137 self.client = managed.SessionClient("127.0.0.1:2181") |
| 138 |
| 139 def test_client_use_while_disconnected_returns_failure(self): |
| 140 # managed session client reconnects here. |
| 141 return True |
| 142 |
| 143 |
| 144 class SessionClientExpireTests(ZookeeperTestCase): |
| 145 """Verify expiration behavior.""" |
| 146 |
| 147 def setUp(self): |
| 148 super(SessionClientExpireTests, self).setUp() |
| 149 self.client = managed.ManagedClient("127.0.0.1:2181", 3000) |
| 150 self.client2 = None |
| 151 return self.client.connect() |
| 152 |
| 153 @inlineCallbacks |
| 154 def tearDown(self): |
| 155 self.client.close() |
| 156 |
| 157 self.client2 = ZookeeperClient("127.0.0.1:2181") |
| 158 yield self.client2.connect() |
| 159 utils.deleteTree(handle=self.client2.handle) |
| 160 yield self.client2.close() |
| 161 super(SessionClientExpireTests, self).tearDown() |
| 162 |
| 163 @inlineCallbacks |
| 164 def expire_session(self): |
| 165 assert self.client.connected |
| 166 self.client2 = ZookeeperClient(self.client.servers) |
| 167 yield self.client2.connect(client_id=self.client.client_id) |
| 168 yield self.client2.close() |
| 169 # It takes some time to propagate (1/3 session time as ping) |
| 170 yield self.sleep(2) |
| 171 |
| 172 @inlineCallbacks |
| 173 def test_session_expiration_conn(self): |
| 174 session_id = self.client.client_id[0] |
| 175 yield self.client.create("/fo-1", "abc") |
| 176 yield self.expire_session() |
| 177 yield self.client.exists("/") |
| 178 self.assertNotEqual(session_id, self.client.client_id[0]) |
| 179 |
| 180 @inlineCallbacks |
| 181 def test_session_expiration_conn_watch(self): |
| 182 session_id = self.client.client_id[0] |
| 183 yield self.client.create("/fo-1", "abc") |
| 184 yield self.expire_session() |
| 185 yield self.client.exists("/") |
| 186 self.assertNotEqual(session_id, self.client.client_id[0]) |
| 187 |
| 188 @inlineCallbacks |
| 189 def test_invoked_watch_gc(self): |
| 190 c_d, w_d = yield self.client.get_children_and_watch("/") |
| 191 yield c_d |
| 192 yield self.client.create("/foo") |
| 193 yield w_d |
| 194 yield self.expire_session() |
| 195 yield self.client.create("/foo2") |
| 196 # Nothing should blow up |
| 197 yield self.sleep(0.2) |
| 198 |
| 199 @inlineCallbacks |
| 200 def test_ephemeral_and_watch_recreate(self): |
| 201 # Create some ephemeral nodes |
| 202 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
| 203 yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL) |
| 204 |
| 205 # Create some watches |
| 206 g_d, g_w_d = self.client.get_and_watch("/fo-1") |
| 207 yield g_d |
| 208 |
| 209 c_d, c_w_d = self.client.get_children_and_watch("/") |
| 210 yield g_d |
| 211 |
| 212 e_d, e_w_d = self.client.get_children_and_watch("/fo-2") |
| 213 yield e_d |
| 214 |
| 215 # Expire the session |
| 216 yield self.expire_session() |
| 217 |
| 218 # Poof |
| 219 |
| 220 # Ephemerals back |
| 221 c, s = yield self.client.get("/fo-1") |
| 222 self.assertEqual(c, "abc") |
| 223 |
| 224 c, s = yield self.client.get("/fo-2") |
| 225 self.assertEqual(c, "def") |
| 226 |
| 227 # Watches triggered |
| 228 yield DeferredList( |
| 229 [g_w_d, c_w_d, e_w_d], |
| 230 fireOnOneErrback=True, consumeErrors=True) |
| 231 |
| 232 self.assertEqual( |
| 233 [str(d.result) for d in (g_w_d, c_w_d, e_w_d)], |
| 234 ["<ClientEvent session at '/fo-1' state: connected>", |
| 235 "<ClientEvent session at '/' state: connected>", |
| 236 "<ClientEvent session at '/fo-2' state: connected>"]) |
| 237 |
| 238 @inlineCallbacks |
| 239 def test_ephemeral_content_modification(self): |
| 240 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
| 241 yield self.client.set("/fo-1", "def") |
| 242 yield self.expire_session() |
| 243 c, s = yield self.client.get("/fo-1") |
| 244 self.assertEqual(c, "def") |
| 245 |
| 246 @inlineCallbacks |
| 247 def test_ephemeral_acl_modification(self): |
| 248 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
| 249 acl = [test_client.PUBLIC_ACL, |
| 250 dict(scheme="digest", |
| 251 id="zebra:moon", |
| 252 perms=zookeeper.PERM_ALL)] |
| 253 yield self.client.set_acl("/fo-1", acl) |
| 254 yield self.expire_session() |
| 255 n_acl, stat = yield self.client.get_acl("/fo-1") |
| 256 self.assertEqual(acl, n_acl) |
| 257 |
| 258 @inlineCallbacks |
| 259 def test_ephemeral_deletion(self): |
| 260 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
| 261 yield self.client.delete("/fo-1") |
| 262 yield self.expire_session() |
| 263 self.assertFalse((yield self.client.exists("/fo-1"))) |
OLD | NEW |