Index: Lib/multiprocessing/util.py |
=================================================================== |
--- Lib/multiprocessing/util.py (revision 0) |
+++ Lib/multiprocessing/util.py (revision 0) |
@@ -0,0 +1,336 @@ |
+# |
+# Module providing various facilities to other parts of the package |
+# |
+# multiprocessing/util.py |
+# |
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
+# |
+ |
+import itertools |
+import weakref |
+import copy_reg |
+import atexit |
+import threading # we want threading to install it's |
+ # cleanup function before multiprocessing does |
+ |
+from multiprocessing.process import current_process, active_children |
+ |
+__all__ = [ |
+ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', |
+ 'log_to_stderr', 'get_temp_dir', 'register_after_fork', |
+ 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal' |
+ ] |
+ |
+# |
+# Logging |
+# |
+ |
+NOTSET = 0 |
+SUBDEBUG = 5 |
+DEBUG = 10 |
+INFO = 20 |
+SUBWARNING = 25 |
+ |
+LOGGER_NAME = 'multiprocessing' |
+DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' |
+ |
+_logger = None |
+_log_to_stderr = False |
+ |
+def sub_debug(msg, *args): |
+ if _logger: |
+ _logger.log(SUBDEBUG, msg, *args) |
+ |
+def debug(msg, *args): |
+ if _logger: |
+ _logger.log(DEBUG, msg, *args) |
+ |
+def info(msg, *args): |
+ if _logger: |
+ _logger.log(INFO, msg, *args) |
+ |
+def sub_warning(msg, *args): |
+ if _logger: |
+ _logger.log(SUBWARNING, msg, *args) |
+ |
+def get_logger(): |
+ ''' |
+ Returns logger used by multiprocessing |
+ ''' |
+ global _logger |
+ |
+ if not _logger: |
+ import logging, atexit |
+ |
+ # XXX multiprocessing should cleanup before logging |
+ if hasattr(atexit, 'unregister'): |
+ atexit.unregister(_exit_function) |
+ atexit.register(_exit_function) |
+ else: |
+ atexit._exithandlers.remove((_exit_function, (), {})) |
+ atexit._exithandlers.append((_exit_function, (), {})) |
+ |
+ _check_logger_class() |
+ _logger = logging.getLogger(LOGGER_NAME) |
+ |
+ return _logger |
+ |
+def _check_logger_class(): |
+ ''' |
+ Make sure process name is recorded when loggers are used |
+ ''' |
+ # XXX This function is unnecessary once logging is patched |
+ import logging |
+ if hasattr(logging, 'multiprocessing'): |
+ return |
+ |
+ logging._acquireLock() |
+ try: |
+ OldLoggerClass = logging.getLoggerClass() |
+ if not getattr(OldLoggerClass, '_process_aware', False): |
+ class ProcessAwareLogger(OldLoggerClass): |
+ _process_aware = True |
+ def makeRecord(self, *args, **kwds): |
+ record = OldLoggerClass.makeRecord(self, *args, **kwds) |
+ record.processName = current_process()._name |
+ return record |
+ logging.setLoggerClass(ProcessAwareLogger) |
+ finally: |
+ logging._releaseLock() |
+ |
+def log_to_stderr(level=None): |
+ ''' |
+ Turn on logging and add a handler which prints to stderr |
+ ''' |
+ global _log_to_stderr |
+ import logging |
+ logger = get_logger() |
+ formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) |
+ handler = logging.StreamHandler() |
+ handler.setFormatter(formatter) |
+ logger.addHandler(handler) |
+ if level is not None: |
+ logger.setLevel(level) |
+ _log_to_stderr = True |
+ |
+# |
+# Function returning a temp directory which will be removed on exit |
+# |
+ |
+def get_temp_dir(): |
+ # get name of a temp directory which will be automatically cleaned up |
+ if current_process()._tempdir is None: |
+ import shutil, tempfile |
+ tempdir = tempfile.mkdtemp(prefix='pymp-') |
+ info('created temp directory %s', tempdir) |
+ Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) |
+ current_process()._tempdir = tempdir |
+ return current_process()._tempdir |
+ |
+# |
+# Support for reinitialization of objects when bootstrapping a child process |
+# |
+ |
+_afterfork_registry = weakref.WeakValueDictionary() |
+_afterfork_counter = itertools.count() |
+ |
+def _run_after_forkers(): |
+ items = list(_afterfork_registry.items()) |
+ items.sort() |
+ for (index, ident, func), obj in items: |
+ try: |
+ func(obj) |
+ except Exception, e: |
+ info('after forker raised exception %s', e) |
+ |
+def register_after_fork(obj, func): |
+ _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj |
+ |
+# |
+# Finalization using weakrefs |
+# |
+ |
+_finalizer_registry = {} |
+_finalizer_counter = itertools.count() |
+ |
+ |
+class Finalize(object): |
+ ''' |
+ Class which supports object finalization using weakrefs |
+ ''' |
+ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): |
+ assert exitpriority is None or type(exitpriority) is int |
+ |
+ if obj is not None: |
+ self._weakref = weakref.ref(obj, self) |
+ else: |
+ assert exitpriority is not None |
+ |
+ self._callback = callback |
+ self._args = args |
+ self._kwargs = kwargs or {} |
+ self._key = (exitpriority, _finalizer_counter.next()) |
+ |
+ _finalizer_registry[self._key] = self |
+ |
+ def __call__(self, wr=None): |
+ ''' |
+ Run the callback unless it has already been called or cancelled |
+ ''' |
+ try: |
+ del _finalizer_registry[self._key] |
+ except KeyError: |
+ sub_debug('finalizer no longer registered') |
+ else: |
+ sub_debug('finalizer calling %s with args %s and kwargs %s', |
+ self._callback, self._args, self._kwargs) |
+ res = self._callback(*self._args, **self._kwargs) |
+ self._weakref = self._callback = self._args = \ |
+ self._kwargs = self._key = None |
+ return res |
+ |
+ def cancel(self): |
+ ''' |
+ Cancel finalization of the object |
+ ''' |
+ try: |
+ del _finalizer_registry[self._key] |
+ except KeyError: |
+ pass |
+ else: |
+ self._weakref = self._callback = self._args = \ |
+ self._kwargs = self._key = None |
+ |
+ def still_active(self): |
+ ''' |
+ Return whether this finalizer is still waiting to invoke callback |
+ ''' |
+ return self._key in _finalizer_registry |
+ |
+ def __repr__(self): |
+ try: |
+ obj = self._weakref() |
+ except (AttributeError, TypeError): |
+ obj = None |
+ |
+ if obj is None: |
+ return '<Finalize object, dead>' |
+ |
+ x = '<Finalize object, callback=%s' % \ |
+ getattr(self._callback, '__name__', self._callback) |
+ if self._args: |
+ x += ', args=' + str(self._args) |
+ if self._kwargs: |
+ x += ', kwargs=' + str(self._kwargs) |
+ if self._key[0] is not None: |
+ x += ', exitprority=' + str(self._key[0]) |
+ return x + '>' |
+ |
+ |
+def _run_finalizers(minpriority=None): |
+ ''' |
+ Run all finalizers whose exit priority is not None and at least minpriority |
+ |
+ Finalizers with highest priority are called first; finalizers with |
+ the same priority will be called in reverse order of creation. |
+ ''' |
+ if minpriority is None: |
+ f = lambda p : p[0][0] is not None |
+ else: |
+ f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |
+ |
+ items = [x for x in _finalizer_registry.items() if f(x)] |
+ items.sort(reverse=True) |
+ |
+ for key, finalizer in items: |
+ sub_debug('calling %s', finalizer) |
+ try: |
+ finalizer() |
+ except Exception: |
+ import traceback |
+ traceback.print_exc() |
+ |
+ if minpriority is None: |
+ _finalizer_registry.clear() |
+ |
+# |
+# Clean up on exit |
+# |
+ |
+def is_exiting(): |
+ ''' |
+ Returns true if the process is shutting down |
+ ''' |
+ return _exiting or _exiting is None |
+ |
+_exiting = False |
+ |
+def _exit_function(): |
+ global _exiting |
+ |
+ info('process shutting down') |
+ debug('running all "atexit" finalizers with priority >= 0') |
+ _run_finalizers(0) |
+ |
+ for p in active_children(): |
+ if p._daemonic: |
+ info('calling terminate() for daemon %s', p.get_name()) |
+ p._popen.terminate() |
+ |
+ for p in active_children(): |
+ info('calling join() for process %s', p.get_name()) |
+ p.join() |
+ |
+ debug('running the remaining "atexit" finalizers') |
+ _run_finalizers() |
+ |
+atexit.register(_exit_function) |
+ |
+# |
+# Some fork aware types |
+# |
+ |
+class ForkAwareThreadLock(object): |
+ def __init__(self): |
+ self._lock = threading.Lock() |
+ self.acquire = self._lock.acquire |
+ self.release = self._lock.release |
+ register_after_fork(self, ForkAwareThreadLock.__init__) |
+ |
+class ForkAwareLocal(threading.local): |
+ def __init__(self): |
+ register_after_fork(self, lambda obj : obj.__dict__.clear()) |
+ def __reduce__(self): |
+ return type(self), () |
+ |
+# |
+# Try making some callable types picklable |
+# |
+ |
+def _reduce_method(m): |
+ if m.im_self is None: |
+ return getattr, (m.im_class, m.im_func.func_name) |
+ else: |
+ return getattr, (m.im_self, m.im_func.func_name) |
+copy_reg.pickle(type(Finalize.__init__), _reduce_method) |
+ |
+def _reduce_method_descriptor(m): |
+ return getattr, (m.__objclass__, m.__name__) |
+copy_reg.pickle(type(list.append), _reduce_method_descriptor) |
+copy_reg.pickle(type(int.__add__), _reduce_method_descriptor) |
+ |
+def _reduce_builtin_function_or_method(m): |
+ return getattr, (m.__self__, m.__name__) |
+copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method) |
+copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method) |
+ |
+try: |
+ from functools import partial |
+except ImportError: |
+ pass |
+else: |
+ def _reduce_partial(p): |
+ return _rebuild_partial, (p.func, p.args, p.keywords or {}) |
+ def _rebuild_partial(func, args, keywords): |
+ return partial(func, *args, **keywords) |
+ copy_reg.pickle(partial, _reduce_partial) |