OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the log2timeline CLI tool.""" | 3 """Tests for the log2timeline CLI tool.""" |
4 | 4 |
5 import argparse | 5 import argparse |
6 import os | 6 import os |
7 import unittest | 7 import unittest |
8 | 8 |
9 from plaso.cli import log2timeline_tool | 9 from plaso.cli import log2timeline_tool |
10 from plaso.lib import errors | 10 from plaso.lib import errors |
(...skipping 23 matching lines...) Expand all Loading... |
34 u' --disable_zeromq, --disable-zeromq', | 34 u' --disable_zeromq, --disable-zeromq', |
35 (u' Disable queueing using ZeroMQ. A ' | 35 (u' Disable queueing using ZeroMQ. A ' |
36 u'Multiprocessing queue'), | 36 u'Multiprocessing queue'), |
37 u' will be used instead.', | 37 u' will be used instead.', |
38 u' --single_process, --single-process', | 38 u' --single_process, --single-process', |
39 (u' Indicate that the tool should run in a ' | 39 (u' Indicate that the tool should run in a ' |
40 u'single process.'), | 40 u'single process.'), |
41 u' --temporary_directory DIRECTORY, --temporary-directory DIRECTORY', | 41 u' --temporary_directory DIRECTORY, --temporary-directory DIRECTORY', |
42 (u' Path to the directory that should be used to ' | 42 (u' Path to the directory that should be used to ' |
43 u'store'), | 43 u'store'), |
44 u' temporary files created during extraction.', | 44 u' temporary files created during processing.', |
45 u' --worker-memory-limit SIZE, --worker_memory_limit SIZE', | 45 u' --worker-memory-limit SIZE, --worker_memory_limit SIZE', |
46 (u' Maximum amount of memory a worker process is ' | 46 (u' Maximum amount of memory a worker process is ' |
47 u'allowed'), | 47 u'allowed'), |
48 u' to consume. [defaults to 2 GiB]', | 48 u' to consume. [defaults to 2 GiB]', |
49 (u' --workers WORKERS The number of worker processes [defaults to ' | 49 (u' --workers WORKERS The number of worker processes [defaults to ' |
50 u'available'), | 50 u'available'), |
51 u' system CPUs minus one].', | 51 u' system CPUs minus one].', |
52 u'']) | 52 u'']) |
53 | 53 |
54 # TODO: add tests for _CheckStorageFile | 54 # TODO: add tests for _CheckStorageFile |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 """Tests the ParseArguments function.""" | 154 """Tests the ParseArguments function.""" |
155 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 155 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
156 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 156 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
157 | 157 |
158 result = test_tool.ParseArguments() | 158 result = test_tool.ParseArguments() |
159 self.assertFalse(result) | 159 self.assertFalse(result) |
160 | 160 |
161 # TODO: check output. | 161 # TODO: check output. |
162 # TODO: improve test coverage. | 162 # TODO: improve test coverage. |
163 | 163 |
| 164 @shared_test_lib.skipUnlessHasTestFile([u'testdir']) |
164 def testParseOptions(self): | 165 def testParseOptions(self): |
165 """Tests the ParseOptions function.""" | 166 """Tests the ParseOptions function.""" |
166 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 167 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
167 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 168 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
168 | 169 |
169 options = test_lib.TestOptions() | 170 options = test_lib.TestOptions() |
170 options.source = self._GetTestFilePath([u'testdir']) | 171 options.source = self._GetTestFilePath([u'testdir']) |
171 options.output = u'storage.plaso' | 172 options.storage_file = u'storage.plaso' |
172 | 173 |
173 test_tool.ParseOptions(options) | 174 test_tool.ParseOptions(options) |
174 | 175 |
175 options = test_lib.TestOptions() | 176 options = test_lib.TestOptions() |
176 | 177 |
| 178 # ParseOptions will raise if source is not set. |
177 with self.assertRaises(errors.BadConfigOption): | 179 with self.assertRaises(errors.BadConfigOption): |
178 test_tool.ParseOptions(options) | 180 test_tool.ParseOptions(options) |
179 | 181 |
180 options = test_lib.TestOptions() | 182 options = test_lib.TestOptions() |
181 options.source = self._GetTestFilePath([u'testdir']) | 183 options.source = self._GetTestFilePath([u'testdir']) |
182 | 184 |
183 with self.assertRaises(errors.BadConfigOption): | 185 with self.assertRaises(errors.BadConfigOption): |
184 test_tool.ParseOptions(options) | 186 test_tool.ParseOptions(options) |
185 | 187 |
186 # TODO: improve test coverage. | 188 # TODO: improve test coverage. |
187 | 189 |
188 def testExtractEventsFromSourcesOnDirectory(self): | 190 def testExtractEventsFromSourcesOnDirectory(self): |
189 """Tests the ExtractEventsFromSources function on a directory.""" | 191 """Tests the ExtractEventsFromSources function on a directory.""" |
190 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 192 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
191 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 193 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
192 | 194 |
193 options = test_lib.TestOptions() | 195 options = test_lib.TestOptions() |
194 options.quiet = True | 196 options.quiet = True |
195 options.single_process = True | 197 options.single_process = True |
196 options.status_view_mode = u'none' | 198 options.status_view_mode = u'none' |
197 options.source = self._GetTestFilePath([u'testdir']) | 199 options.source = self._GetTestFilePath([u'testdir']) |
198 | 200 |
199 with shared_test_lib.TempDirectory() as temp_directory: | 201 with shared_test_lib.TempDirectory() as temp_directory: |
200 options.output = os.path.join(temp_directory, u'storage.plaso') | 202 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
201 | 203 |
202 test_tool.ParseOptions(options) | 204 test_tool.ParseOptions(options) |
203 | 205 |
204 test_tool.ExtractEventsFromSources() | 206 test_tool.ExtractEventsFromSources() |
205 | 207 |
206 expected_output = [ | 208 expected_output = [ |
207 b'', | 209 b'', |
208 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 210 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
209 b'Source type\t: directory', | 211 b'Source type\t: directory', |
210 b'', | 212 b'', |
(...skipping 11 matching lines...) Expand all Loading... |
222 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 224 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
223 | 225 |
224 options = test_lib.TestOptions() | 226 options = test_lib.TestOptions() |
225 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] | 227 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] |
226 options.quiet = True | 228 options.quiet = True |
227 options.single_process = True | 229 options.single_process = True |
228 options.status_view_mode = u'none' | 230 options.status_view_mode = u'none' |
229 options.source = self._GetTestFilePath([u'bdetogo.raw']) | 231 options.source = self._GetTestFilePath([u'bdetogo.raw']) |
230 | 232 |
231 with shared_test_lib.TempDirectory() as temp_directory: | 233 with shared_test_lib.TempDirectory() as temp_directory: |
232 options.output = os.path.join(temp_directory, u'storage.plaso') | 234 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
233 | 235 |
234 test_tool.ParseOptions(options) | 236 test_tool.ParseOptions(options) |
235 | 237 |
236 test_tool.ExtractEventsFromSources() | 238 test_tool.ExtractEventsFromSources() |
237 | 239 |
238 expected_output = [ | 240 expected_output = [ |
239 b'', | 241 b'', |
240 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 242 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
241 b'Source type\t: storage media image', | 243 b'Source type\t: storage media image', |
242 b'', | 244 b'', |
(...skipping 10 matching lines...) Expand all Loading... |
253 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 255 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
254 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 256 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
255 | 257 |
256 options = test_lib.TestOptions() | 258 options = test_lib.TestOptions() |
257 options.quiet = True | 259 options.quiet = True |
258 options.single_process = True | 260 options.single_process = True |
259 options.status_view_mode = u'none' | 261 options.status_view_mode = u'none' |
260 options.source = self._GetTestFilePath([u'ímynd.dd']) | 262 options.source = self._GetTestFilePath([u'ímynd.dd']) |
261 | 263 |
262 with shared_test_lib.TempDirectory() as temp_directory: | 264 with shared_test_lib.TempDirectory() as temp_directory: |
263 options.output = os.path.join(temp_directory, u'storage.plaso') | 265 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
264 | 266 |
265 test_tool.ParseOptions(options) | 267 test_tool.ParseOptions(options) |
266 | 268 |
267 test_tool.ExtractEventsFromSources() | 269 test_tool.ExtractEventsFromSources() |
268 | 270 |
269 expected_output = [ | 271 expected_output = [ |
270 b'', | 272 b'', |
271 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 273 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
272 b'Source type\t: storage media image', | 274 b'Source type\t: storage media image', |
273 b'', | 275 b'', |
(...skipping 12 matching lines...) Expand all Loading... |
286 | 288 |
287 options = test_lib.TestOptions() | 289 options = test_lib.TestOptions() |
288 options.partitions = u'all' | 290 options.partitions = u'all' |
289 options.quiet = True | 291 options.quiet = True |
290 options.single_process = True | 292 options.single_process = True |
291 options.status_view_mode = u'none' | 293 options.status_view_mode = u'none' |
292 # Note that the source file is a RAW (VMDK flat) image. | 294 # Note that the source file is a RAW (VMDK flat) image. |
293 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) | 295 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) |
294 | 296 |
295 with shared_test_lib.TempDirectory() as temp_directory: | 297 with shared_test_lib.TempDirectory() as temp_directory: |
296 options.output = os.path.join(temp_directory, u'storage.plaso') | 298 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
297 | 299 |
298 test_tool.ParseOptions(options) | 300 test_tool.ParseOptions(options) |
299 | 301 |
300 test_tool.ExtractEventsFromSources() | 302 test_tool.ExtractEventsFromSources() |
301 | 303 |
302 expected_output = [ | 304 expected_output = [ |
303 b'', | 305 b'', |
304 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 306 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
305 b'Source type\t: storage media image', | 307 b'Source type\t: storage media image', |
306 b'', | 308 b'', |
(...skipping 11 matching lines...) Expand all Loading... |
318 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 320 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
319 | 321 |
320 options = test_lib.TestOptions() | 322 options = test_lib.TestOptions() |
321 options.quiet = True | 323 options.quiet = True |
322 options.single_process = True | 324 options.single_process = True |
323 options.status_view_mode = u'none' | 325 options.status_view_mode = u'none' |
324 options.source = self._GetTestFilePath([u'vsstest.qcow2']) | 326 options.source = self._GetTestFilePath([u'vsstest.qcow2']) |
325 options.vss_stores = u'all' | 327 options.vss_stores = u'all' |
326 | 328 |
327 with shared_test_lib.TempDirectory() as temp_directory: | 329 with shared_test_lib.TempDirectory() as temp_directory: |
328 options.output = os.path.join(temp_directory, u'storage.plaso') | 330 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
329 | 331 |
330 test_tool.ParseOptions(options) | 332 test_tool.ParseOptions(options) |
331 | 333 |
332 test_tool.ExtractEventsFromSources() | 334 test_tool.ExtractEventsFromSources() |
333 | 335 |
334 expected_output = [ | 336 expected_output = [ |
335 b'', | 337 b'', |
336 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 338 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
337 b'Source type\t: storage media image', | 339 b'Source type\t: storage media image', |
338 b'', | 340 b'', |
(...skipping 14 matching lines...) Expand all Loading... |
353 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 355 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
354 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 356 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
355 | 357 |
356 options = test_lib.TestOptions() | 358 options = test_lib.TestOptions() |
357 options.quiet = True | 359 options.quiet = True |
358 options.single_process = True | 360 options.single_process = True |
359 options.status_view_mode = u'none' | 361 options.status_view_mode = u'none' |
360 options.source = self._GetTestFilePath([u'System.evtx']) | 362 options.source = self._GetTestFilePath([u'System.evtx']) |
361 | 363 |
362 with shared_test_lib.TempDirectory() as temp_directory: | 364 with shared_test_lib.TempDirectory() as temp_directory: |
363 options.output = os.path.join(temp_directory, u'storage.plaso') | 365 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
364 | 366 |
365 test_tool.ParseOptions(options) | 367 test_tool.ParseOptions(options) |
366 | 368 |
367 test_tool.ExtractEventsFromSources() | 369 test_tool.ExtractEventsFromSources() |
368 | 370 |
369 expected_output = [ | 371 expected_output = [ |
370 b'', | 372 b'', |
371 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 373 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
372 b'Source type\t: single file', | 374 b'Source type\t: single file', |
373 b'', | 375 b'', |
(...skipping 11 matching lines...) Expand all Loading... |
385 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) | 387 test_tool = log2timeline_tool.Log2TimelineTool(output_writer=output_writer) |
386 | 388 |
387 options = test_lib.TestOptions() | 389 options = test_lib.TestOptions() |
388 options.quiet = True | 390 options.quiet = True |
389 options.parsers = u'filestat,pe' | 391 options.parsers = u'filestat,pe' |
390 options.single_process = True | 392 options.single_process = True |
391 options.status_view_mode = u'none' | 393 options.status_view_mode = u'none' |
392 options.source = self._GetTestFilePath([u'test_pe.exe']) | 394 options.source = self._GetTestFilePath([u'test_pe.exe']) |
393 | 395 |
394 with shared_test_lib.TempDirectory() as temp_directory: | 396 with shared_test_lib.TempDirectory() as temp_directory: |
395 options.output = os.path.join(temp_directory, u'storage.plaso') | 397 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
396 | 398 |
397 test_tool.ParseOptions(options) | 399 test_tool.ParseOptions(options) |
398 | 400 |
399 test_tool.ExtractEventsFromSources() | 401 test_tool.ExtractEventsFromSources() |
400 | 402 |
401 storage_file = storage_zip_file.ZIPStorageFile() | 403 storage_file = storage_zip_file.ZIPStorageFile() |
402 try: | 404 try: |
403 storage_file.Open(path=options.output, read_only=True) | 405 storage_file.Open(path=options.storage_file, read_only=True) |
404 except IOError as exception: | 406 except IOError as exception: |
405 self.fail(( | 407 self.fail(( |
406 u'Unable to open storage file after processing with error: ' | 408 u'Unable to open storage file after processing with error: ' |
407 u'{0:s}.').format(exception)) | 409 u'{0:s}.').format(exception)) |
408 | 410 |
409 # There should be 3 filestat and 3 pe parser generated events. | 411 # There should be 3 filestat and 3 pe parser generated events. |
410 events = list(storage_file.GetSortedEvents()) | 412 events = list(storage_file.GetSortedEvents()) |
411 self.assertEqual(len(events), 6) | 413 self.assertEqual(len(events), 6) |
412 | 414 |
413 def testShowInfo(self): | 415 def testShowInfo(self): |
(...skipping 11 matching lines...) Expand all Loading... |
425 u'Parser Presets', u'Hashers', u'Parser Plugins', u'Versions', | 427 u'Parser Presets', u'Hashers', u'Parser Plugins', u'Versions', |
426 u'Parsers', u'Output Modules'] | 428 u'Parsers', u'Output Modules'] |
427 for heading in section_headings: | 429 for heading in section_headings: |
428 self.assertIn(heading, output) | 430 self.assertIn(heading, output) |
429 | 431 |
430 self.assertNotIn(u'<class', output) | 432 self.assertNotIn(u'<class', output) |
431 | 433 |
432 | 434 |
433 if __name__ == '__main__': | 435 if __name__ == '__main__': |
434 unittest.main() | 436 unittest.main() |
OLD | NEW |