Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1663)

Unified Diff: tests/multi_processing/psort.py

Issue 328040043: [plaso] Changed l2t_csv output module to join MACB timestamps #306 (Closed)
Patch Set: Changes after review Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tests/multi_processing/psort.py
diff --git a/tests/multi_processing/psort.py b/tests/multi_processing/psort.py
index cdd25cc0191e7c91cf496edca7d994241456ab7a..187ac9940ece7ba8a8ba49f681f3e7c892ec46df 100644
--- a/tests/multi_processing/psort.py
+++ b/tests/multi_processing/psort.py
@@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
"""Tests for the psort multi-processing engine."""
+from __future__ import unicode_literals
+
import os
import shutil
import unittest
@@ -14,6 +16,7 @@ from plaso.engine import knowledge_base
from plaso.formatters import interface as formatters_interface
from plaso.formatters import manager as formatters_manager
from plaso.formatters import mediator as formatters_mediator
+from plaso.lib import definitions
from plaso.multi_processing import psort
from plaso.output import dynamic
from plaso.output import interface as output_interface
@@ -27,7 +30,7 @@ from tests.filters import test_lib as filters_test_lib
class TestAnalysisPlugin(analysis_interface.AnalysisPlugin):
- """Class that defines an analysis plugin for testing."""
+ """Analysis plugin for testing."""
def CompileReport(self, mediator):
"""Compiles a report of the analysis.
@@ -57,43 +60,66 @@ class TestAnalysisPlugin(analysis_interface.AnalysisPlugin):
class TestEvent(events.EventObject):
- """Class that defines an event for testing."""
+ """Event for testing."""
+
+ DATA_TYPE = 'test:event:psort'
- DATA_TYPE = u'test:event:psort'
+ def __init__(
+ self, timestamp,
+ text='My text dude.',
+ timestamp_description=definitions.TIME_DESCRIPTION_LAST_PRINTED):
+ """Initializes an event.
- def __init__(self, timestamp):
- """Initializes an event."""
+ Args:
+ timestamp (int): timestamp.
+ text (Optional[str]): text.
+ timestamp_description (Optional[str]): timestamp description.
+ """
super(TestEvent, self).__init__()
+ self.display_name = '/dev/none'
+ self.filename = '/dev/none'
+ self.parser = 'TestEvent'
+ self.text = text
+ self.timestamp_desc = timestamp_description
self.timestamp = timestamp
- self.timestamp_desc = u'Last Written'
-
- self.parser = u'TestEvent'
-
- self.display_name = u'/dev/none'
- self.filename = u'/dev/none'
- self.some = u'My text dude.'
- self.var = {u'Issue': False, u'Closed': True}
+ self.var = {'Issue': False, 'Closed': True}
class TestEventFormatter(formatters_interface.EventFormatter):
- """Class that defines an event formatter for testing."""
+ """Event formatter for testing."""
- DATA_TYPE = u'test:event:psort'
+ DATA_TYPE = 'test:event:psort'
- FORMAT_STRING = u'My text goes along: {some} lines'
+ FORMAT_STRING = 'My text goes along: {text} lines'
- SOURCE_SHORT = u'LOG'
- SOURCE_LONG = u'None in Particular'
+ SOURCE_SHORT = 'LOG'
+ SOURCE_LONG = 'None in Particular'
class TestOutputModule(output_interface.LinearOutputModule):
- """Class that defines an output module for testing."""
+ """Output module for testing.
+
+ Attributes:
+ events (list[EventObject]): event written to the output.
+ macb_groups (list[list[EventObject]]): MACB groups of events written to
+ the output.
+ """
- NAME = u'psort_test'
+ NAME = 'psort_test'
- _HEADER = (
- u'date,time,timezone,MACB,source,sourcetype,type,user,host,'
- u'short,desc,version,filename,inode,notes,format,extra\n')
+ def __init__(self, output_mediator_object):
+ """Initializes an output module.
+
+ Args:
+ output_mediator_object (OutputMediator): mediates interactions between
+ output modules and other components, such as storage and dfvfs.
+
+ Raises:
+ ValueError: when there are unused keyword arguments.
+ """
+ super(TestOutputModule, self).__init__(output_mediator_object)
+ self.events = []
+ self.macb_groups = []
def WriteEventBody(self, event):
"""Writes the body of an event object to the output.
@@ -101,29 +127,46 @@ class TestOutputModule(output_interface.LinearOutputModule):
Args:
event (EventObject): event.
"""
- message, _ = self._output_mediator.GetFormattedMessages(event)
- source_short, source_long = self._output_mediator.GetFormattedSources(event)
- self._WriteLine(u'{0:s}/{1:s} {2:s}\n'.format(
- source_short, source_long, message))
+ self.events.append(event)
def WriteHeader(self):
"""Writes the header to the output."""
- self._WriteLine(self._HEADER)
+ pass
+
+ def WriteEventMACBGroup(self, event_macb_group):
+ """Writes an event MACB group to the output.
+ An event MACB group is a group of events that have the same timestamp and
+ event data (attributes and values), where the timestamp description (or
+ usage) is one or more of MACB (modification, access, change, birth).
-class EventsHeapTest(shared_test_lib.BaseTestCase):
+ This function is called if the psort engine detected an event MACB group
+ so that the output module, if supported, can represent the group as
+ such. If not overridden this function will output every event individually.
+
+ Args:
+ event_macb_group (list[EventObject]): group of events with identical
+ timestamps, attributes and values.
+ """
+ self.events.extend(event_macb_group)
+ self.macb_groups.append(event_macb_group)
+
+
+class PsortEvensHeapTest(shared_test_lib.BaseTestCase):
"""Tests for the psort events heap."""
# pylint: disable=protected-access
+ # TODO: add tests for _GetEventIdentifiers
+
def testNumberOfEvents(self):
"""Tests the number_of_events property."""
- event_heap = psort._EventsHeap()
+ event_heap = psort.PsortEventHeap()
self.assertEqual(event_heap.number_of_events, 0)
def testPopEvent(self):
"""Tests the PopEvent function."""
- event_heap = psort._EventsHeap()
+ event_heap = psort.PsortEventHeap()
test_event = event_heap.PopEvent()
self.assertIsNone(test_event)
@@ -136,7 +179,7 @@ class EventsHeapTest(shared_test_lib.BaseTestCase):
def testPopEvents(self):
"""Tests the PopEvents function."""
- event_heap = psort._EventsHeap()
+ event_heap = psort.PsortEventHeap()
test_events = list(event_heap.PopEvents())
self.assertEqual(len(test_events), 0)
@@ -149,7 +192,7 @@ class EventsHeapTest(shared_test_lib.BaseTestCase):
def testPushEvent(self):
"""Tests the PushEvent function."""
- event_heap = psort._EventsHeap()
+ event_heap = psort.PsortEventHeap()
event = TestEvent(5134324321)
event_heap.PushEvent(event)
@@ -162,6 +205,40 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
# pylint: disable=protected-access
+ _TEST_EVENTS = [
+ (5134324321, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}),
+ (5134324321, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}),
+ (2134324321, {}),
+ (9134324321, {}),
+ (5134324321, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}),
+ (5134324321, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CREATION}),
+ (15134324321, {}),
+ (5134324322, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CREATION}),
+ (5134324322, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}),
+ (5134324322, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}),
+ (5134324322, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}),
+ (5134324322, {
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}),
+ (5134324322, {
+ 'text': 'Another text',
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}),
+ (5134324322, {
+ 'text': 'Another text',
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}),
+ (5134324322, {
+ 'text': 'Another text',
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}),
+ (5134024321, {}),
+ (5134024321, {})]
+
def _CreateTestStorageFile(self, path):
"""Creates a storage file for testing.
@@ -173,12 +250,9 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
# TODO: add preprocessing information.
- storage_file.AddEvent(TestEvent(5134324321))
- storage_file.AddEvent(TestEvent(2134324321))
- storage_file.AddEvent(TestEvent(9134324321))
- storage_file.AddEvent(TestEvent(15134324321))
- storage_file.AddEvent(TestEvent(5134324322))
- storage_file.AddEvent(TestEvent(5134024321))
+ for timestamp, kwargs in self._TEST_EVENTS:
+ event = TestEvent(timestamp, **kwargs)
+ storage_file.AddEvent(event)
storage_file.Close()
@@ -192,7 +266,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
test_plugin = TestAnalysisPlugin()
with shared_test_lib.TempDirectory() as temp_directory:
- temp_file = os.path.join(temp_directory, u'storage.plaso')
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
self._CreateTestStorageFile(temp_file)
storage_writer = storage_zip_file.ZIPStorageFileWriter(
@@ -210,7 +284,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
test_filter = filters_test_lib.TestEventFilter()
with shared_test_lib.TempDirectory() as temp_directory:
- temp_file = os.path.join(temp_directory, u'storage.plaso')
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
self._CreateTestStorageFile(temp_file)
storage_writer = storage_zip_file.ZIPStorageFileWriter(
@@ -250,13 +324,14 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
formatters_manager.FormattersManager.RegisterFormatter(TestEventFormatter)
with shared_test_lib.TempDirectory() as temp_directory:
- temp_file = os.path.join(temp_directory, u'storage.plaso')
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
self._CreateTestStorageFile(temp_file)
storage_reader = storage_zip_file.ZIPStorageFileReader(temp_file)
storage_reader.ReadPreprocessingInformation(knowledge_base_object)
- test_engine._ExportEvents(storage_reader, output_module)
+ test_engine._ExportEvents(
+ storage_reader, output_module, deduplicate_events=False)
formatters_manager.FormattersManager.DeregisterFormatter(TestEventFormatter)
@@ -265,46 +340,75 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
for line in output.split(b'\n'):
lines.append(line)
- self.assertEqual(len(lines), 7)
+ self.assertEqual(len(output_module.events), 17)
+ self.assertEqual(len(output_module.macb_groups), 3)
- self.assertTrue(b'My text goes along: My text dude. lines' in lines[1])
- self.assertTrue(b'LOG/' in lines[1])
- self.assertTrue(b'None in Particular' in lines[1])
+ def testInternalExportEventsDeduplicate(self):
+ """Tests the _ExportEvents function with deduplication."""
+ knowledge_base_object = knowledge_base.KnowledgeBase()
+ output_writer = cli_test_lib.TestOutputWriter()
- # TODO: add test for _FlushExportBuffer.
+ formatter_mediator = formatters_mediator.FormatterMediator()
- # TODO: add test for _MergeEvents.
- # Note that function will be removed in the future.
+ output_mediator_object = output_mediator.OutputMediator(
+ knowledge_base_object, formatter_mediator)
+
+ output_module = TestOutputModule(output_mediator_object)
+ output_module.SetOutputWriter(output_writer)
+ test_engine = psort.PsortMultiProcessEngine()
+
+ formatters_manager.FormattersManager.RegisterFormatter(TestEventFormatter)
+
+ with shared_test_lib.TempDirectory() as temp_directory:
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
+ self._CreateTestStorageFile(temp_file)
+
+ storage_reader = storage_zip_file.ZIPStorageFileReader(temp_file)
+ storage_reader.ReadPreprocessingInformation(knowledge_base_object)
+
+ test_engine._ExportEvents(storage_reader, output_module)
+
+ formatters_manager.FormattersManager.DeregisterFormatter(TestEventFormatter)
+
+ lines = []
+ output = output_writer.ReadOutput()
+ for line in output.split(b'\n'):
+ lines.append(line)
+
+ self.assertEqual(len(output_module.events), 15)
+ self.assertEqual(len(output_module.macb_groups), 3)
+
+ # TODO: add test for _FlushExportBuffer.
# TODO: add test for _StartAnalysisProcesses.
# TODO: add test for _StatusUpdateThreadMain.
# TODO: add test for _StopAnalysisProcesses.
# TODO: add test for _UpdateProcessingStatus.
- @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso'])
+ @shared_test_lib.skipUnlessHasTestFile(['psort_test.json.plaso'])
def testAnalyzeEvents(self):
"""Tests the AnalyzeEvents function."""
- storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso'])
+ storage_file_path = self._GetTestFilePath(['psort_test.json.plaso'])
session = sessions.Session()
knowledge_base_object = knowledge_base.KnowledgeBase()
formatter_mediator = formatters_mediator.FormatterMediator()
- formatter_mediator.SetPreferredLanguageIdentifier(u'en-US')
+ formatter_mediator.SetPreferredLanguageIdentifier('en-US')
output_mediator_object = output_mediator.OutputMediator(
knowledge_base_object, formatter_mediator)
output_module = null.NullOutputModule(output_mediator_object)
- data_location = u''
+ data_location = ''
analysis_plugin = tagging.TaggingAnalysisPlugin()
# TODO: set tag file.
test_engine = psort.PsortMultiProcessEngine()
with shared_test_lib.TempDirectory() as temp_directory:
- temp_file = os.path.join(temp_directory, u'storage.plaso')
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
shutil.copyfile(storage_file_path, temp_file)
storage_writer = storage_zip_file.ZIPStorageFileWriter(
@@ -320,7 +424,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
test_filter = filters_test_lib.TestEventFilter()
with shared_test_lib.TempDirectory() as temp_directory:
- temp_file = os.path.join(temp_directory, u'storage.plaso')
+ temp_file = os.path.join(temp_directory, 'storage.plaso')
shutil.copyfile(storage_file_path, temp_file)
storage_writer = storage_zip_file.ZIPStorageFileWriter(
@@ -335,16 +439,16 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
# TODO: add bogus data location test.
- @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso'])
+ @shared_test_lib.skipUnlessHasTestFile(['psort_test.json.plaso'])
def testExportEvents(self):
"""Tests the ExportEvents function."""
- storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso'])
+ storage_file_path = self._GetTestFilePath(['psort_test.json.plaso'])
knowledge_base_object = knowledge_base.KnowledgeBase()
output_writer = cli_test_lib.TestOutputWriter()
formatter_mediator = formatters_mediator.FormatterMediator()
- formatter_mediator.SetPreferredLanguageIdentifier(u'en-US')
+ formatter_mediator.SetPreferredLanguageIdentifier('en-US')
output_mediator_object = output_mediator.OutputMediator(
knowledge_base_object, formatter_mediator)
@@ -358,7 +462,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
counter = test_engine.ExportEvents(
knowledge_base_object, storage_reader, output_module)
- self.assertEqual(counter[u'Stored Events'], 0)
+ self.assertEqual(counter['Stored Events'], 0)
lines = []
output = output_writer.ReadOutput()
@@ -368,14 +472,14 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase):
self.assertEqual(len(lines), 22)
expected_line = (
- u'2014-11-18T01:15:43+00:00,'
- u'Content Modification Time,'
- u'LOG,'
- u'Log File,'
- u'[---] last message repeated 5 times ---,'
- u'syslog,'
- u'OS:/tmp/test/test_data/syslog,'
- u'repeated')
+ '2014-11-18T01:15:43+00:00,'
+ 'Content Modification Time,'
+ 'LOG,'
+ 'Log File,'
+ '[---] last message repeated 5 times ---,'
+ 'syslog,'
+ 'OS:/tmp/test/test_data/syslog,'
+ 'repeated')
self.assertEqual(lines[14], expected_line)
« plaso/output/dynamic.py ('K') | « tests/cli/psort_tool.py ('k') | tests/output/l2t_csv.py » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b