LEFT | RIGHT |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The multi-process worker process.""" | 2 """The multi-process worker process.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | |
6 import os | |
7 | 5 |
8 from dfvfs.lib import errors as dfvfs_errors | 6 from dfvfs.lib import errors as dfvfs_errors |
9 from dfvfs.resolver import context | 7 from dfvfs.resolver import context |
10 from dfvfs.resolver import resolver | 8 from dfvfs.resolver import resolver |
11 | 9 |
12 from plaso.engine import plaso_queue | 10 from plaso.engine import plaso_queue |
13 from plaso.engine import profilers | |
14 from plaso.engine import worker | 11 from plaso.engine import worker |
15 from plaso.lib import definitions | 12 from plaso.lib import definitions |
16 from plaso.lib import errors | 13 from plaso.lib import errors |
17 from plaso.multi_processing import base_process | 14 from plaso.multi_processing import base_process |
18 from plaso.multi_processing import logger | 15 from plaso.multi_processing import logger |
19 from plaso.parsers import mediator as parsers_mediator | 16 from plaso.parsers import mediator as parsers_mediator |
20 | 17 |
21 | 18 |
22 class WorkerProcess(base_process.MultiProcessBaseProcess): | 19 class WorkerProcess(base_process.MultiProcessBaseProcess): |
23 """Class that defines a multi-processing worker process.""" | 20 """Class that defines a multi-processing worker process.""" |
24 | 21 |
25 def __init__( | 22 def __init__( |
26 self, task_queue, storage_writer, knowledge_base, session_identifier, | 23 self, task_queue, storage_writer, knowledge_base, session_identifier, |
27 processing_configuration, **kwargs): | 24 processing_configuration, **kwargs): |
28 """Initializes a worker process. | 25 """Initializes a worker process. |
29 | 26 |
30 Non-specified keyword arguments (kwargs) are directly passed to | 27 Non-specified keyword arguments (kwargs) are directly passed to |
31 multiprocessing.Process. | 28 multiprocessing.Process. |
32 | 29 |
33 Args: | 30 Args: |
34 task_queue (PlasoQueue): task queue. | 31 task_queue (PlasoQueue): task queue. |
35 storage_writer (StorageWriter): storage writer for a session storage. | 32 storage_writer (StorageWriter): storage writer for a session storage. |
36 knowledge_base (KnowledgeBase): knowledge base which contains | 33 knowledge_base (KnowledgeBase): knowledge base which contains |
37 information from the source data needed for parsing. | 34 information from the source data needed for parsing. |
38 session_identifier (str): identifier of the session. | 35 session_identifier (str): identifier of the session. |
39 processing_configuration (ProcessingConfiguration): processing | 36 processing_configuration (ProcessingConfiguration): processing |
40 configuration. | 37 configuration. |
41 kwargs: keyword arguments to pass to multiprocessing.Process. | 38 kwargs: keyword arguments to pass to multiprocessing.Process. |
42 """ | 39 """ |
43 super(WorkerProcess, self).__init__(**kwargs) | 40 super(WorkerProcess, self).__init__(processing_configuration, **kwargs) |
44 self._abort = False | 41 self._abort = False |
45 self._buffer_size = 0 | 42 self._buffer_size = 0 |
46 self._current_display_name = '' | 43 self._current_display_name = '' |
47 self._extraction_worker = None | 44 self._extraction_worker = None |
48 self._guppy_memory_profiler = None | |
49 self._knowledge_base = knowledge_base | 45 self._knowledge_base = knowledge_base |
50 self._memory_profiler = None | |
51 self._number_of_consumed_events = 0 | 46 self._number_of_consumed_events = 0 |
52 self._number_of_consumed_sources = 0 | 47 self._number_of_consumed_sources = 0 |
53 self._parser_mediator = None | 48 self._parser_mediator = None |
54 self._parsers_profiler = None | |
55 self._processing_configuration = processing_configuration | |
56 self._processing_profiler = None | |
57 self._serializers_profiler = None | |
58 self._session_identifier = session_identifier | 49 self._session_identifier = session_identifier |
59 self._status = definitions.PROCESSING_STATUS_INITIALIZED | 50 self._status = definitions.PROCESSING_STATUS_INITIALIZED |
60 self._storage_profiler = None | |
61 self._storage_writer = storage_writer | 51 self._storage_writer = storage_writer |
62 self._task = None | 52 self._task = None |
63 self._task_queue = task_queue | 53 self._task_queue = task_queue |
64 | |
65 if self._processing_configuration: | |
66 self._debug_output = self._processing_configuration.debug_output | |
67 | |
68 if processing_configuration.log_filename: | |
69 log_path = os.path.dirname(self._processing_configuration.log_filename) | |
70 log_filename = os.path.basename( | |
71 self._processing_configuration.log_filename) | |
72 log_filename = '{0:s}_{1:s}'.format(self._name, log_filename) | |
73 self._log_filename = os.path.join(log_path, log_filename) | |
74 | 54 |
75 def _GetStatus(self): | 55 def _GetStatus(self): |
76 """Retrieves status information. | 56 """Retrieves status information. |
77 | 57 |
78 Returns: | 58 Returns: |
79 dict[str, object]: status attributes, indexed by name. | 59 dict[str, object]: status attributes, indexed by name. |
80 """ | 60 """ |
81 if self._parser_mediator: | 61 if self._parser_mediator: |
82 number_of_produced_errors = ( | 62 number_of_produced_errors = ( |
83 self._parser_mediator.number_of_produced_errors) | 63 self._parser_mediator.number_of_produced_errors) |
(...skipping 16 matching lines...) Expand all Loading... |
100 processing_status = self._status | 80 processing_status = self._status |
101 | 81 |
102 task_identifier = getattr(self._task, 'identifier', '') | 82 task_identifier = getattr(self._task, 'identifier', '') |
103 | 83 |
104 if self._process_information: | 84 if self._process_information: |
105 used_memory = self._process_information.GetUsedMemory() or 0 | 85 used_memory = self._process_information.GetUsedMemory() or 0 |
106 else: | 86 else: |
107 used_memory = 0 | 87 used_memory = 0 |
108 | 88 |
109 if self._memory_profiler: | 89 if self._memory_profiler: |
110 self._memory_profiler.Sample(used_memory) | 90 self._memory_profiler.Sample('main', used_memory) |
| 91 |
| 92 # XML RPC does not support integer values > 2 GiB so we format them |
| 93 # as a string. |
| 94 used_memory = '{0:d}'.format(used_memory) |
111 | 95 |
112 status = { | 96 status = { |
113 'display_name': self._current_display_name, | 97 'display_name': self._current_display_name, |
114 'identifier': self._name, | 98 'identifier': self._name, |
115 'number_of_consumed_errors': None, | 99 'number_of_consumed_errors': None, |
116 'number_of_consumed_event_tags': None, | 100 'number_of_consumed_event_tags': None, |
117 'number_of_consumed_events': self._number_of_consumed_events, | 101 'number_of_consumed_events': self._number_of_consumed_events, |
118 'number_of_consumed_sources': self._number_of_consumed_sources, | 102 'number_of_consumed_sources': self._number_of_consumed_sources, |
119 'number_of_produced_errors': number_of_produced_errors, | 103 'number_of_produced_errors': number_of_produced_errors, |
120 'number_of_produced_event_tags': None, | 104 'number_of_produced_event_tags': None, |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 # We need to initialize the parser and hasher objects after the process | 138 # We need to initialize the parser and hasher objects after the process |
155 # has forked otherwise on Windows the "fork" will fail with | 139 # has forked otherwise on Windows the "fork" will fail with |
156 # a PickleError for Python modules that cannot be pickled. | 140 # a PickleError for Python modules that cannot be pickled. |
157 self._extraction_worker = worker.EventExtractionWorker( | 141 self._extraction_worker = worker.EventExtractionWorker( |
158 parser_filter_expression=( | 142 parser_filter_expression=( |
159 self._processing_configuration.parser_filter_expression)) | 143 self._processing_configuration.parser_filter_expression)) |
160 | 144 |
161 self._extraction_worker.SetExtractionConfiguration( | 145 self._extraction_worker.SetExtractionConfiguration( |
162 self._processing_configuration.extraction) | 146 self._processing_configuration.extraction) |
163 | 147 |
| 148 self._parser_mediator.StartProfiling( |
| 149 self._processing_configuration.profiling, self._name, |
| 150 self._process_information) |
164 self._StartProfiling(self._processing_configuration.profiling) | 151 self._StartProfiling(self._processing_configuration.profiling) |
165 | |
166 if self._parsers_profiler: | |
167 self._extraction_worker.SetParsersProfiler(self._parsers_profiler) | |
168 | 152 |
169 if self._processing_profiler: | 153 if self._processing_profiler: |
170 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) | 154 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) |
171 | 155 |
172 if self._serializers_profiler: | 156 if self._serializers_profiler: |
173 self._storage_writer.SetSerializersProfiler(self._serializers_profiler) | 157 self._storage_writer.SetSerializersProfiler(self._serializers_profiler) |
174 | 158 |
175 if self._storage_profiler: | 159 if self._storage_profiler: |
176 self._storage_writer.SetStorageProfiler(self._storage_profiler) | 160 self._storage_writer.SetStorageProfiler(self._storage_profiler) |
177 | 161 |
178 logger.debug('Worker: {0!s} (PID: {1:d}) started'.format( | 162 logger.debug('Worker: {0!s} (PID: {1:d}) started.'.format( |
179 self._name, self._pid)) | 163 self._name, self._pid)) |
180 | 164 |
181 self._status = definitions.PROCESSING_STATUS_RUNNING | 165 self._status = definitions.PROCESSING_STATUS_RUNNING |
182 | 166 |
183 try: | 167 try: |
184 logger.debug('{0!s} (PID: {1:d}) started monitoring task queue.'.format( | 168 logger.debug('{0!s} (PID: {1:d}) started monitoring task queue.'.format( |
185 self._name, self._pid)) | 169 self._name, self._pid)) |
186 | 170 |
187 while not self._abort: | 171 while not self._abort: |
188 try: | 172 try: |
(...skipping 15 matching lines...) Expand all Loading... |
204 # All exceptions need to be caught here to prevent the process | 188 # All exceptions need to be caught here to prevent the process |
205 # from being killed by an uncaught exception. | 189 # from being killed by an uncaught exception. |
206 except Exception as exception: # pylint: disable=broad-except | 190 except Exception as exception: # pylint: disable=broad-except |
207 logger.warning( | 191 logger.warning( |
208 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( | 192 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( |
209 self._name, self._pid)) | 193 self._name, self._pid)) |
210 logger.exception(exception) | 194 logger.exception(exception) |
211 | 195 |
212 self._abort = True | 196 self._abort = True |
213 | 197 |
214 if self._parsers_profiler: | |
215 self._extraction_worker.SetParsersProfiler(None) | |
216 | |
217 if self._processing_profiler: | 198 if self._processing_profiler: |
218 self._extraction_worker.SetProcessingProfiler(None) | 199 self._extraction_worker.SetProcessingProfiler(None) |
219 | 200 |
220 if self._serializers_profiler: | 201 if self._serializers_profiler: |
221 self._storage_writer.SetSerializersProfiler(None) | 202 self._storage_writer.SetSerializersProfiler(None) |
222 | 203 |
223 if self._storage_profiler: | 204 if self._storage_profiler: |
224 self._storage_writer.SetStorageProfiler(None) | 205 self._storage_writer.SetStorageProfiler(None) |
225 | 206 |
226 self._StopProfiling() | 207 self._StopProfiling() |
| 208 self._parser_mediator.StopProfiling() |
227 | 209 |
228 self._extraction_worker = None | 210 self._extraction_worker = None |
229 self._parser_mediator = None | 211 self._parser_mediator = None |
230 self._storage_writer = None | 212 self._storage_writer = None |
231 | 213 |
232 if self._abort: | 214 if self._abort: |
233 self._status = definitions.PROCESSING_STATUS_ABORTED | 215 self._status = definitions.PROCESSING_STATUS_ABORTED |
234 else: | 216 else: |
235 self._status = definitions.PROCESSING_STATUS_COMPLETED | 217 self._status = definitions.PROCESSING_STATUS_COMPLETED |
236 | 218 |
237 logger.debug('Worker: {0!s} (PID: {1:d}) stopped'.format( | 219 logger.debug('Worker: {0!s} (PID: {1:d}) stopped.'.format( |
238 self._name, self._pid)) | 220 self._name, self._pid)) |
239 | 221 |
240 try: | 222 try: |
241 self._task_queue.Close(abort=self._abort) | 223 self._task_queue.Close(abort=self._abort) |
242 except errors.QueueAlreadyClosed: | 224 except errors.QueueAlreadyClosed: |
243 logger.error('Queue for {0:s} was already closed.'.format(self.name)) | 225 logger.error('Queue for {0:s} was already closed.'.format(self.name)) |
244 | 226 |
245 def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): | 227 def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): |
246 """Processes a path specification. | 228 """Processes a path specification. |
247 | 229 |
(...skipping 25 matching lines...) Expand all Loading... |
273 'Unhandled exception while processing path specification: ' | 255 'Unhandled exception while processing path specification: ' |
274 '{0:s}.').format(self._current_display_name)) | 256 '{0:s}.').format(self._current_display_name)) |
275 logger.exception(exception) | 257 logger.exception(exception) |
276 | 258 |
277 def _ProcessTask(self, task): | 259 def _ProcessTask(self, task): |
278 """Processes a task. | 260 """Processes a task. |
279 | 261 |
280 Args: | 262 Args: |
281 task (Task): task. | 263 task (Task): task. |
282 """ | 264 """ |
| 265 logger.debug('Started processing task: {0:s}.'.format(task.identifier)) |
| 266 |
283 self._task = task | 267 self._task = task |
284 | 268 |
285 storage_writer = self._storage_writer.CreateTaskStorage(task) | 269 storage_writer = self._storage_writer.CreateTaskStorage(task) |
286 | 270 |
287 if self._serializers_profiler: | 271 if self._serializers_profiler: |
288 storage_writer.SetSerializersProfiler(self._serializers_profiler) | 272 storage_writer.SetSerializersProfiler(self._serializers_profiler) |
289 | 273 |
290 storage_writer.Open() | 274 storage_writer.Open() |
291 | 275 |
292 self._parser_mediator.SetStorageWriter(storage_writer) | 276 self._parser_mediator.SetStorageWriter(storage_writer) |
(...skipping 10 matching lines...) Expand all Loading... |
303 self._guppy_memory_profiler.Sample() | 287 self._guppy_memory_profiler.Sample() |
304 | 288 |
305 finally: | 289 finally: |
306 storage_writer.WriteTaskCompletion(aborted=self._abort) | 290 storage_writer.WriteTaskCompletion(aborted=self._abort) |
307 | 291 |
308 self._parser_mediator.SetStorageWriter(None) | 292 self._parser_mediator.SetStorageWriter(None) |
309 | 293 |
310 storage_writer.Close() | 294 storage_writer.Close() |
311 | 295 |
312 try: | 296 try: |
313 self._storage_writer.FinalizeProcessedTaskStorage(task) | 297 self._storage_writer.FinalizeTaskStorage(task) |
314 except IOError: | 298 except IOError: |
315 pass | 299 pass |
316 | 300 |
317 self._task = None | 301 self._task = None |
318 | 302 |
319 def _StartProfiling(self, configuration): | 303 logger.debug('Completed processing task: {0:s}.'.format(task.identifier)) |
320 """Starts profiling. | |
321 | |
322 Args: | |
323 configuration (ProfilingConfiguration): profiling configuration. | |
324 """ | |
325 if not configuration: | |
326 return | |
327 | |
328 if configuration.HaveProfileMemoryGuppy(): | |
329 self._guppy_memory_profiler = profilers.GuppyMemoryProfiler( | |
330 self._name, configuration) | |
331 self._guppy_memory_profiler.Start() | |
332 | |
333 if configuration.HaveProfileMemory(): | |
334 self._memory_profiler = profilers.MemoryProfiler( | |
335 self._name, configuration) | |
336 self._memory_profiler.Start() | |
337 | |
338 if configuration.HaveProfileParsers(): | |
339 identifier = '{0:s}-parsers'.format(self._name) | |
340 self._parsers_profiler = profilers.ParsersProfiler( | |
341 identifier, configuration) | |
342 self._parsers_profiler.Start() | |
343 | |
344 if configuration.HaveProfileProcessing(): | |
345 identifier = '{0:s}-processing'.format(self._name) | |
346 self._processing_profiler = profilers.ProcessingProfiler( | |
347 identifier, configuration) | |
348 self._processing_profiler.Start() | |
349 | |
350 if configuration.HaveProfileSerializers(): | |
351 identifier = '{0:s}-serializers'.format(self._name) | |
352 self._serializers_profiler = profilers.SerializersProfiler( | |
353 identifier, configuration) | |
354 self._serializers_profiler.Start() | |
355 | |
356 if configuration.HaveProfileStorage(): | |
357 self._storage_profiler = profilers.StorageProfiler( | |
358 self._name, configuration) | |
359 self._storage_profiler.Start() | |
360 | |
361 def _StopProfiling(self): | |
362 """Stops profiling.""" | |
363 if self._guppy_memory_profiler: | |
364 self._guppy_memory_profiler.Sample() | |
365 self._guppy_memory_profiler.Stop() | |
366 self._guppy_memory_profiler = None | |
367 | |
368 if self._memory_profiler: | |
369 self._memory_profiler.Stop() | |
370 self._memory_profiler = None | |
371 | |
372 if self._parsers_profiler: | |
373 self._parsers_profiler.Stop() | |
374 self._parsers_profiler = None | |
375 | |
376 if self._processing_profiler: | |
377 self._processing_profiler.Stop() | |
378 self._processing_profiler = None | |
379 | |
380 if self._serializers_profiler: | |
381 self._serializers_profiler.Stop() | |
382 self._serializers_profiler = None | |
383 | |
384 if self._storage_profiler: | |
385 self._storage_profiler.Stop() | |
386 self._storage_profiler = None | |
387 | 304 |
388 def SignalAbort(self): | 305 def SignalAbort(self): |
389 """Signals the process to abort.""" | 306 """Signals the process to abort.""" |
390 self._abort = True | 307 self._abort = True |
391 if self._extraction_worker: | 308 if self._extraction_worker: |
392 self._extraction_worker.SignalAbort() | 309 self._extraction_worker.SignalAbort() |
393 if self._parser_mediator: | 310 if self._parser_mediator: |
394 self._parser_mediator.SignalAbort() | 311 self._parser_mediator.SignalAbort() |
LEFT | RIGHT |