LEFT | RIGHT |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The multi-process analysis process.""" | 2 """The multi-process analysis process.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | 5 |
6 import threading | 6 import threading |
7 | 7 |
8 from plaso.analysis import mediator as analysis_mediator | 8 from plaso.analysis import mediator as analysis_mediator |
9 from plaso.containers import tasks | 9 from plaso.containers import tasks |
10 from plaso.engine import plaso_queue | 10 from plaso.engine import plaso_queue |
11 from plaso.lib import definitions | 11 from plaso.lib import definitions |
12 from plaso.lib import errors | 12 from plaso.lib import errors |
13 from plaso.multi_processing import base_process | 13 from plaso.multi_processing import base_process |
14 from plaso.multi_processing import logger | 14 from plaso.multi_processing import logger |
15 | 15 |
16 | 16 |
17 class AnalysisProcess(base_process.MultiProcessBaseProcess): | 17 class AnalysisProcess(base_process.MultiProcessBaseProcess): |
18 """Multi-processing analysis process.""" | 18 """Multi-processing analysis process.""" |
19 | 19 |
20 # Number of seconds to wait for the completion status to be queried | 20 # Number of seconds to wait for the completion status to be queried |
21 # by the foreman process. | 21 # by the foreman process. |
22 _FOREMAN_STATUS_WAIT = 5 * 60 | 22 _FOREMAN_STATUS_WAIT = 5 * 60 |
23 | 23 |
24 def __init__( | 24 def __init__( |
25 self, event_queue, storage_writer, knowledge_base, analysis_plugin, | 25 self, event_queue, storage_writer, knowledge_base, analysis_plugin, |
26 data_location=None, event_filter_expression=None, **kwargs): | 26 processing_configuration, data_location=None, |
| 27 event_filter_expression=None, **kwargs): |
27 """Initializes an analysis process. | 28 """Initializes an analysis process. |
28 | 29 |
29 Non-specified keyword arguments (kwargs) are directly passed to | 30 Non-specified keyword arguments (kwargs) are directly passed to |
30 multiprocessing.Process. | 31 multiprocessing.Process. |
31 | 32 |
32 Args: | 33 Args: |
33 event_queue (plaso_queue.Queue): event queue. | 34 event_queue (plaso_queue.Queue): event queue. |
34 storage_writer (StorageWriter): storage writer for a session storage. | 35 storage_writer (StorageWriter): storage writer for a session storage. |
35 knowledge_base (KnowledgeBase): contains information from the source | 36 knowledge_base (KnowledgeBase): contains information from the source |
36 data needed for analysis. | 37 data needed for analysis. |
37 plugin (AnalysisProcess): plugin running in the process. | 38 analysis_plugin (AnalysisPlugin): plugin running in the process. |
| 39 processing_configuration (ProcessingConfiguration): processing |
| 40 configuration. |
38 data_location (Optional[str]): path to the location that data files | 41 data_location (Optional[str]): path to the location that data files |
39 should be loaded from. | 42 should be loaded from. |
40 event_filter_expression (Optional[str]): event filter expression. | 43 event_filter_expression (Optional[str]): event filter expression. |
41 """ | 44 """ |
42 super(AnalysisProcess, self).__init__(**kwargs) | 45 super(AnalysisProcess, self).__init__(processing_configuration, **kwargs) |
43 self._abort = False | 46 self._abort = False |
44 self._analysis_mediator = None | 47 self._analysis_mediator = None |
45 self._analysis_plugin = analysis_plugin | 48 self._analysis_plugin = analysis_plugin |
46 self._data_location = data_location | 49 self._data_location = data_location |
47 self._debug_output = False | 50 self._debug_output = False |
48 self._event_filter_expression = event_filter_expression | 51 self._event_filter_expression = event_filter_expression |
49 self._event_queue = event_queue | 52 self._event_queue = event_queue |
50 self._foreman_status_wait_event = None | 53 self._foreman_status_wait_event = None |
51 self._knowledge_base = knowledge_base | 54 self._knowledge_base = knowledge_base |
52 self._memory_profiler = None | |
53 self._number_of_consumed_events = 0 | 55 self._number_of_consumed_events = 0 |
54 self._serializers_profiler = None | |
55 self._status = definitions.PROCESSING_STATUS_INITIALIZED | 56 self._status = definitions.PROCESSING_STATUS_INITIALIZED |
56 self._storage_writer = storage_writer | 57 self._storage_writer = storage_writer |
57 self._task = None | 58 self._task = None |
58 | 59 |
59 def _GetStatus(self): | 60 def _GetStatus(self): |
60 """Retrieves status information. | 61 """Retrieves status information. |
61 | 62 |
62 Returns: | 63 Returns: |
63 dict[str, object]: status attributes, indexed by name. | 64 dict[str, object]: status attributes, indexed by name. |
64 """ | 65 """ |
65 if self._analysis_mediator: | 66 if self._analysis_mediator: |
66 number_of_produced_event_tags = ( | 67 number_of_produced_event_tags = ( |
67 self._analysis_mediator.number_of_produced_event_tags) | 68 self._analysis_mediator.number_of_produced_event_tags) |
68 number_of_produced_reports = ( | 69 number_of_produced_reports = ( |
69 self._analysis_mediator.number_of_produced_analysis_reports) | 70 self._analysis_mediator.number_of_produced_analysis_reports) |
70 else: | 71 else: |
71 number_of_produced_event_tags = None | 72 number_of_produced_event_tags = None |
72 number_of_produced_reports = None | 73 number_of_produced_reports = None |
| 74 |
| 75 if self._process_information: |
| 76 used_memory = self._process_information.GetUsedMemory() or 0 |
| 77 else: |
| 78 used_memory = 0 |
| 79 |
| 80 if self._memory_profiler: |
| 81 self._memory_profiler.Sample('main', used_memory) |
73 | 82 |
74 status = { | 83 status = { |
75 'display_name': '', | 84 'display_name': '', |
76 'identifier': self._name, | 85 'identifier': self._name, |
77 'number_of_consumed_errors': None, | 86 'number_of_consumed_errors': None, |
78 'number_of_consumed_event_tags': None, | 87 'number_of_consumed_event_tags': None, |
79 'number_of_consumed_events': self._number_of_consumed_events, | 88 'number_of_consumed_events': self._number_of_consumed_events, |
80 'number_of_consumed_reports': None, | 89 'number_of_consumed_reports': None, |
81 'number_of_consumed_sources': None, | 90 'number_of_consumed_sources': None, |
82 'number_of_produced_errors': None, | 91 'number_of_produced_errors': None, |
83 'number_of_produced_event_tags': number_of_produced_event_tags, | 92 'number_of_produced_event_tags': number_of_produced_event_tags, |
84 'number_of_produced_events': None, | 93 'number_of_produced_events': None, |
85 'number_of_produced_reports': number_of_produced_reports, | 94 'number_of_produced_reports': number_of_produced_reports, |
86 'number_of_produced_sources': None, | 95 'number_of_produced_sources': None, |
87 'processing_status': self._status, | 96 'processing_status': self._status, |
88 'task_identifier': None} | 97 'task_identifier': None, |
| 98 'used_memory': used_memory} |
89 | 99 |
90 if self._status in ( | 100 if self._status in ( |
91 definitions.PROCESSING_STATUS_ABORTED, | 101 definitions.PROCESSING_STATUS_ABORTED, |
92 definitions.PROCESSING_STATUS_COMPLETED): | 102 definitions.PROCESSING_STATUS_COMPLETED): |
93 self._foreman_status_wait_event.set() | 103 self._foreman_status_wait_event.set() |
94 | 104 |
95 return status | 105 return status |
96 | 106 |
97 def _Main(self): | 107 def _Main(self): |
98 """The main loop.""" | 108 """The main loop.""" |
| 109 self._StartProfiling(self._processing_configuration.profiling) |
| 110 |
| 111 if self._serializers_profiler: |
| 112 self._storage_writer.SetSerializersProfiler(self._serializers_profiler) |
| 113 |
| 114 if self._storage_profiler: |
| 115 self._storage_writer.SetStorageProfiler(self._storage_profiler) |
| 116 |
99 logger.debug('Analysis plugin: {0!s} (PID: {1:d}) started'.format( | 117 logger.debug('Analysis plugin: {0!s} (PID: {1:d}) started'.format( |
100 self._name, self._pid)) | 118 self._name, self._pid)) |
101 | 119 |
102 # Creating the threading event in the constructor will cause a pickle | 120 # Creating the threading event in the constructor will cause a pickle |
103 # error on Windows when an analysis process is created. | 121 # error on Windows when an analysis process is created. |
104 self._foreman_status_wait_event = threading.Event() | 122 self._foreman_status_wait_event = threading.Event() |
105 self._status = definitions.PROCESSING_STATUS_ANALYZING | 123 self._status = definitions.PROCESSING_STATUS_ANALYZING |
106 | 124 |
107 task = tasks.Task() | 125 task = tasks.Task() |
108 # TODO: temporary solution. | 126 # TODO: temporary solution. |
109 task.identifier = self._analysis_plugin.plugin_name | 127 task.identifier = self._analysis_plugin.plugin_name |
110 | 128 |
111 self._task = task | 129 self._task = task |
112 | 130 |
113 storage_writer = self._storage_writer.CreateTaskStorage(task) | 131 storage_writer = self._storage_writer.CreateTaskStorage(task) |
114 | 132 |
115 if self._serializers_profiler: | 133 if self._serializers_profiler: |
116 storage_writer.SetSerializersProfiler(self._serializers_profiler) | 134 storage_writer.SetSerializersProfiler(self._serializers_profiler) |
117 | 135 |
| 136 if self._storage_profiler: |
| 137 storage_writer.SetStorageProfiler(self._storage_profiler) |
| 138 |
118 storage_writer.Open() | 139 storage_writer.Open() |
119 | 140 |
120 self._analysis_mediator = analysis_mediator.AnalysisMediator( | 141 self._analysis_mediator = analysis_mediator.AnalysisMediator( |
121 storage_writer, self._knowledge_base, data_location=self._data_location) | 142 storage_writer, self._knowledge_base, data_location=self._data_location) |
122 | 143 |
123 # TODO: set event_filter_expression in mediator. | 144 # TODO: set event_filter_expression in mediator. |
124 | 145 |
125 storage_writer.WriteTaskStart() | 146 storage_writer.WriteTaskStart() |
126 | 147 |
127 try: | 148 try: |
(...skipping 11 matching lines...) Expand all Loading... |
139 break | 160 break |
140 | 161 |
141 if isinstance(event, plaso_queue.QueueAbort): | 162 if isinstance(event, plaso_queue.QueueAbort): |
142 logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') | 163 logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') |
143 break | 164 break |
144 | 165 |
145 self._ProcessEvent(self._analysis_mediator, event) | 166 self._ProcessEvent(self._analysis_mediator, event) |
146 | 167 |
147 self._number_of_consumed_events += 1 | 168 self._number_of_consumed_events += 1 |
148 | 169 |
149 if self._memory_profiler: | 170 if self._guppy_memory_profiler: |
150 self._memory_profiler.Sample() | 171 self._guppy_memory_profiler.Sample() |
151 | 172 |
152 logger.debug( | 173 logger.debug( |
153 '{0!s} (PID: {1:d}) stopped monitoring event queue.'.format( | 174 '{0!s} (PID: {1:d}) stopped monitoring event queue.'.format( |
154 self._name, self._pid)) | 175 self._name, self._pid)) |
155 | 176 |
156 if not self._abort: | 177 if not self._abort: |
157 self._status = definitions.PROCESSING_STATUS_REPORTING | 178 self._status = definitions.PROCESSING_STATUS_REPORTING |
158 | 179 |
159 self._analysis_mediator.ProduceAnalysisReport(self._analysis_plugin) | 180 self._analysis_mediator.ProduceAnalysisReport(self._analysis_plugin) |
160 | 181 |
161 # All exceptions need to be caught here to prevent the process | 182 # All exceptions need to be caught here to prevent the process |
162 # from being killed by an uncaught exception. | 183 # from being killed by an uncaught exception. |
163 except Exception as exception: # pylint: disable=broad-except | 184 except Exception as exception: # pylint: disable=broad-except |
164 logger.warning( | 185 logger.warning( |
165 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( | 186 'Unhandled exception in process: {0!s} (PID: {1:d}).'.format( |
166 self._name, self._pid)) | 187 self._name, self._pid)) |
167 logger.exception(exception) | 188 logger.exception(exception) |
168 | 189 |
169 self._abort = True | 190 self._abort = True |
170 | 191 |
171 finally: | 192 finally: |
172 storage_writer.WriteTaskCompletion(aborted=self._abort) | 193 storage_writer.WriteTaskCompletion(aborted=self._abort) |
173 | 194 |
174 storage_writer.Close() | 195 storage_writer.Close() |
175 | 196 |
176 try: | 197 if self._serializers_profiler: |
177 self._storage_writer.FinalizeProcessedTaskStorage(task) | 198 storage_writer.SetSerializersProfiler(None) |
| 199 |
| 200 if self._storage_profiler: |
| 201 storage_writer.SetStorageProfiler(None) |
| 202 |
| 203 try: |
| 204 self._storage_writer.FinalizeTaskStorage(task) |
178 except IOError: | 205 except IOError: |
179 pass | 206 pass |
180 | 207 |
181 if self._abort: | 208 if self._abort: |
182 self._status = definitions.PROCESSING_STATUS_ABORTED | 209 self._status = definitions.PROCESSING_STATUS_ABORTED |
183 else: | 210 else: |
184 self._status = definitions.PROCESSING_STATUS_COMPLETED | 211 self._status = definitions.PROCESSING_STATUS_COMPLETED |
185 | 212 |
186 self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) | 213 self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) |
187 | 214 |
188 logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format( | 215 logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format( |
189 self._name, self._pid)) | 216 self._name, self._pid)) |
| 217 |
| 218 if self._serializers_profiler: |
| 219 self._storage_writer.SetSerializersProfiler(None) |
| 220 |
| 221 if self._storage_profiler: |
| 222 self._storage_writer.SetStorageProfiler(None) |
| 223 |
| 224 self._StopProfiling() |
190 | 225 |
191 self._analysis_mediator = None | 226 self._analysis_mediator = None |
192 self._foreman_status_wait_event = None | 227 self._foreman_status_wait_event = None |
193 self._storage_writer = None | 228 self._storage_writer = None |
194 self._task = None | 229 self._task = None |
195 | 230 |
196 try: | 231 try: |
197 self._event_queue.Close(abort=self._abort) | 232 self._event_queue.Close(abort=self._abort) |
198 except errors.QueueAlreadyClosed: | 233 except errors.QueueAlreadyClosed: |
199 logger.error('Queue for {0:s} was already closed.'.format(self.name)) | 234 logger.error('Queue for {0:s} was already closed.'.format(self.name)) |
(...skipping 18 matching lines...) Expand all Loading... |
218 logger.warning('Unhandled exception while processing event object.') | 253 logger.warning('Unhandled exception while processing event object.') |
219 logger.exception(exception) | 254 logger.exception(exception) |
220 | 255 |
221 def SignalAbort(self): | 256 def SignalAbort(self): |
222 """Signals the process to abort.""" | 257 """Signals the process to abort.""" |
223 self._abort = True | 258 self._abort = True |
224 if self._foreman_status_wait_event: | 259 if self._foreman_status_wait_event: |
225 self._foreman_status_wait_event.set() | 260 self._foreman_status_wait_event.set() |
226 if self._analysis_mediator: | 261 if self._analysis_mediator: |
227 self._analysis_mediator.SignalAbort() | 262 self._analysis_mediator.SignalAbort() |
LEFT | RIGHT |