Left: | ||
Right: |
OLD | NEW |
---|---|
1 import glob | 1 import glob |
2 import grp | 2 import grp |
3 import errno | 3 import errno |
4 import os | 4 import os |
5 import pwd | 5 import pwd |
6 import tempfile | 6 import tempfile |
7 import time | 7 import time |
8 import shutil | 8 import shutil |
9 import signal | 9 import signal |
10 import subprocess | 10 import subprocess |
11 | 11 |
12 from twisted.internet.threads import deferToThread | 12 from twisted.internet.threads import deferToThread |
13 | 13 |
14 CLIENT_SESSION_TIMEOUT = 30000 | |
14 | 15 |
15 zookeeper_script_template = """\ | 16 zookeeper_script_template = """\ |
16 #!/bin/bash | 17 #!/bin/bash |
17 java \ | 18 java \ |
18 -cp "%(class_path)s" \ | 19 -cp "%(class_path)s" \ |
19 -Dzookeeper.log.dir=%(log_dir)s \ | 20 -Dzookeeper.log.dir=%(log_dir)s \ |
20 -Dzookeeper.root.logger=INFO,CONSOLE \ | 21 -Dzookeeper.root.logger=INFO,CONSOLE \ |
21 -Dlog4j.configuration=file:%(log_config_path)s \ | 22 -Dlog4j.configuration=file:%(log_config_path)s \ |
22 "org.apache.zookeeper.server.quorum.QuorumPeerMain" \ | 23 "org.apache.zookeeper.server.quorum.QuorumPeerMain" \ |
23 %(config_path)s & | 24 %(config_path)s & |
24 /bin/echo -n $! > "%(pid_path)s" | 25 /bin/echo -n $! > "%(pid_path)s" |
25 """ | 26 """ |
26 | 27 |
27 log4j_properties = """ | 28 log4j_properties = """ |
28 # DEFAULT: console appender only | 29 # DEFAULT: console appender only |
29 log4j.rootLogger=INFO, ROLLINGFILE | 30 log4j.rootLogger=INFO, ROLLINGFILE |
30 log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout | 31 log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout |
31 log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1} @%L] - %m%n | 32 log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1} @%L] - %m%n |
32 log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender | 33 log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender |
33 log4j.appender.ROLLINGFILE.Threshold=DEBUG | 34 log4j.appender.ROLLINGFILE.Threshold=DEBUG |
34 log4j.appender.ROLLINGFILE.File=/dev/null | 35 log4j.appender.ROLLINGFILE.File=/dev/null |
35 """ | 36 """ |
36 | 37 |
37 zookeeper_conf_template = """ | 38 zookeeper_conf_template = """ |
38 tickTime=2000 | 39 tickTime=2000 |
39 dataDir=%s | 40 dataDir=%s |
40 clientPort=%s | 41 clientPort=%s |
41 maxClientCnxns=500 | 42 maxClientCnxns=500 |
43 minSessionTimeout=%d | |
44 maxSessionTimeout=%d | |
42 """ | 45 """ |
43 | 46 |
44 | 47 |
45 def check_zookeeper(): | 48 def check_zookeeper(): |
46 """Check for package installation of zookeeper.""" | 49 """Check for package installation of zookeeper.""" |
47 return os.path.exists("/usr/share/java/zookeeper.jar") | 50 return os.path.exists("/usr/share/java/zookeeper.jar") |
48 | 51 |
49 | 52 |
50 class Zookeeper(object): | 53 class Zookeeper(object): |
51 | 54 |
52 def __init__(self, run_dir, port=None, host=None, | 55 def __init__(self, run_dir, port=None, host=None, |
53 zk_location="system", user=None, group=None, | 56 zk_location="system", user=None, group=None, |
54 use_deferred=True, fsync=True): | 57 use_deferred=True, min_session_timeout=30000, |
bcsaller
2013/02/01 17:12:20
Should you use the constant above for min and defi
hazmat
2013/02/01 17:50:14
makes sense, done.
| |
58 max_session_timeout=60000, fsync=True): | |
55 """ | 59 """ |
56 :param run_dir: Directory to store all zookeeper instance related data. | 60 :param run_dir: Directory to store all zookeeper instance related data. |
57 :param port: The port zookeeper should run on. | 61 :param port: The port zookeeper should run on. |
58 :param zk_location: Directory to find zk jars or dev checkout, | 62 :param zk_location: Directory to find zk jars or dev checkout, |
59 defaults to using 'system' indicating a package installation. | 63 defaults to using 'system' indicating a package installation. |
60 :param use_deferred: For usage in a twisted application, this will | 64 :param use_deferred: For usage in a twisted application, this will |
61 cause subprocess calls to be executed in a separate thread. | 65 cause subprocess calls to be executed in a separate thread. |
62 | 66 |
63 Specifying either of the following parameters, requires the | 67 Specifying either of the following parameters, requires the |
64 process using the library to be running as root. | 68 process using the library to be running as root. |
65 | 69 |
66 :param user: The user name under which to run zookeeper as. | 70 :param user: The user name under which to run zookeeper as. |
67 :param group: The group under which to run zookeeper under | 71 :param group: The group under which to run zookeeper under |
68 """ | 72 """ |
69 self._run_dir = run_dir | 73 self._run_dir = run_dir |
70 self._host = host | 74 self._host = host |
71 self._port = port | 75 self._port = port |
72 self._user = user | 76 self._user = user |
73 self._group = group | 77 self._group = group |
74 self._zk_location = zk_location | 78 self._zk_location = zk_location |
79 self._min_session_time = min_session_timeout | |
80 self._max_session_time = max_session_timeout | |
75 self._use_deferred = use_deferred | 81 self._use_deferred = use_deferred |
76 self.fsync = fsync | 82 self.fsync = fsync |
77 | 83 |
78 def start(self): | 84 def start(self): |
79 assert self._port is not None | 85 assert self._port is not None |
80 | 86 |
81 if self._use_deferred: | 87 if self._use_deferred: |
82 return deferToThread(self._start) | 88 return deferToThread(self._start) |
83 return self._start() | 89 return self._start() |
84 | 90 |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
191 | 197 |
192 if not os.path.exists(zk_vars["data_dir"]): | 198 if not os.path.exists(zk_vars["data_dir"]): |
193 os.makedirs(zk_vars["data_dir"]) | 199 os.makedirs(zk_vars["data_dir"]) |
194 if change_daemon_user: | 200 if change_daemon_user: |
195 os.chown(zk_vars["data_dir"], uid, gid) | 201 os.chown(zk_vars["data_dir"], uid, gid) |
196 | 202 |
197 with open(zk_vars["log_config_path"], "w") as config: | 203 with open(zk_vars["log_config_path"], "w") as config: |
198 config.write(log4j_properties) | 204 config.write(log4j_properties) |
199 | 205 |
200 with open(zk_vars["config_path"], "w") as config: | 206 with open(zk_vars["config_path"], "w") as config: |
201 conf = zookeeper_conf_template % (zk_vars["data_dir"], self._port) | 207 conf = zookeeper_conf_template % ( |
202 if self.fsync: | 208 zk_vars["data_dir"], self._port, |
203 conf += "\nfsync=no\n" | 209 self._min_session_time, |
210 self._max_session_time) | |
211 if not self.fsync: | |
212 conf += "\nforceSync=no\n" | |
204 config.write(conf) | 213 config.write(conf) |
205 if self._host: | 214 if self._host: |
206 config.write("\nclientPortAddress=%s" % self._host) | 215 config.write("\nclientPortAddress=%s" % self._host) |
207 | 216 |
208 def _start(self): | 217 def _start(self): |
209 zk_vars = self.get_zookeeper_variables() | 218 zk_vars = self.get_zookeeper_variables() |
210 self._setup_data_dir(zk_vars) | 219 self._setup_data_dir(zk_vars) |
211 zookeeper_script = zookeeper_script_template % zk_vars | 220 zookeeper_script = zookeeper_script_template % zk_vars |
212 | 221 |
213 fh = tempfile.NamedTemporaryFile(delete=False) | 222 fh = tempfile.NamedTemporaryFile(delete=False) |
(...skipping 30 matching lines...) Expand all Loading... | |
244 # Hard kill if we've been trying this for a few seconds | 253 # Hard kill if we've been trying this for a few seconds |
245 os.kill(zookeeper_pid, signal.SIGKILL) | 254 os.kill(zookeeper_pid, signal.SIGKILL) |
246 break | 255 break |
247 else: | 256 else: |
248 # Graceful kill up to 5s | 257 # Graceful kill up to 5s |
249 os.kill(zookeeper_pid, signal.SIGTERM) | 258 os.kill(zookeeper_pid, signal.SIGTERM) |
250 # Give a moment for shutdown | 259 # Give a moment for shutdown |
251 time.sleep(0.5) | 260 time.sleep(0.5) |
252 | 261 |
253 shutil.rmtree(self._run_dir) | 262 shutil.rmtree(self._run_dir) |
OLD | NEW |