Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The psort multi-processing engine.""" | 2 """The psort multi-processing engine.""" |
3 | 3 |
4 import collections | 4 import collections |
5 import heapq | 5 import heapq |
6 import logging | 6 import logging |
7 import os | 7 import os |
8 import time | 8 import time |
9 | 9 |
10 from plaso.engine import plaso_queue | 10 from plaso.engine import plaso_queue |
(...skipping 26 matching lines...) Expand all Loading... | |
37 self._heap = [] | 37 self._heap = [] |
38 | 38 |
39 @property | 39 @property |
40 def number_of_events(self): | 40 def number_of_events(self): |
41 """int: number of events on the heap.""" | 41 """int: number of events on the heap.""" |
42 return len(self._heap) | 42 return len(self._heap) |
43 | 43 |
44 def _GetEventIdentifiers(self, event): | 44 def _GetEventIdentifiers(self, event): |
45 """Retrieves different identifiers of the event. | 45 """Retrieves different identifiers of the event. |
46 | 46 |
47 Every event contains event data, which consists of attributes and values. | |
48 These attributes and values can be represented as a string and used for | |
49 sorting and uniquely identifying events. This function determines multiple | |
50 identifiers: | |
51 * an identifier of the attributes and values without the timestamp | |
52 description (or usage). This is referred to as the MACB group | |
53 identifier. | |
54 * an identifier of the attributes and values including the timestamp | |
55 description (or usage). This is referred to as the event content | |
56 identifier. | |
57 | |
58 The identifier without the timestamp description can be used to group | |
59 events that have the same MACB (modification, access, change, birth) | |
60 timestamps. The PsortEventHeap will store these events individually and | |
61 relies on PsortMultiProcessEngine to do the actual grouping of events. | |
62 | |
47 Args: | 63 Args: |
48 event (EventObject): event. | 64 event (EventObject): event. |
49 | 65 |
50 Returns: | 66 Returns: |
51 tuple: contains: | 67 tuple: contains: |
52 | 68 |
53 int: identifier of the event MACB group or None if the event cannot | 69 str: identifier of the event MACB group or None if the event cannot |
54 be grouped. | 70 be grouped. |
55 int: identifier of the event content. | 71 str: identifier of the event content. |
56 """ | 72 """ |
57 attributes = [] | 73 attributes = [] |
58 | 74 |
59 attribute_string = u'data_type: {0:s}'.format(event.data_type) | 75 attribute_string = u'data_type: {0:s}'.format(event.data_type) |
60 attributes.append(attribute_string) | 76 attributes.append(attribute_string) |
61 | 77 |
62 for attribute_name, attribute_value in sorted(event.GetAttributes()): | 78 for attribute_name, attribute_value in sorted(event.GetAttributes()): |
63 if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES: | 79 if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES: |
64 continue | 80 continue |
65 | 81 |
66 if not attribute_value: | 82 if not attribute_value: |
67 continue | 83 continue |
68 | 84 |
69 if attribute_name == u'pathspec': | 85 if attribute_name == u'pathspec': |
70 attribute_value = attribute_value.comparable | 86 attribute_value = attribute_value.comparable |
71 | 87 |
72 elif isinstance(attribute_value, dict): | 88 elif isinstance(attribute_value, dict): |
73 attribute_value = sorted(attribute_value.items()) | 89 attribute_value = sorted(attribute_value.items()) |
74 | 90 |
75 elif isinstance(attribute_value, set): | 91 elif isinstance(attribute_value, set): |
76 attribute_value = sorted(list(attribute_value)) | 92 attribute_value = sorted(list(attribute_value)) |
77 | 93 |
78 attribute_string = u'{0:s}: {1!s}'.format( | 94 attribute_string = u'{0:s}: {1!s}'.format( |
79 attribute_name, attribute_value) | 95 attribute_name, attribute_value) |
80 attributes.append(attribute_string) | 96 attributes.append(attribute_string) |
81 | 97 |
82 # The u'atime', u'ctime', u'crtime', u'mtime' are included for backwards | 98 # The u'atime', u'ctime', u'crtime', u'mtime' are included for backwards |
83 # compatibility of the filestate parser. | 99 # compatibility with the filestat parser. |
84 if event.timestamp_desc in ( | 100 if event.timestamp_desc in ( |
85 u'atime', u'ctime', u'crtime', u'mtime', | 101 u'atime', u'ctime', u'crtime', u'mtime', |
86 definitions.TIME_DESCRIPTION_LAST_ACCESS, | 102 definitions.TIME_DESCRIPTION_LAST_ACCESS, |
87 definitions.TIME_DESCRIPTION_CHANGE, | 103 definitions.TIME_DESCRIPTION_CHANGE, |
88 definitions.TIME_DESCRIPTION_CREATION, | 104 definitions.TIME_DESCRIPTION_CREATION, |
89 definitions.TIME_DESCRIPTION_MODIFICATION): | 105 definitions.TIME_DESCRIPTION_MODIFICATION): |
90 macb_group_identifier = u', '.join(attributes) | 106 macb_group_identifier = u', '.join(attributes) |
91 else: | 107 else: |
92 macb_group_identifier = None | 108 macb_group_identifier = None |
93 | 109 |
94 attributes.insert(0, event.timestamp_desc) | 110 attributes.insert(0, event.timestamp_desc) |
95 content_identifier = u', '.join(attributes) | 111 content_identifier = u', '.join(attributes) |
96 | 112 |
97 return macb_group_identifier, content_identifier | 113 return macb_group_identifier, content_identifier |
98 | 114 |
99 def PopEvent(self): | 115 def PopEvent(self): |
100 """Pops an event from the heap. | 116 """Pops an event from the heap. |
101 | 117 |
102 Returns: | 118 Returns: |
103 tuple: contains: | 119 tuple: contains: |
104 | 120 |
105 int: identifier of the event MACB group or None if the event cannot | 121 str: identifier of the event MACB group or None if the event cannot |
106 be grouped. | 122 be grouped. |
107 int: identifier of the event content. | 123 str: identifier of the event content. |
108 EventObject: event. | 124 EventObject: event. |
109 """ | 125 """ |
110 try: | 126 try: |
111 return heapq.heappop(self._heap) | 127 return heapq.heappop(self._heap) |
112 | 128 |
113 except IndexError: | 129 except IndexError: |
114 return None | 130 return None |
115 | 131 |
116 def PopEvents(self): | 132 def PopEvents(self): |
117 """Pops events from the heap. | 133 """Pops events from the heap. |
118 | 134 |
119 Yields: | 135 Yields: |
120 EventObject: event. | 136 EventObject: event. |
121 """ | 137 """ |
122 event = self.PopEvent() | 138 event = self.PopEvent() |
123 while event: | 139 while event: |
124 yield event | 140 yield event |
125 event = self.PopEvent() | 141 event = self.PopEvent() |
126 | 142 |
127 def PushEvent(self, event): | 143 def PushEvent(self, event): |
128 """Pushes an event onto the heap. | 144 """Pushes an event onto the heap. |
129 | 145 |
130 Args: | 146 Args: |
131 event (EventObject): event. | 147 event (EventObject): event. |
132 """ | 148 """ |
133 macb_group_identifier, content_identifier = self._GetEventIdentifiers(event) | 149 macb_group_identifier, content_identifier = self._GetEventIdentifiers(event) |
134 | 150 |
135 # We can ignore the timestamp here because we only store events | 151 # We can ignore the timestamp here because the psort engine only stores |
136 # with the same timestamp in the event heap. | 152 # events with the same timestamp in the event heap. |
137 heap_values = (macb_group_identifier, content_identifier, event) | 153 heap_values = (macb_group_identifier, content_identifier, event) |
138 heapq.heappush(self._heap, heap_values) | 154 heapq.heappush(self._heap, heap_values) |
139 | 155 |
140 | 156 |
141 class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): | 157 class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
142 """Psort multi-processing engine.""" | 158 """Psort multi-processing engine.""" |
143 | 159 |
144 _DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 | 160 _DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 |
145 | 161 |
146 _PROCESS_JOIN_TIMEOUT = 5.0 | 162 _PROCESS_JOIN_TIMEOUT = 5.0 |
(...skipping 19 matching lines...) Expand all Loading... | |
166 self._export_event_heap = PsortEventHeap() | 182 self._export_event_heap = PsortEventHeap() |
167 self._export_event_timestamp = 0 | 183 self._export_event_timestamp = 0 |
168 self._knowledge_base = None | 184 self._knowledge_base = None |
169 self._merge_task = None | 185 self._merge_task = None |
170 self._number_of_consumed_errors = 0 | 186 self._number_of_consumed_errors = 0 |
171 self._number_of_consumed_event_tags = 0 | 187 self._number_of_consumed_event_tags = 0 |
172 self._number_of_consumed_events = 0 | 188 self._number_of_consumed_events = 0 |
173 self._number_of_consumed_reports = 0 | 189 self._number_of_consumed_reports = 0 |
174 self._number_of_consumed_sources = 0 | 190 self._number_of_consumed_sources = 0 |
175 self._number_of_duplicate_events = 0 | 191 self._number_of_duplicate_events = 0 |
192 self._number_of_macb_grouped_events = 0 | |
176 self._number_of_produced_errors = 0 | 193 self._number_of_produced_errors = 0 |
177 self._number_of_produced_event_tags = 0 | 194 self._number_of_produced_event_tags = 0 |
178 self._number_of_produced_events = 0 | 195 self._number_of_produced_events = 0 |
179 self._number_of_produced_reports = 0 | 196 self._number_of_produced_reports = 0 |
180 self._number_of_produced_sources = 0 | 197 self._number_of_produced_sources = 0 |
181 self._status = definitions.PROCESSING_STATUS_IDLE | 198 self._status = definitions.PROCESSING_STATUS_IDLE |
182 self._status_update_callback = None | 199 self._status_update_callback = None |
183 self._use_zeromq = use_zeromq | 200 self._use_zeromq = use_zeromq |
184 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT | 201 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT |
185 | 202 |
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
475 | 492 |
476 events_counter = collections.Counter() | 493 events_counter = collections.Counter() |
477 events_counter[u'Events filtered'] = number_of_filtered_events | 494 events_counter[u'Events filtered'] = number_of_filtered_events |
478 events_counter[u'Events from time slice'] = number_of_events_from_time_slice | 495 events_counter[u'Events from time slice'] = number_of_events_from_time_slice |
479 events_counter[u'Events processed'] = self._number_of_consumed_events | 496 events_counter[u'Events processed'] = self._number_of_consumed_events |
480 | 497 |
481 if self._number_of_duplicate_events: | 498 if self._number_of_duplicate_events: |
482 events_counter[u'Duplicate events removed'] = ( | 499 events_counter[u'Duplicate events removed'] = ( |
483 self._number_of_duplicate_events) | 500 self._number_of_duplicate_events) |
484 | 501 |
502 if self._number_of_macb_grouped_events: | |
503 events_counter[u'Events MACB grouped'] = ( | |
504 self._number_of_macb_grouped_events) | |
505 | |
485 if filter_limit: | 506 if filter_limit: |
486 events_counter[u'Limited By'] = filter_limit | 507 events_counter[u'Limited By'] = filter_limit |
487 | 508 |
488 return events_counter | 509 return events_counter |
489 | 510 |
490 def _FlushExportBuffer(self, output_module, deduplicate_events=True): | 511 def _FlushExportBuffer(self, output_module, deduplicate_events=True): |
491 """Flushes buffered events and writes them to the output module. | 512 """Flushes buffered events and writes them to the output module. |
492 | 513 |
493 Args: | 514 Args: |
494 output_module (OutputModule): output module. | 515 output_module (OutputModule): output module. |
495 deduplicate_events (Optional[bool]): True if events should be | 516 deduplicate_events (Optional[bool]): True if events should be |
496 deduplicated. | 517 deduplicated. |
497 """ | 518 """ |
498 last_macb_group_identifier = None | 519 last_macb_group_identifier = None |
499 last_content_identifier = None | 520 last_content_identifier = None |
500 macb_group = [] | 521 macb_group = [] |
501 | 522 |
502 generator = self._export_event_heap.PopEvents() | 523 generator = self._export_event_heap.PopEvents() |
503 | 524 |
504 for macb_group_identifier, content_identifier, event in generator: | 525 for macb_group_identifier, content_identifier, event in generator: |
505 if deduplicate_events and last_content_identifier == content_identifier: | 526 if deduplicate_events and last_content_identifier == content_identifier: |
506 self._number_of_duplicate_events += 1 | 527 self._number_of_duplicate_events += 1 |
507 continue | 528 continue |
508 | 529 |
509 if (macb_group_identifier is not None and | 530 if macb_group_identifier is None: |
onager
2017/07/25 08:22:41
What happens if there's a non-MACB-groupable event
onager
2017/07/25 08:28:19
Also, given how tricky this method has been, pleas
Joachim Metz
2017/07/25 08:41:46
the events are grouped (sorted) by macb group iden
| |
510 last_macb_group_identifier == macb_group_identifier): | 531 if macb_group: |
511 macb_group.append(event) | 532 output_module.WriteEventMACBGroup(macb_group) |
512 | 533 macb_group = [] |
513 elif macb_group: | 534 |
514 output_module.WriteEventMACBGroup(macb_group) | 535 output_module.WriteEvent(event) |
515 macb_group = [] | 536 |
516 | 537 else: |
517 output_module.WriteEvent(event) | 538 if (last_macb_group_identifier == macb_group_identifier or |
539 not macb_group): | |
540 macb_group.append(event) | |
541 | |
542 else: | |
543 output_module.WriteEventMACBGroup(macb_group) | |
544 macb_group = [event] | |
545 | |
546 self._number_of_macb_grouped_events += 1 | |
518 | 547 |
519 last_macb_group_identifier = macb_group_identifier | 548 last_macb_group_identifier = macb_group_identifier |
520 last_content_identifier = content_identifier | 549 last_content_identifier = content_identifier |
550 | |
551 if macb_group: | |
552 output_module.WriteEventMACBGroup(macb_group) | |
521 | 553 |
522 def _StartAnalysisProcesses(self, storage_writer, analysis_plugins): | 554 def _StartAnalysisProcesses(self, storage_writer, analysis_plugins): |
523 """Starts the analysis processes. | 555 """Starts the analysis processes. |
524 | 556 |
525 Args: | 557 Args: |
526 storage_writer (StorageWriter): storage writer. | 558 storage_writer (StorageWriter): storage writer. |
527 analysis_plugins (list[AnalysisPlugin]): analysis plugins that should | 559 analysis_plugins (list[AnalysisPlugin]): analysis plugins that should |
528 be run. | 560 be run. |
529 """ | 561 """ |
530 logging.info(u'Starting analysis plugins.') | 562 logging.info(u'Starting analysis plugins.') |
(...skipping 357 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
888 # so we include the storage sync to disk in the status updates. | 920 # so we include the storage sync to disk in the status updates. |
889 self._StopStatusUpdateThread() | 921 self._StopStatusUpdateThread() |
890 | 922 |
891 output_module.WriteFooter() | 923 output_module.WriteFooter() |
892 output_module.Close() | 924 output_module.Close() |
893 | 925 |
894 # Reset values. | 926 # Reset values. |
895 self._status_update_callback = None | 927 self._status_update_callback = None |
896 | 928 |
897 return events_counter | 929 return events_counter |
LEFT | RIGHT |