OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """Merge reader for SQLite storage files.""" | 2 """Merge reader for SQLite storage files.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | 5 |
6 import os | 6 import os |
7 import sqlite3 | 7 import sqlite3 |
| 8 import zlib |
8 | 9 |
9 from plaso.containers import errors | 10 from plaso.containers import errors |
10 from plaso.containers import event_sources | 11 from plaso.containers import event_sources |
11 from plaso.containers import events | 12 from plaso.containers import events |
12 from plaso.containers import reports | 13 from plaso.containers import reports |
13 from plaso.containers import tasks | 14 from plaso.containers import tasks |
| 15 from plaso.lib import definitions |
14 from plaso.storage import identifiers | 16 from plaso.storage import identifiers |
15 from plaso.storage import interface | 17 from plaso.storage import interface |
16 | 18 |
17 | 19 |
18 class SQLiteStorageMergeReader(interface.StorageFileMergeReader): | 20 class SQLiteStorageMergeReader(interface.StorageFileMergeReader): |
19 """SQLite-based storage file reader for merging.""" | 21 """SQLite-based storage file reader for merging.""" |
20 | 22 |
21 _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE | 23 _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE |
22 _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE | 24 _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE |
23 _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE | 25 _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE |
24 _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE | 26 _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE |
25 _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE | 27 _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE |
26 _CONTAINER_TYPE_EXTRACTION_ERROR = errors.ExtractionError.CONTAINER_TYPE | 28 _CONTAINER_TYPE_EXTRACTION_ERROR = errors.ExtractionError.CONTAINER_TYPE |
27 _CONTAINER_TYPE_TASK_COMPLETION = tasks.TaskCompletion.CONTAINER_TYPE | 29 _CONTAINER_TYPE_TASK_COMPLETION = tasks.TaskCompletion.CONTAINER_TYPE |
28 _CONTAINER_TYPE_TASK_START = tasks.TaskStart.CONTAINER_TYPE | 30 _CONTAINER_TYPE_TASK_START = tasks.TaskStart.CONTAINER_TYPE |
29 | 31 |
30 _CONTAINER_TYPES = ( | 32 _CONTAINER_TYPES = ( |
31 _CONTAINER_TYPE_EVENT_SOURCE, | 33 _CONTAINER_TYPE_EVENT_SOURCE, |
32 _CONTAINER_TYPE_EVENT_DATA, | 34 _CONTAINER_TYPE_EVENT_DATA, |
33 _CONTAINER_TYPE_EVENT, | 35 _CONTAINER_TYPE_EVENT, |
34 _CONTAINER_TYPE_EVENT_TAG, | 36 _CONTAINER_TYPE_EVENT_TAG, |
35 _CONTAINER_TYPE_EXTRACTION_ERROR, | 37 _CONTAINER_TYPE_EXTRACTION_ERROR, |
36 _CONTAINER_TYPE_ANALYSIS_REPORT) | 38 _CONTAINER_TYPE_ANALYSIS_REPORT) |
37 | 39 |
| 40 _ADD_CONTAINER_TYPE_METHODS = { |
| 41 _CONTAINER_TYPE_ANALYSIS_REPORT: '_AddAnalysisReport', |
| 42 _CONTAINER_TYPE_EVENT: '_AddEvent', |
| 43 _CONTAINER_TYPE_EVENT_DATA: '_AddEventData', |
| 44 _CONTAINER_TYPE_EVENT_SOURCE: '_AddEventSource', |
| 45 _CONTAINER_TYPE_EVENT_TAG: '_AddEventTag', |
| 46 _CONTAINER_TYPE_EXTRACTION_ERROR: '_AddError', |
| 47 } |
| 48 |
38 _TABLE_NAMES_QUERY = ( | 49 _TABLE_NAMES_QUERY = ( |
39 'SELECT name FROM sqlite_master WHERE type = "table"') | 50 'SELECT name FROM sqlite_master WHERE type = "table"') |
40 | 51 |
41 def __init__(self, storage_writer, path): | 52 def __init__(self, storage_writer, path): |
42 """Initializes a storage merge reader. | 53 """Initializes a storage merge reader. |
43 | 54 |
44 Args: | 55 Args: |
45 storage_writer (StorageWriter): storage writer. | 56 storage_writer (StorageWriter): storage writer. |
46 path (str): path to the input file. | 57 path (str): path to the input file. |
47 | 58 |
48 Raises: | 59 Raises: |
49 IOError: if the input file cannot be opened. | 60 IOError: if the input file cannot be opened. |
| 61 RuntimeError: if an add container method is missing. |
50 """ | 62 """ |
51 super(SQLiteStorageMergeReader, self).__init__(storage_writer) | 63 super(SQLiteStorageMergeReader, self).__init__(storage_writer) |
52 self._active_container_type = None | 64 self._active_container_type = None |
53 self._active_cursor = None | 65 self._active_cursor = None |
| 66 self._add_active_container_method = None |
| 67 self._add_container_type_methods = {} |
| 68 self._compression_format = definitions.COMPRESSION_FORMAT_NONE |
54 self._connection = None | 69 self._connection = None |
55 self._container_types = None | 70 self._container_types = None |
56 self._cursor = None | 71 self._cursor = None |
57 self._event_data_identifier_mappings = {} | 72 self._event_data_identifier_mappings = {} |
58 self._path = path | 73 self._path = path |
59 | 74 |
60 def _AddAttributeContainer(self, attribute_container): | 75 # Create a runtime lookup table for the add container type method this |
61 """Adds a single attribute container to the storage writer. | 76 # approach prevents having to create a series of if-else check for the |
| 77 # container types. Also we need to generate the table at runtime seeing |
| 78 # there are no forard function declarations in Python. |
| 79 for container_type, method_name in self._ADD_CONTAINER_TYPE_METHODS.items(): |
| 80 method = getattr(self, method_name, None) |
| 81 if not method: |
| 82 raise RuntimeError( |
| 83 'Add method missing for container type: {0:s}'.format( |
| 84 container_type)) |
| 85 |
| 86 self._add_container_type_methods[container_type] = method |
| 87 |
| 88 def _AddAnalysisReport(self, analysis_report): |
| 89 """Adds an analysis report. |
62 | 90 |
63 Args: | 91 Args: |
64 attribute_container (AttributeContainer): container | 92 analysis_report (AnalysisReport): analysis report. |
| 93 """ |
| 94 self._storage_writer.AddAnalysisReport(analysis_report) |
65 | 95 |
66 Raises: | 96 def _AddError(self, error): |
67 RuntimeError: if the attribute container type is not supported. | 97 """Adds an error. |
| 98 |
| 99 Args: |
| 100 error (ExtractionError): error. |
68 """ | 101 """ |
69 container_type = attribute_container.CONTAINER_TYPE | 102 self._storage_writer.AddError(error) |
70 if container_type == self._CONTAINER_TYPE_EVENT_SOURCE: | |
71 self._storage_writer.AddEventSource(attribute_container) | |
72 | 103 |
73 elif container_type == self._CONTAINER_TYPE_EVENT_DATA: | 104 def _AddEvent(self, event): |
74 identifier = attribute_container.GetIdentifier() | 105 """Adds an event. |
75 lookup_key = identifier.CopyToString() | |
76 | 106 |
77 self._storage_writer.AddEventData(attribute_container) | 107 Args: |
| 108 event (EventObject): event. |
| 109 """ |
| 110 if hasattr(event, 'event_data_row_identifier'): |
| 111 event_data_identifier = identifiers.SQLTableIdentifier( |
| 112 self._CONTAINER_TYPE_EVENT_DATA, |
| 113 event.event_data_row_identifier) |
| 114 lookup_key = event_data_identifier.CopyToString() |
78 | 115 |
79 identifier = attribute_container.GetIdentifier() | 116 event_data_identifier = self._event_data_identifier_mappings[lookup_key] |
80 self._event_data_identifier_mappings[lookup_key] = identifier | 117 event.SetEventDataIdentifier(event_data_identifier) |
81 | 118 |
82 elif container_type == self._CONTAINER_TYPE_EVENT: | 119 # TODO: add event identifier mappings for event tags. |
83 if hasattr(attribute_container, 'event_data_row_identifier'): | |
84 event_data_identifier = identifiers.SQLTableIdentifier( | |
85 self._CONTAINER_TYPE_EVENT_DATA, | |
86 attribute_container.event_data_row_identifier) | |
87 lookup_key = event_data_identifier.CopyToString() | |
88 | 120 |
89 event_data_identifier = self._event_data_identifier_mappings[lookup_key] | 121 self._storage_writer.AddEvent(event) |
90 attribute_container.SetEventDataIdentifier(event_data_identifier) | |
91 | 122 |
92 # TODO: add event identifier mappings for event tags. | 123 def _AddEventData(self, event_data): |
| 124 """Adds event data. |
93 | 125 |
94 self._storage_writer.AddEvent(attribute_container) | 126 Args: |
| 127 event_data (EventData): event data. |
| 128 """ |
| 129 identifier = event_data.GetIdentifier() |
| 130 lookup_key = identifier.CopyToString() |
95 | 131 |
96 elif container_type == self._CONTAINER_TYPE_EVENT_TAG: | 132 self._storage_writer.AddEventData(event_data) |
97 self._storage_writer.AddEventTag(attribute_container) | |
98 | 133 |
99 elif container_type == self._CONTAINER_TYPE_EXTRACTION_ERROR: | 134 identifier = event_data.GetIdentifier() |
100 self._storage_writer.AddError(attribute_container) | 135 self._event_data_identifier_mappings[lookup_key] = identifier |
101 | 136 |
102 elif container_type == self._CONTAINER_TYPE_ANALYSIS_REPORT: | 137 def _AddEventSource(self, event_source): |
103 self._storage_writer.AddAnalysisReport(attribute_container) | 138 """Adds an event source. |
104 | 139 |
105 elif container_type not in ( | 140 Args: |
106 self._CONTAINER_TYPE_TASK_COMPLETION, self._CONTAINER_TYPE_TASK_START): | 141 event_source (EventSource): event source. |
107 raise RuntimeError('Unsupported container type: {0:s}'.format( | 142 """ |
108 container_type)) | 143 self._storage_writer.AddEventSource(event_source) |
| 144 |
| 145 def _AddEventTag(self, event_tag): |
| 146 """Adds an event tag. |
| 147 |
| 148 Args: |
| 149 event_tag (EventTag): event tag. |
| 150 """ |
| 151 self._storage_writer.AddEventTag(event_tag) |
| 152 |
| 153 def _Close(self): |
| 154 """Closes the task storage after reading.""" |
| 155 self._connection.close() |
| 156 self._connection = None |
| 157 self._cursor = None |
| 158 |
| 159 def _GetContainerTypes(self): |
| 160 """Retrieves the container types to merge. |
| 161 |
| 162 Container types not defined in _CONTAINER_TYPES are ignored and not merged. |
| 163 |
| 164 Specific container types reference other container types, such |
| 165 as event referencing event data. The names are ordered to ensure the |
| 166 attribute containers are merged in the correct order. |
| 167 |
| 168 Returns: |
| 169 list[str]: names of the container types to merge. |
| 170 """ |
| 171 self._cursor.execute(self._TABLE_NAMES_QUERY) |
| 172 table_names = [row[0] for row in self._cursor.fetchall()] |
| 173 |
| 174 return [ |
| 175 table_name for table_name in self._CONTAINER_TYPES |
| 176 if table_name in table_names] |
| 177 |
| 178 def _Open(self): |
| 179 """Opens the task storage for reading.""" |
| 180 self._connection = sqlite3.connect( |
| 181 self._path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) |
| 182 self._cursor = self._connection.cursor() |
| 183 |
| 184 def _ReadStorageMetadata(self): |
| 185 """Reads the task storage metadata.""" |
| 186 query = 'SELECT key, value FROM metadata' |
| 187 self._cursor.execute(query) |
| 188 |
| 189 metadata_values = {row[0]: row[1] for row in self._cursor.fetchall()} |
| 190 |
| 191 self._compression_format = metadata_values['compression_format'] |
| 192 |
| 193 def _PrepareForNextContainerType(self): |
| 194 """Prepares for the next container type. |
| 195 |
| 196 This method prepares the task storage for merging the next container type. |
| 197 It set the active container type, its add method and active cursor |
| 198 accordingly. |
| 199 """ |
| 200 self._active_container_type = self._container_types.pop(0) |
| 201 |
| 202 self._add_active_container_method = self._add_container_type_methods.get( |
| 203 self._active_container_type) |
| 204 |
| 205 query = 'SELECT _identifier, _data FROM {0:s}'.format( |
| 206 self._active_container_type) |
| 207 self._cursor.execute(query) |
| 208 |
| 209 self._active_cursor = self._cursor |
109 | 210 |
110 def MergeAttributeContainers( | 211 def MergeAttributeContainers( |
111 self, callback=None, maximum_number_of_containers=0): | 212 self, callback=None, maximum_number_of_containers=0): |
112 """Reads attribute containers from a task storage file into the writer. | 213 """Reads attribute containers from a task storage file into the writer. |
113 | 214 |
114 Args: | 215 Args: |
115 callback (function[StorageWriter, AttributeContainer]): function to call | 216 callback (function[StorageWriter, AttributeContainer]): function to call |
116 after each attribute container is deserialized. | 217 after each attribute container is deserialized. |
117 maximum_number_of_containers (Optional[int]): maximum number of | 218 maximum_number_of_containers (Optional[int]): maximum number of |
118 containers to merge, where 0 represent no limit. | 219 containers to merge, where 0 represent no limit. |
119 | 220 |
120 Returns: | 221 Returns: |
121 bool: True if the entire task storage file has been merged. | 222 bool: True if the entire task storage file has been merged. |
122 | 223 |
123 Raises: | 224 Raises: |
| 225 RuntimeError: if the add method for the active attribute container |
| 226 type is missing. |
124 OSError: if the task storage file cannot be deleted. | 227 OSError: if the task storage file cannot be deleted. |
125 """ | 228 """ |
126 if not self._cursor: | 229 if not self._cursor: |
127 self._connection = sqlite3.connect( | 230 self._Open() |
128 self._path, | 231 self._ReadStorageMetadata() |
129 detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) | 232 self._container_types = self._GetContainerTypes() |
130 self._cursor = self._connection.cursor() | |
131 | |
132 self._cursor.execute(self._TABLE_NAMES_QUERY) | |
133 table_names = [row[0] for row in self._cursor.fetchall()] | |
134 | |
135 # Remove container types not stored in the storage file but keep | |
136 # the container types list in order. | |
137 self._container_types = list(self._CONTAINER_TYPES) | |
138 for name in set(self._CONTAINER_TYPES).difference(table_names): | |
139 self._container_types.remove(name) | |
140 | 233 |
141 number_of_containers = 0 | 234 number_of_containers = 0 |
142 while self._active_cursor or self._container_types: | 235 while self._active_cursor or self._container_types: |
143 if not self._active_cursor: | 236 if not self._active_cursor: |
144 self._active_container_type = self._container_types.pop(0) | 237 self._PrepareForNextContainerType() |
145 | |
146 query = 'SELECT _identifier, _data FROM {0:s}'.format( | |
147 self._active_container_type) | |
148 self._cursor.execute(query) | |
149 | |
150 self._active_cursor = self._cursor | |
151 | 238 |
152 if maximum_number_of_containers > 0: | 239 if maximum_number_of_containers > 0: |
153 number_of_rows = maximum_number_of_containers - number_of_containers | 240 number_of_rows = maximum_number_of_containers - number_of_containers |
154 rows = self._active_cursor.fetchmany(size=number_of_rows) | 241 rows = self._active_cursor.fetchmany(size=number_of_rows) |
155 else: | 242 else: |
156 rows = self._active_cursor.fetchall() | 243 rows = self._active_cursor.fetchall() |
157 | 244 |
158 if not rows: | 245 if not rows: |
159 self._active_cursor = None | 246 self._active_cursor = None |
160 continue | 247 continue |
161 | 248 |
162 for row in rows: | 249 for row in rows: |
163 identifier = identifiers.SQLTableIdentifier( | 250 identifier = identifiers.SQLTableIdentifier( |
164 self._active_container_type, row[0]) | 251 self._active_container_type, row[0]) |
165 | 252 |
166 serialized_data = row[1] | 253 if self._compression_format == definitions.COMPRESSION_FORMAT_ZLIB: |
| 254 serialized_data = zlib.decompress(row[1]) |
| 255 else: |
| 256 serialized_data = row[1] |
167 | 257 |
168 attribute_container = self._DeserializeAttributeContainer( | 258 attribute_container = self._DeserializeAttributeContainer( |
169 self._active_container_type, serialized_data) | 259 self._active_container_type, serialized_data) |
170 attribute_container.SetIdentifier(identifier) | 260 attribute_container.SetIdentifier(identifier) |
171 | 261 |
172 if self._active_container_type == self._CONTAINER_TYPE_EVENT_TAG: | 262 if self._active_container_type == self._CONTAINER_TYPE_EVENT_TAG: |
173 event_identifier = identifiers.SQLTableIdentifier( | 263 event_identifier = identifiers.SQLTableIdentifier( |
174 self._CONTAINER_TYPE_EVENT, | 264 self._CONTAINER_TYPE_EVENT, |
175 attribute_container.event_row_identifier) | 265 attribute_container.event_row_identifier) |
176 attribute_container.SetEventIdentifier(event_identifier) | 266 attribute_container.SetEventIdentifier(event_identifier) |
177 | 267 |
178 del attribute_container.event_row_identifier | 268 del attribute_container.event_row_identifier |
179 | 269 |
180 if callback: | 270 if callback: |
181 callback(self._storage_writer, attribute_container) | 271 callback(self._storage_writer, attribute_container) |
182 | 272 |
183 self._AddAttributeContainer(attribute_container) | 273 self._add_active_container_method(attribute_container) |
184 | 274 |
185 number_of_containers += 1 | 275 number_of_containers += 1 |
186 | 276 |
187 if (maximum_number_of_containers > 0 and | 277 if (maximum_number_of_containers > 0 and |
188 number_of_containers >= maximum_number_of_containers): | 278 number_of_containers >= maximum_number_of_containers): |
189 return False | 279 return False |
190 | 280 |
191 self._connection.close() | 281 self._Close() |
192 self._connection = None | |
193 self._cursor = None | |
194 | 282 |
195 os.remove(self._path) | 283 os.remove(self._path) |
196 | 284 |
197 return True | 285 return True |
OLD | NEW |