Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 #!/usr/bin/python | |
1 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
2 """A helper script used to get and format the database schema for sqlite | 3 """Script to extract the database schema from SQLite database files. |
3 plugins. The resulting schema will be placed into your clipboard which can then | 4 |
5 The resulting schema will be placed into your clipboard which can then | |
4 be pasted directly into your plugin. | 6 be pasted directly into your plugin. |
5 | 7 |
6 Requires jinja2 and pyperclip python modules. | 8 This script requires the pyperclip Python module. |
7 """ | 9 """ |
8 | 10 |
11 from __future__ import print_function | |
9 import argparse | 12 import argparse |
10 import os | 13 import os |
11 import jinja2 # pylint: disable=import-error | 14 import sys |
15 import textwrap | |
16 | |
12 import pyperclip # pylint: disable=import-error | 17 import pyperclip # pylint: disable=import-error |
13 | 18 |
14 from plaso.parsers import sqlite | 19 # Change PYTHONPATH to include plaso. |
20 sys.path.insert(0, u'.') | |
21 | |
22 from plaso.parsers import sqlite # pylint: disable=wrong-import-position | |
15 | 23 |
16 | 24 |
17 def _existing_path(path): | 25 class SQLiteSchemaExtractor(object): |
18 """Validates an existing file path.""" | 26 """SQLite database file schema extractor.""" |
19 if not os.path.exists(path): | |
20 raise argparse.ArgumentTypeError( | |
21 u'Could not find path: {0:s}'.format(path)) | |
22 return path | |
23 | 27 |
28 def GetDatabaseSchema(self, database_path, wal_path=None): | |
29 """Retrieves schema from given database. | |
24 | 30 |
25 def GetDatabaseSchema(database_path, wal_path=None): | 31 Args: |
26 """Retrieves schema from given database. | 32 database_path (str): file path to database. |
33 wal_path (Optional[str]): file path to WAL file. | |
27 | 34 |
28 Args: | 35 Returns: |
29 database_path (str): file path to database. | 36 dict[str, str]: schema as an SQL query per table name. |
30 wal_path (Optional[str]): file path to wal file. | 37 """ |
38 database = sqlite.SQLiteDatabase('database.db') | |
31 | 39 |
32 Returns: | 40 with open(database_path, u'rb') as file_object: |
33 database schema as a dictionary of {table_name: SQLCommand} | |
34 """ | |
35 database = sqlite.SQLiteDatabase('database.db') | |
36 with open(database_path, u'rb') as file_object: | |
37 if wal_path: | |
38 wal_file_object = open(wal_path, u'rb') | |
39 else: | |
40 wal_file_object = None | 41 wal_file_object = None |
42 if wal_path: | |
43 wal_file_object = open(wal_path, u'rb') | |
41 | 44 |
42 try: | 45 try: |
43 database.Open(file_object, wal_file_object=wal_file_object) | 46 database.Open(file_object, wal_file_object=wal_file_object) |
44 schema = database.schema | 47 schema = database.schema |
45 finally: | |
46 database.Close() | |
47 if wal_file_object: | |
48 wal_file_object.close() | |
49 return schema | |
50 | 48 |
49 finally: | |
50 database.Close() | |
51 | 51 |
52 def GetFormattedSchema(schema): | 52 if wal_file_object: |
53 """Formats schema into a properly wordwrapped string. | 53 wal_file_object.close() |
54 | 54 |
55 Args: | 55 return schema |
56 schema (dict): database schema | |
57 | 56 |
58 Returns: | 57 def FormatSchema(self, schema): |
59 formatted string | 58 """Formats a schema into a word-wrapped string. |
60 """ | 59 |
61 schema = { | 60 Args: |
62 table: u' '.join(query.split()).replace(u'\'', u'\\\'') | 61 schema (dict[str, str]): schema as an SQL query per table name. |
63 for table, query in schema.items()} | 62 |
64 env = jinja2.Environment(trim_blocks=True, lstrip_blocks=True) | 63 Returns: |
65 template = env.from_string(''' | 64 str: schema formated as word-wrapped string. |
66 {% for table, query in schema|dictsort %} | 65 """ |
67 {% if loop.first %} | 66 textwrapper = textwrap.TextWrapper() |
68 {u'{{ table }}': | 67 textwrapper.break_long_words = False |
69 {% else %} | 68 textwrapper.drop_whitespace = True |
70 u'{{ table }}': | 69 textwrapper.width = 80 - (10 + 4) |
71 {% endif %} | 70 |
72 u'{{ query|wordwrap(67, wrapstring=" '\n u'") }}' | 71 lines = [] |
73 {%- if not loop.last -%} | 72 table_index = 1 |
74 , | 73 number_of_tables = len(schema) |
75 {% else -%} | 74 for table_name, query in schema.items(): |
dc3.plaso
2017/05/09 18:52:44
The original Jinja2 template was using dictsort to
Joachim Metz
2017/05/28 12:02:46
Acknowledged.
| |
76 } | 75 line = u' u\'{0:s}\': ('.format(table_name) |
77 {%- endif %} | 76 lines.append(line) |
78 {%- endfor %}''') | 77 |
79 return template.render(schema=schema) | 78 query = query.replace(u'\'', u'\\\'') |
79 query = textwrapper.wrap(query) | |
80 query = [u'{0:s}u\'{1:s} \''.format(u' ' * 10, line) for line in query] | |
81 | |
82 if table_index == number_of_tables: | |
83 query[-1] = u'{0:s}\')}}]'.format(query[-1][:-2]) | |
84 else: | |
85 query[-1] = u'{0:s}\'),'.format(query[-1][:-2]) | |
86 | |
87 lines.extend(query) | |
88 table_index += 1 | |
89 | |
90 return u'\n'.join(lines) | |
80 | 91 |
81 | 92 |
82 if __name__ == u'__main__': | 93 if __name__ == u'__main__': |
83 arg_parser = argparse.ArgumentParser() | 94 argument_parser = argparse.ArgumentParser() |
84 arg_parser.add_argument( | 95 |
85 u'database_path', type=_existing_path, | 96 argument_parser.add_argument( |
97 u'database_path', type=str, | |
86 help=u'The path to the database file to extract schema from.') | 98 help=u'The path to the database file to extract schema from.') |
87 arg_parser.add_argument( | |
88 u'wal_path', type=_existing_path, nargs=u'?', default=None, | |
89 help=u'Optional path to a wal file to commit into the database.') | |
90 options = arg_parser.parse_args() | |
91 | 99 |
92 database_schema = GetDatabaseSchema(options.database_path, options.wal_path) | 100 argument_parser.add_argument( |
93 database_schema = GetFormattedSchema(database_schema) | 101 u'wal_path', type=str, nargs=u'?', default=None, |
102 help=u'Optional path to a WAL file to commit into the database.') | |
103 | |
104 options = argument_parser.parse_args() | |
105 | |
106 if not os.path.exists(options.database_path): | |
107 print(u'No such database file: {0:s}'.format(options.database_path)) | |
108 sys.exit(1) | |
109 | |
110 if options.wal_path and not os.path.exists(options.wal_path): | |
111 print(u'No such WAL file: {0:s}'.format(options.wal_path)) | |
112 sys.exit(1) | |
113 | |
114 extractor = SQLiteSchemaExtractor() | |
115 | |
116 database_schema = extractor.GetDatabaseSchema( | |
117 options.database_path, options.wal_path) | |
118 | |
119 database_schema = extractor.FormatSchema(database_schema) | |
120 | |
94 pyperclip.copy(database_schema) | 121 pyperclip.copy(database_schema) |
122 | |
123 sys.exit(0) | |
LEFT | RIGHT |