LEFT | RIGHT |
(no file at all) | |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the psort multi-processing engine.""" | 3 """Tests for the psort multi-processing engine.""" |
4 | 4 |
5 import os | 5 import os |
6 import shutil | 6 import shutil |
7 import unittest | 7 import unittest |
8 | 8 |
9 from plaso.analysis import interface as analysis_interface | 9 from plaso.analysis import interface as analysis_interface |
10 from plaso.analysis import tagging | 10 from plaso.analysis import tagging |
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
292 formatter_mediator = formatters_mediator.FormatterMediator() | 292 formatter_mediator = formatters_mediator.FormatterMediator() |
293 formatter_mediator.SetPreferredLanguageIdentifier(u'en-US') | 293 formatter_mediator.SetPreferredLanguageIdentifier(u'en-US') |
294 | 294 |
295 output_mediator_object = output_mediator.OutputMediator( | 295 output_mediator_object = output_mediator.OutputMediator( |
296 knowledge_base_object, formatter_mediator) | 296 knowledge_base_object, formatter_mediator) |
297 | 297 |
298 output_module = null.NullOutputModule(output_mediator_object) | 298 output_module = null.NullOutputModule(output_mediator_object) |
299 | 299 |
300 data_location = u'' | 300 data_location = u'' |
301 analysis_plugin = tagging.TaggingAnalysisPlugin() | 301 analysis_plugin = tagging.TaggingAnalysisPlugin() |
| 302 analysis_plugins = {u'tagging': analysis_plugin} |
302 # TODO: set tag file. | 303 # TODO: set tag file. |
303 | 304 |
304 test_engine = psort.PsortMultiProcessEngine() | 305 test_engine = psort.PsortMultiProcessEngine() |
305 | 306 |
306 with shared_test_lib.TempDirectory() as temp_directory: | 307 with shared_test_lib.TempDirectory() as temp_directory: |
307 temp_file = os.path.join(temp_directory, u'storage.plaso') | 308 temp_file = os.path.join(temp_directory, u'storage.plaso') |
308 shutil.copyfile(storage_file_path, temp_file) | 309 shutil.copyfile(storage_file_path, temp_file) |
309 | 310 |
310 storage_writer = storage_zip_file.ZIPStorageFileWriter( | 311 storage_writer = storage_zip_file.ZIPStorageFileWriter( |
311 session, temp_file) | 312 session, temp_file) |
312 | 313 |
313 counter = test_engine.AnalyzeEvents( | 314 counter = test_engine.AnalyzeEvents( |
314 knowledge_base_object, storage_writer, output_module, data_location, | 315 knowledge_base_object, storage_writer, output_module, data_location, |
315 [analysis_plugin]) | 316 analysis_plugins) |
316 | 317 |
317 # TODO: assert if tests were successful. | 318 # TODO: assert if tests were successful. |
318 _ = counter | 319 _ = counter |
319 | 320 |
320 test_filter = filters_test_lib.TestEventFilter() | 321 test_filter = filters_test_lib.TestEventFilter() |
321 | 322 |
322 with shared_test_lib.TempDirectory() as temp_directory: | 323 with shared_test_lib.TempDirectory() as temp_directory: |
323 temp_file = os.path.join(temp_directory, u'storage.plaso') | 324 temp_file = os.path.join(temp_directory, u'storage.plaso') |
324 shutil.copyfile(storage_file_path, temp_file) | 325 shutil.copyfile(storage_file_path, temp_file) |
325 | 326 |
326 storage_writer = storage_zip_file.ZIPStorageFileWriter( | 327 storage_writer = storage_zip_file.ZIPStorageFileWriter( |
327 session, temp_file) | 328 session, temp_file) |
328 | 329 |
329 counter = test_engine.AnalyzeEvents( | 330 counter = test_engine.AnalyzeEvents( |
330 knowledge_base_object, storage_writer, data_location, | 331 knowledge_base_object, storage_writer, data_location, |
331 [analysis_plugin], event_filter=test_filter) | 332 analysis_plugins, event_filter=test_filter) |
332 | 333 |
333 # TODO: assert if tests were successful. | 334 # TODO: assert if tests were successful. |
334 _ = counter | 335 _ = counter |
335 | 336 |
336 # TODO: add bogus data location test. | 337 # TODO: add bogus data location test. |
337 | 338 |
338 @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso']) | 339 @shared_test_lib.skipUnlessHasTestFile([u'psort_test.json.plaso']) |
339 def testExportEvents(self): | 340 def testExportEvents(self): |
340 """Tests the ExportEvents function.""" | 341 """Tests the ExportEvents function.""" |
341 storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso']) | 342 storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso']) |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
374 u'Log File,' | 375 u'Log File,' |
375 u'[---] last message repeated 5 times ---,' | 376 u'[---] last message repeated 5 times ---,' |
376 u'syslog,' | 377 u'syslog,' |
377 u'OS:/tmp/test/test_data/syslog,' | 378 u'OS:/tmp/test/test_data/syslog,' |
378 u'repeated') | 379 u'repeated') |
379 self.assertEqual(lines[14], expected_line) | 380 self.assertEqual(lines[14], expected_line) |
380 | 381 |
381 | 382 |
382 if __name__ == '__main__': | 383 if __name__ == '__main__': |
383 unittest.main() | 384 unittest.main() |
LEFT | RIGHT |