Left: | ||
Right: |
OLD | NEW |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/python |
aaronp
2017/07/26 22:06:10
Note to self: reviewed.
Joachim Metz
2017/07/27 05:43:55
Done.
| |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the psteal CLI tool.""" | 3 """Tests for the psteal CLI tool.""" |
4 | 4 |
5 import os | 5 import os |
6 import unittest | 6 import unittest |
7 | 7 |
8 from dfvfs.resolver import resolver as dfvfs_resolver | 8 from dfvfs.resolver import resolver as dfvfs_resolver |
9 | 9 |
10 from plaso.cli import psteal_tool | 10 from plaso.cli import psteal_tool |
11 from plaso.lib import errors | 11 from plaso.lib import errors |
12 | 12 |
13 from tests import test_lib as shared_test_lib | 13 from tests import test_lib as shared_test_lib |
14 from tests.cli import test_lib | 14 from tests.cli import test_lib |
15 | 15 |
16 | 16 |
17 class PstealToolTest(test_lib.CLIToolTestCase): | 17 class PstealToolTest(test_lib.CLIToolTestCase): |
18 """Tests for the psteal CLI tool.""" | 18 """Tests for the psteal CLI tool.""" |
19 | 19 |
20 # pylint: disable=protected-access | |
21 | |
20 _BDE_PASSWORD = u'bde-TEST' | 22 _BDE_PASSWORD = u'bde-TEST' |
21 | 23 |
22 _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([ | 24 _EXPECTED_PROCESSING_OPTIONS = u'\n'.join([ |
23 u'usage: psteal_test.py', | 25 u'usage: psteal_test.py', |
24 u'', | 26 u'', |
25 u'Test argument parser.', | 27 u'Test argument parser.', |
26 u'']) | 28 u'']) |
27 | 29 |
30 _STORAGE_FILENAME_TEMPLATE = ur'\d{{8}}T\d{{6}}-{filename}.plaso' | |
31 | |
32 def testGenerateStorageFileName(self): | |
33 """Tests the _GenerateStorageFileName function.""" | |
34 test_tool = psteal_tool.PstealTool() | |
35 | |
36 test_tool._source_path = u'/test/storage/path' | |
aaronp
2017/07/26 22:06:10
Lots of duplicate code here. Seems like a candid
Joachim Metz
2017/07/27 05:43:55
I've added a reminder to https://github.com/log2ti
| |
37 storage_filename = test_tool._GenerateStorageFileName() | |
38 expected_storage_filename = self._STORAGE_FILENAME_TEMPLATE.format( | |
39 filename=u'path') | |
40 self.assertRegexpMatches(storage_filename, expected_storage_filename) | |
41 | |
42 test_tool._source_path = u'/test/storage/path/' | |
43 storage_filename = test_tool._GenerateStorageFileName() | |
44 expected_storage_filename = self._STORAGE_FILENAME_TEMPLATE.format( | |
45 filename=u'path') | |
46 self.assertRegexpMatches(storage_filename, expected_storage_filename) | |
47 | |
48 test_tool._source_path = u'/' | |
49 storage_filename = test_tool._GenerateStorageFileName() | |
50 expected_storage_filename = self._STORAGE_FILENAME_TEMPLATE.format( | |
51 filename=u'ROOT') | |
52 self.assertRegexpMatches(storage_filename, expected_storage_filename) | |
53 | |
54 test_tool._source_path = u'/foo/..' | |
55 storage_filename = test_tool._GenerateStorageFileName() | |
56 expected_storage_filename = self._STORAGE_FILENAME_TEMPLATE.format( | |
57 filename=u'ROOT') | |
58 self.assertRegexpMatches(storage_filename, expected_storage_filename) | |
59 | |
60 test_tool._source_path = u'foo/../bar' | |
61 storage_filename = test_tool._GenerateStorageFileName() | |
62 expected_storage_filename = self._STORAGE_FILENAME_TEMPLATE.format( | |
63 filename=u'bar') | |
64 self.assertRegexpMatches(storage_filename, expected_storage_filename) | |
65 | |
28 def testParseOptions(self): | 66 def testParseOptions(self): |
29 """Tests the ParseOptions function.""" | 67 """Tests the ParseOptions function.""" |
30 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 68 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
31 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 69 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
32 | 70 |
33 # Test when no source nor output file specified. | |
34 options = test_lib.TestOptions() | 71 options = test_lib.TestOptions() |
72 | |
73 # Test when the output file is missing. | |
74 expected_error = (u'Output format: dynamic requires an output file') | |
75 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): | |
76 test_tool.ParseOptions(options) | |
77 | |
78 options.write = u'dynamic.out' | |
79 | |
80 # Test when the source is missing. | |
35 expected_error = u'Missing source path.' | 81 expected_error = u'Missing source path.' |
36 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): | 82 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): |
37 test_tool.ParseOptions(options) | 83 test_tool.ParseOptions(options) |
38 | 84 |
39 # Test when the output file is missing. | 85 with shared_test_lib.TempDirectory() as temp_directory: |
40 options.source = self._GetTestFilePath([u'testdir']) | 86 options.source = self._GetTestFilePath([u'testdir']) |
41 expected_error = (u'Output format: dynamic requires an output file') | 87 options.write = os.path.join(temp_directory, u'dynamic.out') |
42 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): | 88 |
89 # Test when both source and output are specified. | |
43 test_tool.ParseOptions(options) | 90 test_tool.ParseOptions(options) |
44 | 91 |
45 # Test when the source is missing. | 92 with open(options.write, 'w') as file_object: |
46 options = test_lib.TestOptions() | 93 file_object.write(u'bogus') |
47 expected_error = u'Missing source path.' | 94 |
48 with shared_test_lib.TempDirectory() as temp_directory: | 95 # Test when output file already exists. |
49 options.analysis_output_file = os.path.join( | 96 expected_error = u'Output file already exists: {0:s}.'.format( |
50 temp_directory, u'unused_output.txt') | 97 options.write) |
51 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): | 98 with self.assertRaisesRegexp(errors.BadConfigOption, expected_error): |
52 test_tool.ParseOptions(options) | 99 test_tool.ParseOptions(options) |
53 | 100 |
54 # Test when both source and output are specified. | |
55 options = test_lib.TestOptions() | |
56 options.source = self._GetTestFilePath([u'testdir']) | |
57 with shared_test_lib.TempDirectory() as temp_directory: | |
58 options.analysis_output_file = os.path.join( | |
59 temp_directory, u'unused_output.txt') | |
60 test_tool.ParseOptions(options) | |
61 | |
62 def testParseArguments(self): | 101 def testParseArguments(self): |
63 """Tests the ParseArguments function""" | 102 """Tests the ParseArguments function""" |
64 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 103 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
65 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 104 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
66 | 105 |
67 # Test ParseArguments with no output file nor source. | 106 # Test ParseArguments with no output file nor source. |
68 result = test_tool.ParseArguments() | 107 result = test_tool.ParseArguments() |
69 self.assertFalse(result) | 108 self.assertFalse(result) |
70 output = output_writer.ReadOutput() | 109 output = output_writer.ReadOutput() |
71 expected_error = u'ERROR: Missing source path' | 110 expected_error = u'ERROR: Output format: dynamic requires an output file' |
72 self.assertIn(expected_error, output) | 111 self.assertIn(expected_error, output) |
73 | 112 |
74 def testExtractEventsFromSourceDirectory(self): | 113 def testExtractEventsFromSourceDirectory(self): |
75 """Tests the ExtractEventsFromSources function on a directory.""" | 114 """Tests the ExtractEventsFromSources function on a directory.""" |
76 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 115 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
77 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 116 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
78 | 117 |
79 options = test_lib.TestOptions() | 118 options = test_lib.TestOptions() |
80 options.quiet = True | 119 options.quiet = True |
81 options.status_view_mode = u'none' | 120 options.status_view_mode = u'none' |
82 options.source = self._GetTestFilePath([u'testdir']) | 121 options.source = self._GetTestFilePath([u'testdir']) |
83 | 122 |
84 with shared_test_lib.TempDirectory() as temp_directory: | 123 with shared_test_lib.TempDirectory() as temp_directory: |
85 options.analysis_output_file = os.path.join( | 124 options.write = os.path.join(temp_directory, u'unused_output.txt') |
86 temp_directory, u'unused_output.txt') | |
87 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 125 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
88 | 126 |
89 test_tool.ParseOptions(options) | 127 test_tool.ParseOptions(options) |
90 | 128 |
91 test_tool.ExtractEventsFromSources() | 129 test_tool.ExtractEventsFromSources() |
92 | 130 |
93 expected_output = [ | 131 expected_output = [ |
94 b'', | 132 b'', |
95 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 133 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
96 b'Source type\t: directory', | 134 b'Source type\t: directory', |
(...skipping 14 matching lines...) Expand all Loading... | |
111 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 149 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
112 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 150 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
113 | 151 |
114 options = test_lib.TestOptions() | 152 options = test_lib.TestOptions() |
115 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] | 153 options.credentials = [u'password:{0:s}'.format(self._BDE_PASSWORD)] |
116 options.quiet = True | 154 options.quiet = True |
117 options.source = self._GetTestFilePath([u'bdetogo.raw']) | 155 options.source = self._GetTestFilePath([u'bdetogo.raw']) |
118 options.status_view_mode = u'none' | 156 options.status_view_mode = u'none' |
119 | 157 |
120 with shared_test_lib.TempDirectory() as temp_directory: | 158 with shared_test_lib.TempDirectory() as temp_directory: |
121 options.analysis_output_file = os.path.join( | 159 options.write = os.path.join(temp_directory, u'unused_output.txt') |
122 temp_directory, u'unused_output.txt') | |
123 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 160 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
124 | 161 |
125 test_tool.ParseOptions(options) | 162 test_tool.ParseOptions(options) |
126 | 163 |
127 test_tool.ExtractEventsFromSources() | 164 test_tool.ExtractEventsFromSources() |
128 | 165 |
129 expected_output = [ | 166 expected_output = [ |
130 b'', | 167 b'', |
131 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 168 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
132 b'Source type\t: storage media image', | 169 b'Source type\t: storage media image', |
(...skipping 10 matching lines...) Expand all Loading... | |
143 """Tests the ExtractEventsFromSources function on a single partition.""" | 180 """Tests the ExtractEventsFromSources function on a single partition.""" |
144 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 181 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
145 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 182 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
146 | 183 |
147 options = test_lib.TestOptions() | 184 options = test_lib.TestOptions() |
148 options.quiet = True | 185 options.quiet = True |
149 options.status_view_mode = u'none' | 186 options.status_view_mode = u'none' |
150 options.source = self._GetTestFilePath([u'ímynd.dd']) | 187 options.source = self._GetTestFilePath([u'ímynd.dd']) |
151 | 188 |
152 with shared_test_lib.TempDirectory() as temp_directory: | 189 with shared_test_lib.TempDirectory() as temp_directory: |
153 options.analysis_output_file = os.path.join( | 190 options.write = os.path.join(temp_directory, u'unused_output.txt') |
154 temp_directory, u'unused_output.txt') | |
155 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 191 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
156 | 192 |
157 test_tool.ParseOptions(options) | 193 test_tool.ParseOptions(options) |
158 | 194 |
159 test_tool.ExtractEventsFromSources() | 195 test_tool.ExtractEventsFromSources() |
160 | 196 |
161 expected_output = [ | 197 expected_output = [ |
162 b'', | 198 b'', |
163 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 199 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
164 b'Source type\t: storage media image', | 200 b'Source type\t: storage media image', |
(...skipping 13 matching lines...) Expand all Loading... | |
178 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 214 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
179 | 215 |
180 options = test_lib.TestOptions() | 216 options = test_lib.TestOptions() |
181 options.partitions = u'all' | 217 options.partitions = u'all' |
182 options.quiet = True | 218 options.quiet = True |
183 options.status_view_mode = u'none' | 219 options.status_view_mode = u'none' |
184 # Note that the source file is a RAW (VMDK flat) image. | 220 # Note that the source file is a RAW (VMDK flat) image. |
185 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) | 221 options.source = self._GetTestFilePath([u'multi_partition_image.vmdk']) |
186 | 222 |
187 with shared_test_lib.TempDirectory() as temp_directory: | 223 with shared_test_lib.TempDirectory() as temp_directory: |
188 options.analysis_output_file = os.path.join( | 224 options.write = os.path.join(temp_directory, u'unused_output.txt') |
189 temp_directory, u'unused_output.txt') | |
190 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 225 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
191 | 226 |
192 test_tool.ParseOptions(options) | 227 test_tool.ParseOptions(options) |
193 | 228 |
194 test_tool.ExtractEventsFromSources() | 229 test_tool.ExtractEventsFromSources() |
195 | 230 |
196 expected_output = [ | 231 expected_output = [ |
197 b'', | 232 b'', |
198 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 233 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
199 b'Source type\t: storage media image', | 234 b'Source type\t: storage media image', |
(...skipping 12 matching lines...) Expand all Loading... | |
212 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 247 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
213 | 248 |
214 options = test_lib.TestOptions() | 249 options = test_lib.TestOptions() |
215 options.quiet = True | 250 options.quiet = True |
216 options.single_process = True | 251 options.single_process = True |
217 options.status_view_mode = u'none' | 252 options.status_view_mode = u'none' |
218 options.source = self._GetTestFilePath([u'vsstest.qcow2']) | 253 options.source = self._GetTestFilePath([u'vsstest.qcow2']) |
219 options.vss_stores = u'all' | 254 options.vss_stores = u'all' |
220 | 255 |
221 with shared_test_lib.TempDirectory() as temp_directory: | 256 with shared_test_lib.TempDirectory() as temp_directory: |
222 options.analysis_output_file = os.path.join( | 257 options.write = os.path.join(temp_directory, u'unused_output.txt') |
223 temp_directory, u'unused_output.txt') | |
224 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 258 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
225 | 259 |
226 test_tool.ParseOptions(options) | 260 test_tool.ParseOptions(options) |
227 | 261 |
228 test_tool.ExtractEventsFromSources() | 262 test_tool.ExtractEventsFromSources() |
229 | 263 |
230 expected_output = [ | 264 expected_output = [ |
231 b'', | 265 b'', |
232 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 266 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
233 b'Source type\t: storage media image', | 267 b'Source type\t: storage media image', |
(...skipping 14 matching lines...) Expand all Loading... | |
248 """Tests the ExtractEventsFromSources function on a single file.""" | 282 """Tests the ExtractEventsFromSources function on a single file.""" |
249 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 283 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
250 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 284 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
251 | 285 |
252 options = test_lib.TestOptions() | 286 options = test_lib.TestOptions() |
253 options.quiet = True | 287 options.quiet = True |
254 options.status_view_mode = u'none' | 288 options.status_view_mode = u'none' |
255 options.source = self._GetTestFilePath([u'System.evtx']) | 289 options.source = self._GetTestFilePath([u'System.evtx']) |
256 | 290 |
257 with shared_test_lib.TempDirectory() as temp_directory: | 291 with shared_test_lib.TempDirectory() as temp_directory: |
258 options.analysis_output_file = os.path.join( | 292 options.write = os.path.join(temp_directory, u'unused_output.txt') |
259 temp_directory, u'unused_output.txt') | |
260 options.storage_file = os.path.join(temp_directory, u'storage.plaso') | 293 options.storage_file = os.path.join(temp_directory, u'storage.plaso') |
261 | 294 |
262 test_tool.ParseOptions(options) | 295 test_tool.ParseOptions(options) |
263 | 296 |
264 test_tool.ExtractEventsFromSources() | 297 test_tool.ExtractEventsFromSources() |
265 | 298 |
266 expected_output = [ | 299 expected_output = [ |
267 b'', | 300 b'', |
268 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), | 301 b'Source path\t: {0:s}'.format(options.source.encode(u'utf-8')), |
269 b'Source type\t: single file', | 302 b'Source type\t: single file', |
270 b'', | 303 b'', |
271 b'Processing started.', | 304 b'Processing started.', |
272 b'Processing completed.', | 305 b'Processing completed.', |
273 b'', | 306 b'', |
274 b''] | 307 b''] |
275 | 308 |
276 output = output_writer.ReadOutput() | 309 output = output_writer.ReadOutput() |
277 self.assertEqual(output.split(b'\n'), expected_output) | 310 self.assertEqual(output.split(b'\n'), expected_output) |
278 | 311 |
279 def testProcessStorage(self): | 312 def testProcessStorage(self): |
280 """Test the AnalyzeEvents function""" | 313 """Test the AnalyzeEvents function""" |
281 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') | 314 output_writer = test_lib.TestOutputWriter(encoding=u'utf-8') |
282 test_tool = psteal_tool.PstealTool(output_writer=output_writer) | 315 test_tool = psteal_tool.PstealTool(output_writer=output_writer) |
283 | 316 |
284 options = test_lib.TestOptions() | 317 options = test_lib.TestOptions() |
285 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) | 318 options.storage_file = self._GetTestFilePath([u'psort_test.json.plaso']) |
286 options.source = u'unused_source' | 319 options.source = u'unused_source' |
287 | 320 |
288 with shared_test_lib.TempDirectory() as temp_directory: | 321 with shared_test_lib.TempDirectory() as temp_directory: |
289 result_file_name = os.path.join(temp_directory, u'output.txt') | 322 options.write = os.path.join(temp_directory, u'output.txt') |
290 options.analysis_output_file = result_file_name | |
291 | 323 |
292 test_tool.ParseOptions(options) | 324 test_tool.ParseOptions(options) |
293 test_tool.AnalyzeEvents() | 325 test_tool.AnalyzeEvents() |
294 | 326 |
295 expected_output_file_name = self._GetTestFilePath( | 327 expected_output_file_name = self._GetTestFilePath( |
296 [u'end_to_end', u'dynamic.log']) | 328 [u'end_to_end', u'dynamic.log']) |
297 with open(expected_output_file_name, 'r') as file_object: | 329 with open(expected_output_file_name, 'r') as file_object: |
298 expected_output = file_object.read() | 330 expected_output = file_object.read() |
299 | 331 |
300 with open(result_file_name, 'r') as file_object: | 332 with open(options.write, 'r') as file_object: |
301 result_output = file_object.read() | 333 result_output = file_object.read() |
302 | 334 |
303 expected_output = sorted(expected_output.split(b'\n')) | 335 expected_output = sorted(expected_output.split(b'\n')) |
304 result_output = sorted(result_output.split(b'\n')) | 336 result_output = sorted(result_output.split(b'\n')) |
305 self.assertEqual(expected_output, result_output) | 337 self.assertEqual(expected_output, result_output) |
306 | 338 |
307 output = output_writer.ReadOutput() | 339 output = output_writer.ReadOutput() |
308 self.assertIn(u'Events processed : 38', output) | 340 self.assertIn(u'Events processed : 38', output) |
309 | 341 |
310 | 342 |
311 if __name__ == '__main__': | 343 if __name__ == '__main__': |
312 unittest.main() | 344 unittest.main() |
OLD | NEW |