Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2)

Unified Diff: txzookeeper/managed.py

Issue 5976074: Managed Connections for txzk
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: txzookeeper/managed.py
=== added file 'txzookeeper/managed.py'
--- txzookeeper/managed.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/managed.py 2012-04-04 00:43:36 +0000
@@ -0,0 +1,324 @@
+from functools import partial
+
+import contextlib
+import logging
+import zookeeper
+
+from twisted.internet.defer import (
+ inlineCallbacks, DeferredLock, fail, returnValue)
+
+from client import ZookeeperClient, ClientEvent, NotConnectedException
+from retry import RetryClient
+
+
+class StopWatcher(Exception):
+ pass
+
+
+WATCH_KIND_MAP = {
+ "child": "get_children_and_watch",
+ "exists": "exists_and_watch",
+ "get": "get_and_watch"
+ }
+
+
+log = logging.getLogger("txzk.managed")
+
+
+class Watch(object):
+ """
+ For application driven persistent watches, where the application
+ is manually resetting the watch.
+ """
+
+ __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback")
+
+ def __init__(self, mgr, path, kind, callback):
+ self._mgr = mgr
+ self._path = path
+ self._kind = kind
+ self._callback = callback
+
+ @property
+ def path(self):
+ return self._path
+
+ @property
+ def kind(self):
+ return self._kind
+
+ @contextlib.contextmanager
+ def _ctx(self):
+ mgr = self._mgr
+ del self._mgr
+ try:
+ yield mgr
+ finally:
+ mgr.remove(self)
+
+ @inlineCallbacks
+ def reset(self):
+ with self._ctx() as mgr:
+ yield self._callback(
+ zookeeper.SESSION_EVENT,
+ zookeeper.CONNECTED_STATE,
+ self._path)
+ mgr # keep the flakes happy
fwereade 2012/04/04 13:23:29 Can't you just do: with self._ctx(): ?
hazmat 2012/04/04 18:09:46 thanks, much nicer.
+
+ def __call__(self, *args, **kw):
+ with self._ctx() as mgr:
+ mgr # keep the flakes happy
fwereade 2012/04/04 13:23:29 Ditto
+ return self._callback(*args, **kw)
+
+ def __str__(self):
+ return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback)
+
+
+class WatchManager(object):
+
+ watch_class = Watch
+
+ def __init__(self):
+ self._watches = []
+
+ def add(self, path, watch_type, watcher):
+ w = self.watch_class(self, path, watch_type, watcher)
+ self._watches.append(w)
+ return w
+
+ def remove(self, w):
+ try:
+ self._watches.remove(w)
+ except ValueError:
+ pass
+
+ def iterkeys(self):
+ for w in self._watches:
+ yield (w.path, w.kind)
+
+ def clear(self):
+ del self._watches
+ self._watches = []
+
+ @inlineCallbacks
+ def reset(self, *ignored):
+ watches = self._watches
+ self._watches = []
+
+ for w in watches:
+ try:
+ yield w.reset()
+ except Exception, e:
+ log.error("Error reseting watch %s with session event. %s %r",
+ w, e, e)
+ continue
+
+
+class SessionClient(ZookeeperClient):
+ """A managed client that automatically restablishes ephemerals and
+ triggers watches after reconnecting post session expiration.
+
+ This abstracts the client from session expiration handling. It does
+ come at a cost though.
fwereade 2012/04/04 13:23:29 Please expand ;).
+ """
+
+ def __init__(
+ self, servers=None, session_timeout=None, connect_timeout=4000):
+ """
+ """
+ super(SessionClient, self).__init__(servers, session_timeout)
+ self._connect_timeout = connect_timeout
+ self._watches = WatchManager()
+ self._ephemerals = {}
+ self._reconnect_lock = DeferredLock()
+ self.set_connection_error_callback(self._cb_connection_error)
+
+ @inlineCallbacks
+ def cb_restablish_session(self, e=None):
+ """Called on intercept of session expiration to restablish the session.
fwereade 2012/04/04 13:23:29 I'm pretty sure there are 2 es in re-establish, an
+
+ This will reconnect to zk, restabslish ephemerals, and trigger watches.
fwereade 2012/04/04 13:23:29 I like the word "restabslish", but ditto.
+ """
+ yield self._reconnect_lock.acquire()
+
+ try:
+ # If its been explicitly closed, don't restablish.
+ if self.handle is None:
+ return
+
+ # If its a stale handle, don't restablish
+ try:
+ zookeeper.is_unrecoverable(self.handle)
+ except zookeeper.ZooKeeperException:
+ if e:
+ raise e
+ return
+
+ # Its already been restablished, don't restablish.
+ if not self.unrecoverable:
+ return
+ elif self.connected:
+ self.close()
+ self.handle = 0
+ elif isinstance(self.handle, int):
+ self.handle = 0
+
+ # Restablish
+ assert self.handle == 0
+ yield self._cb_restablish_session().addErrback(
+ self._cb_restablish_errback, e)
+
+ except Exception, e:
+ log.error("error while restablish %r %s" % (e, e))
+ finally:
+ yield self._reconnect_lock.release()
+
+ @inlineCallbacks
+ def _cb_restablish_session(self):
+ """Restablish a new session, and recreate ephemerals and watches.
+ """
+ while 1:
+ # Reconnect
+ if self.handle is None:
+ returnValue(self.handle)
+ try:
+ yield self.connect(timeout=self._connect_timeout)
+ except zookeeper.ZooKeeperException, e:
+ log.exception("Error while connecting %r %s" % (e, e))
+ continue
+ else:
+ break
+
+ items = self._ephemerals.items()
+ self._ephemerals = {}
+
+ for path, e in items:
+ try:
+ yield self.create(
+ path, e['data'], acls=e['acls'], flags=e['flags'])
+ except zookeeper.NodeExistsException:
+ log.error("Attempt to create ephemeral node failed %r", path)
+ yield self._watches.reset()
+
+ def _cb_restablish_errback(self, err, failure):
+ """If there's an error restablishing the session log it.
+ """
+ log.error("Error while trying to restablish connection %s\n%s" % (
+ failure.value, failure.getTraceback()))
+ return failure
+
+ @inlineCallbacks
+ def _cb_connection_error(self, client, error):
+ """Convert session expiration to a transient connection error.
+
+ Dispatches from api usage error.
+ """
+ if not isinstance(error, (
+ zookeeper.SessionExpiredException,
+ NotConnectedException,
+ zookeeper.ClosingException)):
+ raise error
+ yield self._cb_restablish_session()
+ raise zookeeper.ConnectionLossException
+
+ # client connected tracker
+ def _check_connected(self, d):
+ """Clients are automatically reconnected."""
+ if self.connected:
+ return
+
+ if self.handle is None:
+ d.errback(NotConnectedException("not connected"))
+ return d
+
+ c_d = self.cb_restablish_session()
+
+ def after_connected(client):
+ return fail(zookeeper.ConnectionLossException("Retry"))
+
+ c_d.addCallback(after_connected)
+ c_d.chainDeferred(d)
+ return d
+
+ # Dispatch from node watches on session expiration
+ def _watch_session_wrapper(self, watcher, event_type, conn_state, path):
+ """Watch wrapper that diverts session events to a connection callback.
+ """
+ if (event_type == zookeeper.SESSION_EVENT and
+ conn_state == zookeeper.EXPIRED_SESSION_STATE):
+ d = self.cb_restablish_session()
+ d.addErrback(self._cb_restablish_errback)
+ return d
+ if event_type == zookeeper.SESSION_EVENT:
+ if self._session_event_callback:
+ self._session_event_callback(
+ self, ClientEvent(event_type, conn_state, path))
+ else:
+ return watcher(event_type, conn_state, path)
+
+ # Track all watches
+ def _wrap_watcher(self, watcher, watch_type, path):
+ if watcher is None:
+ return watcher
+ if not callable(watcher):
+ raise SyntaxError("invalid watcher")
+
+ # handle conn watcher, separately.
+ if watch_type is None and path is None:
+ return self._zk_thread_callback(
+ self._watch_session_wrapper, watcher)
+
+ return self._zk_thread_callback(
+ partial(
+ self._watch_session_wrapper,
+ self._watches.add(path, watch_type, watcher)))
+
+ # Track ephemerals
+ def _cb_created(self, d, data, acls, flags, result_code, path):
+ if self._check_result(result_code, d):
+ return
+
+ if flags & zookeeper.EPHEMERAL:
+ self._ephemerals[path] = dict(
+ data=data, acls=acls, flags=flags)
+
+ d.callback(path)
+
+ def _cb_deleted(self, d, path, result_code):
+ if self._check_result(result_code, d):
+ return
+
+ self._ephemerals.pop(path, None)
+ d.callback(result_code)
+
+ def _cb_set_acl(self, d, path, acls, result_code):
+ if self._check_result(result_code, d):
+ return
+
+ if path in self._ephemerals:
+ self._ephemerals[path]['acls'] = acls
+
+ d.callback(result_code)
+
+ def _cb_set(self, d, path, data, result_code, node_stat):
+ if self._check_result(result_code, d):
+ return
+
+ if path in self._ephemerals:
+ self._ephemerals[path]['data'] = data
+
+ d.callback(node_stat)
+
+
+class ManagedClient(RetryClient):
+
+ def __init__(self, *args, **kw):
+ client = SessionClient(*args, **kw)
+ super(ManagedClient, self).__init__(
+ client, client.cb_restablish_session)
+
+
+def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000):
+ from retry import RetryClient
+ client = SessionClient(servers, session_timeout, connect_timeout)
+ return RetryClient(client)
« no previous file with comments | « txzookeeper/client.py ('k') | txzookeeper/retry.py » ('j') | txzookeeper/tests/test_managed.py » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 204d58d