OLD | NEW |
| (Empty) |
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 """Tests for the log2timeline CLI tool.""" | |
4 | |
5 import argparse | |
6 import os | |
7 import unittest | |
8 | |
9 from plaso.lib import errors | |
10 from plaso.storage import zip_file as storage_zip_file | |
11 | |
12 from tests import test_lib as shared_test_lib | |
13 from tests.cli import test_lib as cli_test_lib | |
14 | |
15 from tools import log2timeline | |
16 | |
17 | |
18 class Log2TimelineToolTest(cli_test_lib.CLIToolTestCase): | |
19 """Tests for the log2timeline CLI tool.""" | |
20 | |
21 _BDE_PASSWORD = u'bde-TEST' | |
22 | |
23 _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([ | |
24 u'usage: log2timeline_test.py [--disable_zeromq] [--single_process]', | |
25 u' [--temporary_directory DIRECTORY]', | |
26 (u' [--worker-memory-limit SIZE] ' | |
27 u'[--workers WORKERS]'), | |
28 u'', | |
29 u'Test argument parser.', | |
30 u'', | |
31 u'optional arguments:', | |
32 u' --disable_zeromq, --disable-zeromq', | |
33 (u' Disable queueing using ZeroMQ. A ' | |
34 u'Multiprocessing queue'), | |
35 u' will be used instead.', | |
36 u' --single_process, --single-process', | |
37 (u' Indicate that the tool should run in a ' | |
38 u'single process.'), | |
39 u' --temporary_directory DIRECTORY, --temporary-directory DIRECTORY', | |
40 (u' Path to the directory that should be used to ' | |
41 u'store'), | |
42 u' temporary files created during extraction.', | |
43 u' --worker-memory-limit SIZE, --worker_memory_limit SIZE', | |
44 (u' Maximum amount of memory a worker process is ' | |
45 u'allowed'), | |
46 u' to consume. [defaults to 2 GiB]', | |
47 (u' --workers WORKERS The number of worker processes [defaults to ' | |
48 u'available'), | |
49 u' system CPUs minus one].', | |
50 u'']) | |
51 | |
52 # TODO: add test for _GetMatcher. | |
53 # TODO: add test for _ParseOutputOptions. | |
54 # TODO: add test for _ParseProcessingOptions. | |
55 | |
56 def testAddProcessingOptions(self): | |
57 """Tests the AddProcessingOptions function.""" | |
58 argument_parser = argparse.ArgumentParser( | |
59 prog=u'log2timeline_test.py', | |
60 description=u'Test argument parser.', add_help=False, | |
61 formatter_class=cli_test_lib.SortedArgumentsHelpFormatter) | |
62 | |
63 test_tool = log2timeline.Log2TimelineTool() | |
64 test_tool.AddProcessingOptions(argument_parser) | |
65 | |
66 output = self._RunArgparseFormatHelp(argument_parser) | |
67 self.assertEqual(output, self._EXPECTED_PROCESSING_OPTIONS) | |
68 | |
69 def testListHashers(self): | |
70 """Tests the ListHashers function.""" | |
71 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
72 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
73 | |
74 test_tool.ListHashers() | |
75 | |
76 output = output_writer.ReadOutput() | |
77 | |
78 number_of_tables = 0 | |
79 lines = [] | |
80 for line in output.split(b'\n'): | |
81 line = line.strip() | |
82 lines.append(line) | |
83 | |
84 if line.startswith(b'*****') and line.endswith(b'*****'): | |
85 number_of_tables += 1 | |
86 | |
87 self.assertIn(u'Hashers', lines[1]) | |
88 | |
89 lines = frozenset(lines) | |
90 | |
91 self.assertEqual(number_of_tables, 1) | |
92 | |
93 expected_line = b'md5 : Calculates an MD5 digest hash over input data.' | |
94 self.assertIn(expected_line, lines) | |
95 | |
96 def testListOutputModules(self): | |
97 """Tests the ListOutputModules function.""" | |
98 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
99 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
100 | |
101 test_tool.ListOutputModules() | |
102 | |
103 output = output_writer.ReadOutput() | |
104 number_of_tables = 0 | |
105 lines = [] | |
106 for line in output.split(b'\n'): | |
107 line = line.strip() | |
108 lines.append(line) | |
109 | |
110 if line.startswith(b'*****') and line.endswith(b'*****'): | |
111 number_of_tables += 1 | |
112 | |
113 self.assertIn(u'Output Modules', lines[1]) | |
114 | |
115 # pylint: disable=protected-access | |
116 lines = frozenset(lines) | |
117 disabled_outputs = list(test_tool._front_end.GetDisabledOutputClasses()) | |
118 enabled_outputs = list(test_tool._front_end.GetOutputClasses()) | |
119 | |
120 expected_number_of_tables = 0 | |
121 if disabled_outputs: | |
122 expected_number_of_tables += 1 | |
123 if enabled_outputs: | |
124 expected_number_of_tables += 1 | |
125 | |
126 self.assertEqual(number_of_tables, expected_number_of_tables) | |
127 | |
128 expected_line = b'rawpy : "raw" (or native) Python output.' | |
129 self.assertIn(expected_line, lines) | |
130 | |
131 def testListParsersAndPlugins(self): | |
132 """Tests the ListParsersAndPlugins function.""" | |
133 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
134 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
135 | |
136 test_tool.ListParsersAndPlugins() | |
137 | |
138 output = output_writer.ReadOutput() | |
139 | |
140 number_of_tables = 0 | |
141 lines = [] | |
142 for line in output.split(b'\n'): | |
143 line = line.strip() | |
144 lines.append(line) | |
145 | |
146 if line.startswith(b'*****') and line.endswith(b'*****'): | |
147 number_of_tables += 1 | |
148 | |
149 self.assertIn(u'Parsers', lines[1]) | |
150 | |
151 lines = frozenset(lines) | |
152 | |
153 self.assertEqual(number_of_tables, 9) | |
154 | |
155 expected_line = b'filestat : Parser for file system stat information.' | |
156 self.assertIn(expected_line, lines) | |
157 | |
158 expected_line = b'bencode_utorrent : Parser for uTorrent bencoded files.' | |
159 self.assertIn(expected_line, lines) | |
160 | |
161 expected_line = ( | |
162 b'msie_webcache : Parser for MSIE WebCache ESE database files.') | |
163 self.assertIn(expected_line, lines) | |
164 | |
165 expected_line = b'olecf_default : Parser for a generic OLECF item.' | |
166 self.assertIn(expected_line, lines) | |
167 | |
168 expected_line = b'plist_default : Parser for plist files.' | |
169 self.assertIn(expected_line, lines) | |
170 | |
171 expected_line = ( | |
172 b'chrome_history : Parser for Chrome history SQLite database files.') | |
173 self.assertIn(expected_line, lines) | |
174 | |
175 expected_line = b'ssh : Parser for SSH syslog entries.' | |
176 self.assertIn(expected_line, lines) | |
177 | |
178 expected_line = b'winreg_default : Parser for Registry data.' | |
179 self.assertIn(expected_line, lines) | |
180 | |
181 def testParseArguments(self): | |
182 """Tests the ParseArguments function.""" | |
183 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
184 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
185 | |
186 result = test_tool.ParseArguments() | |
187 self.assertFalse(result) | |
188 | |
189 # TODO: check output. | |
190 # TODO: improve test coverage. | |
191 | |
192 def testParseOptions(self): | |
193 """Tests the ParseOptions function.""" | |
194 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
195 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
196 | |
197 options = cli_test_lib.TestOptions() | |
198 options.source = self._GetTestFilePath([u'testdir']) | |
199 options.output = u'storage.plaso' | |
200 | |
201 test_tool.ParseOptions(options) | |
202 | |
203 options = cli_test_lib.TestOptions() | |
204 | |
205 with self.assertRaises(errors.BadConfigOption): | |
206 test_tool.ParseOptions(options) | |
207 | |
208 options = cli_test_lib.TestOptions() | |
209 options.source = self._GetTestFilePath([u'testdir']) | |
210 | |
211 with self.assertRaises(errors.BadConfigOption): | |
212 test_tool.ParseOptions(options) | |
213 | |
214 # TODO: improve test coverage. | |
215 | |
216 def testProcessSourcesDirectory(self): | |
217 """Tests the ProcessSources function on a directory.""" | |
218 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
219 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
220 | |
221 options = cli_test_lib.TestOptions() | |
222 options.quiet = True | |
223 options.single_process = True | |
224 options.status_view_mode = u'none' | |
225 options.source = self._GetTestFilePath([u'testdir']) | |
226 | |
227 with shared_test_lib.TempDirectory() as temp_directory: | |
228 options.output = os.path.join(temp_directory, u'storage.plaso') | |
229 | |
230 test_tool.ParseOptions(options) | |
231 | |
232 test_tool.ProcessSources() | |
233 | |
234 expected_output = [ | |
235 b'', | |
236 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
237 b'Source type\t: directory', | |
238 b'', | |
239 b'Processing started.', | |
240 b'Processing completed.', | |
241 b'', | |
242 b''] | |
243 | |
244 output = output_writer.ReadOutput() | |
245 self.assertEqual(output.split(b'\n'), expected_output) | |
246 | |
247 def testProcessSourcesBDEImage(self): | |
248 """Tests the ProcessSources function on an image containing BDE.""" | |
249 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
250 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
251 | |
252 options = cli_test_lib.TestOptions() | |
253 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] | |
254 options.quiet = True | |
255 options.single_process = True | |
256 options.status_view_mode = u'none' | |
257 options.source = self._GetTestFilePath([u'bdetogo.raw']) | |
258 | |
259 with shared_test_lib.TempDirectory() as temp_directory: | |
260 options.output = os.path.join(temp_directory, u'storage.plaso') | |
261 | |
262 test_tool.ParseOptions(options) | |
263 | |
264 test_tool.ProcessSources() | |
265 | |
266 expected_output = [ | |
267 b'', | |
268 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
269 b'Source type\t: storage media image', | |
270 b'', | |
271 b'Processing started.', | |
272 b'Processing completed.', | |
273 b'', | |
274 b''] | |
275 | |
276 output = output_writer.ReadOutput() | |
277 self.assertEqual(output.split(b'\n'), expected_output) | |
278 | |
279 def testProcessSourcesImage(self): | |
280 """Tests the ProcessSources function on a single partition image.""" | |
281 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
282 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
283 | |
284 options = cli_test_lib.TestOptions() | |
285 options.quiet = True | |
286 options.single_process = True | |
287 options.status_view_mode = u'none' | |
288 options.source = self._GetTestFilePath([u'ímynd.dd']) | |
289 | |
290 with shared_test_lib.TempDirectory() as temp_directory: | |
291 options.output = os.path.join(temp_directory, u'storage.plaso') | |
292 | |
293 test_tool.ParseOptions(options) | |
294 | |
295 test_tool.ProcessSources() | |
296 | |
297 expected_output = [ | |
298 b'', | |
299 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
300 b'Source type\t: storage media image', | |
301 b'', | |
302 b'Processing started.', | |
303 b'Processing completed.', | |
304 b'', | |
305 b''] | |
306 | |
307 output = output_writer.ReadOutput() | |
308 self.assertEqual(output.split(b'\n'), expected_output) | |
309 | |
310 def testProcessSourcesPartitionedImage(self): | |
311 """Tests the ProcessSources function on a multi partition image.""" | |
312 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
313 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
314 | |
315 options = cli_test_lib.TestOptions() | |
316 options.partitions = u'all' | |
317 options.quiet = True | |
318 options.single_process = True | |
319 options.status_view_mode = u'none' | |
320 # Note that the source file is a RAW (VMDK flat) image. | |
321 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) | |
322 | |
323 with shared_test_lib.TempDirectory() as temp_directory: | |
324 options.output = os.path.join(temp_directory, u'storage.plaso') | |
325 | |
326 test_tool.ParseOptions(options) | |
327 | |
328 test_tool.ProcessSources() | |
329 | |
330 expected_output = [ | |
331 b'', | |
332 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
333 b'Source type\t: storage media image', | |
334 b'', | |
335 b'Processing started.', | |
336 b'Processing completed.', | |
337 b'', | |
338 b''] | |
339 | |
340 output = output_writer.ReadOutput() | |
341 self.assertEqual(output.split(b'\n'), expected_output) | |
342 | |
343 def testProcessSourcesVSSImage(self): | |
344 """Tests the ProcessSources function on an image containing VSS.""" | |
345 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
346 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
347 | |
348 options = cli_test_lib.TestOptions() | |
349 options.quiet = True | |
350 options.single_process = True | |
351 options.status_view_mode = u'none' | |
352 options.source = self._GetTestFilePath([u'vsstest.qcow2']) | |
353 options.vss_stores = u'all' | |
354 | |
355 with shared_test_lib.TempDirectory() as temp_directory: | |
356 options.output = os.path.join(temp_directory, u'storage.plaso') | |
357 | |
358 test_tool.ParseOptions(options) | |
359 | |
360 test_tool.ProcessSources() | |
361 | |
362 expected_output = [ | |
363 b'', | |
364 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
365 b'Source type\t: storage media image', | |
366 b'', | |
367 b'Processing started.', | |
368 b'Processing completed.', | |
369 b'', | |
370 b'Number of errors encountered while extracting events: 1.', | |
371 b'', | |
372 b'Use pinfo to inspect errors in more detail.', | |
373 b'', | |
374 b''] | |
375 | |
376 output = output_writer.ReadOutput() | |
377 self.assertEqual(output.split(b'\n'), expected_output) | |
378 | |
379 def testProcessSourcesSingleFile(self): | |
380 """Tests the ProcessSources function on a single file.""" | |
381 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
382 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
383 | |
384 options = cli_test_lib.TestOptions() | |
385 options.quiet = True | |
386 options.single_process = True | |
387 options.status_view_mode = u'none' | |
388 options.source = self._GetTestFilePath([u'System.evtx']) | |
389 | |
390 with shared_test_lib.TempDirectory() as temp_directory: | |
391 options.output = os.path.join(temp_directory, u'storage.plaso') | |
392 | |
393 test_tool.ParseOptions(options) | |
394 | |
395 test_tool.ProcessSources() | |
396 | |
397 expected_output = [ | |
398 b'', | |
399 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | |
400 b'Source type\t: single file', | |
401 b'', | |
402 b'Processing started.', | |
403 b'Processing completed.', | |
404 b'', | |
405 b''] | |
406 | |
407 output = output_writer.ReadOutput() | |
408 self.assertEqual(output.split(b'\n'), expected_output) | |
409 | |
410 def testProcessSourcesFilestat(self): | |
411 """Test if the filestat and other parsers ran.""" | |
412 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
413 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
414 | |
415 options = cli_test_lib.TestOptions() | |
416 options.quiet = True | |
417 options.parsers = u'filestat,pe' | |
418 options.single_process = True | |
419 options.status_view_mode = u'none' | |
420 options.source = self._GetTestFilePath([u'test_pe.exe']) | |
421 | |
422 with shared_test_lib.TempDirectory() as temp_directory: | |
423 options.output = os.path.join(temp_directory, u'storage.plaso') | |
424 | |
425 test_tool.ParseOptions(options) | |
426 | |
427 test_tool.ProcessSources() | |
428 | |
429 storage_file = storage_zip_file.ZIPStorageFile() | |
430 try: | |
431 storage_file.Open(path=options.output, read_only=True) | |
432 except IOError as exception: | |
433 self.fail(( | |
434 u'Unable to open storage file after processing with error: ' | |
435 u'{0:s}.').format(exception)) | |
436 | |
437 # There should be 3 filestat and 3 pe parser generated events. | |
438 event_objects = list(storage_file.GetEvents()) | |
439 self.assertEqual(len(event_objects), 6) | |
440 | |
441 def testShowInfo(self): | |
442 """Tests the output of the tool in info mode.""" | |
443 output_writer = cli_test_lib.TestOutputWriter(encoding=u'utf-8') | |
444 test_tool = log2timeline.Log2TimelineTool(output_writer=output_writer) | |
445 | |
446 options = cli_test_lib.TestOptions() | |
447 options.show_info = True | |
448 test_tool.ParseOptions(options) | |
449 test_tool.ShowInfo() | |
450 output = output_writer.ReadOutput() | |
451 | |
452 section_headings = [ | |
453 u'Parser Presets', u'Hashers', u'Parser Plugins', u'Versions', | |
454 u'Filters', u'Parsers', u'Output Modules'] | |
455 for heading in section_headings: | |
456 self.assertIn(heading, output) | |
457 | |
458 self.assertNotIn(u'<class', output) | |
459 | |
460 | |
461 if __name__ == '__main__': | |
462 unittest.main() | |
OLD | NEW |