Index: plaso/multi_processing/psort.py |
diff --git a/plaso/multi_processing/psort.py b/plaso/multi_processing/psort.py |
index 93ab90709cc4908a2bfb24f16a4769b04d2910d0..77ac66cc1d78ccb209155da367e39125cbb4bb08 100644 |
--- a/plaso/multi_processing/psort.py |
+++ b/plaso/multi_processing/psort.py |
@@ -12,35 +12,119 @@ from plaso.engine import zeromq_queue |
from plaso.containers import tasks |
from plaso.lib import bufferlib |
from plaso.lib import definitions |
-from plaso.lib import py2to3 |
from plaso.multi_processing import analysis_process |
from plaso.multi_processing import engine as multi_process_engine |
from plaso.multi_processing import multi_process_queue |
from plaso.storage import time_range as storage_time_range |
-class _EventsHeap(object): |
- """Class that defines the events heap.""" |
+class PsortEventHeap(object): |
+ """Psort event heap.""" |
+ |
+ _IDENTIFIER_EXCLUDED_ATTRIBUTES = frozenset([ |
+ u'data_type', |
+ u'display_name', |
+ u'filename', |
+ u'inode', |
+ u'parser', |
+ u'tag', |
+ u'timestamp', |
+ u'timestamp_desc']) |
def __init__(self): |
- """Initializes an events heap.""" |
- super(_EventsHeap, self).__init__() |
+ """Initializes a psort events heap.""" |
+ super(PsortEventHeap, self).__init__() |
self._heap = [] |
@property |
def number_of_events(self): |
- """int: number of serialized events on the heap.""" |
+ """int: number of events on the heap.""" |
return len(self._heap) |
+ def _GetEventIdentifiers(self, event): |
+ """Retrieves different identifiers of the event. |
+ |
+ Every event contains event data, which consists of attributes and values. |
+ These attributes and values can be represented as a string and used for |
+ sorting and uniquely identifying events. This function determines multiple |
+ identifiers: |
+ * an identifier of the attributes and values without the timestamp |
+ description (or usage). This is referred to as the MACB group |
+ identifier. |
+ * an identifier of the attributes and values including the timestamp |
+ description (or usage). This is referred to as the event content |
+ identifier. |
+ |
+ The identifier without the timestamp description can be used to group |
+ events that have the same MACB (modification, access, change, birth) |
+ timestamps. The PsortEventHeap will store these events individually and |
+ relies on PsortMultiProcessEngine to do the actual grouping of events. |
+ |
+ Args: |
+ event (EventObject): event. |
+ |
+ Returns: |
+ tuple: contains: |
+ |
+ str: identifier of the event MACB group or None if the event cannot |
+ be grouped. |
+ str: identifier of the event content. |
+ """ |
+ attributes = [] |
+ |
+ attribute_string = u'data_type: {0:s}'.format(event.data_type) |
+ attributes.append(attribute_string) |
+ |
+ for attribute_name, attribute_value in sorted(event.GetAttributes()): |
+ if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES: |
+ continue |
+ |
+ if not attribute_value: |
+ continue |
+ |
+ if attribute_name == u'pathspec': |
+ attribute_value = attribute_value.comparable |
+ |
+ elif isinstance(attribute_value, dict): |
+ attribute_value = sorted(attribute_value.items()) |
+ |
+ elif isinstance(attribute_value, set): |
+ attribute_value = sorted(list(attribute_value)) |
+ |
+ attribute_string = u'{0:s}: {1!s}'.format( |
+ attribute_name, attribute_value) |
+ attributes.append(attribute_string) |
+ |
+ # The u'atime', u'ctime', u'crtime', u'mtime' are included for backwards |
+ # compatibility with the filestat parser. |
+ if event.timestamp_desc in ( |
+ u'atime', u'ctime', u'crtime', u'mtime', |
+ definitions.TIME_DESCRIPTION_LAST_ACCESS, |
+ definitions.TIME_DESCRIPTION_CHANGE, |
+ definitions.TIME_DESCRIPTION_CREATION, |
+ definitions.TIME_DESCRIPTION_MODIFICATION): |
+ macb_group_identifier = u', '.join(attributes) |
+ else: |
+ macb_group_identifier = None |
+ |
+ attributes.insert(0, event.timestamp_desc) |
+ content_identifier = u', '.join(attributes) |
+ |
+ return macb_group_identifier, content_identifier |
+ |
def PopEvent(self): |
"""Pops an event from the heap. |
Returns: |
- EventObject: event. |
+ tuple: contains: |
+ |
+ str: identifier of the event MACB group or None if the event cannot |
+ be grouped. |
+ str: identifier of the event content. |
+ EventObject: event. |
""" |
try: |
- _, _, _, event = heapq.heappop(self._heap) |
- return event |
+ return heapq.heappop(self._heap) |
except IndexError: |
return None |
@@ -62,15 +146,16 @@ class _EventsHeap(object): |
Args: |
event (EventObject): event. |
""" |
- event_identifier = event.GetIdentifier() |
- event_identifier_string = event_identifier.CopyToString() |
- heap_values = ( |
- event.timestamp, event.timestamp_desc, event_identifier_string, event) |
+ macb_group_identifier, content_identifier = self._GetEventIdentifiers(event) |
+ |
+ # We can ignore the timestamp here because the psort engine only stores |
+ # events with the same timestamp in the event heap. |
+ heap_values = (macb_group_identifier, content_identifier, event) |
heapq.heappush(self._heap, heap_values) |
class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
- """Class that defines the psort multi-processing engine.""" |
+ """Psort multi-processing engine.""" |
_DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024 |
@@ -79,20 +164,6 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
_QUEUE_TIMEOUT = 10 * 60 |
- # Event attributes that should not be used in calculating the event export |
- # buffer identifier. |
- _EXCLUDED_EVENT_ATTRIBUTES = frozenset([ |
- u'data_type', |
- u'display_name', |
- u'filename', |
- u'inode', |
- u'parser', |
- u'pathspec', |
- u'tag', |
- u'timestamp']) |
- |
- _JOIN_ATTRIBUTES = frozenset([u'display_name', u'filename', u'inode']) |
- |
def __init__(self, use_zeromq=True): |
"""Initializes an engine object. |
@@ -108,8 +179,7 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
self._event_queues = {} |
# The event heap is used to make sure the events are sorted in |
# a deterministic way. |
- self._export_event_heap = _EventsHeap() |
- self._export_event_lookup_table = {} |
+ self._export_event_heap = PsortEventHeap() |
self._export_event_timestamp = 0 |
self._knowledge_base = None |
self._merge_task = None |
@@ -119,6 +189,7 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
self._number_of_consumed_reports = 0 |
self._number_of_consumed_sources = 0 |
self._number_of_duplicate_events = 0 |
+ self._number_of_macb_grouped_events = 0 |
self._number_of_produced_errors = 0 |
self._number_of_produced_event_tags = 0 |
self._number_of_produced_events = 0 |
@@ -322,20 +393,10 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
deduplicated. |
""" |
if event.timestamp != self._export_event_timestamp: |
- self._FlushExportBuffer(output_module) |
+ self._FlushExportBuffer( |
+ output_module, deduplicate_events=deduplicate_events) |
self._export_event_timestamp = event.timestamp |
- if deduplicate_events: |
- lookup_key = self._GetEventExportBufferIdentifier(event) |
- previous_event = self._export_event_lookup_table.get(lookup_key, None) |
- if previous_event: |
- self._number_of_duplicate_events += 1 |
- |
- self._MergeEvents(previous_event, event) |
- return |
- |
- self._export_event_lookup_table[lookup_key] = event |
- |
self._export_event_heap.PushEvent(event) |
def _ExportEvents( |
@@ -438,143 +499,57 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine): |
events_counter[u'Duplicate events removed'] = ( |
self._number_of_duplicate_events) |
+ if self._number_of_macb_grouped_events: |
+ events_counter[u'Events MACB grouped'] = ( |
+ self._number_of_macb_grouped_events) |
+ |
if filter_limit: |
events_counter[u'Limited By'] = filter_limit |
return events_counter |
- def _FlushExportBuffer(self, output_module): |
+ def _FlushExportBuffer(self, output_module, deduplicate_events=True): |
"""Flushes buffered events and writes them to the output module. |
Args: |
output_module (OutputModule): output module. |
+ deduplicate_events (Optional[bool]): True if events should be |
+ deduplicated. |
""" |
- for event in self._export_event_heap.PopEvents(): |
- output_module.WriteEvent(event) |
+ last_macb_group_identifier = None |
+ last_content_identifier = None |
+ macb_group = [] |
- self._export_event_lookup_table = {} |
- |
- def _GetEventExportBufferIdentifier(self, event): |
- """Retrieves an identifier of the event for the export buffer. |
- |
- The identifier is determined based on the event attributes. |
- |
- Args: |
- event (EventObject): event. |
+ generator = self._export_event_heap.PopEvents() |
- Returns: |
- str: unique identifier representation of the event that can be used for |
- equality comparison. |
- """ |
- attributes = {} |
- for attribute_name, attribute_value in event.GetAttributes(): |
- if attribute_name in self._EXCLUDED_EVENT_ATTRIBUTES: |
+ for macb_group_identifier, content_identifier, event in generator: |
+ if deduplicate_events and last_content_identifier == content_identifier: |
+ self._number_of_duplicate_events += 1 |
continue |
- if isinstance(attribute_value, dict): |
- attribute_value = sorted(attribute_value.items()) |
- |
- elif isinstance(attribute_value, set): |
- attribute_value = sorted(list(attribute_value)) |
- |
- attributes[attribute_name] = attribute_value |
- |
- if event.pathspec: |
- attributes[u'pathspec'] = event.pathspec.comparable |
- |
- try: |
- event_identifier_string = u'|'.join([ |
- u'{0:s}={1!s}'.format(attribute_name, attribute_value) |
- for attribute_name, attribute_value in sorted(attributes.items())]) |
- |
- except UnicodeDecodeError: |
- event_identifier = event.GetIdentifier() |
- event_identifier_string = u'identifier={0:s}'.format( |
- event_identifier.CopyToString()) |
- |
- event_identifier_string = u'{0:d}|{1:s}|{2:s}'.format( |
- event.timestamp, event.data_type, event_identifier_string) |
- return event_identifier_string |
+ if macb_group_identifier is None: |
onager
2017/07/25 08:22:41
What happens if there's a non-MACB-groupable event
onager
2017/07/25 08:28:19
Also, given how tricky this method has been, pleas
Joachim Metz
2017/07/25 08:41:46
the events are grouped (sorted) by macb group iden
|
+ if macb_group: |
+ output_module.WriteEventMACBGroup(macb_group) |
+ macb_group = [] |
- def _MergeEvents(self, first_event, second_event): |
- """Merges the attributes of the second event into the first. |
- |
- Args: |
- first_event (EventObject): first event. |
- second_event (EventObject): second event. |
- """ |
- # TODO: Currently we are using the first event pathspec, perhaps that |
- # is not the best approach. There is no need to have all the pathspecs |
- # inside the combined event, however which one should be chosen is |
- # perhaps something that can be evaluated here (regular TSK in favor of |
- # an event stored deep inside a VSS for instance). |
- |
- for attribute_name in self._JOIN_ATTRIBUTES: |
- first_value = getattr(first_event, attribute_name, None) |
- if first_value is None: |
- first_value_set = set() |
+ output_module.WriteEvent(event) |
else: |
- if isinstance(first_value, py2to3.STRING_TYPES): |
- first_value = first_value.split(u';') |
- else: |
- first_value = [first_value] |
- |
- first_value_set = set(first_value) |
+ if (last_macb_group_identifier == macb_group_identifier or |
+ not macb_group): |
+ macb_group.append(event) |
- second_value = getattr(second_event, attribute_name, None) |
- if second_value is None: |
- second_value_set = set() |
- |
- else: |
- if isinstance(second_value, py2to3.STRING_TYPES): |
- second_value = second_value.split(u';') |
else: |
- second_value = [second_value] |
- |
- second_value_set = set(second_value) |
- |
- values_list = list(first_value_set.union(second_value_set)) |
- values_list.sort() |
- |
- if not values_list: |
- join_value = None |
- elif len(values_list) == 1: |
- join_value = values_list[0] |
- else: |
- join_value = u';'.join(values_list) |
- |
- setattr(first_event, attribute_name, join_value) |
- |
- # If two events are merged then we'll just pick the first inode value. |
- inode = first_event.inode |
- if isinstance(inode, py2to3.STRING_TYPES): |
- inode_list = inode.split(u';') |
- try: |
- new_inode = int(inode_list[0], 10) |
- except (IndexError, ValueError): |
- new_inode = 0 |
- |
- first_event.inode = new_inode |
- |
- # Special instance if this is a filestat entry we need to combine the |
- # timestamp description field. |
- parser_name = getattr(first_event, u'parser', None) |
- |
- if parser_name == u'filestat': |
- first_description = getattr(first_event, u'timestamp_desc', u'') |
- first_description_set = set(first_description.split(u';')) |
+ output_module.WriteEventMACBGroup(macb_group) |
+ macb_group = [event] |
- second_description = getattr(second_event, u'timestamp_desc', u'') |
- second_description_set = set(second_description.split(u';')) |
+ self._number_of_macb_grouped_events += 1 |
- if first_description_set.difference(second_description_set): |
- descriptions_list = list(first_description_set.union( |
- second_description_set)) |
- descriptions_list.sort() |
- description_value = u';'.join(descriptions_list) |
+ last_macb_group_identifier = macb_group_identifier |
+ last_content_identifier = content_identifier |
- setattr(first_event, u'timestamp_desc', description_value) |
+ if macb_group: |
+ output_module.WriteEventMACBGroup(macb_group) |
def _StartAnalysisProcesses(self, storage_writer, analysis_plugins): |
"""Starts the analysis processes. |