LEFT | RIGHT |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The psort multi-processing engine.""" | 2 """The psort multi-processing engine.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | 5 |
6 import collections | 6 import collections |
7 import heapq | 7 import heapq |
8 import os | 8 import os |
9 import time | 9 import time |
10 | 10 |
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
184 self._analysis_plugins = {} | 184 self._analysis_plugins = {} |
185 self._completed_analysis_processes = set() | 185 self._completed_analysis_processes = set() |
186 self._data_location = None | 186 self._data_location = None |
187 self._event_filter_expression = None | 187 self._event_filter_expression = None |
188 self._event_queues = {} | 188 self._event_queues = {} |
189 self._event_tag_index = event_tag_index.EventTagIndex() | 189 self._event_tag_index = event_tag_index.EventTagIndex() |
190 # The export event heap is used to make sure the events are sorted in | 190 # The export event heap is used to make sure the events are sorted in |
191 # a deterministic way. | 191 # a deterministic way. |
192 self._export_event_heap = PsortEventHeap() | 192 self._export_event_heap = PsortEventHeap() |
193 self._export_event_timestamp = 0 | 193 self._export_event_timestamp = 0 |
| 194 self._guppy_memory_profiler = None |
194 self._knowledge_base = None | 195 self._knowledge_base = None |
| 196 self._memory_profiler = None |
195 self._merge_task = None | 197 self._merge_task = None |
196 self._guppy_memory_profiler = None | |
197 self._memory_profiler = None | |
198 self._processing_profiler = None | |
199 self._serializers_profiler = None | |
200 self._number_of_consumed_errors = 0 | 198 self._number_of_consumed_errors = 0 |
| 199 self._number_of_consumed_events = 0 |
201 self._number_of_consumed_event_tags = 0 | 200 self._number_of_consumed_event_tags = 0 |
202 self._number_of_consumed_events = 0 | |
203 self._number_of_consumed_reports = 0 | 201 self._number_of_consumed_reports = 0 |
204 self._number_of_consumed_sources = 0 | 202 self._number_of_consumed_sources = 0 |
205 self._number_of_duplicate_events = 0 | 203 self._number_of_duplicate_events = 0 |
206 self._number_of_macb_grouped_events = 0 | 204 self._number_of_macb_grouped_events = 0 |
207 self._profiling_configuration = None | |
208 self._number_of_produced_errors = 0 | 205 self._number_of_produced_errors = 0 |
| 206 self._number_of_produced_events = 0 |
209 self._number_of_produced_event_tags = 0 | 207 self._number_of_produced_event_tags = 0 |
210 self._number_of_produced_events = 0 | |
211 self._number_of_produced_reports = 0 | 208 self._number_of_produced_reports = 0 |
212 self._number_of_produced_sources = 0 | 209 self._number_of_produced_sources = 0 |
| 210 self._processing_configuration = None |
| 211 self._processing_profiler = None |
| 212 self._serializers_profiler = None |
213 self._status = definitions.PROCESSING_STATUS_IDLE | 213 self._status = definitions.PROCESSING_STATUS_IDLE |
214 self._status_update_callback = None | 214 self._status_update_callback = None |
215 self._use_zeromq = use_zeromq | 215 self._use_zeromq = use_zeromq |
216 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT | 216 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT |
217 | 217 |
218 def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): | 218 def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): |
219 """Analyzes events in a plaso storage. | 219 """Analyzes events in a plaso storage. |
220 | 220 |
221 Args: | 221 Args: |
222 storage_writer (StorageWriter): storage writer. | 222 storage_writer (StorageWriter): storage writer. |
(...skipping 596 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
819 queue_name = '{0:s} input event queue'.format(process_name) | 819 queue_name = '{0:s} input event queue'.format(process_name) |
820 input_event_queue = zeromq_queue.ZeroMQPullConnectQueue( | 820 input_event_queue = zeromq_queue.ZeroMQPullConnectQueue( |
821 name=queue_name, delay_open=True, port=output_event_queue.port, | 821 name=queue_name, delay_open=True, port=output_event_queue.port, |
822 timeout_seconds=self._QUEUE_TIMEOUT) | 822 timeout_seconds=self._QUEUE_TIMEOUT) |
823 | 823 |
824 else: | 824 else: |
825 input_event_queue = output_event_queue | 825 input_event_queue = output_event_queue |
826 | 826 |
827 process = analysis_process.AnalysisProcess( | 827 process = analysis_process.AnalysisProcess( |
828 input_event_queue, storage_writer, self._knowledge_base, | 828 input_event_queue, storage_writer, self._knowledge_base, |
829 analysis_plugin, data_location=self._data_location, | 829 analysis_plugin, self._processing_configuration, |
| 830 data_location=self._data_location, |
830 event_filter_expression=self._event_filter_expression, | 831 event_filter_expression=self._event_filter_expression, |
831 name=process_name) | 832 name=process_name) |
832 | 833 |
833 process.start() | 834 process.start() |
834 | 835 |
835 logger.info('Started analysis plugin: {0:s} (PID: {1:d}).'.format( | 836 logger.info('Started analysis plugin: {0:s} (PID: {1:d}).'.format( |
836 process_name, process.pid)) | 837 process_name, process.pid)) |
837 | 838 |
838 try: | 839 try: |
839 self._StartMonitoringProcess(process) | 840 self._StartMonitoringProcess(process) |
840 except (IOError, KeyError) as exception: | 841 except (IOError, KeyError) as exception: |
841 logger.error(( | 842 logger.error(( |
842 'Unable to monitor analysis plugin: {0:s} (PID: {1:d}) ' | 843 'Unable to monitor analysis plugin: {0:s} (PID: {1:d}) ' |
843 'with error: {2!s}').format(process_name, process.pid, exception)) | 844 'with error: {2!s}').format(process_name, process.pid, exception)) |
844 | 845 |
845 process.terminate() | 846 process.terminate() |
846 return None | 847 return None |
847 | 848 |
848 self._RegisterProcess(process) | 849 self._RegisterProcess(process) |
849 return process | 850 return process |
850 | 851 |
851 def AnalyzeEvents( | 852 def AnalyzeEvents( |
852 self, knowledge_base_object, storage_writer, data_location, | 853 self, knowledge_base_object, storage_writer, data_location, |
853 analysis_plugins, event_filter=None, event_filter_expression=None, | 854 analysis_plugins, processing_configuration, event_filter=None, |
854 status_update_callback=None, worker_memory_limit=None, | 855 event_filter_expression=None, status_update_callback=None, |
855 profiling_configuration=None): | 856 worker_memory_limit=None): |
856 """Analyzes events in a plaso storage. | 857 """Analyzes events in a plaso storage. |
857 | 858 |
858 Args: | 859 Args: |
859 knowledge_base_object (KnowledgeBase): contains information from | 860 knowledge_base_object (KnowledgeBase): contains information from |
860 the source data needed for processing. | 861 the source data needed for processing. |
861 storage_writer (StorageWriter): storage writer. | 862 storage_writer (StorageWriter): storage writer. |
862 data_location (str): path to the location that data files should | 863 data_location (str): path to the location that data files should |
863 be loaded from. | 864 be loaded from. |
864 analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that | 865 analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that |
865 should be run and their names. | 866 should be run and their names. |
| 867 processing_configuration (ProcessingConfiguration): processing |
| 868 configuration. |
866 event_filter (Optional[FilterObject]): event filter. | 869 event_filter (Optional[FilterObject]): event filter. |
867 event_filter_expression (Optional[str]): event filter expression. | 870 event_filter_expression (Optional[str]): event filter expression. |
868 status_update_callback (Optional[function]): callback function for status | 871 status_update_callback (Optional[function]): callback function for status |
869 updates. | 872 updates. |
870 worker_memory_limit (Optional[int]): maximum amount of memory a worker is | 873 worker_memory_limit (Optional[int]): maximum amount of memory a worker is |
871 allowed to consume, where None represents the default memory limit | 874 allowed to consume, where None represents the default memory limit |
872 and 0 represents no limit. | 875 and 0 represents no limit. |
873 profiling_configuration (Optional[ProfilingConfiguration]): profiling | |
874 configuration. | |
875 | 876 |
876 Raises: | 877 Raises: |
877 KeyboardInterrupt: if a keyboard interrupt was raised. | 878 KeyboardInterrupt: if a keyboard interrupt was raised. |
878 """ | 879 """ |
879 if not analysis_plugins: | 880 if not analysis_plugins: |
880 return | 881 return |
881 | 882 |
882 keyboard_interrupt = False | 883 keyboard_interrupt = False |
883 | 884 |
884 self._analysis_plugins = {} | 885 self._analysis_plugins = {} |
885 self._data_location = data_location | 886 self._data_location = data_location |
886 self._event_filter_expression = event_filter_expression | 887 self._event_filter_expression = event_filter_expression |
887 self._knowledge_base = knowledge_base_object | 888 self._knowledge_base = knowledge_base_object |
888 self._status_update_callback = status_update_callback | 889 self._status_update_callback = status_update_callback |
889 self._profiling_configuration = profiling_configuration | 890 self._processing_configuration = processing_configuration |
890 | 891 |
891 if worker_memory_limit is None: | 892 if worker_memory_limit is None: |
892 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT | 893 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT |
893 else: | 894 else: |
894 self._worker_memory_limit = worker_memory_limit | 895 self._worker_memory_limit = worker_memory_limit |
895 | 896 |
896 self._StartProfiling(self._profiling_configuration) | 897 self._StartProfiling(self._processing_configuration.profiling) |
897 | 898 |
898 # Set up the storage writer before the analysis processes. | 899 # Set up the storage writer before the analysis processes. |
899 storage_writer.StartTaskStorage() | 900 storage_writer.StartTaskStorage() |
900 | 901 |
901 self._StartAnalysisProcesses(storage_writer, analysis_plugins) | 902 self._StartAnalysisProcesses(storage_writer, analysis_plugins) |
902 | 903 |
903 # Start the status update thread after open of the storage writer | 904 # Start the status update thread after open of the storage writer |
904 # so we don't have to clean up the thread if the open fails. | 905 # so we don't have to clean up the thread if the open fails. |
905 self._StartStatusUpdateThread() | 906 self._StartStatusUpdateThread() |
906 | 907 |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
948 # due to incorrectly finalized IPC. | 949 # due to incorrectly finalized IPC. |
949 self._KillProcess(os.getpid()) | 950 self._KillProcess(os.getpid()) |
950 | 951 |
951 self._StopProfiling() | 952 self._StopProfiling() |
952 | 953 |
953 # Reset values. | 954 # Reset values. |
954 self._analysis_plugins = {} | 955 self._analysis_plugins = {} |
955 self._data_location = None | 956 self._data_location = None |
956 self._event_filter_expression = None | 957 self._event_filter_expression = None |
957 self._knowledge_base = None | 958 self._knowledge_base = None |
958 self._profiling_configuration = None | 959 self._processing_configuration = None |
959 self._status_update_callback = None | 960 self._status_update_callback = None |
960 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT | 961 self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT |
961 | 962 |
962 if keyboard_interrupt: | 963 if keyboard_interrupt: |
963 raise KeyboardInterrupt | 964 raise KeyboardInterrupt |
964 | 965 |
965 if keyboard_interrupt: | 966 if keyboard_interrupt: |
966 raise KeyboardInterrupt | 967 raise KeyboardInterrupt |
967 | 968 |
968 def ExportEvents( | 969 def ExportEvents( |
969 self, knowledge_base_object, storage_reader, output_module, | 970 self, knowledge_base_object, storage_reader, output_module, |
970 deduplicate_events=True, event_filter=None, status_update_callback=None, | 971 processing_configuration, deduplicate_events=True, event_filter=None, |
971 time_slice=None, use_time_slicer=False, profiling_configuration=None): | 972 status_update_callback=None, time_slice=None, use_time_slicer=False): |
972 """Exports events using an output module. | 973 """Exports events using an output module. |
973 | 974 |
974 Args: | 975 Args: |
975 knowledge_base_object (KnowledgeBase): contains information from | 976 knowledge_base_object (KnowledgeBase): contains information from |
976 the source data needed for processing. | 977 the source data needed for processing. |
977 storage_reader (StorageReader): storage reader. | 978 storage_reader (StorageReader): storage reader. |
978 output_module (OutputModule): output module. | 979 output_module (OutputModule): output module. |
| 980 processing_configuration (ProcessingConfiguration): processing |
| 981 configuration. |
979 deduplicate_events (Optional[bool]): True if events should be | 982 deduplicate_events (Optional[bool]): True if events should be |
980 deduplicated. | 983 deduplicated. |
981 event_filter (Optional[FilterObject]): event filter. | 984 event_filter (Optional[FilterObject]): event filter. |
982 status_update_callback (Optional[function]): callback function for status | 985 status_update_callback (Optional[function]): callback function for status |
983 updates. | 986 updates. |
984 time_slice (Optional[TimeSlice]): slice of time to output. | 987 time_slice (Optional[TimeSlice]): slice of time to output. |
985 use_time_slicer (Optional[bool]): True if the 'time slicer' should be | 988 use_time_slicer (Optional[bool]): True if the 'time slicer' should be |
986 used. The 'time slicer' will provide a context of events around | 989 used. The 'time slicer' will provide a context of events around |
987 an event of interest. | 990 an event of interest. |
988 profiling_configuration (Optional[ProfilingConfiguration]): profiling | |
989 configuration. | |
990 | 991 |
991 Returns: | 992 Returns: |
992 collections.Counter: counter that tracks the number of events extracted | 993 collections.Counter: counter that tracks the number of events extracted |
993 from storage. | 994 from storage. |
994 """ | 995 """ |
995 self._profiling_configuration = profiling_configuration | 996 self._processing_configuration = processing_configuration |
996 self._status_update_callback = status_update_callback | 997 self._status_update_callback = status_update_callback |
997 | 998 |
998 storage_reader.ReadPreprocessingInformation(knowledge_base_object) | 999 storage_reader.ReadPreprocessingInformation(knowledge_base_object) |
999 | 1000 |
1000 output_module.Open() | 1001 output_module.Open() |
1001 output_module.WriteHeader() | 1002 output_module.WriteHeader() |
1002 | 1003 |
1003 self._StartStatusUpdateThread() | 1004 self._StartStatusUpdateThread() |
1004 | 1005 |
1005 self._StartProfiling(self._profiling_configuration) | 1006 self._StartProfiling(self._processing_configuration.profiling) |
1006 | 1007 |
1007 try: | 1008 try: |
1008 events_counter = self._ExportEvents( | 1009 events_counter = self._ExportEvents( |
1009 storage_reader, output_module, deduplicate_events=deduplicate_events, | 1010 storage_reader, output_module, deduplicate_events=deduplicate_events, |
1010 event_filter=event_filter, time_slice=time_slice, | 1011 event_filter=event_filter, time_slice=time_slice, |
1011 use_time_slicer=use_time_slicer) | 1012 use_time_slicer=use_time_slicer) |
1012 | 1013 |
1013 finally: | 1014 finally: |
1014 # Stop the status update thread after close of the storage writer | 1015 # Stop the status update thread after close of the storage writer |
1015 # so we include the storage sync to disk in the status updates. | 1016 # so we include the storage sync to disk in the status updates. |
1016 self._StopStatusUpdateThread() | 1017 self._StopStatusUpdateThread() |
1017 | 1018 |
1018 output_module.WriteFooter() | 1019 output_module.WriteFooter() |
1019 output_module.Close() | 1020 output_module.Close() |
1020 | 1021 |
1021 self._StopProfiling() | 1022 self._StopProfiling() |
1022 | 1023 |
1023 # Reset values. | 1024 # Reset values. |
1024 self._status_update_callback = None | 1025 self._status_update_callback = None |
1025 self._profiling_configuration = None | 1026 self._processing_configuration = None |
1026 | 1027 |
1027 return events_counter | 1028 return events_counter |
LEFT | RIGHT |