OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the log2timeline CLI tool.""" | 3 """Tests for the log2timeline CLI tool.""" |
4 | 4 |
5 import argparse | 5 import argparse |
6 import os | 6 import os |
7 import unittest | 7 import unittest |
8 | 8 |
| 9 from plaso.cli import log2timeline_tool |
9 from plaso.lib import errors | 10 from plaso.lib import errors |
| 11 from plaso.output import manager as output_manager |
10 from plaso.storage import zip_file as storage_zip_file | 12 from plaso.storage import zip_file as storage_zip_file |
11 | 13 |
12 from tests import test_lib as shared_test_lib | 14 from tests import test_lib as shared_test_lib |
13 from tests.cli import test_lib as cli_test_lib | 15 from tests.cli import test_lib |
14 | |
15 from tools import log2timeline | |
16 | 16 |
17 | 17 |
18 class Log2TimelineToolTest(cli_test_lib.CLIToolTestCase): | 18 class Log2TimelineToolTest(test_lib.CLIToolTestCase): |
19 """Tests for the log2timeline CLI tool.""" | 19 """Tests for the log2timeline CLI tool.""" |
20 | 20 |
| 21 # pylint: disable=protected-access |
| 22 |
21 _BDE_PASSWORD = u'bde-TEST' | 23 _BDE_PASSWORD = u'bde-TEST' |
22 | 24 |
23 _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([ | 25 _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([ |
24 u'usage: log2timeline_test.py [--disable_zeromq] [--single_process]', | 26 u'usage: log2timeline_test.py [--disable_zeromq] [--single_process]', |
25 u' [--temporary_directory DIRECTORY]', | 27 u' [--temporary_directory DIRECTORY]', |
26 (u' [--worker-memory-limit SIZE] ' | 28 (u' [--worker-memory-limit SIZE] ' |
27 u'[--workers WORKERS]'), | 29 u'[--workers WORKERS]'), |
28 u'', | 30 u'', |
29 u'Test argument parser.', | 31 u'Test argument parser.', |
30 u'', | 32 u'', |
(...skipping 11 matching lines...) Expand all Loading... |
42 u' temporary files created during extraction.', | 44 u' temporary files created during extraction.', |
43 u' --worker-memory-limit SIZE, --worker_memory_limit SIZE', | 45 u' --worker-memory-limit SIZE, --worker_memory_limit SIZE', |
44 (u' Maximum amount of memory a worker process is ' | 46 (u' Maximum amount of memory a worker process is ' |
45 u'allowed'), | 47 u'allowed'), |
46 u' to consume. [defaults to 2 GiB]', | 48 u' to consume. [defaults to 2 GiB]', |
47 (u' --workers WORKERS The number of worker processes [defaults to ' | 49 (u' --workers WORKERS The number of worker processes [defaults to ' |
48 u'available'), | 50 u'available'), |
49 u' system CPUs minus one].', | 51 u' system CPUs minus one].', |
50 u'']) | 52 u'']) |
51 | 53 |
52 # TODO: add test for _GetMatcher. | 54 # TODO: add tests for _CheckStorageFile |
53 # TODO: add test for _ParseOutputOptions. | 55 # TODO: add tests for _CreateProcessingConfiguration |
54 # TODO: add test for _ParseProcessingOptions. | 56 # TODO: add tests for _DetermineSourceType |
| 57 |
| 58 def testGetFiltersInformation(self): |
| 59 """Tests the _GetFiltersInformation function.""" |
| 60 test_tool = log2timeline_tool.Log2TimelineTool() |
| 61 filters_info = test_tool._GetFiltersInformation() |
| 62 |
| 63 self.assertIsNotNone(filters_info) |
| 64 |
| 65 available_filter_names = [name for name, _ in filters_info] |
| 66 self.assertIn(u'DynamicFilter', available_filter_names) |
| 67 self.assertIn(u'EventObjectFilter', available_filter_names) |
| 68 |
| 69 def testGetOutputModulesInformation(self): |
| 70 """Tests the _GetOutputModulesInformation function.""" |
| 71 test_tool = log2timeline_tool.Log2TimelineTool() |
| 72 modules_info = test_tool._GetOutputModulesInformation() |
| 73 |
| 74 self.assertIsNotNone(modules_info) |
| 75 |
| 76 available_module_names = [name for name, _ in modules_info] |
| 77 self.assertIn(u'dynamic', available_module_names) |
| 78 self.assertIn(u'json', available_module_names) |
| 79 |
| 80 def testGetPluginData(self): |
| 81 """Tests the _GetPluginData function.""" |
| 82 test_tool = log2timeline_tool.Log2TimelineTool() |
| 83 plugin_info = test_tool._GetPluginData() |
| 84 |
| 85 self.assertIn(u'Hashers', plugin_info) |
| 86 |
| 87 available_hasher_names = [name for name, _ in plugin_info[u'Hashers']] |
| 88 self.assertIn(u'sha256', available_hasher_names) |
| 89 self.assertIn(u'sha1', available_hasher_names) |
| 90 |
| 91 self.assertIn(u'Parsers', plugin_info) |
| 92 self.assertIsNotNone(plugin_info[u'Parsers']) |
| 93 |
| 94 self.assertIn(u'Parser Plugins', plugin_info) |
| 95 self.assertIsNotNone(plugin_info[u'Parser Plugins']) |
| 96 |
| 97 # TODO: add tests for _GetStatusUpdateCallback |
| 98 |
| 99 def testParseFilterOption(self): |
| 100 """Tests the _ParseFilterOption function.""" |
| 101 test_tool = log2timeline_tool.Log2TimelineTool() |
| 102 |
| 103 options = test_lib.TestOptions() |
| 104 |
| 105 options.filter = u'parser is "syslog" and message contains "root"' |
| 106 |
| 107 test_tool._ParseFilterOption(options) |
| 108 self.assertEqual(test_tool._filter_expression, options.filter) |
| 109 self.assertIsNotNone(test_tool._filter_object) |
| 110 |
| 111 def testParseOutputOptions(self): |
| 112 """Tests the _ParseOutputOptions function.""" |
| 113 test_tool = log2timeline_tool.Log2TimelineTool() |
| 114 |
| 115 options = test_lib.TestOptions() |
| 116 |
| 117 test_tool._ParseOutputOptions(options) |
| 118 |
| 119 def testParseProcessingOptions(self): |
| 120 """Tests the _ParseProcessingOptions function.""" |
| 121 test_tool = log2timeline_tool.Log2TimelineTool() |
| 122 |
| 123 options = test_lib.TestOptions() |
| 124 |
| 125 test_tool._ParseProcessingOptions(options) |
| 126 |
| 127 # TODO: add tests for _PrintProcessingSummary |
| 128 |
| 129 # TODO: add tests for AddOutputOptions |
55 | 130 |
56 def testAddProcessingOptions(self): | 131 def testAddProcessingOptions(self): |
57 """Tests the AddProcessingOptions function.""" | 132 """Tests the AddProcessingOptions function.""" |
58 argument_parser = argparse.ArgumentParser( | 133 argument_parser = argparse.ArgumentParser( |
59 prog=u'log2timeline_test.py', | 134 prog=u'log2timeline_test.py', |
60 description=u'Test argument parser.', add_help=False, | 135 description=u'Test argument parser.', add_help=False, |
61 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | 136 formatter_class=test_lib.SortedArgumentsHelpFormatter) |
62 | 137 |
63 test_tool = log2timeline.Log2TimelineTool() | 138 test_tool = log2timeline_tool.Log2TimelineTool() |
64 test_tool.AddProcessingOptions(argument_parser) | 139 test_tool.AddProcessingOptions(argument_parser) |
65 | 140 |
66 output = self._RunArgparseFormatHelp(argument_parser) | 141 output = self._RunArgparseFormatHelp(argument_parser) |
67 self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS) | 142 self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS) |
68 | 143 |
69 def testListHashers(self): | |
70 """Tests the ListHashers function.""" | |
71 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
72 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
73 | |
74 test_tool.ListHashers() | |
75 | |
76 output = output_writer.ReadOutput() | |
77 | |
78 number_of_tables = 0 | |
79 lines = [] | |
80 for line in output.split(b'\n'): | |
81 line = line.strip() | |
82 lines.append(line) | |
83 | |
84 if line.startswith(b'*****') and line.endswith(b'*****'): | |
85 number_of_tables += 1 | |
86 | |
87 self.assertIn(u'Hashers', lines[1]) | |
88 | |
89 lines = frozenset(lines) | |
90 | |
91 self.assertEqual(number_of_tables, 1) | |
92 | |
93 expected_line = b'md5 : Calculates an MD5 digest hash over input data.' | |
94 self.assertIn(expected_line, lines) | |
95 | |
96 def testListOutputModules(self): | 144 def testListOutputModules(self): |
97 """Tests the ListOutputModules function.""" | 145 """Tests the ListOutputModules function.""" |
98 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 146 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
99 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 147 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
100 | 148 |
101 test_tool.ListOutputModules() | 149 test_tool.ListOutputModules() |
102 | 150 |
103 output = output_writer.ReadOutput() | 151 output = output_writer.ReadOutput() |
104 number_of_tables = 0 | 152 number_of_tables = 0 |
105 lines = [] | 153 lines = [] |
106 for line in output.split(b'\n'): | 154 for line in output.split(b'\n'): |
107 line = line.strip() | 155 line = line.strip() |
108 lines.append(line) | 156 lines.append(line) |
109 | 157 |
110 if line.startswith(b'*****') and line.endswith(b'*****'): | 158 if line.startswith(b'*****') and line.endswith(b'*****'): |
111 number_of_tables += 1 | 159 number_of_tables += 1 |
112 | 160 |
113 self.assertIn(u'Output Modules', lines[1]) | 161 self.assertIn(u'Output Modules', lines[1]) |
114 | 162 |
115 # pylint: disable=protected-access | |
116 lines = frozenset(lines) | 163 lines = frozenset(lines) |
117 disabled_outputs = list(test_tool._front_end.GetDisabledOutputClasses()) | 164 disabled_outputs = list( |
118 enabled_outputs = list(test_tool._front_end.GetOutputClasses()) | 165 output_manager.OutputManager.GetDisabledOutputClasses()) |
| 166 enabled_outputs = list(output_manager.OutputManager.GetOutputClasses()) |
119 | 167 |
120 expected_number_of_tables = 0 | 168 expected_number_of_tables = 0 |
121 if disabled_outputs: | 169 if disabled_outputs: |
122 expected_number_of_tables += 1 | 170 expected_number_of_tables += 1 |
123 if enabled_outputs: | 171 if enabled_outputs: |
124 expected_number_of_tables += 1 | 172 expected_number_of_tables += 1 |
125 | 173 |
126 self.assertEqual(number_of_tables, expected_number_of_tables) | 174 self.assertEqual(number_of_tables, expected_number_of_tables) |
127 | 175 |
128 expected_line = b'rawpy : "raw" (or native) Python output.' | 176 expected_line = b'rawpy : "raw" (or native) Python output.' |
129 self.assertIn(expected_line, lines) | 177 self.assertIn(expected_line, lines) |
130 | 178 |
131 def testListParsersAndPlugins(self): | |
132 """Tests the ListParsersAndPlugins function.""" | |
133 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
134 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
135 | |
136 test_tool.ListParsersAndPlugins() | |
137 | |
138 output = output_writer.ReadOutput() | |
139 | |
140 number_of_tables = 0 | |
141 lines = [] | |
142 for line in output.split(b'\n'): | |
143 line = line.strip() | |
144 lines.append(line) | |
145 | |
146 if line.startswith(b'*****') and line.endswith(b'*****'): | |
147 number_of_tables += 1 | |
148 | |
149 self.assertIn(u'Parsers', lines[1]) | |
150 | |
151 lines = frozenset(lines) | |
152 | |
153 self.assertEqual(number_of_tables, 9) | |
154 | |
155 expected_line = b'filestat : Parser for file system stat information.' | |
156 self.assertIn(expected_line, lines) | |
157 | |
158 expected_line = b'bencode_utorrent : Parser for uTorrent bencoded files.' | |
159 self.assertIn(expected_line, lines) | |
160 | |
161 expected_line = ( | |
162 b'msie_webcache : Parser for MSIE WebCache ESE database files.') | |
163 self.assertIn(expected_line, lines) | |
164 | |
165 expected_line = b'olecf_default : Parser for a generic OLECF item.' | |
166 self.assertIn(expected_line, lines) | |
167 | |
168 expected_line = b'plist_default : Parser for plist files.' | |
169 self.assertIn(expected_line, lines) | |
170 | |
171 expected_line = ( | |
172 b'chrome_history : Parser for Chrome history SQLite database files.') | |
173 self.assertIn(expected_line, lines) | |
174 | |
175 expected_line = b'ssh : Parser for SSH syslog entries.' | |
176 self.assertIn(expected_line, lines) | |
177 | |
178 expected_line = b'winreg_default : Parser for Registry data.' | |
179 self.assertIn(expected_line, lines) | |
180 | |
181 def testParseArguments(self): | 179 def testParseArguments(self): |
182 """Tests the ParseArguments function.""" | 180 """Tests the ParseArguments function.""" |
183 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 181 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
184 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 182 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
185 | 183 |
186 result = test_tool.ParseArguments() | 184 result = test_tool.ParseArguments() |
187 self.assertFalse(result) | 185 self.assertFalse(result) |
188 | 186 |
189 # TODO: check output. | 187 # TODO: check output. |
190 # TODO: improve test coverage. | 188 # TODO: improve test coverage. |
191 | 189 |
192 def testParseOptions(self): | 190 def testParseOptions(self): |
193 """Tests the ParseOptions function.""" | 191 """Tests the ParseOptions function.""" |
194 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 192 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
195 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 193 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
196 | 194 |
197 options = cli_test_lib.TestOptions() | 195 options = test_lib.TestOptions() |
198 options.source = self._GetTestFilePath([u'testdir']) | 196 options.source = self._GetTestFilePath([u'testdir']) |
199 options.output = u'storage.plaso' | 197 options.output = u'storage.plaso' |
200 | 198 |
201 test_tool.ParseOptions(options) | 199 test_tool.ParseOptions(options) |
202 | 200 |
203 options = cli_test_lib.TestOptions() | 201 options = test_lib.TestOptions() |
204 | 202 |
205 with self.assertRaises(errors.BadConfigOption): | 203 with self.assertRaises(errors.BadConfigOption): |
206 test_tool.ParseOptions(options) | 204 test_tool.ParseOptions(options) |
207 | 205 |
208 options = cli_test_lib.TestOptions() | 206 options = test_lib.TestOptions() |
209 options.source = self._GetTestFilePath([u'testdir']) | 207 options.source = self._GetTestFilePath([u'testdir']) |
210 | 208 |
211 with self.assertRaises(errors.BadConfigOption): | 209 with self.assertRaises(errors.BadConfigOption): |
212 test_tool.ParseOptions(options) | 210 test_tool.ParseOptions(options) |
213 | 211 |
214 # TODO: improve test coverage. | 212 # TODO: improve test coverage. |
215 | 213 |
216 def testProcessSourcesDirectory(self): | 214 def testExtractEventsFromSourcesOnDirectory(self): |
217 """Tests the ProcessSources function on a directory.""" | 215 """Tests the ExtractEventsFromSources function on a directory.""" |
218 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 216 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
219 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 217 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
220 | 218 |
221 options = cli_test_lib.TestOptions() | 219 options = test_lib.TestOptions() |
222 options.quiet = True | 220 options.quiet = True |
223 options.single_process = True | 221 options.single_process = True |
224 options.status_view_mode = u'none' | 222 options.status_view_mode = u'none' |
225 options.source = self._GetTestFilePath([u'testdir']) | 223 options.source = self._GetTestFilePath([u'testdir']) |
226 | 224 |
227 with shared_test_lib.TempDirectory() as temp_directory: | 225 with shared_test_lib.TempDirectory() as temp_directory: |
228 options.output = os.path.join(temp_directory, u'storage.plaso') | 226 options.output = os.path.join(temp_directory, u'storage.plaso') |
229 | 227 |
230 test_tool.ParseOptions(options) | 228 test_tool.ParseOptions(options) |
231 | 229 |
232 test_tool.ProcessSources() | 230 test_tool.ExtractEventsFromSources() |
233 | 231 |
234 expected_output = [ | 232 expected_output = [ |
235 b'', | 233 b'', |
236 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 234 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
237 b'Source type\t: directory', | 235 b'Source type\t: directory', |
238 b'', | 236 b'', |
239 b'Processing started.', | 237 b'Processing started.', |
240 b'Processing completed.', | 238 b'Processing completed.', |
241 b'', | 239 b'', |
242 b''] | 240 b''] |
243 | 241 |
244 output = output_writer.ReadOutput() | 242 output = output_writer.ReadOutput() |
245 self.assertEqual(output.split(b'\n'), expected_output) | 243 self.assertEqual(output.split(b'\n'), expected_output) |
246 | 244 |
247 def testProcessSourcesBDEImage(self): | 245 def testExtractEventsFromSourcesOnBDEImage(self): |
248 """Tests the ProcessSources function on an image containing BDE.""" | 246 """Tests the ExtractEventsFromSources function on BDE image.""" |
249 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 247 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
250 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 248 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
251 | 249 |
252 options = cli_test_lib.TestOptions() | 250 options = test_lib.TestOptions() |
253 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] | 251 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] |
254 options.quiet = True | 252 options.quiet = True |
255 options.single_process = True | 253 options.single_process = True |
256 options.status_view_mode = u'none' | 254 options.status_view_mode = u'none' |
257 options.source = self._GetTestFilePath([u'bdetogo.raw']) | 255 options.source = self._GetTestFilePath([u'bdetogo.raw']) |
258 | 256 |
259 with shared_test_lib.TempDirectory() as temp_directory: | 257 with shared_test_lib.TempDirectory() as temp_directory: |
260 options.output = os.path.join(temp_directory, u'storage.plaso') | 258 options.output = os.path.join(temp_directory, u'storage.plaso') |
261 | 259 |
262 test_tool.ParseOptions(options) | 260 test_tool.ParseOptions(options) |
263 | 261 |
264 test_tool.ProcessSources() | 262 test_tool.ExtractEventsFromSources() |
265 | 263 |
266 expected_output = [ | 264 expected_output = [ |
267 b'', | 265 b'', |
268 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 266 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
269 b'Source type\t: storage media image', | 267 b'Source type\t: storage media image', |
270 b'', | 268 b'', |
271 b'Processing started.', | 269 b'Processing started.', |
272 b'Processing completed.', | 270 b'Processing completed.', |
273 b'', | 271 b'', |
274 b''] | 272 b''] |
275 | 273 |
276 output = output_writer.ReadOutput() | 274 output = output_writer.ReadOutput() |
277 self.assertEqual(output.split(b'\n'), expected_output) | 275 self.assertEqual(output.split(b'\n'), expected_output) |
278 | 276 |
279 def testProcessSourcesImage(self): | 277 def testExtractEventsFromSourcesImage(self): |
280 """Tests the ProcessSources function on a single partition image.""" | 278 """Tests the ExtractEventsFromSources function on single partition image.""" |
281 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 279 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
282 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 280 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
283 | 281 |
284 options = cli_test_lib.TestOptions() | 282 options = test_lib.TestOptions() |
285 options.quiet = True | 283 options.quiet = True |
286 options.single_process = True | 284 options.single_process = True |
287 options.status_view_mode = u'none' | 285 options.status_view_mode = u'none' |
288 options.source = self._GetTestFilePath([u'ímynd.dd']) | 286 options.source = self._GetTestFilePath([u'ímynd.dd']) |
289 | 287 |
290 with shared_test_lib.TempDirectory() as temp_directory: | 288 with shared_test_lib.TempDirectory() as temp_directory: |
291 options.output = os.path.join(temp_directory, u'storage.plaso') | 289 options.output = os.path.join(temp_directory, u'storage.plaso') |
292 | 290 |
293 test_tool.ParseOptions(options) | 291 test_tool.ParseOptions(options) |
294 | 292 |
295 test_tool.ProcessSources() | 293 test_tool.ExtractEventsFromSources() |
296 | 294 |
297 expected_output = [ | 295 expected_output = [ |
298 b'', | 296 b'', |
299 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 297 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
300 b'Source type\t: storage media image', | 298 b'Source type\t: storage media image', |
301 b'', | 299 b'', |
302 b'Processing started.', | 300 b'Processing started.', |
303 b'Processing completed.', | 301 b'Processing completed.', |
304 b'', | 302 b'', |
305 b''] | 303 b''] |
306 | 304 |
307 output = output_writer.ReadOutput() | 305 output = output_writer.ReadOutput() |
308 self.assertEqual(output.split(b'\n'), expected_output) | 306 self.assertEqual(output.split(b'\n'), expected_output) |
309 | 307 |
310 def testProcessSourcesPartitionedImage(self): | 308 def testExtractEventsFromSourcesPartitionedImage(self): |
311 """Tests the ProcessSources function on a multi partition image.""" | 309 """Tests the ExtractEventsFromSources function on multi partition image.""" |
312 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 310 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
313 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 311 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
314 | 312 |
315 options = cli_test_lib.TestOptions() | 313 options = test_lib.TestOptions() |
316 options.partitions = u'all' | 314 options.partitions = u'all' |
317 options.quiet = True | 315 options.quiet = True |
318 options.single_process = True | 316 options.single_process = True |
319 options.status_view_mode = u'none' | 317 options.status_view_mode = u'none' |
320 # Note that the source file is a RAW (VMDK flat) image. | 318 # Note that the source file is a RAW (VMDK flat) image. |
321 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) | 319 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) |
322 | 320 |
323 with shared_test_lib.TempDirectory() as temp_directory: | 321 with shared_test_lib.TempDirectory() as temp_directory: |
324 options.output = os.path.join(temp_directory, u'storage.plaso') | 322 options.output = os.path.join(temp_directory, u'storage.plaso') |
325 | 323 |
326 test_tool.ParseOptions(options) | 324 test_tool.ParseOptions(options) |
327 | 325 |
328 test_tool.ProcessSources() | 326 test_tool.ExtractEventsFromSources() |
329 | 327 |
330 expected_output = [ | 328 expected_output = [ |
331 b'', | 329 b'', |
332 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 330 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
333 b'Source type\t: storage media image', | 331 b'Source type\t: storage media image', |
334 b'', | 332 b'', |
335 b'Processing started.', | 333 b'Processing started.', |
336 b'Processing completed.', | 334 b'Processing completed.', |
337 b'', | 335 b'', |
338 b''] | 336 b''] |
339 | 337 |
340 output = output_writer.ReadOutput() | 338 output = output_writer.ReadOutput() |
341 self.assertEqual(output.split(b'\n'), expected_output) | 339 self.assertEqual(output.split(b'\n'), expected_output) |
342 | 340 |
343 def testProcessSourcesVSSImage(self): | 341 def testExtractEventsFromSourcesOnVSSImage(self): |
344 """Tests the ProcessSources function on an image containing VSS.""" | 342 """Tests the ExtractEventsFromSources function on VSS image.""" |
345 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 343 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
346 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 344 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
347 | 345 |
348 options = cli_test_lib.TestOptions() | 346 options = test_lib.TestOptions() |
349 options.quiet = True | 347 options.quiet = True |
350 options.single_process = True | 348 options.single_process = True |
351 options.status_view_mode = u'none' | 349 options.status_view_mode = u'none' |
352 options.source = self._GetTestFilePath([u'vsstest.qcow2']) | 350 options.source = self._GetTestFilePath([u'vsstest.qcow2']) |
353 options.vss_stores = u'all' | 351 options.vss_stores = u'all' |
354 | 352 |
355 with shared_test_lib.TempDirectory() as temp_directory: | 353 with shared_test_lib.TempDirectory() as temp_directory: |
356 options.output = os.path.join(temp_directory, u'storage.plaso') | 354 options.output = os.path.join(temp_directory, u'storage.plaso') |
357 | 355 |
358 test_tool.ParseOptions(options) | 356 test_tool.ParseOptions(options) |
359 | 357 |
360 test_tool.ProcessSources() | 358 test_tool.ExtractEventsFromSources() |
361 | 359 |
362 expected_output = [ | 360 expected_output = [ |
363 b'', | 361 b'', |
364 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 362 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
365 b'Source type\t: storage media image', | 363 b'Source type\t: storage media image', |
366 b'', | 364 b'', |
367 b'Processing started.', | 365 b'Processing started.', |
368 b'Processing completed.', | 366 b'Processing completed.', |
369 b'', | 367 b'', |
370 b'Number of errors encountered while extracting events: 1.', | 368 b'Number of errors encountered while extracting events: 1.', |
371 b'', | 369 b'', |
372 b'Use pinfo to inspect errors in more detail.', | 370 b'Use pinfo to inspect errors in more detail.', |
373 b'', | 371 b'', |
374 b''] | 372 b''] |
375 | 373 |
376 output = output_writer.ReadOutput() | 374 output = output_writer.ReadOutput() |
377 self.assertEqual(output.split(b'\n'), expected_output) | 375 self.assertEqual(output.split(b'\n'), expected_output) |
378 | 376 |
379 def testProcessSourcesSingleFile(self): | 377 def testExtractEventsFromSourcesOnFile(self): |
380 """Tests the ProcessSources function on a single file.""" | 378 """Tests the ExtractEventsFromSources function on a file.""" |
381 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 379 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
382 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 380 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
383 | 381 |
384 options = cli_test_lib.TestOptions() | 382 options = test_lib.TestOptions() |
385 options.quiet = True | 383 options.quiet = True |
386 options.single_process = True | 384 options.single_process = True |
387 options.status_view_mode = u'none' | 385 options.status_view_mode = u'none' |
388 options.source = self._GetTestFilePath([u'System.evtx']) | 386 options.source = self._GetTestFilePath([u'System.evtx']) |
389 | 387 |
390 with shared_test_lib.TempDirectory() as temp_directory: | 388 with shared_test_lib.TempDirectory() as temp_directory: |
391 options.output = os.path.join(temp_directory, u'storage.plaso') | 389 options.output = os.path.join(temp_directory, u'storage.plaso') |
392 | 390 |
393 test_tool.ParseOptions(options) | 391 test_tool.ParseOptions(options) |
394 | 392 |
395 test_tool.ProcessSources() | 393 test_tool.ExtractEventsFromSources() |
396 | 394 |
397 expected_output = [ | 395 expected_output = [ |
398 b'', | 396 b'', |
399 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 397 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
400 b'Source type\t: single file', | 398 b'Source type\t: single file', |
401 b'', | 399 b'', |
402 b'Processing started.', | 400 b'Processing started.', |
403 b'Processing completed.', | 401 b'Processing completed.', |
404 b'', | 402 b'', |
405 b''] | 403 b''] |
406 | 404 |
407 output = output_writer.ReadOutput() | 405 output = output_writer.ReadOutput() |
408 self.assertEqual(output.split(b'\n'), expected_output) | 406 self.assertEqual(output.split(b'\n'), expected_output) |
409 | 407 |
410 def testProcessSourcesFilestat(self): | 408 def testExtractEventsFromSourcesWithFilestat(self): |
411 """Test if the filestat and other parsers ran.""" | 409 """Tests the ExtractEventsFromSources function with filestat parser.""" |
412 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 410 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
413 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 411 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
414 | 412 |
415 options = cli_test_lib.TestOptions() | 413 options = test_lib.TestOptions() |
416 options.quiet = True | 414 options.quiet = True |
417 options.parsers = u'filestat,pe' | 415 options.parsers = u'filestat,pe' |
418 options.single_process = True | 416 options.single_process = True |
419 options.status_view_mode = u'none' | 417 options.status_view_mode = u'none' |
420 options.source = self._GetTestFilePath([u'test_pe.exe']) | 418 options.source = self._GetTestFilePath([u'test_pe.exe']) |
421 | 419 |
422 with shared_test_lib.TempDirectory() as temp_directory: | 420 with shared_test_lib.TempDirectory() as temp_directory: |
423 options.output = os.path.join(temp_directory, u'storage.plaso') | 421 options.output = os.path.join(temp_directory, u'storage.plaso') |
424 | 422 |
425 test_tool.ParseOptions(options) | 423 test_tool.ParseOptions(options) |
426 | 424 |
427 test_tool.ProcessSources() | 425 test_tool.ExtractEventsFromSources() |
428 | 426 |
429 storage_file = storage_zip_file.ZIPStorageFile() | 427 storage_file = storage_zip_file.ZIPStorageFile() |
430 try: | 428 try: |
431 storage_file.Open(path=options.output, read_only=True) | 429 storage_file.Open(path=options.output, read_only=True) |
432 except IOError as exception: | 430 except IOError as exception: |
433 self.fail(( | 431 self.fail(( |
434 u'Unable to open storage file after processing with error: ' | 432 u'Unable to open storage file after processing with error: ' |
435 u'{0:s}.').format(exception)) | 433 u'{0:s}.').format(exception)) |
436 | 434 |
437 # There should be 3 filestat and 3 pe parser generated events. | 435 # There should be 3 filestat and 3 pe parser generated events. |
438 event_objects = list(storage_file.GetEvents()) | 436 event_objects = list(storage_file.GetEvents()) |
439 self.assertEqual(len(event_objects), 6) | 437 self.assertEqual(len(event_objects), 6) |
440 | 438 |
441 def testShowInfo(self): | 439 def testShowInfo(self): |
442 """Tests the output of the tool in info mode.""" | 440 """Tests the output of the tool in info mode.""" |
443 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | 441 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
444 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | 442 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
445 | 443 |
446 options = cli_test_lib.TestOptions() | 444 options = test_lib.TestOptions() |
447 options.show_info = True | 445 options.show_info = True |
448 test_tool.ParseOptions(options) | 446 test_tool.ParseOptions(options) |
449 test_tool.ShowInfo() | 447 test_tool.ShowInfo() |
450 output = output_writer.ReadOutput() | 448 output = output_writer.ReadOutput() |
451 | 449 |
452 section_headings = [ | 450 section_headings = [ |
453 u'Parser Presets', u'Hashers', u'Parser Plugins', u'Versions', | 451 u'Parser Presets', u'Hashers', u'Parser Plugins', u'Versions', |
454 u'Filters', u'Parsers', u'Output Modules'] | 452 u'Filters', u'Parsers', u'Output Modules'] |
455 for heading in section_headings: | 453 for heading in section_headings: |
456 self.assertIn(heading, output) | 454 self.assertIn(heading, output) |
457 | 455 |
458 self.assertNotIn(u'<class', output) | 456 self.assertNotIn(u'<class', output) |
459 | 457 |
460 | 458 |
461 if __name__ == '__main__': | 459 if __name__ == '__main__': |
462 unittest.main() | 460 unittest.main() |
OLD | NEW |