Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(890)

Unified Diff: tools/psteal.py

Issue 321000043: [plaso] Refactored log2timeline front-end to tool #160 (Closed)
Patch Set: Changes after review Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/psort_test.py ('k') | tools/psteal_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/psteal.py
diff --git a/tools/psteal.py b/tools/psteal.py
old mode 100644
new mode 100755
index 6172bdaccd2226f1b32d8bc7fe29027f14562a7b..35777a09a4ce3cef99f6063209a06c40722792a5
--- a/tools/psteal.py
+++ b/tools/psteal.py
@@ -12,381 +12,19 @@ See additional details here:
https://github.com/log2timeline/plaso/wiki/Using-psteal
"""
-import argparse
-import collections
import multiprocessing
import logging
-import os
import sys
-import textwrap
-from dfvfs.lib import definitions as dfvfs_definitions
-
-# The following import makes sure the filters are registered.
-from plaso.cli import extract_analyze_tool
-from plaso.cli import tools as cli_tools
-from plaso.cli import views as cli_views
-from plaso.cli.helpers import manager as helpers_manager
-from plaso.frontend import log2timeline
-from plaso.frontend import psort
-from plaso.engine import configurations
-from plaso.output import interface as output_interface
+from plaso.cli import psteal_tool
from plaso.lib import errors
-class PstealTool(extract_analyze_tool.ExtractionAndAnalysisTool):
- """Implements the psteal CLI tool.
-
- Psteal extract events from the provided source and stores them in an
- intermediate storage file. After extraction an output log file is created.
- This mimics the behaviour of the log2timeline.pl.
- The tool currently doesn't support any of the log2timeline or psort tools'
- flags.
-
- Attributes:
- dependencies_check (bool): True if the availability and versions of
- dependencies should be checked.
- list_output_modules (bool): True if information about the output modules
- should be shown.
- """
-
- NAME = u'psteal'
-
- DESCRIPTION = textwrap.dedent(u'\n'.join([
- u'',
- (u'psteal is a command line tool to extract events from individual '),
- u'files, recursing a directory (e.g. mount point) or storage media ',
- u'image or device. The output events will be stored in a storage file.',
- u'This tool will then read the output and process the events into a CSV ',
- u'file.',
- u'',
- u'More information can be gathered from here:',
- u' https://github.com/log2timeline/plaso/wiki/Using-log2timeline',
- u'']))
-
- EPILOG = textwrap.dedent(u'\n'.join([
- u'',
- u'Example usage:',
- u'',
- u'Run the tool against a storage media image (full kitchen sink)',
- u' psteal.py --source ímynd.dd -w imynd.timeline.txt',
- u'',
- u'And that is how you build a timeline using psteal...',
- u'']))
-
- def __init__(self, input_reader=None, output_writer=None):
- """Initializes the CLI tool object.
-
- Args:
- input_reader (Optional[InputReader]): input reader, where None indicates
- that the stdin input reader should be used.
- output_writer (Optional[OutputWriter]): output writer, where None
- indicates that the stdout output writer should be used.
- """
- super(PstealTool, self).__init__(
- input_reader=input_reader, output_writer=output_writer)
- self._analysis_front_end = psort.PsortFrontend()
- self._command_line_arguments = None
- self._deduplicate_events = True
- self._enable_sigsegv_handler = False
- self._extraction_front_end = log2timeline.Log2TimelineFrontend()
- self._force_preprocessing = False
- self._hasher_names_string = None
- self._number_of_extraction_workers = 0
- self._options = None
- self._output_format = u'dynamic'
- self._output_filename = None
- self._output_module = None
- self._parser_filter_expression = None
- self._preferred_year = None
- self._single_process_mode = False
- self._source_type = None
- self._status_view_mode = u'window'
- self._time_slice = None
- self._use_time_slicer = False
- self._yara_rules_string = None
-
- def _DetermineSourceType(self):
- """Determines the source type."""
- scan_context = self.ScanSource()
- self._source_type = scan_context.source_type
-
- if self._source_type == dfvfs_definitions.SOURCE_TYPE_DIRECTORY:
- self._source_type_string = u'directory'
-
- elif self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
- self._source_type_string = u'single file'
-
- elif self._source_type == (
- dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE):
- self._source_type_string = u'storage media device'
-
- elif self._source_type == (
- dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE):
- self._source_type_string = u'storage media image'
-
- else:
- self._source_type_string = u'UNKNOWN'
-
- def _GetStatusUpdateCallback(self):
- """Retrieves the status update callback function.
-
- Returns:
- function: status update callback function or None.
- """
- if self._status_view_mode == u'linear':
- return self._PrintStatusUpdateStream
- elif self._status_view_mode == u'window':
- return self._PrintStatusUpdate
-
- def ExtractEventsFromSources(self):
- """Processes the sources and extract events.
-
- This is a stripped down copy of tools/log2timeline.py that doesn't support
- the full set of flags. The defaults for these are hard coded in the
- constructor of this class.
-
- Raises:
- SourceScannerError: if the source scanner could not find a supported
- file system.
- UserAbort: if the user initiated an abort.
- """
- self._DetermineSourceType()
-
- self._output_writer.Write(u'\n')
- self._PrintStatusHeader()
-
- self._output_writer.Write(u'Processing started.\n')
-
- status_update_callback = self._GetStatusUpdateCallback()
-
- session = self._extraction_front_end.CreateSession(
- command_line_arguments=self._command_line_arguments,
- filter_file=self._filter_file,
- preferred_encoding=self.preferred_encoding,
- preferred_time_zone=self._preferred_time_zone,
- preferred_year=self._preferred_year)
-
- storage_writer = self._extraction_front_end.CreateStorageWriter(
- session, self._storage_file_path)
- # TODO: handle errors.BadConfigOption
-
- # TODO: pass preferred_encoding.
- configuration = configurations.ProcessingConfiguration()
- configuration.credentials = self._credential_configurations
- configuration.debug_output = self._debug_mode
- configuration.extraction.hasher_names_string = self._hasher_names_string
- configuration.extraction.yara_rules_string = self._yara_rules_string
- configuration.filter_file = self._filter_file
- configuration.parser_filter_expression = self._parser_filter_expression
- configuration.preferred_year = self._preferred_year
-
- processing_status = self._extraction_front_end.ProcessSources(
- session, storage_writer, self._source_path_specs, self._source_type,
- configuration, enable_sigsegv_handler=self._enable_sigsegv_handler,
- force_preprocessing=self._force_preprocessing,
- number_of_extraction_workers=self._number_of_extraction_workers,
- single_process_mode=self._single_process_mode,
- status_update_callback=status_update_callback)
-
- if not processing_status:
- self._output_writer.Write(
- u'WARNING: missing processing status information.\n')
-
- elif not processing_status.aborted:
- if processing_status.error_path_specs:
- self._output_writer.Write(u'Processing completed with errors.\n')
- else:
- self._output_writer.Write(u'Processing completed.\n')
-
- number_of_errors = (
- processing_status.foreman_status.number_of_produced_errors)
- if number_of_errors:
- output_text = u'\n'.join([
- u'',
- (u'Number of errors encountered while extracting events: '
- u'{0:d}.').format(number_of_errors),
- u'',
- u'Use pinfo to inspect errors in more detail.',
- u''])
- self._output_writer.Write(output_text)
-
- if processing_status.error_path_specs:
- output_text = u'\n'.join([
- u'',
- u'Path specifications that could not be processed:',
- u''])
- self._output_writer.Write(output_text)
- for path_spec in processing_status.error_path_specs:
- self._output_writer.Write(path_spec.comparable)
- self._output_writer.Write(u'\n')
-
- self._output_writer.Write(u'\n')
-
- def _CreateOutputModule(self):
- """Creates a default output module
-
- Raises:
- BadConfigOption: when the output_filename already exists or hasn't been
- set.
- """
- self._output_module = self._analysis_front_end.CreateOutputModule(
- self._output_format, preferred_encoding=self.preferred_encoding,
- timezone=self._preferred_time_zone)
-
- if isinstance(self._output_module, output_interface.LinearOutputModule):
- if not self._output_filename:
- raise errors.BadConfigOption(
- u'Output format: {0:s} requires an output file.'.format(
- self._output_format))
-
- if self._output_filename and os.path.exists(self._output_filename):
- raise errors.BadConfigOption(
- u'Output file already exists: {0:s}. Aborting.'.format(
- self._output_filename))
-
- output_file_object = open(self._output_filename, u'wb')
- output_writer = cli_tools.FileObjectOutputWriter(output_file_object)
-
- self._output_module.SetOutputWriter(output_writer)
-
- def AnalyzeEvents(self):
- """Analyzes events from a plaso storage file and generate a report.
-
- Raises:
- BadConfigOption: when a configuration parameter fails validation.
- RuntimeError: if a non-recoverable situation is encountered.
- """
- helpers_manager.ArgumentHelperManager.ParseOptions(
- self._options, self._output_module)
-
- # No analysis plugin
-
- session = self._analysis_front_end.CreateSession(
- command_line_arguments=self._command_line_arguments,
- preferred_encoding=self.preferred_encoding)
-
- storage_reader = self._analysis_front_end.CreateStorageReader(
- self._storage_file_path)
- self._number_of_analysis_reports = (
- storage_reader.GetNumberOfAnalysisReports())
- storage_reader.Close()
-
- configuration = configurations.ProcessingConfiguration()
-
- counter = collections.Counter()
- if self._output_format != u'null':
- storage_reader = self._analysis_front_end.CreateStorageReader(
- self._storage_file_path)
-
- events_counter = self._analysis_front_end.ExportEvents(
- storage_reader, self._output_module, configuration,
- deduplicate_events=self._deduplicate_events,
- time_slice=self._time_slice, use_time_slicer=self._use_time_slicer)
-
- counter += events_counter
-
- for item, value in iter(session.analysis_reports_counter.items()):
- counter[item] = value
-
- if self._quiet_mode:
- return
-
- self._output_writer.Write(u'Processing completed.\n')
-
- table_view = cli_views.ViewsFactory.GetTableView(
- self._views_format_type, title=u'Counter')
- for element, count in counter.most_common():
- if not element:
- element = u'N/A'
- table_view.AddRow([element, count])
- table_view.Write(self._output_writer)
-
- storage_reader = self._analysis_front_end.CreateStorageReader(
- self._storage_file_path)
- self._PrintAnalysisReportsDetails(storage_reader)
-
- self._output_writer.Write(u'Storage file is {0:s}\n'.format(
- self._storage_file_path))
-
- def ParseArguments(self):
- """Parses the command line arguments.
-
- Returns:
- bool: True if the arguments were successfully parsed.
- """
- argument_parser = argparse.ArgumentParser(
- description=self.DESCRIPTION, epilog=self.EPILOG, add_help=False,
- formatter_class=argparse.RawDescriptionHelpFormatter)
-
- self.AddBasicOptions(argument_parser)
- self.AddStorageFileOptions(argument_parser)
-
- extraction_group = argument_parser.add_argument_group(
- u'Extraction Arguments')
-
- self.AddCredentialOptions(extraction_group)
-
- input_group = argument_parser.add_argument_group(u'Input Arguments')
- input_group.add_argument(
- u'--source', dest=u'source', action=u'store',
- type=str, help=u'The source to process')
-
- output_group = argument_parser.add_argument_group(u'Output Arguments')
- output_group.add_argument(
- u'-w', u'--write', dest=u'analysis_output_file', action=u'store',
- type=str, default=None, help=(
- u'The destination file, storing the output of analysis'))
-
- try:
- options = argument_parser.parse_args()
- except UnicodeEncodeError:
- # If we get here we are attempting to print help in a non-Unicode
- # terminal.
- self._output_writer.Write(u'\n')
- self._output_writer.Write(argument_parser.format_help())
- return False
-
- try:
- self.ParseOptions(options)
- except errors.BadConfigOption as exception:
- self._output_writer.Write(u'ERROR: {0:s}'.format(exception))
- self._output_writer.Write(u'\n')
- self._output_writer.Write(argument_parser.format_usage())
- return False
-
- return True
-
- def ParseOptions(self, options):
- """Parses tool specific options.
-
- Args:
- options (argparse.Namespace): command line arguments.
-
- Raises:
- BadConfigOption: if the options are invalid.
- """
- super(PstealTool, self).ParseOptions(options)
-
- # These arguments are parsed from argparse.Namespace, so we can make
- # tests consistents with the log2timeline/psort ones.
- self._single_process_mode = getattr(options, u'single_process', False)
- self._status_view_mode = getattr(options, u'status_view_mode', u'window')
-
- self.SetSourcePath(getattr(options, u'source', None))
- self._output_filename = getattr(options, u'analysis_output_file', None)
- self._ParseStorageFileOptions(options)
-
- self._options = options
-
- self._CreateOutputModule()
-
-
def Main():
"""The main function."""
multiprocessing.freeze_support()
- tool = PstealTool()
+ tool = psteal_tool.PstealTool()
if not tool.ParseArguments():
return False
« no previous file with comments | « tools/psort_test.py ('k') | tools/psteal_test.py » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b