Left: | ||
Right: |
OLD | NEW |
---|---|
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The psort multi-processing engine.""" | 2 """The psort multi-processing engine.""" |
3 | 3 |
4 import collections | 4 import collections |
5 import heapq | 5 import heapq |
6 import logging | 6 import logging |
7 import os | 7 import os |
8 import time | 8 import time |
9 | 9 |
10 from plaso.engine import plaso_queue | 10 from plaso.engine import plaso_queue |
11 from plaso.engine import zeromq_queue | 11 from plaso.engine import zeromq_queue |
12 from plaso.containers import tasks | 12 from plaso.containers import tasks |
13 from plaso.lib import bufferlib | 13 from plaso.lib import bufferlib |
14 from plaso.lib import definitions | 14 from plaso.lib import definitions |
15 from plaso.lib import py2to3 | |
16 from plaso.multi_processing import analysis_process | 15 from plaso.multi_processing import analysis_process |
17 from plaso.multi_processing import engine as multi_process_engine | 16 from plaso.multi_processing import engine as multi_process_engine |
18 from plaso.multi_processing import multi_process_queue | 17 from plaso.multi_processing import multi_process_queue |
19 from plaso.storage import time_range as storage_time_range | 18 from plaso.storage import time_range as storage_time_range |
20 | 19 |
21 | 20 |
22 class _EventsHeap(object): | 21 class PsortEventHeap(object): |
23 """Class that defines the events heap.""" | 22 """Psort event heap.""" |
23 | |
24 _IDENTIFIER_EXCLUDED_ATTRIBUTES = frozenset([ | |
25 u'data_type', | |
26 u'display_name', | |
27 u'filename', | |
28 u'inode', | |
29 u'parser', | |
30 u'tag', | |
31 u'timestamp', | |
32 u'timestamp_desc']) | |
24 | 33 |
25 def __init__(self): | 34 def __init__(self): |
26 """Initializes an events heap.""" | 35 """Initializes a psort events heap.""" |
27 super(_EventsHeap, self).__init__() | 36 super(PsortEventHeap, self).__init__() |
28 self._heap = [] | 37 self._heap = [] |
29 | 38 |
30 @property | 39 @property |
31 def number_of_events(self): | 40 def number_of_events(self): |
32 """int: number of serialized events on the heap.""" | 41 """int: number of events on the heap.""" |
33 return len(self._heap) | 42 return len(self._heap) |
34 | 43 |
44 def _GetEventIdentifiers(self, event): | |
45 """Retrieves different identifiers of the event. | |
46 | |
47 Every event contains event data, which consists of attributes and values. | |
48 These attributes and values can be represented as a string and used for | |
49 sorting and uniquely identifying events. This function determines multiple | |
50 identifiers: | |
51 * an identifier of the attributes and values without the timestamp | |
52 description (or usage). This is referred to as the MACB group | |
53 identifier. | |
54 * an identifier of the attributes and values including the timestamp | |
55 description (or usage). This is referred to as the event content | |
56 identifier. | |
57 | |
58 The identifier without the timestamp description can be used to group | |
59 events that have the same MACB (modification, access, change, birth) | |
60 timestamps. The PsortEventHeap will store these events individually and | |
61 relies on PsortMultiProcessEngine to do the actual grouping of events. | |
62 | |
63 Args: | |
64 event (EventObject): event. | |
65 | |
66 Returns: | |
67 tuple: contains: | |
68 | |
69 str: identifier of the event MACB group or None if the event cannot | |
70 be grouped. | |
71 str: identifier of the event content. | |
72 """ | |
73 attributes = [] | |
74 | |
75 attribute_string = u'data_type: {0:s}'.format(event.data_type) | |
76 attributes.append(attribute_string) | |
77 | |
78 for attribute_name, attribute_value in sorted(event.GetAttributes()): | |
79 if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES: | |
80 continue | |
81 | |
82 if not attribute_value: | |
83 continue | |
84 | |
85 if attribute_name == u'pathspec': | |
86 attribute_value = attribute_value.comparable | |
87 | |
88 elif isinstance(attribute_value, dict): | |
89 attribute_value = sorted(attribute_value.items()) | |
90 | |
91 elif isinstance(attribute_value, set): | |
92 attribute_value = sorted(list(attribute_value)) | |
93 | |
94 attribute_string = u'{0:s}: {1!s}'.format( | |
95 attribute_name, attribute_value) | |
96 attributes.append(attribute_string) | |
97 | |
98 # The u'atime', u'ctime', u'crtime', u'mtime' are included for backwards | |
99 # compatibility with the filestat parser. | |
100 if event.timestamp_desc in ( | |
101 u'atime', u'ctime', u'crtime', u'mtime', | |
102 definitions.TIME_DESCRIPTION_LAST_ACCESS, | |
103 definitions.TIME_DESCRIPTION_CHANGE, | |
104 definitions.TIME_DESCRIPTION_CREATION, | |
105 definitions.TIME_DESCRIPTION_MODIFICATION): | |
106 macb_group_identifier = u', '.join(attributes) | |
107 else: | |
108 macb_group_identifier = None | |
109 | |
110 attributes.insert(0, event.timestamp_desc) | |
111 content_identifier = u', '.join(attributes) | |
112 | |
113 return macb_group_identifier, content_identifier | |
114 | |
35 def PopEvent(self): | 115 def PopEvent(self): |
36 """Pops an event from the heap. | 116 """Pops an event from the heap. |
37 | 117 |
38 Returns: | 118 Returns: |
39 EventObject: event. | 119 tuple: contains: |
120 | |
121 str: identifier of the event MACB group or None if the event cannot | |
122 be grouped. | |
123 str: identifier of the event content. | |
124 EventObject: event. | |
40 """ | 125 """ |
41 try: | 126 try: |
42 _, _, _, event = heapq.heappop(self._heap) | 127 return heapq.heappop(self._heap) |
43 return event | |
44 | 128 |
45 except IndexError: | 129 except IndexError: |
46 return None | 130 return None |
47 | 131 |
48 def PopEvents(self): | 132 def PopEvents(self): |
49 """Pops events from the heap. | 133 """Pops events from the heap. |
50 | 134 |
51 Yields: | 135 Yields: |
52 EventObject: event. | 136 EventObject: event. |
53 """ | 137 """ |
54 event = self.PopEvent() | 138 event = self.PopEvent() |
55 while event: | 139 while event: |
56 yield event | 140 yield event |
57 event = self.PopEvent() | 141 event = self.PopEvent() |
58 | 142 |
59 def PushEvent(self, event): | 143 def PushEvent(self, event): |
60 """Pushes an event onto the heap. | 144 """Pushes an event onto the heap. |
61 | 145 |
62 Args: | 146 Args: |
63 event (EventObject): event. | 147 event (EventObject): event. |
64 """ | 148 """ |
65 event_identifier = event.GetIdentifier() | 149 macb_group_identifier, content_identifier = self._GetEventIdentifiers(event) |
66 event_identifier_string = event_identifier.CopyToString() | 150 |
67 heap_values = ( | 151 # We can ignore the timestamp here because the psort engine only stores |
68 event.timestamp, event.timestamp_desc, event_identifier_string, event) | 152 # events with the same timestamp in the event heap. |
153 heap_values = (macb_group_identifier, content_identifier, event) | |
69 heapq.heappush(self._heap, heap_values) | 154 heapq.heappush(self._heap, heap_values) |
70 | 155 |
71 | 156 |
72 class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): | 157 class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
73 """Class that defines the psort multi-processing engine.""" | 158 """Psort multi-processing engine.""" |
74 | 159 |
75 _DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 | 160 _DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 |
76 | 161 |
77 _PROCESS_JOIN_TIMEOUT = 5.0 | 162 _PROCESS_JOIN_TIMEOUT = 5.0 |
78 _PROCESS_WORKER_TIMEOUT = 15.0 * 60.0 | 163 _PROCESS_WORKER_TIMEOUT = 15.0 * 60.0 |
79 | 164 |
80 _QUEUE_TIMEOUT = 10 * 60 | 165 _QUEUE_TIMEOUT = 10 * 60 |
81 | 166 |
82 # Event attributes that should not be used in calculating the event export | |
83 # buffer identifier. | |
84 _EXCLUDED_EVENT_ATTRIBUTES = frozenset([ | |
85 u'data_type', | |
86 u'display_name', | |
87 u'filename', | |
88 u'inode', | |
89 u'parser', | |
90 u'pathspec', | |
91 u'tag', | |
92 u'timestamp']) | |
93 | |
94 _JOIN_ATTRIBUTES = frozenset([u'display_name', u'filename', u'inode']) | |
95 | |
96 def __init__(self, use_zeromq=True): | 167 def __init__(self, use_zeromq=True): |
97 """Initializes an engine object. | 168 """Initializes an engine object. |
98 | 169 |
99 Args: | 170 Args: |
100 use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing | 171 use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing |
101 instead of Python's multiprocessing queue. | 172 instead of Python's multiprocessing queue. |
102 """ | 173 """ |
103 super(PsortMultiProcessEngine, self).__init__() | 174 super(PsortMultiProcessEngine, self).__init__() |
104 self._analysis_plugins = {} | 175 self._analysis_plugins = {} |
105 self._completed_analysis_processes = set() | 176 self._completed_analysis_processes = set() |
106 self._data_location = None | 177 self._data_location = None |
107 self._event_filter_expression = None | 178 self._event_filter_expression = None |
108 self._event_queues = {} | 179 self._event_queues = {} |
109 # The event heap is used to make sure the events are sorted in | 180 # The event heap is used to make sure the events are sorted in |
110 # a deterministic way. | 181 # a deterministic way. |
111 self._export_event_heap = _EventsHeap() | 182 self._export_event_heap = PsortEventHeap() |
112 self._export_event_lookup_table = {} | |
113 self._export_event_timestamp = 0 | 183 self._export_event_timestamp = 0 |
114 self._knowledge_base = None | 184 self._knowledge_base = None |
115 self._merge_task = None | 185 self._merge_task = None |
116 self._number_of_consumed_errors = 0 | 186 self._number_of_consumed_errors = 0 |
117 self._number_of_consumed_event_tags = 0 | 187 self._number_of_consumed_event_tags = 0 |
118 self._number_of_consumed_events = 0 | 188 self._number_of_consumed_events = 0 |
119 self._number_of_consumed_reports = 0 | 189 self._number_of_consumed_reports = 0 |
120 self._number_of_consumed_sources = 0 | 190 self._number_of_consumed_sources = 0 |
121 self._number_of_duplicate_events = 0 | 191 self._number_of_duplicate_events = 0 |
192 self._number_of_macb_grouped_events = 0 | |
122 self._number_of_produced_errors = 0 | 193 self._number_of_produced_errors = 0 |
123 self._number_of_produced_event_tags = 0 | 194 self._number_of_produced_event_tags = 0 |
124 self._number_of_produced_events = 0 | 195 self._number_of_produced_events = 0 |
125 self._number_of_produced_reports = 0 | 196 self._number_of_produced_reports = 0 |
126 self._number_of_produced_sources = 0 | 197 self._number_of_produced_sources = 0 |
127 self._status = definitions.PROCESSING_STATUS_IDLE | 198 self._status = definitions.PROCESSING_STATUS_IDLE |
128 self._status_update_callback = None | 199 self._status_update_callback = None |
129 self._use_zeromq = use_zeromq | 200 self._use_zeromq = use_zeromq |
130 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT | 201 self._worker_memory_limit = self._DEFAULT_WORKER_MEMORY_LIMIT |
131 | 202 |
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
315 def _ExportEvent(self, output_module, event, deduplicate_events=True): | 386 def _ExportEvent(self, output_module, event, deduplicate_events=True): |
316 """Exports an event using an output module. | 387 """Exports an event using an output module. |
317 | 388 |
318 Args: | 389 Args: |
319 output_module (OutputModule): output module. | 390 output_module (OutputModule): output module. |
320 event (EventObject): event. | 391 event (EventObject): event. |
321 deduplicate_events (Optional[bool]): True if events should be | 392 deduplicate_events (Optional[bool]): True if events should be |
322 deduplicated. | 393 deduplicated. |
323 """ | 394 """ |
324 if event.timestamp != self._export_event_timestamp: | 395 if event.timestamp != self._export_event_timestamp: |
325 self._FlushExportBuffer(output_module) | 396 self._FlushExportBuffer( |
397 output_module, deduplicate_events=deduplicate_events) | |
326 self._export_event_timestamp = event.timestamp | 398 self._export_event_timestamp = event.timestamp |
327 | 399 |
328 if deduplicate_events: | |
329 lookup_key = self._GetEventExportBufferIdentifier(event) | |
330 previous_event = self._export_event_lookup_table.get(lookup_key, None) | |
331 if previous_event: | |
332 self._number_of_duplicate_events += 1 | |
333 | |
334 self._MergeEvents(previous_event, event) | |
335 return | |
336 | |
337 self._export_event_lookup_table[lookup_key] = event | |
338 | |
339 self._export_event_heap.PushEvent(event) | 400 self._export_event_heap.PushEvent(event) |
340 | 401 |
341 def _ExportEvents( | 402 def _ExportEvents( |
342 self, storage_reader, output_module, deduplicate_events=True, | 403 self, storage_reader, output_module, deduplicate_events=True, |
343 event_filter=None, time_slice=None, use_time_slicer=False): | 404 event_filter=None, time_slice=None, use_time_slicer=False): |
344 """Exports events using an output module. | 405 """Exports events using an output module. |
345 | 406 |
346 Args: | 407 Args: |
347 storage_reader (StorageReader): storage reader. | 408 storage_reader (StorageReader): storage reader. |
348 output_module (OutputModule): output module. | 409 output_module (OutputModule): output module. |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
431 | 492 |
432 events_counter = collections.Counter() | 493 events_counter = collections.Counter() |
433 events_counter[u'Events filtered'] = number_of_filtered_events | 494 events_counter[u'Events filtered'] = number_of_filtered_events |
434 events_counter[u'Events from time slice'] = number_of_events_from_time_slice | 495 events_counter[u'Events from time slice'] = number_of_events_from_time_slice |
435 events_counter[u'Events processed'] = self._number_of_consumed_events | 496 events_counter[u'Events processed'] = self._number_of_consumed_events |
436 | 497 |
437 if self._number_of_duplicate_events: | 498 if self._number_of_duplicate_events: |
438 events_counter[u'Duplicate events removed'] = ( | 499 events_counter[u'Duplicate events removed'] = ( |
439 self._number_of_duplicate_events) | 500 self._number_of_duplicate_events) |
440 | 501 |
502 if self._number_of_macb_grouped_events: | |
503 events_counter[u'Events MACB grouped'] = ( | |
504 self._number_of_macb_grouped_events) | |
505 | |
441 if filter_limit: | 506 if filter_limit: |
442 events_counter[u'Limited By'] = filter_limit | 507 events_counter[u'Limited By'] = filter_limit |
443 | 508 |
444 return events_counter | 509 return events_counter |
445 | 510 |
446 def _FlushExportBuffer(self, output_module): | 511 def _FlushExportBuffer(self, output_module, deduplicate_events=True): |
447 """Flushes buffered events and writes them to the output module. | 512 """Flushes buffered events and writes them to the output module. |
448 | 513 |
449 Args: | 514 Args: |
450 output_module (OutputModule): output module. | 515 output_module (OutputModule): output module. |
516 deduplicate_events (Optional[bool]): True if events should be | |
517 deduplicated. | |
451 """ | 518 """ |
452 for event in self._export_event_heap.PopEvents(): | 519 last_macb_group_identifier = None |
453 output_module.WriteEvent(event) | 520 last_content_identifier = None |
521 macb_group = [] | |
454 | 522 |
455 self._export_event_lookup_table = {} | 523 generator = self._export_event_heap.PopEvents() |
456 | 524 |
457 def _GetEventExportBufferIdentifier(self, event): | 525 for macb_group_identifier, content_identifier, event in generator: |
458 """Retrieves an identifier of the event for the export buffer. | 526 if deduplicate_events and last_content_identifier == content_identifier: |
459 | 527 self._number_of_duplicate_events += 1 |
460 The identifier is determined based on the event attributes. | |
461 | |
462 Args: | |
463 event (EventObject): event. | |
464 | |
465 Returns: | |
466 str: unique identifier representation of the event that can be used for | |
467 equality comparison. | |
468 """ | |
469 attributes = {} | |
470 for attribute_name, attribute_value in event.GetAttributes(): | |
471 if attribute_name in self._EXCLUDED_EVENT_ATTRIBUTES: | |
472 continue | 528 continue |
473 | 529 |
474 if isinstance(attribute_value, dict): | 530 if macb_group_identifier is None: |
onager
2017/07/25 08:22:41
What happens if there's a non-MACB-groupable event
onager
2017/07/25 08:28:19
Also, given how tricky this method has been, pleas
Joachim Metz
2017/07/25 08:41:46
the events are grouped (sorted) by macb group iden
| |
475 attribute_value = sorted(attribute_value.items()) | 531 if macb_group: |
532 output_module.WriteEventMACBGroup(macb_group) | |
533 macb_group = [] | |
476 | 534 |
477 elif isinstance(attribute_value, set): | 535 output_module.WriteEvent(event) |
478 attribute_value = sorted(list(attribute_value)) | |
479 | |
480 attributes[attribute_name] = attribute_value | |
481 | |
482 if event.pathspec: | |
483 attributes[u'pathspec'] = event.pathspec.comparable | |
484 | |
485 try: | |
486 event_identifier_string = u'|'.join([ | |
487 u'{0:s}={1!s}'.format(attribute_name, attribute_value) | |
488 for attribute_name, attribute_value in sorted(attributes.items())]) | |
489 | |
490 except UnicodeDecodeError: | |
491 event_identifier = event.GetIdentifier() | |
492 event_identifier_string = u'identifier={0:s}'.format( | |
493 event_identifier.CopyToString()) | |
494 | |
495 event_identifier_string = u'{0:d}|{1:s}|{2:s}'.format( | |
496 event.timestamp, event.data_type, event_identifier_string) | |
497 return event_identifier_string | |
498 | |
499 def _MergeEvents(self, first_event, second_event): | |
500 """Merges the attributes of the second event into the first. | |
501 | |
502 Args: | |
503 first_event (EventObject): first event. | |
504 second_event (EventObject): second event. | |
505 """ | |
506 # TODO: Currently we are using the first event pathspec, perhaps that | |
507 # is not the best approach. There is no need to have all the pathspecs | |
508 # inside the combined event, however which one should be chosen is | |
509 # perhaps something that can be evaluated here (regular TSK in favor of | |
510 # an event stored deep inside a VSS for instance). | |
511 | |
512 for attribute_name in self._JOIN_ATTRIBUTES: | |
513 first_value = getattr(first_event, attribute_name, None) | |
514 if first_value is None: | |
515 first_value_set = set() | |
516 | 536 |
517 else: | 537 else: |
518 if isinstance(first_value, py2to3.STRING_TYPES): | 538 if (last_macb_group_identifier == macb_group_identifier or |
519 first_value = first_value.split(u';') | 539 not macb_group): |
540 macb_group.append(event) | |
541 | |
520 else: | 542 else: |
521 first_value = [first_value] | 543 output_module.WriteEventMACBGroup(macb_group) |
544 macb_group = [event] | |
522 | 545 |
523 first_value_set = set(first_value) | 546 self._number_of_macb_grouped_events += 1 |
524 | 547 |
525 second_value = getattr(second_event, attribute_name, None) | 548 last_macb_group_identifier = macb_group_identifier |
526 if second_value is None: | 549 last_content_identifier = content_identifier |
527 second_value_set = set() | |
528 | 550 |
529 else: | 551 if macb_group: |
530 if isinstance(second_value, py2to3.STRING_TYPES): | 552 output_module.WriteEventMACBGroup(macb_group) |
531 second_value = second_value.split(u';') | |
532 else: | |
533 second_value = [second_value] | |
534 | |
535 second_value_set = set(second_value) | |
536 | |
537 values_list = list(first_value_set.union(second_value_set)) | |
538 values_list.sort() | |
539 | |
540 if not values_list: | |
541 join_value = None | |
542 elif len(values_list) == 1: | |
543 join_value = values_list[0] | |
544 else: | |
545 join_value = u';'.join(values_list) | |
546 | |
547 setattr(first_event, attribute_name, join_value) | |
548 | |
549 # If two events are merged then we'll just pick the first inode value. | |
550 inode = first_event.inode | |
551 if isinstance(inode, py2to3.STRING_TYPES): | |
552 inode_list = inode.split(u';') | |
553 try: | |
554 new_inode = int(inode_list[0], 10) | |
555 except (IndexError, ValueError): | |
556 new_inode = 0 | |
557 | |
558 first_event.inode = new_inode | |
559 | |
560 # Special instance if this is a filestat entry we need to combine the | |
561 # timestamp description field. | |
562 parser_name = getattr(first_event, u'parser', None) | |
563 | |
564 if parser_name == u'filestat': | |
565 first_description = getattr(first_event, u'timestamp_desc', u'') | |
566 first_description_set = set(first_description.split(u';')) | |
567 | |
568 second_description = getattr(second_event, u'timestamp_desc', u'') | |
569 second_description_set = set(second_description.split(u';')) | |
570 | |
571 if first_description_set.difference(second_description_set): | |
572 descriptions_list = list(first_description_set.union( | |
573 second_description_set)) | |
574 descriptions_list.sort() | |
575 description_value = u';'.join(descriptions_list) | |
576 | |
577 setattr(first_event, u'timestamp_desc', description_value) | |
578 | 553 |
579 def _StartAnalysisProcesses(self, storage_writer, analysis_plugins): | 554 def _StartAnalysisProcesses(self, storage_writer, analysis_plugins): |
580 """Starts the analysis processes. | 555 """Starts the analysis processes. |
581 | 556 |
582 Args: | 557 Args: |
583 storage_writer (StorageWriter): storage writer. | 558 storage_writer (StorageWriter): storage writer. |
584 analysis_plugins (list[AnalysisPlugin]): analysis plugins that should | 559 analysis_plugins (list[AnalysisPlugin]): analysis plugins that should |
585 be run. | 560 be run. |
586 """ | 561 """ |
587 logging.info(u'Starting analysis plugins.') | 562 logging.info(u'Starting analysis plugins.') |
(...skipping 357 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
945 # so we include the storage sync to disk in the status updates. | 920 # so we include the storage sync to disk in the status updates. |
946 self._StopStatusUpdateThread() | 921 self._StopStatusUpdateThread() |
947 | 922 |
948 output_module.WriteFooter() | 923 output_module.WriteFooter() |
949 output_module.Close() | 924 output_module.Close() |
950 | 925 |
951 # Reset values. | 926 # Reset values. |
952 self._status_update_callback = None | 927 self._status_update_callback = None |
953 | 928 |
954 return events_counter | 929 return events_counter |
OLD | NEW |