Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(374)

Unified Diff: tools/psort_test.py

Issue 321000043: [plaso] Refactored log2timeline front-end to tool #160 (Closed)
Patch Set: Updated review script Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/psort_test.py
diff --git a/tools/psort_test.py b/tools/psort_test.py
deleted file mode 100644
index ff32ca46dc9351a684281f3be344de65b5f39eed..0000000000000000000000000000000000000000
--- a/tools/psort_test.py
+++ /dev/null
@@ -1,508 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-"""Tests for the psort CLI tool."""
-
-import argparse
-import os
-import unittest
-
-from plaso.cli.helpers import interface as helpers_interface
-from plaso.cli.helpers import manager as helpers_manager
-from plaso.lib import errors
-from plaso.output import manager as output_manager
-
-from tests import test_lib as shared_test_lib
-from tests.cli import test_lib as cli_test_lib
-from tests.multi_processing import psort as psort_test
-
-from tools import psort
-
-
-class TestInputReader(object):
- """Test input reader."""
-
- def __init__(self):
- """Initialize the reader."""
- super(TestInputReader, self).__init__()
- self.read_called = False
-
- def Read(self):
- """Mock a read operation by user."""
- self.read_called = True
- return u'foobar'
-
-
-class TestOutputModuleArgumentHelper(helpers_interface.ArgumentsHelper):
- """Test argument helper for the test output module."""
-
- NAME = u'test_missing'
-
- @classmethod
- def AddArguments(cls, argument_group):
- """Mock the add argument section."""
- pass
-
- @classmethod
- def ParseOptions(cls, options, output_module):
- """Provide a test parse options section."""
- if not isinstance(output_module, TestOutputModuleMissingParameters):
- raise errors.BadConfigObject((
- u'Output module is not an instance of '
- u'TestOutputModuleMissingParameters'))
-
- missing = getattr(options, u'missing', None)
- if missing:
- output_module.SetMissingValue(u'missing', missing)
-
- parameters = getattr(options, u'parameters', None)
- if parameters:
- output_module.SetMissingValue(u'parameters', parameters)
-
-
-class TestOutputModuleMissingParameters(psort_test.TestOutputModule):
- """Test output module that is missing some parameters."""
-
- NAME = u'test_missing'
-
- # For test purpose assign these as class attributes.
- missing = None
- parameters = None
-
- def GetMissingArguments(self):
- """Return a list of missing parameters."""
- missing_parameters = []
- if self.missing is None:
- missing_parameters.append(u'missing')
-
- if self.parameters is None:
- missing_parameters.append(u'parameters')
-
- return missing_parameters
-
- @classmethod
- def SetMissingValue(cls, attribute, value):
- """Set missing value."""
- setattr(cls, attribute, value)
-
-
-class PsortToolTest(cli_test_lib.CLIToolTestCase):
- """Tests for the psort tool."""
-
- _EXPECTED_ANALYSIS_PLUGIN_OPTIONS = u'\n'.join([
- u'usage: psort_test.py [--nsrlsvr-hash HASH] [--nsrlsvr-host HOST]',
- (u' [--nsrlsvr-port PORT] '
- u'[--tagging-file TAGGING_FILE]'),
- u' [--viper-hash HASH] [--viper-host HOST]',
- u' [--viper-port PORT] [--viper-protocol PROTOCOL]',
- u' [--virustotal-api-key API_KEY]',
- (u' [--virustotal-free-rate-limit] '
- u'[--virustotal-hash HASH]'),
- u' [--windows-services-output {text,yaml}]',
- u'',
- u'Test argument parser.',
- u'',
- u'optional arguments:',
- u' --nsrlsvr-hash HASH, --nsrlsvr_hash HASH',
- (u' Type of hash to use to query nsrlsvr '
- u'instance, the'),
- (u' default is: md5. Supported options: md5, '
- u'sha1'),
- u' --nsrlsvr-host HOST, --nsrlsvr_host HOST',
- (u' Hostname or IP address of the nsrlsvr '
- u'instance to'),
- u' query, the default is: localhost',
- u' --nsrlsvr-port PORT, --nsrlsvr_port PORT',
- (u' Port number of the nsrlsvr instance to '
- u'query, the'),
- u' default is: 9120.',
- u' --tagging-file TAGGING_FILE, --tagging_file TAGGING_FILE',
- u' Specify a file to read tagging criteria from.',
- u' --viper-hash HASH, --viper_hash HASH',
- (u' Type of hash to use to query the Viper '
- u'server, the'),
- (u' default is: sha256. Supported options: md5, '
- u'sha256'),
- u' --viper-host HOST, --viper_host HOST',
- (u' Hostname of the Viper server to query, the '
- u'default is:'),
- u' localhost',
- u' --viper-port PORT, --viper_port PORT',
- (u' Port of the Viper server to query, the '
- u'default is:'),
- u' 8080.',
- u' --viper-protocol PROTOCOL, --viper_protocol PROTOCOL',
- (u' Protocol to use to query Viper, the '
- u'default is: http.'),
- u' Supported options: http, https',
- u' --virustotal-api-key API_KEY, --virustotal_api_key API_KEY',
- u' Specify the API key for use with VirusTotal.',
- u' --virustotal-free-rate-limit, --virustotal_free_rate_limit',
- (u' Limit Virustotal requests to the default '
- u'free API key'),
- (u' rate of 4 requests per minute. Set this to '
- u'false if'),
- u' you have an key for the private API.',
- u' --virustotal-hash HASH, --virustotal_hash HASH',
- (u' Type of hash to query VirusTotal, the '
- u'default is:'),
- u' sha256',
- u' --windows-services-output {text,yaml}',
- (u' Specify how the results should be displayed. '
- u'Options'),
- u' are text and yaml.',
- u''])
-
- _EXPECTED_FILTER_OPTIONS = u'\n'.join([
- (u'usage: psort_test.py [--slice DATE] [--slice_size SLICE_SIZE] '
- u'[--slicer]'),
- u' [FILTER]',
- u'',
- u'Test argument parser.',
- u'',
- u'positional arguments:',
- (u' FILTER A filter that can be used to filter the '
- u'dataset before'),
- (u' it is written into storage. More information '
- u'about the'),
- (u' filters and how to use them can be found '
- u'here:'),
- (u' '
- u'https://github.com/log2timeline/plaso/wiki/Filters'),
- u'',
- u'optional arguments:',
- (u' --slice DATE Create a time slice around a certain date. '
- u'This'),
- (u' parameter, if defined will display all '
- u'events that'),
- (u' happened X minutes before and after the '
- u'defined date.'),
- (u' X is controlled by the parameter '
- u'--slice_size but'),
- u' defaults to 5 minutes.',
- u' --slice_size SLICE_SIZE, --slice-size SLICE_SIZE',
- (u' Defines the slice size. In the case of a '
- u'regular time'),
- (u' slice it defines the number of minutes the '
- u'slice size'),
- (u' should be. In the case of the --slicer it '
- u'determines'),
- (u' the number of events before and after a '
- u'filter match'),
- (u' has been made that will be included in the '
- u'result set.'),
- (u' The default value is 5]. See --slice or '
- u'--slicer for'),
- u' more details about this option.',
- (u' --slicer Create a time slice around every filter '
- u'match. This'),
- (u' parameter, if defined will save all X '
- u'events before'),
- (u' and after a filter match has been made. X '
- u'is defined'),
- u' by the --slice_size parameter.',
- u''])
-
- _EXPECTED_LANGUAGE_OPTIONS = u'\n'.join([
- u'usage: psort_test.py [--language LANGUAGE]',
- u'',
- u'Test argument parser.',
- u'',
- u'optional arguments:',
- (u' --language LANGUAGE The preferred language identifier for Windows '
- u'Event Log'),
- (u' message strings. Use "--language list" to see '
- u'a list of'),
- (u' available language identifiers. Note that '
- u'formatting'),
- (u' will fall back on en-US (LCID 0x0409) if the '
- u'preferred'),
- (u' language is not available in the database of '
- u'message'),
- u' string templates.',
- u''])
-
- _EXPECTED_OUTPUT_MODULE_OPTIONS = u'\n'.join([
- (u'usage: psort_test.py [--fields FIELDS] '
- u'[--additional_fields ADDITIONAL_FIELDS]'),
- u'',
- u'Test argument parser.',
- u'',
- u'optional arguments:',
- u' --additional_fields ADDITIONAL_FIELDS',
- (u' Defines extra fields to be included in the '
- u'output, in'),
- (u' addition to the default fields, which are '
- u'datetime,tim'),
- (u' estamp_desc,source,source_long,message,parser,'
- u'display_'),
- u' name,tag.',
- (u' --fields FIELDS Defines which fields should be included in '
- u'the output.'),
- u''])
-
- _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([
- (u'usage: psort_test.py [--disable_zeromq] '
- u'[--temporary_directory DIRECTORY]'),
- u' [--worker-memory-limit SIZE]',
- u'',
- u'Test argument parser.',
- u'',
- u'optional arguments:',
- u' --disable_zeromq, --disable-zeromq',
- (u' Disable queueing using ZeroMQ. A '
- u'Multiprocessing queue'),
- u' will be used instead.',
- u' --temporary_directory DIRECTORY, --temporary-directory DIRECTORY',
- (u' Path to the directory that should be used to '
- u'store'),
- u' temporary files created during analysis.',
- u' --worker-memory-limit SIZE, --worker_memory_limit SIZE',
- (u' Maximum amount of memory a worker process is '
- u'allowed'),
- u' to consume. [defaults to 2 GiB]',
- u''])
-
- # TODO: add test for _FormatStatusTableRow.
- # TODO: add test for _ParseAnalysisPluginOptions.
- # TODO: add test for _ParseProcessingOptions.
- # TODO: add test for _ParseFilterOptions.
- # TODO: add test for _ParseInformationalOptions.
- # TODO: add test for _ParseLanguageOptions.
- # TODO: add test for _PrintStatusHeader.
- # TODO: add test for _PrintStatusUpdate.
- # TODO: add test for _PrintStatusUpdateStream.
- # TODO: add test for _PromptUserForInput.
-
- def testAddAnalysisPluginOptions(self):
- """Tests the AddAnalysisPluginOptions function."""
- argument_parser = argparse.ArgumentParser(
- prog=u'psort_test.py',
- description=u'Test argument parser.', add_help=False,
- formatter_class=cli_test_lib.SortedArgumentsHelpFormatter)
-
- test_tool = psort.PsortTool()
- test_tool.AddAnalysisPluginOptions(argument_parser, [])
-
- output = self._RunArgparseFormatHelp(argument_parser)
- self.assertEqual(output, self._EXPECTED_ANALYSIS_PLUGIN_OPTIONS)
-
- def testAddFilterOptions(self):
- """Tests the AddFilterOptions function."""
- argument_parser = argparse.ArgumentParser(
- prog=u'psort_test.py',
- description=u'Test argument parser.', add_help=False,
- formatter_class=cli_test_lib.SortedArgumentsHelpFormatter)
-
- test_tool = psort.PsortTool()
- test_tool.AddFilterOptions(argument_parser)
-
- output = self._RunArgparseFormatHelp(argument_parser)
- self.assertEqual(output, self._EXPECTED_FILTER_OPTIONS)
-
- def testAddLanguageOptions(self):
- """Tests the AddLanguageOptions function."""
- argument_parser = argparse.ArgumentParser(
- prog=u'psort_test.py',
- description=u'Test argument parser.', add_help=False,
- formatter_class=cli_test_lib.SortedArgumentsHelpFormatter)
-
- test_tool = psort.PsortTool()
- test_tool.AddLanguageOptions(argument_parser)
-
- output = self._RunArgparseFormatHelp(argument_parser)
- self.assertEqual(output, self._EXPECTED_LANGUAGE_OPTIONS)
-
- def testAddOutputModuleOptions(self):
- """Tests the AddOutputModuleOptions function."""
- argument_parser = argparse.ArgumentParser(
- prog=u'psort_test.py',
- description=u'Test argument parser.', add_help=False,
- formatter_class=cli_test_lib.SortedArgumentsHelpFormatter)
-
- test_tool = psort.PsortTool()
- test_tool.AddOutputModuleOptions(argument_parser, [u'dynamic'])
-
- output = self._RunArgparseFormatHelp(argument_parser)
- self.assertEqual(output, self._EXPECTED_OUTPUT_MODULE_OPTIONS)
-
- def testAddProcessingOptions(self):
- """Tests the AddProcessingOptions function."""
- argument_parser = argparse.ArgumentParser(
- prog=u'psort_test.py',
- description=u'Test argument parser.', add_help=False,
- formatter_class=cli_test_lib.SortedArgumentsHelpFormatter)
-
- test_tool = psort.PsortTool()
- test_tool.AddProcessingOptions(argument_parser)
-
- output = self._RunArgparseFormatHelp(argument_parser)
- self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS)
-
- def testListAnalysisPlugins(self):
- """Tests the ListAnalysisPlugins function."""
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(output_writer=output_writer)
-
- test_tool.ListAnalysisPlugins()
-
- output = output_writer.ReadOutput()
-
- number_of_tables = 0
- lines = []
- for line in output.split(b'\n'):
- line = line.strip()
- lines.append(line)
-
- if line.startswith(b'*****') and line.endswith(b'*****'):
- number_of_tables += 1
-
- self.assertIn(u'Analysis Plugins', lines[1])
-
- lines = frozenset(lines)
-
- self.assertEqual(number_of_tables, 1)
-
- expected_line = (
- b'browser_search : Analyze browser search entries from events.')
- self.assertIn(expected_line, lines)
-
- def testListLanguageIdentifiers(self):
- """Tests the ListLanguageIdentifiers function."""
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(output_writer=output_writer)
-
- test_tool.ListLanguageIdentifiers()
-
- output = output_writer.ReadOutput()
-
- number_of_tables = 0
- lines = []
- for line in output.split(b'\n'):
- line = line.strip()
- lines.append(line)
-
- if line.startswith(b'*****') and line.endswith(b'*****'):
- number_of_tables += 1
-
- self.assertIn(u'Language identifiers', lines[1])
-
- lines = frozenset(lines)
-
- self.assertEqual(number_of_tables, 1)
-
- expected_line = b'en : English'
- self.assertIn(expected_line, lines)
-
- def testListOutputModules(self):
- """Tests the ListOutputModules function."""
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(output_writer=output_writer)
-
- test_tool.ListOutputModules()
-
- output = output_writer.ReadOutput()
-
- number_of_tables = 0
- lines = []
- for line in output.split(b'\n'):
- line = line.strip()
- lines.append(line)
-
- if line.startswith(b'*****') and line.endswith(b'*****'):
- number_of_tables += 1
-
- self.assertIn(u'Output Modules', lines[1])
-
- # pylint: disable=protected-access
- lines = frozenset(lines)
- disabled_outputs = list(test_tool._front_end.GetDisabledOutputClasses())
- enabled_outputs = list(test_tool._front_end.GetOutputClasses())
-
- expected_number_of_tables = 0
- if disabled_outputs:
- expected_number_of_tables += 1
- if enabled_outputs:
- expected_number_of_tables += 1
-
- self.assertEqual(number_of_tables, expected_number_of_tables)
- expected_line = b'rawpy : "raw" (or native) Python output.'
- self.assertIn(expected_line, lines)
-
- def testParseArguments(self):
- """Tests the ParseArguments function."""
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(output_writer=output_writer)
-
- result = test_tool.ParseArguments()
- self.assertFalse(result)
-
- # TODO: check output.
- # TODO: improve test coverage.
-
- def testParseOptions(self):
- """Tests the ParseOptions function."""
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(output_writer=output_writer)
-
- options = cli_test_lib.TestOptions()
- options.output_format = u'null'
- options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso'])
-
- test_tool.ParseOptions(options)
-
- options = cli_test_lib.TestOptions()
-
- with self.assertRaises(errors.BadConfigOption):
- test_tool.ParseOptions(options)
-
- options = cli_test_lib.TestOptions()
- options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso'])
-
- with self.assertRaises(errors.BadConfigOption):
- test_tool.ParseOptions(options)
-
- # TODO: improve test coverage.
-
- def testProcessStorageWithMissingParameters(self):
- """Test the ProcessStorage function with half-configured output module."""
- input_reader = TestInputReader()
- output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8')
- test_tool = psort.PsortTool(
- input_reader=input_reader, output_writer=output_writer)
-
- options = cli_test_lib.TestOptions()
- options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso'])
- options.output_format = u'test_missing'
-
- output_manager.OutputManager.RegisterOutput(
- TestOutputModuleMissingParameters)
- helpers_manager.ArgumentHelperManager.RegisterHelper(
- TestOutputModuleArgumentHelper)
-
- lines = []
- with shared_test_lib.TempDirectory() as temp_directory:
- temp_file_name = os.path.join(temp_directory, u'output.txt')
- options.write = temp_file_name
-
- test_tool.ParseOptions(options)
- test_tool.ProcessStorage()
-
- with open(temp_file_name, 'rb') as file_object:
- for line in file_object.readlines():
- lines.append(line.strip())
-
- self.assertTrue(input_reader.read_called)
- self.assertEqual(TestOutputModuleMissingParameters.missing, u'foobar')
- self.assertEqual(TestOutputModuleMissingParameters.parameters, u'foobar')
-
- expected_line = u'FILE/OS ctime OS:/tmp/test/test_data/syslog Type: file'
- self.assertIn(expected_line, lines)
-
- output_manager.OutputManager.DeregisterOutput(
- TestOutputModuleMissingParameters)
- helpers_manager.ArgumentHelperManager.DeregisterHelper(
- TestOutputModuleArgumentHelper)
-
-
-if __name__ == '__main__':
- unittest.main()
« plaso/frontend/extraction_frontend.py ('K') | « tools/psort.py ('k') | tools/psteal.py » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b