OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The multi-process processing engine.""" | 2 """The multi-process processing engine.""" |
3 | 3 |
4 import abc | 4 import abc |
5 import ctypes | 5 import ctypes |
6 import logging | 6 import logging |
7 import os | 7 import os |
8 import signal | 8 import signal |
9 import sys | 9 import sys |
10 import threading | 10 import threading |
11 import time | 11 import time |
12 | 12 |
13 from plaso.engine import engine | 13 from plaso.engine import engine |
14 from plaso.engine import process_info | 14 from plaso.engine import process_info |
15 from plaso.lib import definitions | 15 from plaso.lib import definitions |
16 from plaso.multi_processing import plaso_xmlrpc | 16 from plaso.multi_processing import plaso_xmlrpc |
17 | 17 |
18 | 18 |
19 class MultiProcessEngine(engine.BaseEngine): | 19 class MultiProcessEngine(engine.BaseEngine): |
20 """Multi-process engine base. | 20 """Multi-process engine base. |
21 | 21 |
22 This class contains functionality to: | 22 This class contains functionality to: |
23 * monitor and manage worker processes; | 23 * monitor and manage worker processes; |
24 * retrieve a process status information via RPC; | 24 * retrieve a process status information via RPC; |
25 * manage the status update thread. | 25 * manage the status update thread. |
26 """ | 26 """ |
27 | 27 |
28 _DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 | |
29 | |
30 # Note that on average Windows seems to require a longer wait. | 28 # Note that on average Windows seems to require a longer wait. |
31 _RPC_SERVER_TIMEOUT = 8.0 | 29 _RPC_SERVER_TIMEOUT = 8.0 |
32 _MAXIMUM_RPC_ERRORS = 10 | 30 _MAXIMUM_RPC_ERRORS = 10 |
33 # Maximum number of attempts to try to start a replacement worker process. | 31 # Maximum number of attempts to try to start a replacement worker process. |
34 _MAXIMUM_REPLACEMENT_RETRIES = 3 | 32 _MAXIMUM_REPLACEMENT_RETRIES = 3 |
35 # Number of seconds to wait between attempts to start a replacement worker | 33 # Number of seconds to wait between attempts to start a replacement worker |
36 # process | 34 # process |
37 _REPLACEMENT_WORKER_RETRY_DELAY = 1 | 35 _REPLACEMENT_WORKER_RETRY_DELAY = 1 |
38 | 36 |
39 _ZEROMQ_NO_WORKER_REQUEST_TIME_SECONDS = 300 | 37 _ZEROMQ_NO_WORKER_REQUEST_TIME_SECONDS = 300 |
40 | 38 |
41 def __init__(self): | 39 def __init__(self): |
42 """Initializes a multi-process engine.""" | 40 """Initializes a multi-process engine.""" |
43 super(MultiProcessEngine, self).__init__() | 41 super(MultiProcessEngine, self).__init__() |
44 self._name = u'Main' | 42 self._name = u'Main' |
45 self._pid = os.getpid() | 43 self._pid = os.getpid() |
46 self._process_information = process_info.ProcessInfo(self._pid) | 44 self._process_information = process_info.ProcessInfo(self._pid) |
47 self._process_information_per_pid = {} | 45 self._process_information_per_pid = {} |
48 self._processes_per_pid = {} | 46 self._processes_per_pid = {} |
49 self._rpc_clients_per_pid = {} | 47 self._rpc_clients_per_pid = {} |
50 self._rpc_errors_per_pid = {} | 48 self._rpc_errors_per_pid = {} |
51 self._status_update_active = False | 49 self._status_update_active = False |
52 self._status_update_callback = None | 50 self._status_update_callback = None |
53 self._status_update_thread = None | 51 self._status_update_thread = None |
54 self._storage_writer = None | 52 self._storage_writer = None |
55 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT | 53 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT |
56 | 54 |
57 def _AbortJoin(self, timeout=None): | 55 def _AbortJoin(self, timeout=None): |
58 """Aborts all registered processes by joining with the parent process. | 56 """Aborts all registered processes by joining with the parent process. |
59 | 57 |
60 Args: | 58 Args: |
61 timeout (int): number of seconds to wait for processes to join, where | 59 timeout (int): number of seconds to wait for processes to join, where |
62 None represents no timeout. | 60 None represents no timeout. |
63 """ | 61 """ |
64 for pid, process in iter(self._processes_per_pid.items()): | 62 for pid, process in iter(self._processes_per_pid.items()): |
65 logging.debug(u'Waiting for process: {0:s} (PID: {1:d}).'.format( | 63 logging.debug(u'Waiting for process: {0:s} (PID: {1:d}).'.format( |
(...skipping 347 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
413 | 411 |
414 Args: | 412 Args: |
415 pid (int): process identifier (PID) of the worker process. | 413 pid (int): process identifier (PID) of the worker process. |
416 process_status (dict[str, object]): status values received from | 414 process_status (dict[str, object]): status values received from |
417 the worker process. | 415 the worker process. |
418 used_memory (int): size of used memory in bytes. | 416 used_memory (int): size of used memory in bytes. |
419 | 417 |
420 Raises: | 418 Raises: |
421 KeyError: if the process is not registered with the engine. | 419 KeyError: if the process is not registered with the engine. |
422 """ | 420 """ |
OLD | NEW |