OLD | NEW |
(Empty) | |
| 1 #· |
| 2 # Module implementing synchronization primitives· |
| 3 #· |
| 4 # multiprocessing/synchronize.py· |
| 5 #· |
| 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt· |
| 7 #· |
| 8 · |
| 9 __all__ = [· |
| 10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'· |
| 11 ]· |
| 12 · |
| 13 import threading· |
| 14 import os· |
| 15 import sys· |
| 16 · |
| 17 from time import time as _time, sleep as _sleep· |
| 18 · |
| 19 import _multiprocessing· |
| 20 from multiprocessing.process import current_process· |
| 21 from multiprocessing.util import Finalize, register_after_fork, debug· |
| 22 from multiprocessing.forking import assert_spawning, Popen· |
| 23 · |
| 24 #· |
| 25 # Constants· |
| 26 #· |
| 27 · |
| 28 RECURSIVE_MUTEX, SEMAPHORE = range(2)· |
| 29 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX· |
| 30 · |
| 31 #· |
| 32 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`· |
| 33 #· |
| 34 · |
| 35 class SemLock(object):· |
| 36 · |
| 37 def __init__(self, kind, value, maxvalue):· |
| 38 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)· |
| 39 debug('created semlock with handle %s' % sl.handle)· |
| 40 self._make_methods()· |
| 41 ········· |
| 42 if sys.platform != 'win32':· |
| 43 def _after_fork(obj):· |
| 44 obj._semlock._after_fork()· |
| 45 register_after_fork(self, _after_fork)· |
| 46 · |
| 47 def _make_methods(self):· |
| 48 self.acquire = self._semlock.acquire· |
| 49 self.release = self._semlock.release· |
| 50 self.__enter__ = self._semlock.__enter__· |
| 51 self.__exit__ = self._semlock.__exit__· |
| 52 · |
| 53 def __getstate__(self):· |
| 54 assert_spawning(self)· |
| 55 sl = self._semlock· |
| 56 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)· |
| 57 · |
| 58 def __setstate__(self, state):· |
| 59 self._semlock = _multiprocessing.SemLock._rebuild(*state)· |
| 60 debug('recreated blocker with handle %r' % state[0])· |
| 61 self._make_methods()· |
| 62 · |
| 63 #· |
| 64 # Semaphore· |
| 65 #· |
| 66 · |
| 67 class Semaphore(SemLock):· |
| 68 · |
| 69 def __init__(self, value=1):· |
| 70 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)· |
| 71 · |
| 72 def get_value(self):· |
| 73 return self._semlock._get_value()· |
| 74 · |
| 75 def __repr__(self):· |
| 76 try:· |
| 77 value = self._semlock._get_value()· |
| 78 except Exception:· |
| 79 value = 'unknown'· |
| 80 return '<Semaphore(value=%s)>' % value· |
| 81 · |
| 82 #· |
| 83 # Bounded semaphore· |
| 84 #· |
| 85 · |
| 86 class BoundedSemaphore(Semaphore):· |
| 87 · |
| 88 def __init__(self, value=1):· |
| 89 SemLock.__init__(self, SEMAPHORE, value, value)· |
| 90 · |
| 91 def __repr__(self):· |
| 92 try:· |
| 93 value = self._semlock._get_value()· |
| 94 except Exception:· |
| 95 value = 'unknown'· |
| 96 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \· |
| 97 (value, self._semlock.maxvalue)· |
| 98 · |
| 99 #· |
| 100 # Non-recursive lock· |
| 101 #· |
| 102 · |
| 103 class Lock(SemLock):· |
| 104 · |
| 105 def __init__(self):· |
| 106 SemLock.__init__(self, SEMAPHORE, 1, 1)· |
| 107 · |
| 108 def __repr__(self):· |
| 109 try:· |
| 110 if self._semlock._is_mine():· |
| 111 name = current_process().get_name()· |
| 112 if threading.currentThread().getName() != 'MainThread':· |
| 113 name += '|' + threading.currentThread().getName()· |
| 114 elif self._semlock._get_value() == 1:· |
| 115 name = 'None'· |
| 116 elif self._semlock._count() > 0:· |
| 117 name = 'SomeOtherThread'· |
| 118 else:· |
| 119 name = 'SomeOtherProcess'· |
| 120 except Exception:· |
| 121 name = 'unknown'· |
| 122 return '<Lock(owner=%s)>' % name· |
| 123 · |
| 124 #· |
| 125 # Recursive lock· |
| 126 #· |
| 127 · |
| 128 class RLock(SemLock):· |
| 129 · |
| 130 def __init__(self):· |
| 131 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)· |
| 132 ········· |
| 133 def __repr__(self):· |
| 134 try:· |
| 135 if self._semlock._is_mine():· |
| 136 name = current_process().get_name()· |
| 137 if threading.currentThread().getName() != 'MainThread':· |
| 138 name += '|' + threading.currentThread().getName()· |
| 139 count = self._semlock._count()· |
| 140 elif self._semlock._get_value() == 1:· |
| 141 name, count = 'None', 0· |
| 142 elif self._semlock._count() > 0:· |
| 143 name, count = 'SomeOtherThread', 'nonzero'· |
| 144 else:· |
| 145 name, count = 'SomeOtherProcess', 'nonzero'· |
| 146 except Exception:· |
| 147 name, count = 'unknown', 'unknown'· |
| 148 return '<RLock(%s, %s)>' % (name, count)· |
| 149 · |
| 150 #· |
| 151 # Condition variable· |
| 152 #· |
| 153 · |
| 154 class Condition(object):· |
| 155 · |
| 156 def __init__(self, lock=None):· |
| 157 self._lock = lock or RLock()· |
| 158 self._sleeping_count = Semaphore(0)· |
| 159 self._woken_count = Semaphore(0)· |
| 160 self._wait_semaphore = Semaphore(0)· |
| 161 self._make_methods()· |
| 162 · |
| 163 def __getstate__(self):· |
| 164 assert_spawning(self)· |
| 165 return (self._lock, self._sleeping_count,· |
| 166 self._woken_count, self._wait_semaphore)· |
| 167 · |
| 168 def __setstate__(self, state):· |
| 169 (self._lock, self._sleeping_count,· |
| 170 self._woken_count, self._wait_semaphore) = state· |
| 171 self._make_methods()· |
| 172 · |
| 173 def _make_methods(self):· |
| 174 self.acquire = self._lock.acquire· |
| 175 self.release = self._lock.release· |
| 176 self.__enter__ = self._lock.__enter__· |
| 177 self.__exit__ = self._lock.__exit__· |
| 178 · |
| 179 def __repr__(self):· |
| 180 try:· |
| 181 num_waiters = (self._sleeping_count._semlock._get_value() -· |
| 182 self._woken_count._semlock._get_value())· |
| 183 except Exception:· |
| 184 num_waiters = 'unkown'· |
| 185 return '<Condition(%s, %s)>' % (self._lock, num_waiters)· |
| 186 · |
| 187 def wait(self, timeout=None):· |
| 188 assert self._lock._semlock._is_mine(), \· |
| 189 'must acquire() condition before using wait()'· |
| 190 · |
| 191 # indicate that this thread is going to sleep· |
| 192 self._sleeping_count.release()· |
| 193 · |
| 194 # release lock· |
| 195 count = self._lock._semlock._count()· |
| 196 for i in xrange(count):· |
| 197 self._lock.release()· |
| 198 · |
| 199 try:· |
| 200 # wait for notification or timeout· |
| 201 self._wait_semaphore.acquire(True, timeout)· |
| 202 finally:· |
| 203 # indicate that this thread has woken· |
| 204 self._woken_count.release()· |
| 205 · |
| 206 # reacquire lock· |
| 207 for i in xrange(count):· |
| 208 self._lock.acquire()· |
| 209 · |
| 210 def notify(self):· |
| 211 assert self._lock._semlock._is_mine(), 'lock is not owned'· |
| 212 assert not self._wait_semaphore.acquire(False)· |
| 213 ········· |
| 214 # to take account of timeouts since last notify() we subtract· |
| 215 # woken_count from sleeping_count and rezero woken_count· |
| 216 while self._woken_count.acquire(False):· |
| 217 res = self._sleeping_count.acquire(False)· |
| 218 assert res· |
| 219 ············· |
| 220 if self._sleeping_count.acquire(False): # try grabbing a sleeper· |
| 221 self._wait_semaphore.release() # wake up one sleeper· |
| 222 self._woken_count.acquire() # wait for the sleeper to wake· |
| 223 ············· |
| 224 # rezero _wait_semaphore in case a timeout just happened· |
| 225 self._wait_semaphore.acquire(False)· |
| 226 · |
| 227 def notify_all(self):· |
| 228 assert self._lock._semlock._is_mine(), 'lock is not owned'· |
| 229 assert not self._wait_semaphore.acquire(False)· |
| 230 · |
| 231 # to take account of timeouts since last notify*() we subtract· |
| 232 # woken_count from sleeping_count and rezero woken_count· |
| 233 while self._woken_count.acquire(False):· |
| 234 res = self._sleeping_count.acquire(False)· |
| 235 assert res· |
| 236 ············· |
| 237 sleepers = 0· |
| 238 while self._sleeping_count.acquire(False):· |
| 239 self._wait_semaphore.release() # wake up one sleeper· |
| 240 sleepers += 1· |
| 241 · |
| 242 if sleepers:· |
| 243 for i in xrange(sleepers):· |
| 244 self._woken_count.acquire() # wait for a sleeper to wake· |
| 245 · |
| 246 # rezero wait_semaphore in case some timeouts just happened· |
| 247 while self._wait_semaphore.acquire(False):· |
| 248 pass· |
| 249 · |
| 250 #· |
| 251 # Event· |
| 252 #· |
| 253 · |
| 254 class Event(object):· |
| 255 · |
| 256 def __init__(self):· |
| 257 self._cond = Condition(Lock())· |
| 258 self._flag = Semaphore(0)· |
| 259 · |
| 260 def is_set(self):· |
| 261 self._cond.acquire()· |
| 262 try:· |
| 263 if self._flag.acquire(False):· |
| 264 self._flag.release()· |
| 265 return True· |
| 266 return False· |
| 267 finally:· |
| 268 self._cond.release()· |
| 269 ····· |
| 270 def set(self):· |
| 271 self._cond.acquire()· |
| 272 try:· |
| 273 self._flag.acquire(False)· |
| 274 self._flag.release()· |
| 275 self._cond.notify_all()· |
| 276 finally:· |
| 277 self._cond.release()· |
| 278 · |
| 279 def clear(self):· |
| 280 self._cond.acquire()· |
| 281 try:· |
| 282 self._flag.acquire(False)· |
| 283 finally:· |
| 284 self._cond.release()· |
| 285 · |
| 286 def wait(self, timeout=None):· |
| 287 self._cond.acquire()· |
| 288 try:· |
| 289 if self._flag.acquire(False):· |
| 290 self._flag.release()· |
| 291 else:· |
| 292 self._cond.wait(timeout)· |
| 293 finally:· |
| 294 self._cond.release()· |
OLD | NEW |