Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(239)

Unified Diff: plaso/multi_processing/psort.py

Issue 328040043: [plaso] Changed l2t_csv output module to join MACB timestamps #306 (Closed)
Patch Set: Changes after review Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: plaso/multi_processing/psort.py
diff --git a/plaso/multi_processing/psort.py b/plaso/multi_processing/psort.py
index 93ab90709cc4908a2bfb24f16a4769b04d2910d0..77ac66cc1d78ccb209155da367e39125cbb4bb08 100644
--- a/plaso/multi_processing/psort.py
+++ b/plaso/multi_processing/psort.py
@@ -12,35 +12,119 @@ from plaso.engine import zeromq_queue
from plaso.containers import tasks
from plaso.lib import bufferlib
from plaso.lib import definitions
-from plaso.lib import py2to3
from plaso.multi_processing import analysis_process
from plaso.multi_processing import engine as multi_process_engine
from plaso.multi_processing import multi_process_queue
from plaso.storage import time_range as storage_time_range
-class _EventsHeap(object):
- """Class that defines the events heap."""
+class PsortEventHeap(object):
+ """Psort event heap."""
+
+ _IDENTIFIER_EXCLUDED_ATTRIBUTES = frozenset([
+ u'data_type',
+ u'display_name',
+ u'filename',
+ u'inode',
+ u'parser',
+ u'tag',
+ u'timestamp',
+ u'timestamp_desc'])
def __init__(self):
- """Initializes an events heap."""
- super(_EventsHeap, self).__init__()
+ """Initializes a psort events heap."""
+ super(PsortEventHeap, self).__init__()
self._heap = []
@property
def number_of_events(self):
- """int: number of serialized events on the heap."""
+ """int: number of events on the heap."""
return len(self._heap)
+ def _GetEventIdentifiers(self, event):
+ """Retrieves different identifiers of the event.
+
+ Every event contains event data, which consists of attributes and values.
+ These attributes and values can be represented as a string and used for
+ sorting and uniquely identifying events. This function determines multiple
+ identifiers:
+ * an identifier of the attributes and values without the timestamp
+ description (or usage). This is referred to as the MACB group
+ identifier.
+ * an identifier of the attributes and values including the timestamp
+ description (or usage). This is referred to as the event content
+ identifier.
+
+ The identifier without the timestamp description can be used to group
+ events that have the same MACB (modification, access, change, birth)
+ timestamps. The PsortEventHeap will store these events individually and
+ relies on PsortMultiProcessEngine to do the actual grouping of events.
+
+ Args:
+ event (EventObject): event.
+
+ Returns:
+ tuple: contains:
+
+ str: identifier of the event MACB group or None if the event cannot
+ be grouped.
+ str: identifier of the event content.
+ """
+ attributes = []
+
+ attribute_string = u'data_type: {0:s}'.format(event.data_type)
+ attributes.append(attribute_string)
+
+ for attribute_name, attribute_value in sorted(event.GetAttributes()):
+ if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES:
+ continue
+
+ if not attribute_value:
+ continue
+
+ if attribute_name == u'pathspec':
+ attribute_value = attribute_value.comparable
+
+ elif isinstance(attribute_value, dict):
+ attribute_value = sorted(attribute_value.items())
+
+ elif isinstance(attribute_value, set):
+ attribute_value = sorted(list(attribute_value))
+
+ attribute_string = u'{0:s}: {1!s}'.format(
+ attribute_name, attribute_value)
+ attributes.append(attribute_string)
+
+ # The u'atime', u'ctime', u'crtime', u'mtime' are included for backwards
+ # compatibility with the filestat parser.
+ if event.timestamp_desc in (
+ u'atime', u'ctime', u'crtime', u'mtime',
+ definitions.TIME_DESCRIPTION_LAST_ACCESS,
+ definitions.TIME_DESCRIPTION_CHANGE,
+ definitions.TIME_DESCRIPTION_CREATION,
+ definitions.TIME_DESCRIPTION_MODIFICATION):
+ macb_group_identifier = u', '.join(attributes)
+ else:
+ macb_group_identifier = None
+
+ attributes.insert(0, event.timestamp_desc)
+ content_identifier = u', '.join(attributes)
+
+ return macb_group_identifier, content_identifier
+
def PopEvent(self):
"""Pops an event from the heap.
Returns:
- EventObject: event.
+ tuple: contains:
+
+ str: identifier of the event MACB group or None if the event cannot
+ be grouped.
+ str: identifier of the event content.
+ EventObject: event.
"""
try:
- _, _, _, event = heapq.heappop(self._heap)
- return event
+ return heapq.heappop(self._heap)
except IndexError:
return None
@@ -62,15 +146,16 @@ class _EventsHeap(object):
Args:
event (EventObject): event.
"""
- event_identifier = event.GetIdentifier()
- event_identifier_string = event_identifier.CopyToString()
- heap_values = (
- event.timestamp, event.timestamp_desc, event_identifier_string, event)
+ macb_group_identifier, content_identifier = self._GetEventIdentifiers(event)
+
+ # We can ignore the timestamp here because the psort engine only stores
+ # events with the same timestamp in the event heap.
+ heap_values = (macb_group_identifier, content_identifier, event)
heapq.heappush(self._heap, heap_values)
class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
- """Class that defines the psort multi-processing engine."""
+ """Psort multi-processing engine."""
_DEFAULT_WORKER_MEMORY_LIMIT = 2048 * 1024 * 1024
@@ -79,20 +164,6 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
_QUEUE_TIMEOUT = 10 * 60
- # Event attributes that should not be used in calculating the event export
- # buffer identifier.
- _EXCLUDED_EVENT_ATTRIBUTES = frozenset([
- u'data_type',
- u'display_name',
- u'filename',
- u'inode',
- u'parser',
- u'pathspec',
- u'tag',
- u'timestamp'])
-
- _JOIN_ATTRIBUTES = frozenset([u'display_name', u'filename', u'inode'])
-
def __init__(self, use_zeromq=True):
"""Initializes an engine object.
@@ -108,8 +179,7 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
self._event_queues = {}
# The event heap is used to make sure the events are sorted in
# a deterministic way.
- self._export_event_heap = _EventsHeap()
- self._export_event_lookup_table = {}
+ self._export_event_heap = PsortEventHeap()
self._export_event_timestamp = 0
self._knowledge_base = None
self._merge_task = None
@@ -119,6 +189,7 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
self._number_of_consumed_reports = 0
self._number_of_consumed_sources = 0
self._number_of_duplicate_events = 0
+ self._number_of_macb_grouped_events = 0
self._number_of_produced_errors = 0
self._number_of_produced_event_tags = 0
self._number_of_produced_events = 0
@@ -322,20 +393,10 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
deduplicated.
"""
if event.timestamp != self._export_event_timestamp:
- self._FlushExportBuffer(output_module)
+ self._FlushExportBuffer(
+ output_module, deduplicate_events=deduplicate_events)
self._export_event_timestamp = event.timestamp
- if deduplicate_events:
- lookup_key = self._GetEventExportBufferIdentifier(event)
- previous_event = self._export_event_lookup_table.get(lookup_key, None)
- if previous_event:
- self._number_of_duplicate_events += 1
-
- self._MergeEvents(previous_event, event)
- return
-
- self._export_event_lookup_table[lookup_key] = event
-
self._export_event_heap.PushEvent(event)
def _ExportEvents(
@@ -438,143 +499,57 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
events_counter[u'Duplicate events removed'] = (
self._number_of_duplicate_events)
+ if self._number_of_macb_grouped_events:
+ events_counter[u'Events MACB grouped'] = (
+ self._number_of_macb_grouped_events)
+
if filter_limit:
events_counter[u'Limited By'] = filter_limit
return events_counter
- def _FlushExportBuffer(self, output_module):
+ def _FlushExportBuffer(self, output_module, deduplicate_events=True):
"""Flushes buffered events and writes them to the output module.
Args:
output_module (OutputModule): output module.
+ deduplicate_events (Optional[bool]): True if events should be
+ deduplicated.
"""
- for event in self._export_event_heap.PopEvents():
- output_module.WriteEvent(event)
+ last_macb_group_identifier = None
+ last_content_identifier = None
+ macb_group = []
- self._export_event_lookup_table = {}
-
- def _GetEventExportBufferIdentifier(self, event):
- """Retrieves an identifier of the event for the export buffer.
-
- The identifier is determined based on the event attributes.
-
- Args:
- event (EventObject): event.
+ generator = self._export_event_heap.PopEvents()
- Returns:
- str: unique identifier representation of the event that can be used for
- equality comparison.
- """
- attributes = {}
- for attribute_name, attribute_value in event.GetAttributes():
- if attribute_name in self._EXCLUDED_EVENT_ATTRIBUTES:
+ for macb_group_identifier, content_identifier, event in generator:
+ if deduplicate_events and last_content_identifier == content_identifier:
+ self._number_of_duplicate_events += 1
continue
- if isinstance(attribute_value, dict):
- attribute_value = sorted(attribute_value.items())
-
- elif isinstance(attribute_value, set):
- attribute_value = sorted(list(attribute_value))
-
- attributes[attribute_name] = attribute_value
-
- if event.pathspec:
- attributes[u'pathspec'] = event.pathspec.comparable
-
- try:
- event_identifier_string = u'|'.join([
- u'{0:s}={1!s}'.format(attribute_name, attribute_value)
- for attribute_name, attribute_value in sorted(attributes.items())])
-
- except UnicodeDecodeError:
- event_identifier = event.GetIdentifier()
- event_identifier_string = u'identifier={0:s}'.format(
- event_identifier.CopyToString())
-
- event_identifier_string = u'{0:d}|{1:s}|{2:s}'.format(
- event.timestamp, event.data_type, event_identifier_string)
- return event_identifier_string
+ if macb_group_identifier is None:
onager 2017/07/25 08:22:41 What happens if there's a non-MACB-groupable event
onager 2017/07/25 08:28:19 Also, given how tricky this method has been, pleas
Joachim Metz 2017/07/25 08:41:46 the events are grouped (sorted) by macb group iden
+ if macb_group:
+ output_module.WriteEventMACBGroup(macb_group)
+ macb_group = []
- def _MergeEvents(self, first_event, second_event):
- """Merges the attributes of the second event into the first.
-
- Args:
- first_event (EventObject): first event.
- second_event (EventObject): second event.
- """
- # TODO: Currently we are using the first event pathspec, perhaps that
- # is not the best approach. There is no need to have all the pathspecs
- # inside the combined event, however which one should be chosen is
- # perhaps something that can be evaluated here (regular TSK in favor of
- # an event stored deep inside a VSS for instance).
-
- for attribute_name in self._JOIN_ATTRIBUTES:
- first_value = getattr(first_event, attribute_name, None)
- if first_value is None:
- first_value_set = set()
+ output_module.WriteEvent(event)
else:
- if isinstance(first_value, py2to3.STRING_TYPES):
- first_value = first_value.split(u';')
- else:
- first_value = [first_value]
-
- first_value_set = set(first_value)
+ if (last_macb_group_identifier == macb_group_identifier or
+ not macb_group):
+ macb_group.append(event)
- second_value = getattr(second_event, attribute_name, None)
- if second_value is None:
- second_value_set = set()
-
- else:
- if isinstance(second_value, py2to3.STRING_TYPES):
- second_value = second_value.split(u';')
else:
- second_value = [second_value]
-
- second_value_set = set(second_value)
-
- values_list = list(first_value_set.union(second_value_set))
- values_list.sort()
-
- if not values_list:
- join_value = None
- elif len(values_list) == 1:
- join_value = values_list[0]
- else:
- join_value = u';'.join(values_list)
-
- setattr(first_event, attribute_name, join_value)
-
- # If two events are merged then we'll just pick the first inode value.
- inode = first_event.inode
- if isinstance(inode, py2to3.STRING_TYPES):
- inode_list = inode.split(u';')
- try:
- new_inode = int(inode_list[0], 10)
- except (IndexError, ValueError):
- new_inode = 0
-
- first_event.inode = new_inode
-
- # Special instance if this is a filestat entry we need to combine the
- # timestamp description field.
- parser_name = getattr(first_event, u'parser', None)
-
- if parser_name == u'filestat':
- first_description = getattr(first_event, u'timestamp_desc', u'')
- first_description_set = set(first_description.split(u';'))
+ output_module.WriteEventMACBGroup(macb_group)
+ macb_group = [event]
- second_description = getattr(second_event, u'timestamp_desc', u'')
- second_description_set = set(second_description.split(u';'))
+ self._number_of_macb_grouped_events += 1
- if first_description_set.difference(second_description_set):
- descriptions_list = list(first_description_set.union(
- second_description_set))
- descriptions_list.sort()
- description_value = u';'.join(descriptions_list)
+ last_macb_group_identifier = macb_group_identifier
+ last_content_identifier = content_identifier
- setattr(first_event, u'timestamp_desc', description_value)
+ if macb_group:
+ output_module.WriteEventMACBGroup(macb_group)
def _StartAnalysisProcesses(self, storage_writer, analysis_plugins):
"""Starts the analysis processes.
« no previous file with comments | « plaso/lib/definitions.py ('k') | plaso/output/dynamic.py » ('j') | plaso/output/dynamic.py » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b