OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The multi-process worker process.""" | 2 """The multi-process worker process.""" |
3 | 3 |
| 4 from __future__ import unicode_literals |
| 5 |
4 import logging | 6 import logging |
5 | 7 |
6 from dfvfs.lib import errors as dfvfs_errors | 8 from dfvfs.lib import errors as dfvfs_errors |
7 from dfvfs.resolver import context | 9 from dfvfs.resolver import context |
8 from dfvfs.resolver import resolver | 10 from dfvfs.resolver import resolver |
9 | 11 |
10 from plaso.engine import plaso_queue | 12 from plaso.engine import plaso_queue |
11 from plaso.engine import profiler | 13 from plaso.engine import profiler |
12 from plaso.engine import worker | 14 from plaso.engine import worker |
13 from plaso.lib import definitions | 15 from plaso.lib import definitions |
(...skipping 19 matching lines...) Expand all Loading... |
33 knowledge_base (KnowledgeBase): knowledge base which contains | 35 knowledge_base (KnowledgeBase): knowledge base which contains |
34 information from the source data needed for parsing. | 36 information from the source data needed for parsing. |
35 session_identifier (str): identifier of the session. | 37 session_identifier (str): identifier of the session. |
36 processing_configuration (ProcessingConfiguration): processing | 38 processing_configuration (ProcessingConfiguration): processing |
37 configuration. | 39 configuration. |
38 kwargs: keyword arguments to pass to multiprocessing.Process. | 40 kwargs: keyword arguments to pass to multiprocessing.Process. |
39 """ | 41 """ |
40 super(WorkerProcess, self).__init__(**kwargs) | 42 super(WorkerProcess, self).__init__(**kwargs) |
41 self._abort = False | 43 self._abort = False |
42 self._buffer_size = 0 | 44 self._buffer_size = 0 |
43 self._current_display_name = u'' | 45 self._current_display_name = '' |
44 self._extraction_worker = None | 46 self._extraction_worker = None |
45 self._guppy_memory_profiler = None | 47 self._guppy_memory_profiler = None |
46 self._knowledge_base = knowledge_base | 48 self._knowledge_base = knowledge_base |
47 self._number_of_consumed_events = 0 | 49 self._number_of_consumed_events = 0 |
48 self._number_of_consumed_sources = 0 | 50 self._number_of_consumed_sources = 0 |
49 self._parser_mediator = None | 51 self._parser_mediator = None |
50 self._parsers_profiler = None | 52 self._parsers_profiler = None |
51 self._processing_configuration = processing_configuration | 53 self._processing_configuration = processing_configuration |
52 self._processing_profiler = None | 54 self._processing_profiler = None |
53 self._serializers_profiler = None | 55 self._serializers_profiler = None |
54 self._session_identifier = session_identifier | 56 self._session_identifier = session_identifier |
55 self._status = definitions.PROCESSING_STATUS_INITIALIZED | 57 self._status = definitions.PROCESSING_STATUS_INITIALIZED |
56 self._storage_writer = storage_writer | 58 self._storage_writer = storage_writer |
57 self._task = None | 59 self._task = None |
58 self._task_queue = task_queue | 60 self._task_queue = task_queue |
59 | 61 |
60 def _GetStatus(self): | 62 def _GetStatus(self): |
61 """Returns status information. | 63 """Retrieves status information. |
62 | 64 |
63 Returns: | 65 Returns: |
64 dict[str, object]: status attributes, indexed by name. | 66 dict[str, object]: status attributes, indexed by name. |
65 """ | 67 """ |
66 if self._parser_mediator: | 68 if self._parser_mediator: |
67 number_of_produced_errors = ( | 69 number_of_produced_errors = ( |
68 self._parser_mediator.number_of_produced_errors) | 70 self._parser_mediator.number_of_produced_errors) |
69 number_of_produced_events = ( | 71 number_of_produced_events = ( |
70 self._parser_mediator.number_of_produced_events) | 72 self._parser_mediator.number_of_produced_events) |
71 number_of_produced_sources = ( | 73 number_of_produced_sources = ( |
72 self._parser_mediator.number_of_produced_event_sources) | 74 self._parser_mediator.number_of_produced_event_sources) |
73 else: | 75 else: |
74 number_of_produced_errors = None | 76 number_of_produced_errors = None |
75 number_of_produced_events = None | 77 number_of_produced_events = None |
76 number_of_produced_sources = None | 78 number_of_produced_sources = None |
77 | 79 |
78 if self._extraction_worker: | 80 if self._extraction_worker: |
79 last_activity_timestamp = self._extraction_worker.last_activity_timestamp | 81 last_activity_timestamp = self._extraction_worker.last_activity_timestamp |
80 processing_status = self._extraction_worker.processing_status | 82 processing_status = self._extraction_worker.processing_status |
81 else: | 83 else: |
82 last_activity_timestamp = 0.0 | 84 last_activity_timestamp = 0.0 |
83 processing_status = self._status | 85 processing_status = self._status |
84 | 86 |
85 task_identifier = getattr(self._task, u'identifier', u'') | 87 task_identifier = getattr(self._task, 'identifier', '') |
86 | 88 |
87 status = { | 89 status = { |
88 u'display_name': self._current_display_name, | 90 'display_name': self._current_display_name, |
89 u'identifier': self._name, | 91 'identifier': self._name, |
90 u'number_of_consumed_errors': None, | 92 'number_of_consumed_errors': None, |
91 u'number_of_consumed_event_tags': None, | 93 'number_of_consumed_event_tags': None, |
92 u'number_of_consumed_events': self._number_of_consumed_events, | 94 'number_of_consumed_events': self._number_of_consumed_events, |
93 u'number_of_consumed_sources': self._number_of_consumed_sources, | 95 'number_of_consumed_sources': self._number_of_consumed_sources, |
94 u'number_of_produced_errors': number_of_produced_errors, | 96 'number_of_produced_errors': number_of_produced_errors, |
95 u'number_of_produced_event_tags': None, | 97 'number_of_produced_event_tags': None, |
96 u'number_of_produced_events': number_of_produced_events, | 98 'number_of_produced_events': number_of_produced_events, |
97 u'number_of_produced_sources': number_of_produced_sources, | 99 'number_of_produced_sources': number_of_produced_sources, |
98 u'last_activity_timestamp': last_activity_timestamp, | 100 'last_activity_timestamp': last_activity_timestamp, |
99 u'processing_status': processing_status, | 101 'processing_status': processing_status, |
100 u'task_identifier': task_identifier} | 102 'task_identifier': task_identifier} |
101 | 103 |
102 return status | 104 return status |
103 | 105 |
104 def _Main(self): | 106 def _Main(self): |
105 """The main loop.""" | 107 """The main loop.""" |
106 # We need a resolver context per process to prevent multi processing | 108 # We need a resolver context per process to prevent multi processing |
107 # issues with file objects stored in images. | 109 # issues with file objects stored in images. |
108 resolver_context = context.Context() | 110 resolver_context = context.Context() |
109 | 111 |
110 for credential_configuration in self._processing_configuration.credentials: | 112 for credential_configuration in self._processing_configuration.credentials: |
(...skipping 19 matching lines...) Expand all Loading... |
130 # a PickleError for Python modules that cannot be pickled. | 132 # a PickleError for Python modules that cannot be pickled. |
131 self._extraction_worker = worker.EventExtractionWorker( | 133 self._extraction_worker = worker.EventExtractionWorker( |
132 parser_filter_expression=( | 134 parser_filter_expression=( |
133 self._processing_configuration.parser_filter_expression)) | 135 self._processing_configuration.parser_filter_expression)) |
134 | 136 |
135 self._extraction_worker.SetExtractionConfiguration( | 137 self._extraction_worker.SetExtractionConfiguration( |
136 self._processing_configuration.extraction) | 138 self._processing_configuration.extraction) |
137 | 139 |
138 self._StartProfiling() | 140 self._StartProfiling() |
139 | 141 |
140 logging.debug(u'Worker: {0!s} (PID: {1:d}) started'.format( | 142 logging.debug('Worker: {0!s} (PID: {1:d}) started'.format( |
141 self._name, self._pid)) | 143 self._name, self._pid)) |
142 | 144 |
143 self._status = definitions.PROCESSING_STATUS_RUNNING | 145 self._status = definitions.PROCESSING_STATUS_RUNNING |
144 | 146 |
145 try: | 147 try: |
146 logging.debug( | 148 logging.debug('{0!s} (PID: {1:d}) started monitoring task queue.'.format( |
147 u'{0!s} (PID: {1:d}) started monitoring task queue.'.format( | 149 self._name, self._pid)) |
148 self._name, self._pid)) | |
149 | 150 |
150 while not self._abort: | 151 while not self._abort: |
151 try: | 152 try: |
152 task = self._task_queue.PopItem() | 153 task = self._task_queue.PopItem() |
153 except (errors.QueueClose, errors.QueueEmpty) as exception: | 154 except (errors.QueueClose, errors.QueueEmpty) as exception: |
154 logging.debug(u'ConsumeItems exiting with exception {0:s}.'.format( | 155 logging.debug('ConsumeItems exiting with exception {0:s}.'.format( |
155 type(exception))) | 156 type(exception))) |
156 break | 157 break |
157 | 158 |
158 if isinstance(task, plaso_queue.QueueAbort): | 159 if isinstance(task, plaso_queue.QueueAbort): |
159 logging.debug(u'ConsumeItems exiting, dequeued QueueAbort object.') | 160 logging.debug('ConsumeItems exiting, dequeued QueueAbort object.') |
160 break | 161 break |
161 | 162 |
162 self._ProcessTask(task) | 163 self._ProcessTask(task) |
163 | 164 |
164 logging.debug( | 165 logging.debug('{0!s} (PID: {1:d}) stopped monitoring task queue.'.format( |
165 u'{0!s} (PID: {1:d}) stopped monitoring task queue.'.format( | 166 self._name, self._pid)) |
166 self._name, self._pid)) | |
167 | 167 |
168 # All exceptions need to be caught here to prevent the process | 168 # All exceptions need to be caught here to prevent the process |
169 # from being killed by an uncaught exception. | 169 # from being killed by an uncaught exception. |
170 except Exception as exception: # pylint: disable=broad-except | 170 except Exception as exception: # pylint: disable=broad-except |
171 logging.warning( | 171 logging.warning( |
172 u'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( | 172 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( |
173 self._name, self._pid)) | 173 self._name, self._pid)) |
174 logging.exception(exception) | 174 logging.exception(exception) |
175 | 175 |
176 self._abort = True | 176 self._abort = True |
177 | 177 |
178 self._StopProfiling() | 178 self._StopProfiling() |
179 self._extraction_worker = None | 179 self._extraction_worker = None |
180 self._parser_mediator = None | 180 self._parser_mediator = None |
181 self._storage_writer = None | 181 self._storage_writer = None |
182 | 182 |
183 if self._abort: | 183 if self._abort: |
184 self._status = definitions.PROCESSING_STATUS_ABORTED | 184 self._status = definitions.PROCESSING_STATUS_ABORTED |
185 else: | 185 else: |
186 self._status = definitions.PROCESSING_STATUS_COMPLETED | 186 self._status = definitions.PROCESSING_STATUS_COMPLETED |
187 | 187 |
188 logging.debug(u'Worker: {0!s} (PID: {1:d}) stopped'.format( | 188 logging.debug('Worker: {0!s} (PID: {1:d}) stopped'.format( |
189 self._name, self._pid)) | 189 self._name, self._pid)) |
190 | 190 |
191 try: | 191 try: |
192 self._task_queue.Close(abort=self._abort) | 192 self._task_queue.Close(abort=self._abort) |
193 except errors.QueueAlreadyClosed: | 193 except errors.QueueAlreadyClosed: |
194 logging.error(u'Queue for {0:s} was already closed.'.format(self.name)) | 194 logging.error('Queue for {0:s} was already closed.'.format(self.name)) |
195 | 195 |
196 def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): | 196 def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): |
197 """Processes a path specification. | 197 """Processes a path specification. |
198 | 198 |
199 Args: | 199 Args: |
200 extraction_worker (worker.ExtractionWorker): extraction worker. | 200 extraction_worker (worker.ExtractionWorker): extraction worker. |
201 parser_mediator (ParserMediator): parser mediator. | 201 parser_mediator (ParserMediator): parser mediator. |
202 path_spec (dfvfs.PathSpec): path specification. | 202 path_spec (dfvfs.PathSpec): path specification. |
203 """ | 203 """ |
204 self._current_display_name = parser_mediator.GetDisplayNameForPathSpec( | 204 self._current_display_name = parser_mediator.GetDisplayNameForPathSpec( |
205 path_spec) | 205 path_spec) |
206 | 206 |
207 try: | 207 try: |
208 extraction_worker.ProcessPathSpec(parser_mediator, path_spec) | 208 extraction_worker.ProcessPathSpec(parser_mediator, path_spec) |
209 | 209 |
210 except dfvfs_errors.CacheFullError: | 210 except dfvfs_errors.CacheFullError: |
211 # TODO: signal engine of failure. | 211 # TODO: signal engine of failure. |
212 self._abort = True | 212 self._abort = True |
213 logging.error(( | 213 logging.error(( |
214 u'ABORT: detected cache full error while processing ' | 214 'ABORT: detected cache full error while processing path spec: ' |
215 u'path spec: {0:s}').format(self._current_display_name)) | 215 '{0:s}').format(self._current_display_name)) |
216 | 216 |
217 except Exception as exception: # pylint: disable=broad-except | 217 except Exception as exception: # pylint: disable=broad-except |
218 parser_mediator.ProduceExtractionError(( | 218 parser_mediator.ProduceExtractionError(( |
219 u'unable to process path specification with error: ' | 219 'unable to process path specification with error: ' |
220 u'{0:s}').format(exception), path_spec=path_spec) | 220 '{0!s}').format(exception), path_spec=path_spec) |
221 | 221 |
222 if self._processing_configuration.debug_output: | 222 if self._processing_configuration.debug_output: |
223 logging.warning(( | 223 logging.warning(( |
224 u'Unhandled exception while processing path specification: ' | 224 'Unhandled exception while processing path specification: ' |
225 u'{0:s}.').format(self._current_display_name)) | 225 '{0:s}.').format(self._current_display_name)) |
226 logging.exception(exception) | 226 logging.exception(exception) |
227 | 227 |
228 def _ProcessTask(self, task): | 228 def _ProcessTask(self, task): |
229 """Processes a task. | 229 """Processes a task. |
230 | 230 |
231 Args: | 231 Args: |
232 task (Task): task. | 232 task (Task): task. |
233 """ | 233 """ |
234 self._task = task | 234 self._task = task |
235 | 235 |
(...skipping 30 matching lines...) Expand all Loading... |
266 pass | 266 pass |
267 | 267 |
268 self._task = None | 268 self._task = None |
269 | 269 |
270 def _StartProfiling(self): | 270 def _StartProfiling(self): |
271 """Starts profiling.""" | 271 """Starts profiling.""" |
272 if not self._processing_configuration: | 272 if not self._processing_configuration: |
273 return | 273 return |
274 | 274 |
275 if self._processing_configuration.profiling.HaveProfileMemoryGuppy(): | 275 if self._processing_configuration.profiling.HaveProfileMemoryGuppy(): |
276 identifier = u'{0:s}-memory'.format(self._name) | 276 identifier = '{0:s}-memory'.format(self._name) |
277 self._guppy_memory_profiler = profiler.GuppyMemoryProfiler( | 277 self._guppy_memory_profiler = profiler.GuppyMemoryProfiler( |
278 identifier, path=self._processing_configuration.profiling.directory, | 278 identifier, path=self._processing_configuration.profiling.directory, |
279 profiling_sample_rate=( | 279 profiling_sample_rate=( |
280 self._processing_configuration.profiling.sample_rate)) | 280 self._processing_configuration.profiling.sample_rate)) |
281 self._guppy_memory_profiler.Start() | 281 self._guppy_memory_profiler.Start() |
282 | 282 |
283 if self._processing_configuration.profiling.HaveProfileParsers(): | 283 if self._processing_configuration.profiling.HaveProfileParsers(): |
284 identifier = u'{0:s}-parsers'.format(self._name) | 284 identifier = '{0:s}-parsers'.format(self._name) |
285 self._parsers_profiler = profiler.ParsersProfiler( | 285 self._parsers_profiler = profiler.ParsersProfiler( |
286 identifier, path=self._processing_configuration.profiling.directory) | 286 identifier, path=self._processing_configuration.profiling.directory) |
287 self._extraction_worker.SetParsersProfiler(self._parsers_profiler) | 287 self._extraction_worker.SetParsersProfiler(self._parsers_profiler) |
288 | 288 |
289 if self._processing_configuration.profiling.HaveProfileProcessing(): | 289 if self._processing_configuration.profiling.HaveProfileProcessing(): |
290 identifier = u'{0:s}-processing'.format(self._name) | 290 identifier = '{0:s}-processing'.format(self._name) |
291 self._processing_profiler = profiler.ProcessingProfiler( | 291 self._processing_profiler = profiler.ProcessingProfiler( |
292 identifier, path=self._processing_configuration.profiling.directory) | 292 identifier, path=self._processing_configuration.profiling.directory) |
293 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) | 293 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) |
294 | 294 |
295 if self._processing_configuration.profiling.HaveProfileSerializers(): | 295 if self._processing_configuration.profiling.HaveProfileSerializers(): |
296 identifier = u'{0:s}-serializers'.format(self._name) | 296 identifier = '{0:s}-serializers'.format(self._name) |
297 self._serializers_profiler = profiler.SerializersProfiler( | 297 self._serializers_profiler = profiler.SerializersProfiler( |
298 identifier, path=self._processing_configuration.profiling.directory) | 298 identifier, path=self._processing_configuration.profiling.directory) |
299 | 299 |
300 def _StopProfiling(self): | 300 def _StopProfiling(self): |
301 """Stops profiling.""" | 301 """Stops profiling.""" |
302 if self._guppy_memory_profiler: | 302 if self._guppy_memory_profiler: |
303 self._guppy_memory_profiler.Sample() | 303 self._guppy_memory_profiler.Sample() |
304 self._guppy_memory_profiler = None | 304 self._guppy_memory_profiler = None |
305 | 305 |
306 if self._parsers_profiler: | 306 if self._parsers_profiler: |
(...skipping 10 matching lines...) Expand all Loading... |
317 self._serializers_profiler.Write() | 317 self._serializers_profiler.Write() |
318 self._serializers_profiler = None | 318 self._serializers_profiler = None |
319 | 319 |
320 def SignalAbort(self): | 320 def SignalAbort(self): |
321 """Signals the process to abort.""" | 321 """Signals the process to abort.""" |
322 self._abort = True | 322 self._abort = True |
323 if self._extraction_worker: | 323 if self._extraction_worker: |
324 self._extraction_worker.SignalAbort() | 324 self._extraction_worker.SignalAbort() |
325 if self._parser_mediator: | 325 if self._parser_mediator: |
326 self._parser_mediator.SignalAbort() | 326 self._parser_mediator.SignalAbort() |
OLD | NEW |