Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1161)

Unified Diff: tests/multi_processing/test_lib.py

Issue 328870043: [plaso] Worked on worker process test coverage (Closed)
Patch Set: Worked on analysis process test coverage Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tests/multi_processing/test_lib.py
diff --git a/tests/multi_processing/test_lib.py b/tests/multi_processing/test_lib.py
new file mode 100644
index 0000000000000000000000000000000000000000..1b2aac73fb847f10cc74880f7f78463c11df7234
--- /dev/null
+++ b/tests/multi_processing/test_lib.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+"""Multi-processing related functions and classes for testing."""
+
+from plaso.engine import knowledge_base
+from plaso.parsers import mediator as parsers_mediator
+from plaso.storage import fake_storage
+
+from tests import test_lib as shared_test_lib
+
+
+class MultiProcessingTestCase(shared_test_lib.BaseTestCase):
+ """Multi-processing test case."""
+
+ def _CreateKnowledgeBase(self, knowledge_base_values=None, timezone=u'UTC'):
+ """Creates a knowledge base.
+
+ Args:
+ knowledge_base_values (Optional[dict]): knowledge base values.
+ timezone (str): timezone.
+
+ Returns:
+ KnowledgeBase: knowledge base.
+ """
+ knowledge_base_object = knowledge_base.KnowledgeBase()
+ if knowledge_base_values:
+ for identifier, value in iter(knowledge_base_values.items()):
+ knowledge_base_object.SetValue(identifier, value)
+
+ knowledge_base_object.SetTimeZone(timezone)
+
+ return knowledge_base_object
+
+ def _CreateParserMediator(
+ self, storage_writer, knowledge_base_object, file_entry=None,
+ parser_chain=None):
+ """Creates a parser mediator.
+
+ Args:
+ storage_writer (StorageWriter): storage writer.
+ knowledge_base_object (KnowledgeBase): knowledge base.
+ file_entry (Optional[dfvfs.FileEntry]): file entry object being parsed.
+ parser_chain (Optional[str]): parsing chain up to this point.
+
+ Returns:
+ ParserMediator: parser mediator.
+ """
+ parser_mediator = parsers_mediator.ParserMediator(
+ storage_writer, knowledge_base_object)
+
+ if file_entry:
+ parser_mediator.SetFileEntry(file_entry)
+
+ if parser_chain:
+ parser_mediator.parser_chain = parser_chain
+
+ return parser_mediator
+
+ def _CreateStorageWriter(self, session):
+ """Creates a storage writer object.
+
+ Args:
+ session (Session): session.
+
+ Returns:
+ FakeStorageWriter: storage writer.
+ """
+ storage_writer = fake_storage.FakeStorageWriter(session)
+ storage_writer.Open()
+ return storage_writer

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b