OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 """Tests for the SQLite plugin interface.""" | 3 """Tests for the SQLite plugin interface.""" |
4 | 4 |
5 import sys | 5 import sys |
6 import unittest | 6 import unittest |
7 | 7 |
| 8 from plaso.containers import time_events |
| 9 from plaso.lib import eventdata |
| 10 from plaso.lib import timelib |
8 from plaso.parsers import sqlite | 11 from plaso.parsers import sqlite |
9 from plaso.parsers.sqlite_plugins import interface | 12 from plaso.parsers.sqlite_plugins import interface |
10 | 13 |
11 from tests import test_lib as shared_test_lib | 14 from tests import test_lib as shared_test_lib |
12 from tests.parsers.sqlite_plugins import test_lib | 15 from tests.parsers.sqlite_plugins import test_lib |
13 | 16 |
14 | 17 |
15 class TestSQLitePlugin(interface.SQLitePlugin): | 18 class TestSQLitePlugin(interface.SQLitePlugin): |
16 """Convenience class for a test SQLite plugin.""" | 19 """Convenience class for a test SQLite plugin.""" |
17 | 20 |
18 NAME = u'test' | 21 NAME = u'test' |
19 | 22 |
20 QUERIES = [( | 23 QUERIES = [( |
21 u'SELECT Field1, Field2, Field3 FROM MyTable', u'ParseMyTableRow')] | 24 u'SELECT Field1, Field2, Field3 FROM MyTable', u'ParseMyTableRow')] |
22 | 25 |
23 REQUIRED_TABLES = frozenset([u'MyTable']) | 26 REQUIRED_TABLES = frozenset([u'MyTable']) |
24 | 27 |
| 28 SCHEMAS = [ |
| 29 {u'MyTable': |
| 30 u'CREATE TABLE "MyTable" ( `Field1` TEXT, `Field2` INTEGER, ' |
| 31 u'`Field3` BLOB )'}] |
| 32 |
25 def __init__(self): | 33 def __init__(self): |
26 """Initializes SQLite plugin.""" | 34 """Initializes SQLite plugin.""" |
27 super(TestSQLitePlugin, self).__init__() | 35 super(TestSQLitePlugin, self).__init__() |
28 self.results = [] | 36 self.results = [] |
29 | 37 |
30 def ParseMyTableRow(self, parser_mediator, row, **unused_kwargs): | 38 def ParseMyTableRow(self, parser_mediator, row, **unused_kwargs): |
31 """Parses a MyTable row. | 39 """Parses a MyTable row. |
32 | 40 |
33 Args: | 41 Args: |
34 parser_mediator: A parser mediator object (instance of ParserMediator). | 42 parser_mediator: A parser mediator object (instance of ParserMediator). |
35 row: The row resulting from the query. | 43 row: The row resulting from the query. |
36 """ | 44 """ |
37 file_entry = parser_mediator.GetFileEntry() | 45 file_entry = parser_mediator.GetFileEntry() |
38 path_spec = file_entry.path_spec | 46 path_spec = file_entry.path_spec |
39 location = path_spec.location | 47 location = path_spec.location |
40 from_wal = location.endswith(u'-wal') | 48 from_wal = location.endswith(u'-wal') |
41 # Note that pysqlite does not accept a Unicode string in row['string'] and | 49 # Note that pysqlite does not accept a Unicode string in row['string'] and |
42 # will raise "IndexError: Index must be int or string". | 50 # will raise "IndexError: Index must be int or string". |
43 # Also, Field3 needs to be converted to a string if Python 2 is used | 51 # Also, Field3 needs to be converted to a string if Python 2 is used |
44 # because it is a read-write buffer. | 52 # because it is a read-write buffer. |
45 field3 = row['Field3'] | 53 field3 = row['Field3'] |
46 if sys.version_info[0] < 3: | 54 if sys.version_info[0] < 3: |
47 field3 = str(field3) | 55 field3 = str(field3) |
48 self.results.append(((row['Field1'], row['Field2'], field3), from_wal)) | 56 self.results.append(((row['Field1'], row['Field2'], field3), from_wal)) |
49 | 57 |
| 58 event = time_events.TimestampEvent( |
| 59 timelib.Timestamp.NONE_TIMESTAMP, eventdata.EventTimestamp.NOT_A_TIME, |
| 60 data_type=u'fake') |
| 61 event.field1 = row['Field1'] |
| 62 event.field2 = row['Field2'] |
| 63 event.field3 = field3 |
| 64 event.from_wal = location.endswith(u'-wal') |
| 65 parser_mediator.ProduceEvent(event) |
| 66 |
50 | 67 |
51 class SQLiteInterfaceTest(test_lib.SQLitePluginTestCase): | 68 class SQLiteInterfaceTest(test_lib.SQLitePluginTestCase): |
52 """Tests for the SQLite plugin interface.""" | 69 """Tests for the SQLite plugin interface.""" |
53 | 70 |
54 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db']) | 71 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db']) |
55 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db-wal']) | 72 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db-wal']) |
56 def testProcessWithWAL(self): | 73 def testProcessWithWAL(self): |
57 """Tests the Process function on a database with WAL file.""" | 74 """Tests the Process function on a database with WAL file.""" |
58 plugin_object = TestSQLitePlugin() | 75 plugin_object = TestSQLitePlugin() |
59 cache = sqlite.SQLiteCache() | 76 cache = sqlite.SQLiteCache() |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
94 ((u'Committed Text 3', 4, b'None'), False), | 111 ((u'Committed Text 3', 4, b'None'), False), |
95 ((u'Committed Text 4', 5, b'None'), False), | 112 ((u'Committed Text 4', 5, b'None'), False), |
96 ((u'Deleted Text 2', 6, b'None'), False), | 113 ((u'Deleted Text 2', 6, b'None'), False), |
97 ((u'Committed Text 5', 7, b'None'), False), | 114 ((u'Committed Text 5', 7, b'None'), False), |
98 ((u'Committed Text 6', 8, b'None'), False), | 115 ((u'Committed Text 6', 8, b'None'), False), |
99 ((u'Committed Text 7', 9, b'None'), False), | 116 ((u'Committed Text 7', 9, b'None'), False), |
100 ((u'Unhashable Row 1', 10, b'Binary Text!\x01\x02\x03'), False)] | 117 ((u'Unhashable Row 1', 10, b'Binary Text!\x01\x02\x03'), False)] |
101 | 118 |
102 self.assertEqual(expected_results, plugin_object.results) | 119 self.assertEqual(expected_results, plugin_object.results) |
103 | 120 |
| 121 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db']) |
| 122 @shared_test_lib.skipUnlessHasTestFile([u'wal_database.db-wal']) |
| 123 def testSchemaMatching(self): |
| 124 """Tests the Schema matching capabilities.""" |
| 125 plugin_object = TestSQLitePlugin() |
| 126 cache = sqlite.SQLiteCache() |
| 127 |
| 128 # Test matching schema. |
| 129 storage_writer = self._ParseDatabaseFileWithPlugin( |
| 130 [u'wal_database.db'], plugin_object, cache=cache) |
| 131 self.assertTrue(storage_writer.events) |
| 132 for event in storage_writer.events: |
| 133 self.assertTrue(event.schema_match) |
| 134 |
| 135 # Test schema change with WAL. |
| 136 wal_file = self._GetTestFilePath([u'wal_database.db-wal']) |
| 137 storage_writer = self._ParseDatabaseFileWithPlugin( |
| 138 [u'wal_database.db'], plugin_object, cache=cache, wal_path=wal_file) |
| 139 self.assertTrue(storage_writer.events) |
| 140 for event in storage_writer.events: |
| 141 if event.from_wal: |
| 142 self.assertFalse(event.schema_match) |
| 143 else: |
| 144 self.assertTrue(event.schema_match) |
| 145 |
| 146 # Add schema change from WAL file and test again. |
| 147 plugin_object.SCHEMAS.append( |
| 148 {u'MyTable': |
| 149 u'CREATE TABLE "MyTable" ( `Field1` TEXT, `Field2` INTEGER, `Field3` ' |
| 150 u'BLOB , NewField TEXT)', |
| 151 u'NewTable': |
| 152 u'CREATE TABLE NewTable(NewTableField1 TEXT, NewTableField2 TEXT)'}) |
| 153 storage_writer = self._ParseDatabaseFileWithPlugin( |
| 154 [u'wal_database.db'], plugin_object, cache=cache, wal_path=wal_file) |
| 155 self.assertTrue(storage_writer.events) |
| 156 for event in storage_writer.events: |
| 157 self.assertTrue(event.schema_match) |
| 158 |
| 159 # Test without original schema. |
| 160 del plugin_object.SCHEMAS[0] |
| 161 storage_writer = self._ParseDatabaseFileWithPlugin( |
| 162 [u'wal_database.db'], plugin_object, cache=cache, wal_path=wal_file) |
| 163 self.assertTrue(storage_writer.events) |
| 164 for event in storage_writer.events: |
| 165 if event.from_wal: |
| 166 self.assertTrue(event.schema_match) |
| 167 else: |
| 168 self.assertFalse(event.schema_match) |
| 169 |
104 | 170 |
105 if __name__ == '__main__': | 171 if __name__ == '__main__': |
106 unittest.main() | 172 unittest.main() |
OLD | NEW |