OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The task multi-process processing engine.""" | 2 """The task multi-process processing engine.""" |
3 | 3 |
4 import heapq | 4 import heapq |
5 import logging | 5 import logging |
6 import multiprocessing | 6 import multiprocessing |
7 import os | 7 import os |
8 import time | 8 import time |
9 | 9 |
10 from dfvfs.lib import definitions as dfvfs_definitions | 10 from dfvfs.lib import definitions as dfvfs_definitions |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
104 | 104 |
105 Args: | 105 Args: |
106 maximum_number_of_tasks (Optional[int]): maximum number of concurrent | 106 maximum_number_of_tasks (Optional[int]): maximum number of concurrent |
107 tasks, where 0 represents no limit. | 107 tasks, where 0 represents no limit. |
108 use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing | 108 use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing |
109 instead of Python's multiprocessing queue. | 109 instead of Python's multiprocessing queue. |
110 """ | 110 """ |
111 super(TaskMultiProcessEngine, self).__init__() | 111 super(TaskMultiProcessEngine, self).__init__() |
112 self._enable_sigsegv_handler = False | 112 self._enable_sigsegv_handler = False |
113 self._filter_find_specs = None | 113 self._filter_find_specs = None |
| 114 self._guppy_memory_profiler = None |
114 self._last_worker_number = 0 | 115 self._last_worker_number = 0 |
115 self._maximum_number_of_tasks = maximum_number_of_tasks | 116 self._maximum_number_of_tasks = maximum_number_of_tasks |
116 self._memory_profiler = None | |
117 self._merge_task = None | 117 self._merge_task = None |
118 self._merge_task_on_hold = None | 118 self._merge_task_on_hold = None |
119 self._number_of_consumed_errors = 0 | 119 self._number_of_consumed_errors = 0 |
120 self._number_of_consumed_event_tags = 0 | 120 self._number_of_consumed_event_tags = 0 |
121 self._number_of_consumed_events = 0 | 121 self._number_of_consumed_events = 0 |
122 self._number_of_consumed_reports = 0 | 122 self._number_of_consumed_reports = 0 |
123 self._number_of_consumed_sources = 0 | 123 self._number_of_consumed_sources = 0 |
124 self._number_of_produced_errors = 0 | 124 self._number_of_produced_errors = 0 |
125 self._number_of_produced_event_tags = 0 | 125 self._number_of_produced_event_tags = 0 |
126 self._number_of_produced_events = 0 | 126 self._number_of_produced_events = 0 |
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
275 | 275 |
276 self._number_of_produced_errors = storage_writer.number_of_errors | 276 self._number_of_produced_errors = storage_writer.number_of_errors |
277 self._number_of_produced_events = storage_writer.number_of_events | 277 self._number_of_produced_events = storage_writer.number_of_events |
278 self._number_of_produced_sources = storage_writer.number_of_event_sources | 278 self._number_of_produced_sources = storage_writer.number_of_event_sources |
279 | 279 |
280 if self._processing_profiler: | 280 if self._processing_profiler: |
281 self._processing_profiler.StopTiming(u'process_sources') | 281 self._processing_profiler.StopTiming(u'process_sources') |
282 | 282 |
283 def _ProfilingSampleMemory(self): | 283 def _ProfilingSampleMemory(self): |
284 """Creates a memory profiling sample.""" | 284 """Creates a memory profiling sample.""" |
285 if self._memory_profiler: | 285 if self._guppy_memory_profiler: |
286 self._memory_profiler.Sample() | 286 self._guppy_memory_profiler.Sample() |
287 | 287 |
288 def _ScheduleTask(self, task): | 288 def _ScheduleTask(self, task): |
289 """Schedules a task. | 289 """Schedules a task. |
290 | 290 |
291 Args: | 291 Args: |
292 task (Task): task. | 292 task (Task): task. |
293 | 293 |
294 Returns: | 294 Returns: |
295 bool: True if the task was scheduled. | 295 bool: True if the task was scheduled. |
296 """ | 296 """ |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
384 task = self._task_manager.CreateTask(self._session_identifier) | 384 task = self._task_manager.CreateTask(self._session_identifier) |
385 task.file_entry_type = event_source.file_entry_type | 385 task.file_entry_type = event_source.file_entry_type |
386 task.path_spec = event_source.path_spec | 386 task.path_spec = event_source.path_spec |
387 logging.debug( | 387 logging.debug( |
388 u'Scheduled task {0:s} for path specification {1:s}'.format( | 388 u'Scheduled task {0:s} for path specification {1:s}'.format( |
389 task.identifier, task.path_spec.comparable)) | 389 task.identifier, task.path_spec.comparable)) |
390 event_source = None | 390 event_source = None |
391 | 391 |
392 self._number_of_consumed_sources += 1 | 392 self._number_of_consumed_sources += 1 |
393 | 393 |
394 if self._memory_profiler: | 394 if self._guppy_memory_profiler: |
395 self._memory_profiler.Sample() | 395 self._guppy_memory_profiler.Sample() |
396 | 396 |
397 if task: | 397 if task: |
398 if self._ScheduleTask(task): | 398 if self._ScheduleTask(task): |
399 task = None | 399 task = None |
400 | 400 |
401 self._MergeTaskStorage(storage_writer) | 401 self._MergeTaskStorage(storage_writer) |
402 | 402 |
403 self._FillEventSourceHeap(storage_writer, event_source_heap) | 403 self._FillEventSourceHeap(storage_writer, event_source_heap) |
404 | 404 |
405 if not event_source and not task: | 405 if not event_source and not task: |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
453 | 453 |
454 self._RegisterProcess(process) | 454 self._RegisterProcess(process) |
455 | 455 |
456 return process | 456 return process |
457 | 457 |
458 def _StartProfiling(self): | 458 def _StartProfiling(self): |
459 """Starts profiling.""" | 459 """Starts profiling.""" |
460 if not self._processing_configuration: | 460 if not self._processing_configuration: |
461 return | 461 return |
462 | 462 |
463 if self._processing_configuration.profiling.HaveProfileMemory(): | 463 if self._processing_configuration.profiling.HaveProfileMemoryGuppy(): |
464 identifier = u'{0:s}-memory'.format(self._name) | 464 identifier = u'{0:s}-memory'.format(self._name) |
465 self._memory_profiler = profiler.GuppyMemoryProfiler( | 465 self._guppy_memory_profiler = profiler.GuppyMemoryProfiler( |
466 identifier, path=self._processing_configuration.profiling.directory, | 466 identifier, path=self._processing_configuration.profiling.directory, |
467 profiling_sample_rate=( | 467 profiling_sample_rate=( |
468 self._processing_configuration.profiling.sample_rate)) | 468 self._processing_configuration.profiling.sample_rate)) |
469 self._memory_profiler.Start() | 469 self._guppy_memory_profiler.Start() |
470 | 470 |
471 if self._processing_configuration.profiling.HaveProfileProcessing(): | 471 if self._processing_configuration.profiling.HaveProfileProcessing(): |
472 identifier = u'{0:s}-processing'.format(self._name) | 472 identifier = u'{0:s}-processing'.format(self._name) |
473 self._processing_profiler = profiler.ProcessingProfiler( | 473 self._processing_profiler = profiler.ProcessingProfiler( |
474 identifier, path=self._processing_configuration.profiling.directory) | 474 identifier, path=self._processing_configuration.profiling.directory) |
475 | 475 |
476 if self._processing_configuration.profiling.HaveProfileSerializers(): | 476 if self._processing_configuration.profiling.HaveProfileSerializers(): |
477 identifier = u'{0:s}-serializers'.format(self._name) | 477 identifier = u'{0:s}-serializers'.format(self._name) |
478 self._serializers_profiler = profiler.SerializersProfiler( | 478 self._serializers_profiler = profiler.SerializersProfiler( |
479 identifier, path=self._processing_configuration.profiling.directory) | 479 identifier, path=self._processing_configuration.profiling.directory) |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
541 self._AbortKill() | 541 self._AbortKill() |
542 else: | 542 else: |
543 # Check if the processes are still alive and terminate them if necessary. | 543 # Check if the processes are still alive and terminate them if necessary. |
544 self._AbortTerminate() | 544 self._AbortTerminate() |
545 self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) | 545 self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) |
546 | 546 |
547 self._task_queue.Close(abort=True) | 547 self._task_queue.Close(abort=True) |
548 | 548 |
549 def _StopProfiling(self): | 549 def _StopProfiling(self): |
550 """Stops profiling.""" | 550 """Stops profiling.""" |
551 if self._memory_profiler: | 551 if self._guppy_memory_profiler: |
552 self._memory_profiler.Sample() | 552 self._guppy_memory_profiler.Sample() |
553 self._memory_profiler = None | 553 self._guppy_memory_profiler = None |
554 | 554 |
555 if self._processing_profiler: | 555 if self._processing_profiler: |
556 self._processing_profiler.Write() | 556 self._processing_profiler.Write() |
557 self._processing_profiler = None | 557 self._processing_profiler = None |
558 | 558 |
559 if self._serializers_profiler: | 559 if self._serializers_profiler: |
560 self._serializers_profiler.Write() | 560 self._serializers_profiler.Write() |
561 self._serializers_profiler = None | 561 self._serializers_profiler = None |
562 | 562 |
563 def _UpdateProcessingStatus(self, pid, process_status, used_memory): | 563 def _UpdateProcessingStatus(self, pid, process_status, used_memory): |
(...skipping 254 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
818 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT | 818 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT |
819 | 819 |
820 self._processing_configuration = None | 820 self._processing_configuration = None |
821 | 821 |
822 self._filter_find_specs = None | 822 self._filter_find_specs = None |
823 self._session_identifier = None | 823 self._session_identifier = None |
824 self._status_update_callback = None | 824 self._status_update_callback = None |
825 self._storage_writer = None | 825 self._storage_writer = None |
826 | 826 |
827 return self._processing_status | 827 return self._processing_status |
OLD | NEW |