LEFT | RIGHT |
(no file at all) | |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the psort CLI tool.""" | 3 """Tests for the psort CLI tool.""" |
4 | 4 |
5 import argparse | 5 import argparse |
6 import os | 6 import os |
7 import unittest | 7 import unittest |
8 | 8 |
9 from plaso.cli import psort_tool | 9 from plaso.cli import psort_tool |
10 from plaso.cli.helpers import interface as helpers_interface | 10 from plaso.cli.helpers import interface as helpers_interface |
11 from plaso.cli.helpers import manager as helpers_manager | 11 from plaso.cli.helpers import manager as helpers_manager |
12 from plaso.lib import errors | 12 from plaso.lib import errors |
| 13 from plaso.output import interface as output_interface |
13 from plaso.output import manager as output_manager | 14 from plaso.output import manager as output_manager |
14 | 15 |
15 from tests import test_lib as shared_test_lib | 16 from tests import test_lib as shared_test_lib |
16 from tests.cli import test_lib | 17 from tests.cli import test_lib |
17 from tests.multi_processing import psort as psort_test | |
18 | 18 |
19 | 19 |
20 class TestInputReader(object): | 20 class TestInputReader(object): |
21 """Test input reader.""" | 21 """Test input reader.""" |
22 | 22 |
23 def __init__(self): | 23 def __init__(self): |
24 """Initialize the reader.""" | 24 """Initialize the reader.""" |
25 super(TestInputReader, self).__init__() | 25 super(TestInputReader, self).__init__() |
26 self.read_called = False | 26 self.read_called = False |
27 | 27 |
(...skipping 23 matching lines...) Expand all Loading... |
51 | 51 |
52 missing = getattr(options, u'missing', None) | 52 missing = getattr(options, u'missing', None) |
53 if missing: | 53 if missing: |
54 output_module.SetMissingValue(u'missing', missing) | 54 output_module.SetMissingValue(u'missing', missing) |
55 | 55 |
56 parameters = getattr(options, u'parameters', None) | 56 parameters = getattr(options, u'parameters', None) |
57 if parameters: | 57 if parameters: |
58 output_module.SetMissingValue(u'parameters', parameters) | 58 output_module.SetMissingValue(u'parameters', parameters) |
59 | 59 |
60 | 60 |
61 class TestOutputModuleMissingParameters(psort_test.TestOutputModule): | 61 class TestOutputModuleMissingParameters(output_interface.LinearOutputModule): |
62 """Test output module that is missing some parameters.""" | 62 """Test output module that is missing some parameters.""" |
63 | 63 |
64 NAME = u'test_missing' | 64 NAME = u'test_missing' |
| 65 |
| 66 _HEADER = ( |
| 67 u'date,time,timezone,MACB,source,sourcetype,type,user,host,' |
| 68 u'short,desc,version,filename,inode,notes,format,extra\n') |
65 | 69 |
66 # For test purpose assign these as class attributes. | 70 # For test purpose assign these as class attributes. |
67 missing = None | 71 missing = None |
68 parameters = None | 72 parameters = None |
69 | 73 |
70 def GetMissingArguments(self): | 74 def GetMissingArguments(self): |
71 """Return a list of missing parameters.""" | 75 """Return a list of missing parameters.""" |
72 missing_parameters = [] | 76 missing_parameters = [] |
73 if self.missing is None: | 77 if self.missing is None: |
74 missing_parameters.append(u'missing') | 78 missing_parameters.append(u'missing') |
75 | 79 |
76 if self.parameters is None: | 80 if self.parameters is None: |
77 missing_parameters.append(u'parameters') | 81 missing_parameters.append(u'parameters') |
78 | 82 |
79 return missing_parameters | 83 return missing_parameters |
80 | 84 |
81 @classmethod | 85 @classmethod |
82 def SetMissingValue(cls, attribute, value): | 86 def SetMissingValue(cls, attribute, value): |
83 """Set missing value.""" | 87 """Set missing value.""" |
84 setattr(cls, attribute, value) | 88 setattr(cls, attribute, value) |
| 89 |
| 90 def WriteEventBody(self, event): |
| 91 """Writes the body of an event object to the output. |
| 92 |
| 93 Args: |
| 94 event (EventObject): event. |
| 95 """ |
| 96 message, _ = self._output_mediator.GetFormattedMessages(event) |
| 97 source_short, source_long = self._output_mediator.GetFormattedSources(event) |
| 98 self._WriteLine(u'{0:s}/{1:s} {2:s}\n'.format( |
| 99 source_short, source_long, message)) |
| 100 |
| 101 def WriteHeader(self): |
| 102 """Writes the header to the output.""" |
| 103 self._WriteLine(self._HEADER) |
85 | 104 |
86 | 105 |
87 class PsortToolTest(test_lib.CLIToolTestCase): | 106 class PsortToolTest(test_lib.CLIToolTestCase): |
88 """Tests for the psort tool.""" | 107 """Tests for the psort tool.""" |
89 | 108 |
90 _EXPECTED_ANALYSIS_PLUGIN_OPTIONS = u'\n'.join([ | 109 _EXPECTED_ANALYSIS_PLUGIN_OPTIONS = u'\n'.join([ |
91 u'usage: psort_test.py [--nsrlsvr-hash HASH] [--nsrlsvr-host HOST]', | 110 u'usage: psort_test.py [--nsrlsvr-hash HASH] [--nsrlsvr-host HOST]', |
92 u' [--nsrlsvr-label LABEL] [--nsrlsvr-port PORT]', | 111 u' [--nsrlsvr-label LABEL] [--nsrlsvr-port PORT]', |
93 u' [--tagging-file TAGGING_FILE] [--viper-hash HASH]', | 112 u' [--tagging-file TAGGING_FILE] [--viper-hash HASH]', |
94 u' [--viper-host HOST] [--viper-port PORT]', | 113 u' [--viper-host HOST] [--viper-port PORT]', |
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 | 482 |
464 options = test_lib.TestOptions() | 483 options = test_lib.TestOptions() |
465 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 484 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
466 | 485 |
467 with self.assertRaises(errors.BadConfigOption): | 486 with self.assertRaises(errors.BadConfigOption): |
468 test_tool.ParseOptions(options) | 487 test_tool.ParseOptions(options) |
469 | 488 |
470 # TODO: improve test coverage. | 489 # TODO: improve test coverage. |
471 | 490 |
472 def testProcessStorageWithMissingParameters(self): | 491 def testProcessStorageWithMissingParameters(self): |
473 """Test the ProcessStorage function with half-configured output module.""" | 492 """Tests the ProcessStorage function with parameters missing.""" |
474 input_reader = TestInputReader() | 493 input_reader = TestInputReader() |
475 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 494 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
476 test_tool = psort_tool.PsortTool( | 495 test_tool = psort_tool.PsortTool( |
477 input_reader=input_reader, output_writer=output_writer) | 496 input_reader=input_reader, output_writer=output_writer) |
478 | 497 |
479 options = test_lib.TestOptions() | 498 options = test_lib.TestOptions() |
480 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 499 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
481 options.output_format = u'test_missing' | 500 options.output_format = u'test_missing' |
482 | 501 |
483 output_manager.OutputManager.RegisterOutput( | 502 output_manager.OutputManager.RegisterOutput( |
(...skipping 10 matching lines...) Expand all Loading... |
494 test_tool.ProcessStorage() | 513 test_tool.ProcessStorage() |
495 | 514 |
496 with open(temp_file_name, 'rb') as file_object: | 515 with open(temp_file_name, 'rb') as file_object: |
497 for line in file_object.readlines(): | 516 for line in file_object.readlines(): |
498 lines.append(line.strip()) | 517 lines.append(line.strip()) |
499 | 518 |
500 self.assertTrue(input_reader.read_called) | 519 self.assertTrue(input_reader.read_called) |
501 self.assertEqual(TestOutputModuleMissingParameters.missing, u'foobar') | 520 self.assertEqual(TestOutputModuleMissingParameters.missing, u'foobar') |
502 self.assertEqual(TestOutputModuleMissingParameters.parameters, u'foobar') | 521 self.assertEqual(TestOutputModuleMissingParameters.parameters, u'foobar') |
503 | 522 |
504 expected_line = u'FILE/OS ctime OS:/tmp/test/test_data/syslog Type: file' | 523 expected_line = ( |
| 524 u'FILE/OS Metadata Modification Time OS:/tmp/test/test_data/syslog ' |
| 525 u'Type: file') |
505 self.assertIn(expected_line, lines) | 526 self.assertIn(expected_line, lines) |
506 | 527 |
507 output_manager.OutputManager.DeregisterOutput( | 528 output_manager.OutputManager.DeregisterOutput( |
508 TestOutputModuleMissingParameters) | 529 TestOutputModuleMissingParameters) |
509 helpers_manager.ArgumentHelperManager.DeregisterHelper( | 530 helpers_manager.ArgumentHelperManager.DeregisterHelper( |
510 TestOutputModuleArgumentHelper) | 531 TestOutputModuleArgumentHelper) |
511 | 532 |
512 | 533 |
513 if __name__ == '__main__': | 534 if __name__ == '__main__': |
514 unittest.main() | 535 unittest.main() |
LEFT | RIGHT |