Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(380)

Side by Side Diff: Lib/multiprocessing/synchronize.py

Issue 2061: Review PEP 371 patch 1 Base URL: http://svn.python.org/view/*checkout*/python/trunk/
Patch Set: Created 15 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #·
2 # Module implementing synchronization primitives·
3 #·
4 # multiprocessing/synchronize.py·
5 #·
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt·
7 #·
8 ·
9 __all__ = [·
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'·
11 ]·
12 ·
13 import threading·
14 import os·
15 import sys·
16 ·
17 from time import time as _time, sleep as _sleep·
18 ·
19 import _multiprocessing·
20 from multiprocessing.process import current_process·
21 from multiprocessing.util import Finalize, register_after_fork, debug·
22 from multiprocessing.forking import assert_spawning, Popen·
23 ·
24 #·
25 # Constants·
26 #·
27 ·
28 RECURSIVE_MUTEX, SEMAPHORE = range(2)·
29 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX·
30 ·
31 #·
32 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`·
33 #·
34 ·
35 class SemLock(object):·
36 ·
37 def __init__(self, kind, value, maxvalue):·
38 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)·
39 debug('created semlock with handle %s' % sl.handle)·
40 self._make_methods()·
41 ·········
42 if sys.platform != 'win32':·
43 def _after_fork(obj):·
44 obj._semlock._after_fork()·
45 register_after_fork(self, _after_fork)·
46 ·
47 def _make_methods(self):·
48 self.acquire = self._semlock.acquire·
49 self.release = self._semlock.release·
50 self.__enter__ = self._semlock.__enter__·
51 self.__exit__ = self._semlock.__exit__·
52 ·
53 def __getstate__(self):·
54 assert_spawning(self)·
55 sl = self._semlock·
56 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)·
57 ·
58 def __setstate__(self, state):·
59 self._semlock = _multiprocessing.SemLock._rebuild(*state)·
60 debug('recreated blocker with handle %r' % state[0])·
61 self._make_methods()·
62 ·
63 #·
64 # Semaphore·
65 #·
66 ·
67 class Semaphore(SemLock):·
68 ·
69 def __init__(self, value=1):·
70 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)·
71 ·
72 def get_value(self):·
73 return self._semlock._get_value()·
74 ·
75 def __repr__(self):·
76 try:·
77 value = self._semlock._get_value()·
78 except Exception:·
79 value = 'unknown'·
80 return '<Semaphore(value=%s)>' % value·
81 ·
82 #·
83 # Bounded semaphore·
84 #·
85 ·
86 class BoundedSemaphore(Semaphore):·
87 ·
88 def __init__(self, value=1):·
89 SemLock.__init__(self, SEMAPHORE, value, value)·
90 ·
91 def __repr__(self):·
92 try:·
93 value = self._semlock._get_value()·
94 except Exception:·
95 value = 'unknown'·
96 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \·
97 (value, self._semlock.maxvalue)·
98 ·
99 #·
100 # Non-recursive lock·
101 #·
102 ·
103 class Lock(SemLock):·
104 ·
105 def __init__(self):·
106 SemLock.__init__(self, SEMAPHORE, 1, 1)·
107 ·
108 def __repr__(self):·
109 try:·
110 if self._semlock._is_mine():·
111 name = current_process().get_name()·
112 if threading.currentThread().getName() != 'MainThread':·
Benjamin 2008/06/10 20:50:52 Once I get the new threading API checked in, it ca
113 name += '|' + threading.currentThread().getName()·
114 elif self._semlock._get_value() == 1:·
115 name = 'None'·
116 elif self._semlock._count() > 0:·
117 name = 'SomeOtherThread'·
118 else:·
119 name = 'SomeOtherProcess'·
120 except Exception:·
121 name = 'unknown'·
122 return '<Lock(owner=%s)>' % name·
123 ·
124 #·
125 # Recursive lock·
126 #·
127 ·
128 class RLock(SemLock):·
129 ·
130 def __init__(self):·
131 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)·
132 ·········
133 def __repr__(self):·
134 try:·
135 if self._semlock._is_mine():·
136 name = current_process().get_name()·
137 if threading.currentThread().getName() != 'MainThread':·
138 name += '|' + threading.currentThread().getName()·
139 count = self._semlock._count()·
140 elif self._semlock._get_value() == 1:·
141 name, count = 'None', 0·
142 elif self._semlock._count() > 0:·
143 name, count = 'SomeOtherThread', 'nonzero'·
144 else:·
145 name, count = 'SomeOtherProcess', 'nonzero'·
146 except Exception:·
147 name, count = 'unknown', 'unknown'·
148 return '<RLock(%s, %s)>' % (name, count)·
149 ·
150 #·
151 # Condition variable·
152 #·
153 ·
154 class Condition(object):·
155 ·
156 def __init__(self, lock=None):·
157 self._lock = lock or RLock()·
158 self._sleeping_count = Semaphore(0)·
159 self._woken_count = Semaphore(0)·
160 self._wait_semaphore = Semaphore(0)·
161 self._make_methods()·
162 ·
163 def __getstate__(self):·
164 assert_spawning(self)·
165 return (self._lock, self._sleeping_count,·
166 self._woken_count, self._wait_semaphore)·
167 ·
168 def __setstate__(self, state):·
169 (self._lock, self._sleeping_count,·
170 self._woken_count, self._wait_semaphore) = state·
171 self._make_methods()·
172 ·
173 def _make_methods(self):·
174 self.acquire = self._lock.acquire·
175 self.release = self._lock.release·
176 self.__enter__ = self._lock.__enter__·
177 self.__exit__ = self._lock.__exit__·
178 ·
179 def __repr__(self):·
180 try:·
181 num_waiters = (self._sleeping_count._semlock._get_value() -·
182 self._woken_count._semlock._get_value())·
183 except Exception:·
184 num_waiters = 'unkown'·
185 return '<Condition(%s, %s)>' % (self._lock, num_waiters)·
186 ·
187 def wait(self, timeout=None):·
188 assert self._lock._semlock._is_mine(), \·
189 'must acquire() condition before using wait()'·
190 ·
191 # indicate that this thread is going to sleep·
192 self._sleeping_count.release()·
193 ·
194 # release lock·
195 count = self._lock._semlock._count()·
196 for i in xrange(count):·
197 self._lock.release()·
198 ·
199 try:·
200 # wait for notification or timeout·
201 self._wait_semaphore.acquire(True, timeout)·
202 finally:·
203 # indicate that this thread has woken·
204 self._woken_count.release()·
205 ·
206 # reacquire lock·
207 for i in xrange(count):·
208 self._lock.acquire()·
209 ·
210 def notify(self):·
211 assert self._lock._semlock._is_mine(), 'lock is not owned'·
212 assert not self._wait_semaphore.acquire(False)·
213 ·········
214 # to take account of timeouts since last notify() we subtract·
215 # woken_count from sleeping_count and rezero woken_count·
216 while self._woken_count.acquire(False):·
217 res = self._sleeping_count.acquire(False)·
218 assert res·
219 ·············
220 if self._sleeping_count.acquire(False): # try grabbing a sleeper·
221 self._wait_semaphore.release() # wake up one sleeper·
222 self._woken_count.acquire() # wait for the sleeper to wake·
223 ·············
224 # rezero _wait_semaphore in case a timeout just happened·
225 self._wait_semaphore.acquire(False)·
226 ·
227 def notify_all(self):·
228 assert self._lock._semlock._is_mine(), 'lock is not owned'·
229 assert not self._wait_semaphore.acquire(False)·
230 ·
231 # to take account of timeouts since last notify*() we subtract·
232 # woken_count from sleeping_count and rezero woken_count·
233 while self._woken_count.acquire(False):·
234 res = self._sleeping_count.acquire(False)·
235 assert res·
236 ·············
237 sleepers = 0·
238 while self._sleeping_count.acquire(False):·
239 self._wait_semaphore.release() # wake up one sleeper·
240 sleepers += 1·
241 ·
242 if sleepers:·
243 for i in xrange(sleepers):·
244 self._woken_count.acquire() # wait for a sleeper to wake·
245 ·
246 # rezero wait_semaphore in case some timeouts just happened·
247 while self._wait_semaphore.acquire(False):·
248 pass·
249 ·
250 #·
251 # Event·
252 #·
253 ·
254 class Event(object):·
255 ·
256 def __init__(self):·
257 self._cond = Condition(Lock())·
258 self._flag = Semaphore(0)·
259 ·
260 def is_set(self):·
261 self._cond.acquire()·
262 try:·
263 if self._flag.acquire(False):·
264 self._flag.release()·
265 return True·
266 return False·
267 finally:·
268 self._cond.release()·
269 ·····
270 def set(self):·
271 self._cond.acquire()·
272 try:·
273 self._flag.acquire(False)·
274 self._flag.release()·
275 self._cond.notify_all()·
276 finally:·
277 self._cond.release()·
278 ·
279 def clear(self):·
280 self._cond.acquire()·
281 try:·
282 self._flag.acquire(False)·
283 finally:·
284 self._cond.release()·
285 ·
286 def wait(self, timeout=None):·
287 self._cond.acquire()·
288 try:·
289 if self._flag.acquire(False):·
290 self._flag.release()·
291 else:·
292 self._cond.wait(timeout)·
293 finally:·
294 self._cond.release()·
OLDNEW

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b