OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The multi-process worker process.""" | 2 """The multi-process worker process.""" |
3 | 3 |
4 import logging | 4 import logging |
5 | 5 |
6 from dfvfs.lib import errors as dfvfs_errors | 6 from dfvfs.lib import errors as dfvfs_errors |
7 from dfvfs.resolver import context | 7 from dfvfs.resolver import context |
8 from dfvfs.resolver import resolver | 8 from dfvfs.resolver import resolver |
9 | 9 |
10 from plaso.engine import plaso_queue | 10 from plaso.engine import plaso_queue |
(...skipping 24 matching lines...) Expand all Loading... |
35 session_identifier (str): identifier of the session. | 35 session_identifier (str): identifier of the session. |
36 processing_configuration (ProcessingConfiguration): processing | 36 processing_configuration (ProcessingConfiguration): processing |
37 configuration. | 37 configuration. |
38 kwargs: keyword arguments to pass to multiprocessing.Process. | 38 kwargs: keyword arguments to pass to multiprocessing.Process. |
39 """ | 39 """ |
40 super(WorkerProcess, self).__init__(**kwargs) | 40 super(WorkerProcess, self).__init__(**kwargs) |
41 self._abort = False | 41 self._abort = False |
42 self._buffer_size = 0 | 42 self._buffer_size = 0 |
43 self._current_display_name = u'' | 43 self._current_display_name = u'' |
44 self._extraction_worker = None | 44 self._extraction_worker = None |
| 45 self._guppy_memory_profiler = None |
45 self._knowledge_base = knowledge_base | 46 self._knowledge_base = knowledge_base |
46 self._memory_profiler = None | |
47 self._number_of_consumed_events = 0 | 47 self._number_of_consumed_events = 0 |
48 self._number_of_consumed_sources = 0 | 48 self._number_of_consumed_sources = 0 |
49 self._parser_mediator = None | 49 self._parser_mediator = None |
50 self._parsers_profiler = None | 50 self._parsers_profiler = None |
51 self._processing_configuration = processing_configuration | 51 self._processing_configuration = processing_configuration |
52 self._processing_profiler = None | 52 self._processing_profiler = None |
53 self._serializers_profiler = None | 53 self._serializers_profiler = None |
54 self._session_identifier = session_identifier | 54 self._session_identifier = session_identifier |
55 self._status = definitions.PROCESSING_STATUS_INITIALIZED | 55 self._status = definitions.PROCESSING_STATUS_INITIALIZED |
56 self._storage_writer = storage_writer | 56 self._storage_writer = storage_writer |
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 self._parser_mediator.SetStorageWriter(storage_writer) | 243 self._parser_mediator.SetStorageWriter(storage_writer) |
244 | 244 |
245 storage_writer.WriteTaskStart() | 245 storage_writer.WriteTaskStart() |
246 | 246 |
247 try: | 247 try: |
248 # TODO: add support for more task types. | 248 # TODO: add support for more task types. |
249 self._ProcessPathSpec( | 249 self._ProcessPathSpec( |
250 self._extraction_worker, self._parser_mediator, task.path_spec) | 250 self._extraction_worker, self._parser_mediator, task.path_spec) |
251 self._number_of_consumed_sources += 1 | 251 self._number_of_consumed_sources += 1 |
252 | 252 |
253 if self._memory_profiler: | 253 if self._guppy_memory_profiler: |
254 self._memory_profiler.Sample() | 254 self._guppy_memory_profiler.Sample() |
255 | 255 |
256 finally: | 256 finally: |
257 storage_writer.WriteTaskCompletion(aborted=self._abort) | 257 storage_writer.WriteTaskCompletion(aborted=self._abort) |
258 | 258 |
259 self._parser_mediator.SetStorageWriter(None) | 259 self._parser_mediator.SetStorageWriter(None) |
260 | 260 |
261 storage_writer.Close() | 261 storage_writer.Close() |
262 | 262 |
263 try: | 263 try: |
264 self._storage_writer.PrepareMergeTaskStorage(task) | 264 self._storage_writer.PrepareMergeTaskStorage(task) |
265 except IOError: | 265 except IOError: |
266 pass | 266 pass |
267 | 267 |
268 self._task = None | 268 self._task = None |
269 | 269 |
270 def _StartProfiling(self): | 270 def _StartProfiling(self): |
271 """Starts profiling.""" | 271 """Starts profiling.""" |
272 if not self._processing_configuration: | 272 if not self._processing_configuration: |
273 return | 273 return |
274 | 274 |
275 if self._processing_configuration.profiling.HaveProfileMemory(): | 275 if self._processing_configuration.profiling.HaveProfileMemoryGuppy(): |
276 identifier = u'{0:s}-memory'.format(self._name) | 276 identifier = u'{0:s}-memory'.format(self._name) |
277 self._memory_profiler = profiler.GuppyMemoryProfiler( | 277 self._guppy_memory_profiler = profiler.GuppyMemoryProfiler( |
278 identifier, path=self._processing_configuration.profiling.directory, | 278 identifier, path=self._processing_configuration.profiling.directory, |
279 profiling_sample_rate=( | 279 profiling_sample_rate=( |
280 self._processing_configuration.profiling.sample_rate)) | 280 self._processing_configuration.profiling.sample_rate)) |
281 self._memory_profiler.Start() | 281 self._guppy_memory_profiler.Start() |
282 | 282 |
283 if self._processing_configuration.profiling.HaveProfileParsers(): | 283 if self._processing_configuration.profiling.HaveProfileParsers(): |
284 identifier = u'{0:s}-parsers'.format(self._name) | 284 identifier = u'{0:s}-parsers'.format(self._name) |
285 self._parsers_profiler = profiler.ParsersProfiler( | 285 self._parsers_profiler = profiler.ParsersProfiler( |
286 identifier, path=self._processing_configuration.profiling.directory) | 286 identifier, path=self._processing_configuration.profiling.directory) |
287 self._extraction_worker.SetParsersProfiler(self._parsers_profiler) | 287 self._extraction_worker.SetParsersProfiler(self._parsers_profiler) |
288 | 288 |
289 if self._processing_configuration.profiling.HaveProfileProcessing(): | 289 if self._processing_configuration.profiling.HaveProfileProcessing(): |
290 identifier = u'{0:s}-processing'.format(self._name) | 290 identifier = u'{0:s}-processing'.format(self._name) |
291 self._processing_profiler = profiler.ProcessingProfiler( | 291 self._processing_profiler = profiler.ProcessingProfiler( |
292 identifier, path=self._processing_configuration.profiling.directory) | 292 identifier, path=self._processing_configuration.profiling.directory) |
293 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) | 293 self._extraction_worker.SetProcessingProfiler(self._processing_profiler) |
294 | 294 |
295 if self._processing_configuration.profiling.HaveProfileSerializers(): | 295 if self._processing_configuration.profiling.HaveProfileSerializers(): |
296 identifier = u'{0:s}-serializers'.format(self._name) | 296 identifier = u'{0:s}-serializers'.format(self._name) |
297 self._serializers_profiler = profiler.SerializersProfiler( | 297 self._serializers_profiler = profiler.SerializersProfiler( |
298 identifier, path=self._processing_configuration.profiling.directory) | 298 identifier, path=self._processing_configuration.profiling.directory) |
299 | 299 |
300 def _StopProfiling(self): | 300 def _StopProfiling(self): |
301 """Stops profiling.""" | 301 """Stops profiling.""" |
302 if self._memory_profiler: | 302 if self._guppy_memory_profiler: |
303 self._memory_profiler.Sample() | 303 self._guppy_memory_profiler.Sample() |
304 self._memory_profiler = None | 304 self._guppy_memory_profiler = None |
305 | 305 |
306 if self._parsers_profiler: | 306 if self._parsers_profiler: |
307 self._extraction_worker.SetParsersProfiler(None) | 307 self._extraction_worker.SetParsersProfiler(None) |
308 self._parsers_profiler.Write() | 308 self._parsers_profiler.Write() |
309 self._parsers_profiler = None | 309 self._parsers_profiler = None |
310 | 310 |
311 if self._processing_profiler: | 311 if self._processing_profiler: |
312 self._extraction_worker.SetProcessingProfiler(None) | 312 self._extraction_worker.SetProcessingProfiler(None) |
313 self._processing_profiler.Write() | 313 self._processing_profiler.Write() |
314 self._processing_profiler = None | 314 self._processing_profiler = None |
315 | 315 |
316 if self._serializers_profiler: | 316 if self._serializers_profiler: |
317 self._serializers_profiler.Write() | 317 self._serializers_profiler.Write() |
318 self._serializers_profiler = None | 318 self._serializers_profiler = None |
319 | 319 |
320 def SignalAbort(self): | 320 def SignalAbort(self): |
321 """Signals the process to abort.""" | 321 """Signals the process to abort.""" |
322 self._abort = True | 322 self._abort = True |
323 if self._extraction_worker: | 323 if self._extraction_worker: |
324 self._extraction_worker.SignalAbort() | 324 self._extraction_worker.SignalAbort() |
325 if self._parser_mediator: | 325 if self._parser_mediator: |
326 self._parser_mediator.SignalAbort() | 326 self._parser_mediator.SignalAbort() |
OLD | NEW |