Index: tests/multi_processing/psort.py |
diff --git a/tests/multi_processing/psort.py b/tests/multi_processing/psort.py |
index cdd25cc0191e7c91cf496edca7d994241456ab7a..187ac9940ece7ba8a8ba49f681f3e7c892ec46df 100644 |
--- a/tests/multi_processing/psort.py |
+++ b/tests/multi_processing/psort.py |
@@ -2,6 +2,8 @@ |
# -*- coding: utf-8 -*- |
"""Tests for the psort multi-processing engine.""" |
+from __future__ import unicode_literals |
+ |
import os |
import shutil |
import unittest |
@@ -14,6 +16,7 @@ from plaso.engine import knowledge_base |
from plaso.formatters import interface as formatters_interface |
from plaso.formatters import manager as formatters_manager |
from plaso.formatters import mediator as formatters_mediator |
+from plaso.lib import definitions |
from plaso.multi_processing import psort |
from plaso.output import dynamic |
from plaso.output import interface as output_interface |
@@ -27,7 +30,7 @@ from tests.filters import test_lib as filters_test_lib |
class TestAnalysisPlugin(analysis_interface.AnalysisPlugin): |
- """Class that defines an analysis plugin for testing.""" |
+ """Analysis plugin for testing.""" |
def CompileReport(self, mediator): |
"""Compiles a report of the analysis. |
@@ -57,43 +60,66 @@ class TestAnalysisPlugin(analysis_interface.AnalysisPlugin): |
class TestEvent(events.EventObject): |
- """Class that defines an event for testing.""" |
+ """Event for testing.""" |
+ |
+ DATA_TYPE = 'test:event:psort' |
- DATA_TYPE = u'test:event:psort' |
+ def __init__( |
+ self, timestamp, |
+ text='My text dude.', |
+ timestamp_description=definitions.TIME_DESCRIPTION_LAST_PRINTED): |
+ """Initializes an event. |
- def __init__(self, timestamp): |
- """Initializes an event.""" |
+ Args: |
+ timestamp (int): timestamp. |
+ text (Optional[str]): text. |
+ timestamp_description (Optional[str]): timestamp description. |
+ """ |
super(TestEvent, self).__init__() |
+ self.display_name = '/dev/none' |
+ self.filename = '/dev/none' |
+ self.parser = 'TestEvent' |
+ self.text = text |
+ self.timestamp_desc = timestamp_description |
self.timestamp = timestamp |
- self.timestamp_desc = u'Last Written' |
- |
- self.parser = u'TestEvent' |
- |
- self.display_name = u'/dev/none' |
- self.filename = u'/dev/none' |
- self.some = u'My text dude.' |
- self.var = {u'Issue': False, u'Closed': True} |
+ self.var = {'Issue': False, 'Closed': True} |
class TestEventFormatter(formatters_interface.EventFormatter): |
- """Class that defines an event formatter for testing.""" |
+ """Event formatter for testing.""" |
- DATA_TYPE = u'test:event:psort' |
+ DATA_TYPE = 'test:event:psort' |
- FORMAT_STRING = u'My text goes along: {some} lines' |
+ FORMAT_STRING = 'My text goes along: {text} lines' |
- SOURCE_SHORT = u'LOG' |
- SOURCE_LONG = u'None in Particular' |
+ SOURCE_SHORT = 'LOG' |
+ SOURCE_LONG = 'None in Particular' |
class TestOutputModule(output_interface.LinearOutputModule): |
- """Class that defines an output module for testing.""" |
+ """Output module for testing. |
+ |
+ Attributes: |
+ events (list[EventObject]): event written to the output. |
+ macb_groups (list[list[EventObject]]): MACB groups of events written to |
+ the output. |
+ """ |
- NAME = u'psort_test' |
+ NAME = 'psort_test' |
- _HEADER = ( |
- u'date,time,timezone,MACB,source,sourcetype,type,user,host,' |
- u'short,desc,version,filename,inode,notes,format,extra\n') |
+ def __init__(self, output_mediator_object): |
+ """Initializes an output module. |
+ |
+ Args: |
+ output_mediator_object (OutputMediator): mediates interactions between |
+ output modules and other components, such as storage and dfvfs. |
+ |
+ Raises: |
+ ValueError: when there are unused keyword arguments. |
+ """ |
+ super(TestOutputModule, self).__init__(output_mediator_object) |
+ self.events = [] |
+ self.macb_groups = [] |
def WriteEventBody(self, event): |
"""Writes the body of an event object to the output. |
@@ -101,29 +127,46 @@ class TestOutputModule(output_interface.LinearOutputModule): |
Args: |
event (EventObject): event. |
""" |
- message, _ = self._output_mediator.GetFormattedMessages(event) |
- source_short, source_long = self._output_mediator.GetFormattedSources(event) |
- self._WriteLine(u'{0:s}/{1:s} {2:s}\n'.format( |
- source_short, source_long, message)) |
+ self.events.append(event) |
def WriteHeader(self): |
"""Writes the header to the output.""" |
- self._WriteLine(self._HEADER) |
+ pass |
+ |
+ def WriteEventMACBGroup(self, event_macb_group): |
+ """Writes an event MACB group to the output. |
+ An event MACB group is a group of events that have the same timestamp and |
+ event data (attributes and values), where the timestamp description (or |
+ usage) is one or more of MACB (modification, access, change, birth). |
-class EventsHeapTest(shared_test_lib.BaseTestCase): |
+ This function is called if the psort engine detected an event MACB group |
+ so that the output module, if supported, can represent the group as |
+ such. If not overridden this function will output every event individually. |
+ |
+ Args: |
+ event_macb_group (list[EventObject]): group of events with identical |
+ timestamps, attributes and values. |
+ """ |
+ self.events.extend(event_macb_group) |
+ self.macb_groups.append(event_macb_group) |
+ |
+ |
+class PsortEvensHeapTest(shared_test_lib.BaseTestCase): |
"""Tests for the psort events heap.""" |
# pylint: disable=protected-access |
+ # TODO: add tests for _GetEventIdentifiers |
+ |
def testNumberOfEvents(self): |
"""Tests the number_of_events property.""" |
- event_heap = psort._EventsHeap() |
+ event_heap = psort.PsortEventHeap() |
self.assertEqual(event_heap.number_of_events, 0) |
def testPopEvent(self): |
"""Tests the PopEvent function.""" |
- event_heap = psort._EventsHeap() |
+ event_heap = psort.PsortEventHeap() |
test_event = event_heap.PopEvent() |
self.assertIsNone(test_event) |
@@ -136,7 +179,7 @@ class EventsHeapTest(shared_test_lib.BaseTestCase): |
def testPopEvents(self): |
"""Tests the PopEvents function.""" |
- event_heap = psort._EventsHeap() |
+ event_heap = psort.PsortEventHeap() |
test_events = list(event_heap.PopEvents()) |
self.assertEqual(len(test_events), 0) |
@@ -149,7 +192,7 @@ class EventsHeapTest(shared_test_lib.BaseTestCase): |
def testPushEvent(self): |
"""Tests the PushEvent function.""" |
- event_heap = psort._EventsHeap() |
+ event_heap = psort.PsortEventHeap() |
event = TestEvent(5134324321) |
event_heap.PushEvent(event) |
@@ -162,6 +205,40 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
# pylint: disable=protected-access |
+ _TEST_EVENTS = [ |
+ (5134324321, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}), |
+ (5134324321, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}), |
+ (2134324321, {}), |
+ (9134324321, {}), |
+ (5134324321, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}), |
+ (5134324321, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CREATION}), |
+ (15134324321, {}), |
+ (5134324322, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CREATION}), |
+ (5134324322, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}), |
+ (5134324322, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}), |
+ (5134324322, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}), |
+ (5134324322, { |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}), |
+ (5134324322, { |
+ 'text': 'Another text', |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_LAST_ACCESS}), |
+ (5134324322, { |
+ 'text': 'Another text', |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_CHANGE}), |
+ (5134324322, { |
+ 'text': 'Another text', |
+ 'timestamp_description': definitions.TIME_DESCRIPTION_WRITTEN}), |
+ (5134024321, {}), |
+ (5134024321, {})] |
+ |
def _CreateTestStorageFile(self, path): |
"""Creates a storage file for testing. |
@@ -173,12 +250,9 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
# TODO: add preprocessing information. |
- storage_file.AddEvent(TestEvent(5134324321)) |
- storage_file.AddEvent(TestEvent(2134324321)) |
- storage_file.AddEvent(TestEvent(9134324321)) |
- storage_file.AddEvent(TestEvent(15134324321)) |
- storage_file.AddEvent(TestEvent(5134324322)) |
- storage_file.AddEvent(TestEvent(5134024321)) |
+ for timestamp, kwargs in self._TEST_EVENTS: |
+ event = TestEvent(timestamp, **kwargs) |
+ storage_file.AddEvent(event) |
storage_file.Close() |
@@ -192,7 +266,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
test_plugin = TestAnalysisPlugin() |
with shared_test_lib.TempDirectory() as temp_directory: |
- temp_file = os.path.join(temp_directory, u'storage.plaso') |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
self._CreateTestStorageFile(temp_file) |
storage_writer = storage_zip_file.ZIPStorageFileWriter( |
@@ -210,7 +284,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
test_filter = filters_test_lib.TestEventFilter() |
with shared_test_lib.TempDirectory() as temp_directory: |
- temp_file = os.path.join(temp_directory, u'storage.plaso') |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
self._CreateTestStorageFile(temp_file) |
storage_writer = storage_zip_file.ZIPStorageFileWriter( |
@@ -250,13 +324,14 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
formatters_manager.FormattersManager.RegisterFormatter(TestEventFormatter) |
with shared_test_lib.TempDirectory() as temp_directory: |
- temp_file = os.path.join(temp_directory, u'storage.plaso') |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
self._CreateTestStorageFile(temp_file) |
storage_reader = storage_zip_file.ZIPStorageFileReader(temp_file) |
storage_reader.ReadPreprocessingInformation(knowledge_base_object) |
- test_engine._ExportEvents(storage_reader, output_module) |
+ test_engine._ExportEvents( |
+ storage_reader, output_module, deduplicate_events=False) |
formatters_manager.FormattersManager.DeregisterFormatter(TestEventFormatter) |
@@ -265,46 +340,75 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
for line in output.split(b'\n'): |
lines.append(line) |
- self.assertEqual(len(lines), 7) |
+ self.assertEqual(len(output_module.events), 17) |
+ self.assertEqual(len(output_module.macb_groups), 3) |
- self.assertTrue(b'My text goes along: My text dude. lines' in lines[1]) |
- self.assertTrue(b'LOG/' in lines[1]) |
- self.assertTrue(b'None in Particular' in lines[1]) |
+ def testInternalExportEventsDeduplicate(self): |
+ """Tests the _ExportEvents function with deduplication.""" |
+ knowledge_base_object = knowledge_base.KnowledgeBase() |
+ output_writer = cli_test_lib.TestOutputWriter() |
- # TODO: add test for _FlushExportBuffer. |
+ formatter_mediator = formatters_mediator.FormatterMediator() |
- # TODO: add test for _MergeEvents. |
- # Note that function will be removed in the future. |
+ output_mediator_object = output_mediator.OutputMediator( |
+ knowledge_base_object, formatter_mediator) |
+ |
+ output_module = TestOutputModule(output_mediator_object) |
+ output_module.SetOutputWriter(output_writer) |
+ test_engine = psort.PsortMultiProcessEngine() |
+ |
+ formatters_manager.FormattersManager.RegisterFormatter(TestEventFormatter) |
+ |
+ with shared_test_lib.TempDirectory() as temp_directory: |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
+ self._CreateTestStorageFile(temp_file) |
+ |
+ storage_reader = storage_zip_file.ZIPStorageFileReader(temp_file) |
+ storage_reader.ReadPreprocessingInformation(knowledge_base_object) |
+ |
+ test_engine._ExportEvents(storage_reader, output_module) |
+ |
+ formatters_manager.FormattersManager.DeregisterFormatter(TestEventFormatter) |
+ |
+ lines = [] |
+ output = output_writer.ReadOutput() |
+ for line in output.split(b'\n'): |
+ lines.append(line) |
+ |
+ self.assertEqual(len(output_module.events), 15) |
+ self.assertEqual(len(output_module.macb_groups), 3) |
+ |
+ # TODO: add test for _FlushExportBuffer. |
# TODO: add test for _StartAnalysisProcesses. |
# TODO: add test for _StatusUpdateThreadMain. |
# TODO: add test for _StopAnalysisProcesses. |
# TODO: add test for _UpdateProcessingStatus. |
- @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso']) |
+ @shared_test_lib.skipUnlessHasTestFile(['psort_test.json.plaso']) |
def testAnalyzeEvents(self): |
"""Tests the AnalyzeEvents function.""" |
- storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso']) |
+ storage_file_path = self._GetTestFilePath(['psort_test.json.plaso']) |
session = sessions.Session() |
knowledge_base_object = knowledge_base.KnowledgeBase() |
formatter_mediator = formatters_mediator.FormatterMediator() |
- formatter_mediator.SetPreferredLanguageIdentifier(u'en-US') |
+ formatter_mediator.SetPreferredLanguageIdentifier('en-US') |
output_mediator_object = output_mediator.OutputMediator( |
knowledge_base_object, formatter_mediator) |
output_module = null.NullOutputModule(output_mediator_object) |
- data_location = u'' |
+ data_location = '' |
analysis_plugin = tagging.TaggingAnalysisPlugin() |
# TODO: set tag file. |
test_engine = psort.PsortMultiProcessEngine() |
with shared_test_lib.TempDirectory() as temp_directory: |
- temp_file = os.path.join(temp_directory, u'storage.plaso') |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
shutil.copyfile(storage_file_path, temp_file) |
storage_writer = storage_zip_file.ZIPStorageFileWriter( |
@@ -320,7 +424,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
test_filter = filters_test_lib.TestEventFilter() |
with shared_test_lib.TempDirectory() as temp_directory: |
- temp_file = os.path.join(temp_directory, u'storage.plaso') |
+ temp_file = os.path.join(temp_directory, 'storage.plaso') |
shutil.copyfile(storage_file_path, temp_file) |
storage_writer = storage_zip_file.ZIPStorageFileWriter( |
@@ -335,16 +439,16 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
# TODO: add bogus data location test. |
- @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso']) |
+ @shared_test_lib.skipUnlessHasTestFile(['psort_test.json.plaso']) |
def testExportEvents(self): |
"""Tests the ExportEvents function.""" |
- storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso']) |
+ storage_file_path = self._GetTestFilePath(['psort_test.json.plaso']) |
knowledge_base_object = knowledge_base.KnowledgeBase() |
output_writer = cli_test_lib.TestOutputWriter() |
formatter_mediator = formatters_mediator.FormatterMediator() |
- formatter_mediator.SetPreferredLanguageIdentifier(u'en-US') |
+ formatter_mediator.SetPreferredLanguageIdentifier('en-US') |
output_mediator_object = output_mediator.OutputMediator( |
knowledge_base_object, formatter_mediator) |
@@ -358,7 +462,7 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
counter = test_engine.ExportEvents( |
knowledge_base_object, storage_reader, output_module) |
- self.assertEqual(counter[u'Stored Events'], 0) |
+ self.assertEqual(counter['Stored Events'], 0) |
lines = [] |
output = output_writer.ReadOutput() |
@@ -368,14 +472,14 @@ class PsortMultiProcessEngineTest(shared_test_lib.BaseTestCase): |
self.assertEqual(len(lines), 22) |
expected_line = ( |
- u'2014-11-18T01:15:43+00:00,' |
- u'Content Modification Time,' |
- u'LOG,' |
- u'Log File,' |
- u'[---] last message repeated 5 times ---,' |
- u'syslog,' |
- u'OS:/tmp/test/test_data/syslog,' |
- u'repeated') |
+ '2014-11-18T01:15:43+00:00,' |
+ 'Content Modification Time,' |
+ 'LOG,' |
+ 'Log File,' |
+ '[---] last message repeated 5 times ---,' |
+ 'syslog,' |
+ 'OS:/tmp/test/test_data/syslog,' |
+ 'repeated') |
self.assertEqual(lines[14], expected_line) |