OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the psort CLI tool.""" | 3 """Tests for the psort CLI tool.""" |
4 | 4 |
5 import argparse | 5 import argparse |
6 import os | 6 import os |
7 import unittest | 7 import unittest |
8 | 8 |
| 9 from plaso.cli import psort_tool |
9 from plaso.cli.helpers import interface as helpers_interface | 10 from plaso.cli.helpers import interface as helpers_interface |
10 from plaso.cli.helpers import manager as helpers_manager | 11 from plaso.cli.helpers import manager as helpers_manager |
11 from plaso.lib import errors | 12 from plaso.lib import errors |
12 from plaso.output import manager as output_manager | 13 from plaso.output import manager as output_manager |
13 | 14 |
14 from tests import test_lib as shared_test_lib | 15 from tests import test_lib as shared_test_lib |
15 from tests.cli import test_lib as cli_test_lib | 16 from tests.cli import test_lib |
16 from tests.multi_processing import psort as psort_test | 17 from tests.multi_processing import psort as psort_test |
17 | 18 |
18 from tools import psort | |
19 | |
20 | 19 |
21 class TestInputReader(object): | 20 class TestInputReader(object): |
22 """Test input reader.""" | 21 """Test input reader.""" |
23 | 22 |
24 def __init__(self): | 23 def __init__(self): |
25 """Initialize the reader.""" | 24 """Initialize the reader.""" |
26 super(TestInputReader, self).__init__() | 25 super(TestInputReader, self).__init__() |
27 self.read_called = False | 26 self.read_called = False |
28 | 27 |
29 def Read(self): | 28 def Read(self): |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
78 missing_parameters.append(u'parameters') | 77 missing_parameters.append(u'parameters') |
79 | 78 |
80 return missing_parameters | 79 return missing_parameters |
81 | 80 |
82 @classmethod | 81 @classmethod |
83 def SetMissingValue(cls, attribute, value): | 82 def SetMissingValue(cls, attribute, value): |
84 """Set missing value.""" | 83 """Set missing value.""" |
85 setattr(cls, attribute, value) | 84 setattr(cls, attribute, value) |
86 | 85 |
87 | 86 |
88 class PsortToolTest(cli_test_lib.CLIToolTestCase): | 87 class PsortToolTest(test_lib.CLIToolTestCase): |
89 """Tests for the psort tool.""" | 88 """Tests for the psort tool.""" |
90 | 89 |
91 _EXPECTED_ANALYSIS_PLUGIN_OPTIONS = u'\n'.join([ | 90 _EXPECTED_ANALYSIS_PLUGIN_OPTIONS = u'\n'.join([ |
92 u'usage: psort_test.py [--nsrlsvr-hash HASH] [--nsrlsvr-host HOST]', | 91 u'usage: psort_test.py [--nsrlsvr-hash HASH] [--nsrlsvr-host HOST]', |
93 (u' [--nsrlsvr-port PORT] ' | 92 (u' [--nsrlsvr-port PORT] ' |
94 u'[--tagging-file TAGGING_FILE]'), | 93 u'[--tagging-file TAGGING_FILE]'), |
95 u' [--viper-hash HASH] [--viper-host HOST]', | 94 u' [--viper-hash HASH] [--viper-host HOST]', |
96 u' [--viper-port PORT] [--viper-protocol PROTOCOL]', | 95 u' [--viper-port PORT] [--viper-protocol PROTOCOL]', |
97 u' [--virustotal-api-key API_KEY]', | 96 u' [--virustotal-api-key API_KEY]', |
98 (u' [--virustotal-free-rate-limit] ' | 97 (u' [--virustotal-free-rate-limit] ' |
(...skipping 172 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
271 # TODO: add test for _PrintStatusHeader. | 270 # TODO: add test for _PrintStatusHeader. |
272 # TODO: add test for _PrintStatusUpdate. | 271 # TODO: add test for _PrintStatusUpdate. |
273 # TODO: add test for _PrintStatusUpdateStream. | 272 # TODO: add test for _PrintStatusUpdateStream. |
274 # TODO: add test for _PromptUserForInput. | 273 # TODO: add test for _PromptUserForInput. |
275 | 274 |
276 def testAddAnalysisPluginOptions(self): | 275 def testAddAnalysisPluginOptions(self): |
277 """Tests the AddAnalysisPluginOptions function.""" | 276 """Tests the AddAnalysisPluginOptions function.""" |
278 argument_parser = argparse.ArgumentParser( | 277 argument_parser = argparse.ArgumentParser( |
279 prog=u'psort_test.py', | 278 prog=u'psort_test.py', |
280 description=u'Test argument parser.', add_help=False, | 279 description=u'Test argument parser.', add_help=False, |
281 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 280 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
282 | 281 |
283 test_tool = psort.PsortTool() | 282 test_tool = psort_tool.PsortTool() |
284 test_tool.AddAnalysisPluginOptions(argument_parser, []) | 283 test_tool.AddAnalysisPluginOptions(argument_parser, []) |
285 | 284 |
286 output = self._RunArgparseFormatHelp(argument_parser) | 285 output = self._RunArgparseFormatHelp(argument_parser) |
287 self.assertEqual(output, self._EXPECTED_ANALYSIS_PLUGIN_OPTIONS) | 286 self.assertEqual(output, self._EXPECTED_ANALYSIS_PLUGIN_OPTIONS) |
288 | 287 |
289 def testAddFilterOptions(self): | 288 def testAddFilterOptions(self): |
290 """Tests the AddFilterOptions function.""" | 289 """Tests the AddFilterOptions function.""" |
291 argument_parser = argparse.ArgumentParser( | 290 argument_parser = argparse.ArgumentParser( |
292 prog=u'psort_test.py', | 291 prog=u'psort_test.py', |
293 description=u'Test argument parser.', add_help=False, | 292 description=u'Test argument parser.', add_help=False, |
294 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 293 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
295 | 294 |
296 test_tool = psort.PsortTool() | 295 test_tool = psort_tool.PsortTool() |
297 test_tool.AddFilterOptions(argument_parser) | 296 test_tool.AddFilterOptions(argument_parser) |
298 | 297 |
299 output = self._RunArgparseFormatHelp(argument_parser) | 298 output = self._RunArgparseFormatHelp(argument_parser) |
300 self.assertEqual(output, self._EXPECTED_FILTER_OPTIONS) | 299 self.assertEqual(output, self._EXPECTED_FILTER_OPTIONS) |
301 | 300 |
302 def testAddLanguageOptions(self): | 301 def testAddLanguageOptions(self): |
303 """Tests the AddLanguageOptions function.""" | 302 """Tests the AddLanguageOptions function.""" |
304 argument_parser = argparse.ArgumentParser( | 303 argument_parser = argparse.ArgumentParser( |
305 prog=u'psort_test.py', | 304 prog=u'psort_test.py', |
306 description=u'Test argument parser.', add_help=False, | 305 description=u'Test argument parser.', add_help=False, |
307 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 306 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
308 | 307 |
309 test_tool = psort.PsortTool() | 308 test_tool = psort_tool.PsortTool() |
310 test_tool.AddLanguageOptions(argument_parser) | 309 test_tool.AddLanguageOptions(argument_parser) |
311 | 310 |
312 output = self._RunArgparseFormatHelp(argument_parser) | 311 output = self._RunArgparseFormatHelp(argument_parser) |
313 self.assertEqual(output, self._EXPECTED_LANGUAGE_OPTIONS) | 312 self.assertEqual(output, self._EXPECTED_LANGUAGE_OPTIONS) |
314 | 313 |
315 def testAddOutputModuleOptions(self): | 314 def testAddOutputModuleOptions(self): |
316 """Tests the AddOutputModuleOptions function.""" | 315 """Tests the AddOutputModuleOptions function.""" |
317 argument_parser = argparse.ArgumentParser( | 316 argument_parser = argparse.ArgumentParser( |
318 prog=u'psort_test.py', | 317 prog=u'psort_test.py', |
319 description=u'Test argument parser.', add_help=False, | 318 description=u'Test argument parser.', add_help=False, |
320 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 319 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
321 | 320 |
322 test_tool = psort.PsortTool() | 321 test_tool = psort_tool.PsortTool() |
323 test_tool.AddOutputModuleOptions(argument_parser, [u'dynamic']) | 322 test_tool.AddOutputModuleOptions(argument_parser, [u'dynamic']) |
324 | 323 |
325 output = self._RunArgparseFormatHelp(argument_parser) | 324 output = self._RunArgparseFormatHelp(argument_parser) |
326 self.assertEqual(output, self._EXPECTED_OUTPUT_MODULE_OPTIONS) | 325 self.assertEqual(output, self._EXPECTED_OUTPUT_MODULE_OPTIONS) |
327 | 326 |
328 def testAddProcessingOptions(self): | 327 def testAddProcessingOptions(self): |
329 """Tests the AddProcessingOptions function.""" | 328 """Tests the AddProcessingOptions function.""" |
330 argument_parser = argparse.ArgumentParser( | 329 argument_parser = argparse.ArgumentParser( |
331 prog=u'psort_test.py', | 330 prog=u'psort_test.py', |
332 description=u'Test argument parser.', add_help=False, | 331 description=u'Test argument parser.', add_help=False, |
333 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 332 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
334 | 333 |
335 test_tool = psort.PsortTool() | 334 test_tool = psort_tool.PsortTool() |
336 test_tool.AddProcessingOptions(argument_parser) | 335 test_tool.AddProcessingOptions(argument_parser) |
337 | 336 |
338 output = self._RunArgparseFormatHelp(argument_parser) | 337 output = self._RunArgparseFormatHelp(argument_parser) |
339 self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS) | 338 self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS) |
340 | 339 |
341 def testListAnalysisPlugins(self): | 340 def testListAnalysisPlugins(self): |
342 """Tests the ListAnalysisPlugins function.""" | 341 """Tests the ListAnalysisPlugins function.""" |
343 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 342 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
344 test_tool = psort.PsortTool(output_writer=output_writer) | 343 test_tool = psort_tool.PsortTool(output_writer=output_writer) |
345 | 344 |
346 test_tool.ListAnalysisPlugins() | 345 test_tool.ListAnalysisPlugins() |
347 | 346 |
348 output = output_writer.ReadOutput() | 347 output = output_writer.ReadOutput() |
349 | 348 |
350 number_of_tables = 0 | 349 number_of_tables = 0 |
351 lines = [] | 350 lines = [] |
352 for line in output.split(b'\n'): | 351 for line in output.split(b'\n'): |
353 line = line.strip() | 352 line = line.strip() |
354 lines.append(line) | 353 lines.append(line) |
355 | 354 |
356 if line.startswith(b'*****') and line.endswith(b'*****'): | 355 if line.startswith(b'*****') and line.endswith(b'*****'): |
357 number_of_tables += 1 | 356 number_of_tables += 1 |
358 | 357 |
359 self.assertIn(u'Analysis Plugins', lines[1]) | 358 self.assertIn(u'Analysis Plugins', lines[1]) |
360 | 359 |
361 lines = frozenset(lines) | 360 lines = frozenset(lines) |
362 | 361 |
363 self.assertEqual(number_of_tables, 1) | 362 self.assertEqual(number_of_tables, 1) |
364 | 363 |
365 expected_line = ( | 364 expected_line = ( |
366 b'browser_search : Analyze browser search entries from events.') | 365 b'browser_search : Analyze browser search entries from events.') |
367 self.assertIn(expected_line, lines) | 366 self.assertIn(expected_line, lines) |
368 | 367 |
369 def testListLanguageIdentifiers(self): | 368 def testListLanguageIdentifiers(self): |
370 """Tests the ListLanguageIdentifiers function.""" | 369 """Tests the ListLanguageIdentifiers function.""" |
371 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 370 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
372 test_tool = psort.PsortTool(output_writer=output_writer) | 371 test_tool = psort_tool.PsortTool(output_writer=output_writer) |
373 | 372 |
374 test_tool.ListLanguageIdentifiers() | 373 test_tool.ListLanguageIdentifiers() |
375 | 374 |
376 output = output_writer.ReadOutput() | 375 output = output_writer.ReadOutput() |
377 | 376 |
378 number_of_tables = 0 | 377 number_of_tables = 0 |
379 lines = [] | 378 lines = [] |
380 for line in output.split(b'\n'): | 379 for line in output.split(b'\n'): |
381 line = line.strip() | 380 line = line.strip() |
382 lines.append(line) | 381 lines.append(line) |
383 | 382 |
384 if line.startswith(b'*****') and line.endswith(b'*****'): | 383 if line.startswith(b'*****') and line.endswith(b'*****'): |
385 number_of_tables += 1 | 384 number_of_tables += 1 |
386 | 385 |
387 self.assertIn(u'Language identifiers', lines[1]) | 386 self.assertIn(u'Language identifiers', lines[1]) |
388 | 387 |
389 lines = frozenset(lines) | 388 lines = frozenset(lines) |
390 | 389 |
391 self.assertEqual(number_of_tables, 1) | 390 self.assertEqual(number_of_tables, 1) |
392 | 391 |
393 expected_line = b'en : English' | 392 expected_line = b'en : English' |
394 self.assertIn(expected_line, lines) | 393 self.assertIn(expected_line, lines) |
395 | 394 |
396 def testListOutputModules(self): | 395 def testListOutputModules(self): |
397 """Tests the ListOutputModules function.""" | 396 """Tests the ListOutputModules function.""" |
398 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 397 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
399 test_tool = psort.PsortTool(output_writer=output_writer) | 398 test_tool = psort_tool.PsortTool(output_writer=output_writer) |
400 | 399 |
401 test_tool.ListOutputModules() | 400 test_tool.ListOutputModules() |
402 | 401 |
403 output = output_writer.ReadOutput() | 402 output = output_writer.ReadOutput() |
404 | 403 |
405 number_of_tables = 0 | 404 number_of_tables = 0 |
406 lines = [] | 405 lines = [] |
407 for line in output.split(b'\n'): | 406 for line in output.split(b'\n'): |
408 line = line.strip() | 407 line = line.strip() |
409 lines.append(line) | 408 lines.append(line) |
(...skipping 13 matching lines...) Expand all Loading... |
423 expected_number_of_tables += 1 | 422 expected_number_of_tables += 1 |
424 if enabled_outputs: | 423 if enabled_outputs: |
425 expected_number_of_tables += 1 | 424 expected_number_of_tables += 1 |
426 | 425 |
427 self.assertEqual(number_of_tables, expected_number_of_tables) | 426 self.assertEqual(number_of_tables, expected_number_of_tables) |
428 expected_line = b'rawpy : "raw" (or native) Python output.' | 427 expected_line = b'rawpy : "raw" (or native) Python output.' |
429 self.assertIn(expected_line, lines) | 428 self.assertIn(expected_line, lines) |
430 | 429 |
431 def testParseArguments(self): | 430 def testParseArguments(self): |
432 """Tests the ParseArguments function.""" | 431 """Tests the ParseArguments function.""" |
433 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 432 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
434 test_tool = psort.PsortTool(output_writer=output_writer) | 433 test_tool = psort_tool.PsortTool(output_writer=output_writer) |
435 | 434 |
436 result = test_tool.ParseArguments() | 435 result = test_tool.ParseArguments() |
437 self.assertFalse(result) | 436 self.assertFalse(result) |
438 | 437 |
439 # TODO: check output. | 438 # TODO: check output. |
440 # TODO: improve test coverage. | 439 # TODO: improve test coverage. |
441 | 440 |
442 def testParseOptions(self): | 441 def testParseOptions(self): |
443 """Tests the ParseOptions function.""" | 442 """Tests the ParseOptions function.""" |
444 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 443 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
445 test_tool = psort.PsortTool(output_writer=output_writer) | 444 test_tool = psort_tool.PsortTool(output_writer=output_writer) |
446 | 445 |
447 options = cli_test_lib.TestOptions() | 446 options = test_lib.TestOptions() |
448 options.output_format = u'null' | 447 options.output_format = u'null' |
449 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 448 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
450 | 449 |
451 test_tool.ParseOptions(options) | 450 test_tool.ParseOptions(options) |
452 | 451 |
453 options = cli_test_lib.TestOptions() | 452 options = test_lib.TestOptions() |
454 | 453 |
455 with self.assertRaises(errors.BadConfigOption): | 454 with self.assertRaises(errors.BadConfigOption): |
456 test_tool.ParseOptions(options) | 455 test_tool.ParseOptions(options) |
457 | 456 |
458 options = cli_test_lib.TestOptions() | 457 options = test_lib.TestOptions() |
459 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 458 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
460 | 459 |
461 with self.assertRaises(errors.BadConfigOption): | 460 with self.assertRaises(errors.BadConfigOption): |
462 test_tool.ParseOptions(options) | 461 test_tool.ParseOptions(options) |
463 | 462 |
464 # TODO: improve test coverage. | 463 # TODO: improve test coverage. |
465 | 464 |
466 def testProcessStorageWithMissingParameters(self): | 465 def testProcessStorageWithMissingParameters(self): |
467 """Test the ProcessStorage function with half-configured output module.""" | 466 """Test the ProcessStorage function with half-configured output module.""" |
468 input_reader = TestInputReader() | 467 input_reader = TestInputReader() |
469 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 468 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
470 test_tool = psort.PsortTool( | 469 test_tool = psort_tool.PsortTool( |
471 input_reader=input_reader, output_writer=output_writer) | 470 input_reader=input_reader, output_writer=output_writer) |
472 | 471 |
473 options = cli_test_lib.TestOptions() | 472 options = test_lib.TestOptions() |
474 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 473 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
475 options.output_format = u'test_missing' | 474 options.output_format = u'test_missing' |
476 | 475 |
477 output_manager.OutputManager.RegisterOutput( | 476 output_manager.OutputManager.RegisterOutput( |
478 TestOutputModuleMissingParameters) | 477 TestOutputModuleMissingParameters) |
479 helpers_manager.ArgumentHelperManager.RegisterHelper( | 478 helpers_manager.ArgumentHelperManager.RegisterHelper( |
480 TestOutputModuleArgumentHelper) | 479 TestOutputModuleArgumentHelper) |
481 | 480 |
482 lines = [] | 481 lines = [] |
483 with shared_test_lib.TempDirectory() as temp_directory: | 482 with shared_test_lib.TempDirectory() as temp_directory: |
(...skipping 15 matching lines...) Expand all Loading... |
499 self.assertIn(expected_line, lines) | 498 self.assertIn(expected_line, lines) |
500 | 499 |
501 output_manager.OutputManager.DeregisterOutput( | 500 output_manager.OutputManager.DeregisterOutput( |
502 TestOutputModuleMissingParameters) | 501 TestOutputModuleMissingParameters) |
503 helpers_manager.ArgumentHelperManager.DeregisterHelper( | 502 helpers_manager.ArgumentHelperManager.DeregisterHelper( |
504 TestOutputModuleArgumentHelper) | 503 TestOutputModuleArgumentHelper) |
505 | 504 |
506 | 505 |
507 if __name__ == '__main__': | 506 if __name__ == '__main__': |
508 unittest.main() | 507 unittest.main() |
OLD | NEW |