OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """gzip-based storage. | 2 """gzip-based storage. |
3 | 3 |
4 Only supports task storage at the moment. | 4 Only supports task storage at the moment. |
5 """ | 5 """ |
6 | 6 |
7 import gzip | 7 import gzip |
8 import os | 8 import os |
9 import time | 9 import time |
10 | 10 |
11 from plaso.lib import definitions | 11 from plaso.lib import definitions |
12 from plaso.lib import platform_specific | 12 from plaso.lib import platform_specific |
| 13 from plaso.storage import event_heaps |
13 from plaso.storage import identifiers | 14 from plaso.storage import identifiers |
14 from plaso.storage import interface | 15 from plaso.storage import interface |
15 | 16 |
16 | 17 |
17 class GZIPStorageFile(interface.BaseFileStorage): | 18 class GZIPStorageFile(interface.BaseFileStorage): |
18 """gzip-based storage file.""" | 19 """gzip-based storage file.""" |
19 | 20 |
20 # pylint: disable=abstract-method | 21 # pylint: disable=abstract-method |
21 | 22 |
22 _COMPRESSION_LEVEL = 9 | 23 _COMPRESSION_LEVEL = 9 |
23 | 24 |
24 _DATA_BUFFER_SIZE = 1 * 1024 * 1024 | 25 _DATA_BUFFER_SIZE = 1 * 1024 * 1024 |
25 | 26 |
26 def __init__(self, storage_type=definitions.STORAGE_TYPE_TASK): | 27 def __init__(self, storage_type=definitions.STORAGE_TYPE_TASK): |
27 """Initializes a storage. | 28 """Initializes a storage. |
28 | 29 |
29 Args: | 30 Args: |
30 storage_type (Optional[str]): storage type. | 31 storage_type (Optional[str]): storage type. |
31 | 32 |
32 Raises: | 33 Raises: |
33 ValueError: if the storage type is not supported. | 34 ValueError: if the storage type is not supported. |
34 """ | 35 """ |
35 if storage_type != definitions.STORAGE_TYPE_TASK: | 36 if storage_type != definitions.STORAGE_TYPE_TASK: |
36 raise ValueError(u'Unsupported storage type: {0:s}.'.format( | 37 raise ValueError(u'Unsupported storage type: {0:s}.'.format( |
37 storage_type)) | 38 storage_type)) |
38 | 39 |
39 super(GZIPStorageFile, self).__init__() | 40 super(GZIPStorageFile, self).__init__() |
40 self._attribute_containers = {} | 41 self._attribute_containers = {} |
41 self._gzip_file = None | 42 self._gzip_file = None |
| 43 self._number_of_containers = 0 |
42 | 44 |
43 def _AddAttributeContainer(self, attribute_container): | 45 def _AddAttributeContainer(self, attribute_container): |
44 """Adds an attribute container. | 46 """Adds an attribute container. |
45 | 47 |
46 Args: | 48 Args: |
47 attribute_container (AttributeContainer): attribute container. | 49 attribute_container (AttributeContainer): attribute container. |
48 """ | 50 """ |
49 container_type = attribute_container.CONTAINER_TYPE | 51 container_list = self._GetAttributeContainerList( |
50 if container_type not in self._attribute_containers: | 52 attribute_container.CONTAINER_TYPE) |
51 self._attribute_containers[container_type] = [] | |
52 | 53 |
53 self._attribute_containers[container_type].append(attribute_container) | 54 container_list.append(attribute_container) |
54 | 55 |
55 def _GetAttributeContainerList(self, container_type): | 56 def _GetAttributeContainerList(self, container_type): |
56 """Retrieves an attribute container list. | 57 """Retrieves an attribute container list. |
57 | 58 |
58 Args: | 59 Args: |
59 container_type (str): attribute container type. | 60 container_type (str): attribute container type. |
60 | 61 |
61 Returns: | 62 Returns: |
62 list[AttributeContainer]: attribute container list. | 63 list[AttributeContainer]: attribute container list. |
63 """ | 64 """ |
64 return self._attribute_containers.get(container_type, []) | 65 if container_type not in self._attribute_containers: |
| 66 self._attribute_containers[container_type] = [] |
| 67 |
| 68 return self._attribute_containers[container_type] |
65 | 69 |
66 def _OpenRead(self): | 70 def _OpenRead(self): |
67 """Opens the storage file for reading.""" | 71 """Opens the storage file for reading.""" |
68 # Do not use gzip.readlines() here since it can consume a large amount | 72 # Do not use gzip.readlines() here since it can consume a large amount |
69 # of memory. | 73 # of memory. |
70 data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) | 74 data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) |
71 while data_buffer: | 75 while data_buffer: |
72 lines = data_buffer.splitlines(True) | 76 lines = data_buffer.splitlines(True) |
73 data_buffer = b'' | 77 data_buffer = b'' |
74 for index, line in enumerate(lines): | 78 for index, line in enumerate(lines): |
75 if line.endswith(b'\n'): | 79 if line.endswith(b'\n'): |
76 attribute_container = self._DeserializeAttributeContainer( | 80 attribute_container = self._DeserializeAttributeContainer( |
77 u'attribute_container', line) | 81 u'attribute_container', line) |
78 self._AddAttributeContainer(attribute_container) | 82 self._AddAttributeContainer(attribute_container) |
79 else: | 83 else: |
80 data_buffer = b''.join(lines[index:]) | 84 data_buffer = b''.join(lines[index:]) |
81 data_buffer = data_buffer + self._gzip_file.read( | 85 data_buffer = data_buffer + self._gzip_file.read( |
82 self._DATA_BUFFER_SIZE) | 86 self._DATA_BUFFER_SIZE) |
83 | 87 |
84 def _WriteAttributeContainer(self, attribute_container): | 88 def _WriteAttributeContainer(self, attribute_container): |
85 """Writes an attribute container. | 89 """Writes an attribute container. |
86 | 90 |
87 Args: | 91 Args: |
88 attribute_container (AttributeContainer): attribute container. | 92 attribute_container (AttributeContainer): attribute container. |
| 93 """ |
| 94 identifier = identifiers.SerializedStreamIdentifier( |
| 95 1, self._number_of_containers) |
| 96 attribute_container.SetIdentifier(identifier) |
89 | 97 |
90 Raises: | 98 serialized_data = self._SerializeAttributeContainer(attribute_container) |
91 IOError: when the storage file is closed or read-only. | |
92 """ | |
93 if not self._is_open: | |
94 raise IOError(u'Unable to write to closed storage file.') | |
95 | 99 |
96 if self._read_only: | 100 self._number_of_containers += 1 |
97 raise IOError(u'Unable to write to read-only storage file.') | |
98 | 101 |
99 attribute_container_identifier = identifiers.SerializedStreamIdentifier( | 102 self._gzip_file.write(serialized_data) |
100 1, len(self._attribute_containers)) | |
101 attribute_container.SetIdentifier(attribute_container_identifier) | |
102 | |
103 attribute_container_data = self._SerializeAttributeContainer( | |
104 attribute_container) | |
105 self._gzip_file.write(attribute_container_data) | |
106 self._gzip_file.write(b'\n') | 103 self._gzip_file.write(b'\n') |
107 | 104 |
108 def AddAnalysisReport(self, analysis_report): | 105 def AddAnalysisReport(self, analysis_report): |
109 """Adds an analysis report. | 106 """Adds an analysis report. |
110 | 107 |
111 Args: | 108 Args: |
112 analysis_report (AnalysisReport): analysis report. | 109 analysis_report (AnalysisReport): analysis report. |
| 110 |
| 111 Raises: |
| 112 IOError: when the storage file is closed or read-only. |
113 """ | 113 """ |
| 114 self._RaiseIfNotWritable() |
| 115 |
114 self._WriteAttributeContainer(analysis_report) | 116 self._WriteAttributeContainer(analysis_report) |
115 | 117 |
116 def AddError(self, error): | 118 def AddError(self, error): |
117 """Adds an error. | 119 """Adds an error. |
118 | 120 |
119 Args: | 121 Args: |
120 error (ExtractionError): error. | 122 error (ExtractionError): error. |
| 123 |
| 124 Raises: |
| 125 IOError: when the storage file is closed or read-only. |
121 """ | 126 """ |
| 127 self._RaiseIfNotWritable() |
| 128 |
122 self._WriteAttributeContainer(error) | 129 self._WriteAttributeContainer(error) |
123 | 130 |
124 def AddEvent(self, event): | 131 def AddEvent(self, event): |
125 """Adds an event. | 132 """Adds an event. |
126 | 133 |
127 Args: | 134 Args: |
128 event (EventObject): event. | 135 event (EventObject): event. |
| 136 |
| 137 Raises: |
| 138 IOError: when the storage file is closed or read-only. |
129 """ | 139 """ |
| 140 self._RaiseIfNotWritable() |
| 141 |
| 142 # TODO: change to no longer allow event_data_identifier is None |
| 143 # after refactoring every parser to generate event data. |
| 144 event_data_identifier = event.GetEventDataIdentifier() |
| 145 if event_data_identifier: |
| 146 if not isinstance( |
| 147 event_data_identifier, identifiers.SerializedStreamIdentifier): |
| 148 raise IOError(u'Unsupported event data identifier type: {0:s}'.format( |
| 149 type(event_data_identifier))) |
| 150 |
| 151 event.event_data_stream_number = event_data_identifier.stream_number |
| 152 event.event_data_entry_index = event_data_identifier.entry_index |
| 153 |
130 self._WriteAttributeContainer(event) | 154 self._WriteAttributeContainer(event) |
131 | 155 |
| 156 def AddEventData(self, event_data): |
| 157 """Adds event data. |
| 158 |
| 159 Args: |
| 160 event_data (EventData): event data. |
| 161 |
| 162 Raises: |
| 163 IOError: when the storage file is closed or read-only. |
| 164 """ |
| 165 self._RaiseIfNotWritable() |
| 166 |
| 167 self._WriteAttributeContainer(event_data) |
| 168 |
132 def AddEventSource(self, event_source): | 169 def AddEventSource(self, event_source): |
133 """Adds an event source. | 170 """Adds an event source. |
134 | 171 |
135 Args: | 172 Args: |
136 event_source (EventSource): event source. | 173 event_source (EventSource): event source. |
| 174 |
| 175 Raises: |
| 176 IOError: when the storage file is closed or read-only. |
137 """ | 177 """ |
| 178 self._RaiseIfNotWritable() |
| 179 |
138 self._WriteAttributeContainer(event_source) | 180 self._WriteAttributeContainer(event_source) |
139 | 181 |
140 def AddEventTag(self, event_tag): | 182 def AddEventTag(self, event_tag): |
141 """Adds an event tag. | 183 """Adds an event tag. |
142 | 184 |
143 Args: | 185 Args: |
144 event_tag (EventTag): event tag. | 186 event_tag (EventTag): event tag. |
145 | 187 |
146 Raises: | 188 Raises: |
147 IOError: if the event tag event identifier type is not supported. | 189 IOError: when the storage file is closed or read-only or |
| 190 if the event tag event identifier type is not supported. |
148 """ | 191 """ |
| 192 self._RaiseIfNotWritable() |
| 193 |
149 event_identifier = event_tag.GetEventIdentifier() | 194 event_identifier = event_tag.GetEventIdentifier() |
150 if not isinstance( | 195 if not isinstance( |
151 event_identifier, identifiers.SerializedStreamIdentifier): | 196 event_identifier, identifiers.SerializedStreamIdentifier): |
152 raise IOError(u'Unsupported event identifier type: {0:s}'.format( | 197 raise IOError(u'Unsupported event identifier type: {0:s}'.format( |
153 type(event_identifier))) | 198 type(event_identifier))) |
154 | 199 |
155 event_tag.event_stream_number = event_identifier.stream_number | 200 event_tag.event_stream_number = event_identifier.stream_number |
156 event_tag.event_entry_index = event_identifier.entry_index | 201 event_tag.event_entry_index = event_identifier.entry_index |
157 | 202 |
158 self._WriteAttributeContainer(event_tag) | 203 self._WriteAttributeContainer(event_tag) |
(...skipping 21 matching lines...) Expand all Loading... |
180 return iter(self._GetAttributeContainerList(u'analysis_report')) | 225 return iter(self._GetAttributeContainerList(u'analysis_report')) |
181 | 226 |
182 def GetErrors(self): | 227 def GetErrors(self): |
183 """Retrieves the errors. | 228 """Retrieves the errors. |
184 | 229 |
185 Returns: | 230 Returns: |
186 generator(ExtractionError): error generator. | 231 generator(ExtractionError): error generator. |
187 """ | 232 """ |
188 return iter(self._GetAttributeContainerList(u'extraction_error')) | 233 return iter(self._GetAttributeContainerList(u'extraction_error')) |
189 | 234 |
| 235 def GetEventData(self): |
| 236 """Retrieves event data. |
| 237 |
| 238 Returns: |
| 239 generator(EventData): event data generator. |
| 240 """ |
| 241 return iter(self._GetAttributeContainerList(u'event_data')) |
| 242 |
| 243 def GetEventDataByIdentifier(self, identifier): |
| 244 """Retrieves specific event data. |
| 245 |
| 246 Args: |
| 247 identifier (AttributeContainerIdentifier): event data identifier. |
| 248 |
| 249 Returns: |
| 250 EventData: event data or None if not available. |
| 251 """ |
| 252 raise NotImplementedError() |
| 253 |
190 def GetEvents(self): | 254 def GetEvents(self): |
191 """Retrieves the events. | 255 """Retrieves the events. |
192 | 256 |
193 Returns: | 257 Returns: |
194 generator(EventObject): event generator. | 258 generator(EventObject): event generator. |
195 """ | 259 """ |
196 return iter(self._GetAttributeContainerList(u'event')) | 260 return iter(self._GetAttributeContainerList(u'event')) |
197 | 261 |
198 def GetEventSources(self): | 262 def GetEventSources(self): |
199 """Retrieves the event sources. | 263 """Retrieves the event sources. |
200 | 264 |
201 Returns: | 265 Returns: |
202 generator(EventSource): event source generator. | 266 generator(EventSource): event source generator. |
203 """ | 267 """ |
204 return iter(self._GetAttributeContainerList(u'event_source')) | 268 return iter(self._GetAttributeContainerList(u'event_source')) |
205 | 269 |
206 def GetEventTags(self): | 270 def GetEventTags(self): |
207 """Retrieves the event tags. | 271 """Retrieves the event tags. |
208 | 272 |
209 Yields: | 273 Yields: |
210 EventTag: event tag. | 274 EventTag: event tag. |
211 """ | 275 """ |
212 for event_tag in iter(self._GetAttributeContainerList(u'event_tag')): | 276 for event_tag in iter(self._GetAttributeContainerList(u'event_tag')): |
213 event_identifier = identifiers.SerializedStreamIdentifier( | 277 event_identifier = identifiers.SerializedStreamIdentifier( |
214 event_tag.event_stream_number, event_tag.event_entry_index) | 278 event_tag.event_stream_number, event_tag.event_entry_index) |
215 event_tag.SetEventIdentifier(event_identifier) | 279 event_tag.SetEventIdentifier(event_identifier) |
216 | 280 |
217 yield event_tag | 281 yield event_tag |
218 | 282 |
219 # TODO: time_range is currently not operational, nor that events are | |
220 # returned in chronological order. Fix this. | |
221 def GetSortedEvents(self, time_range=None): | 283 def GetSortedEvents(self, time_range=None): |
222 """Retrieves the events in increasing chronological order. | 284 """Retrieves the events in increasing chronological order. |
223 | 285 |
224 Args: | 286 Args: |
225 time_range (Optional[TimeRange]): time range used to filter events | 287 time_range (Optional[TimeRange]): time range used to filter events |
226 that fall in a specific period. | 288 that fall in a specific period. |
227 | 289 |
228 Returns: | 290 Returns: |
229 generator(EventObject): event generator. | 291 generator(EventObject): event generator. |
230 """ | 292 """ |
231 return iter(self._GetAttributeContainerList(u'event')) | 293 event_heap = event_heaps.EventHeap() |
| 294 |
| 295 for event in self._GetAttributeContainerList(u'event'): |
| 296 if (time_range and ( |
| 297 event.timestamp < time_range.start_timestamp or |
| 298 event.timestamp > time_range.end_timestamp)): |
| 299 continue |
| 300 |
| 301 event_heap.PushEvent(event) |
| 302 |
| 303 return iter(event_heap.PopEvents()) |
232 | 304 |
233 def HasAnalysisReports(self): | 305 def HasAnalysisReports(self): |
234 """Determines if a storage contains analysis reports. | 306 """Determines if a storage contains analysis reports. |
235 | 307 |
236 Returns: | 308 Returns: |
237 bool: True if the storage contains analysis reports. | 309 bool: True if the storage contains analysis reports. |
238 """ | 310 """ |
239 return len(self._GetAttributeContainerList(u'analysis_report')) > 0 | 311 return len(self._GetAttributeContainerList(u'analysis_report')) > 0 |
240 | 312 |
241 def HasErrors(self): | 313 def HasErrors(self): |
242 """Determines if a storage contains extraction errors. | 314 """Determines if a storage contains extraction errors. |
243 | 315 |
244 Returns: | 316 Returns: |
245 bool: True if the storage contains extraction errors. | 317 bool: True if the storage contains extraction errors. |
246 """ | 318 """ |
247 return len(self._GetAttributeContainerList(u'extraction_error')) > 0 | 319 return len(self._GetAttributeContainerList(u'extraction_error')) > 0 |
248 | 320 |
249 def HasEventTags(self): | 321 def HasEventTags(self): |
250 """Determines if a storage contains event tags. | 322 """Determines if a storage contains event tags. |
251 | 323 |
252 Returns: | 324 Returns: |
253 bool: True if the storage contains event tags. | 325 bool: True if the storage contains event tags. |
254 """ | 326 """ |
255 return len(self._GetAttributeContainerList(u'event_tags')) > 0 | 327 return len(self._GetAttributeContainerList(u'event_tags')) > 0 |
256 | 328 |
| 329 # pylint: disable=arguments-differ |
257 def Open(self, path=None, read_only=True, **unused_kwargs): | 330 def Open(self, path=None, read_only=True, **unused_kwargs): |
258 """Opens the storage. | 331 """Opens the storage. |
259 | 332 |
260 Args: | 333 Args: |
261 path (Optional[str]): path of the storage file. | 334 path (Optional[str]): path of the storage file. |
262 read_only (Optional[bool]): True if the file should be opened in | 335 read_only (Optional[bool]): True if the file should be opened in |
263 read-only mode. | 336 read-only mode. |
264 | 337 |
265 Raises: | 338 Raises: |
266 IOError: if the storage file is already opened. | 339 IOError: if the storage file is already opened. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
302 Args: | 375 Args: |
303 session_start (SessionStart): session start information. | 376 session_start (SessionStart): session start information. |
304 """ | 377 """ |
305 raise NotImplementedError() | 378 raise NotImplementedError() |
306 | 379 |
307 def WriteTaskCompletion(self, task_completion): | 380 def WriteTaskCompletion(self, task_completion): |
308 """Writes task completion information. | 381 """Writes task completion information. |
309 | 382 |
310 Args: | 383 Args: |
311 task_completion (TaskCompletion): task completion information. | 384 task_completion (TaskCompletion): task completion information. |
| 385 |
| 386 Raises: |
| 387 IOError: when the storage file is closed or read-only. |
312 """ | 388 """ |
| 389 self._RaiseIfNotWritable() |
| 390 |
313 self._WriteAttributeContainer(task_completion) | 391 self._WriteAttributeContainer(task_completion) |
314 | 392 |
315 def WriteTaskStart(self, task_start): | 393 def WriteTaskStart(self, task_start): |
316 """Writes task start information. | 394 """Writes task start information. |
317 | 395 |
318 Args: | 396 Args: |
319 task_start (TaskStart): task start information. | 397 task_start (TaskStart): task start information. |
| 398 |
| 399 Raises: |
| 400 IOError: when the storage file is closed or read-only. |
320 """ | 401 """ |
| 402 self._RaiseIfNotWritable() |
| 403 |
321 self._WriteAttributeContainer(task_start) | 404 self._WriteAttributeContainer(task_start) |
322 | 405 |
323 | 406 |
324 class GZIPStorageMergeReader(interface.FileStorageMergeReader): | 407 class GZIPStorageMergeReader(interface.FileStorageMergeReader): |
325 """Class that implements a gzip-based storage file reader for merging.""" | 408 """Class that implements a gzip-based storage file reader for merging.""" |
326 | 409 |
327 _DATA_BUFFER_SIZE = 1 * 1024 * 1024 | 410 _DATA_BUFFER_SIZE = 1 * 1024 * 1024 |
328 _MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 4 | 411 _MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 4 |
329 _LOCKED_FILE_SLEEP_TIME = 0.5 | 412 _LOCKED_FILE_SLEEP_TIME = 0.5 |
330 | 413 |
(...skipping 17 matching lines...) Expand all Loading... |
348 if attempt == (self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS - 1): | 431 if attempt == (self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS - 1): |
349 raise | 432 raise |
350 time.sleep(self._LOCKED_FILE_SLEEP_TIME) | 433 time.sleep(self._LOCKED_FILE_SLEEP_TIME) |
351 | 434 |
352 if platform_specific.PlatformIsWindows(): | 435 if platform_specific.PlatformIsWindows(): |
353 file_handle = gzip_file.fileno() | 436 file_handle = gzip_file.fileno() |
354 platform_specific.DisableWindowsFileHandleInheritance(file_handle) | 437 platform_specific.DisableWindowsFileHandleInheritance(file_handle) |
355 | 438 |
356 super(GZIPStorageMergeReader, self).__init__(storage_writer) | 439 super(GZIPStorageMergeReader, self).__init__(storage_writer) |
357 self._data_buffer = None | 440 self._data_buffer = None |
| 441 self._event_data_identifier_mappings = {} |
358 self._gzip_file = gzip_file | 442 self._gzip_file = gzip_file |
| 443 self._number_of_containers = 0 |
359 self._path = path | 444 self._path = path |
360 | 445 |
361 def _AddAttributeContainer(self, attribute_container): | 446 def _AddAttributeContainer(self, attribute_container): |
362 """Adds a single attribute container to the storage writer. | 447 """Adds a single attribute container to the storage writer. |
363 | 448 |
364 Args: | 449 Args: |
365 attribute_container (AttributeContainer): container | 450 attribute_container (AttributeContainer): container |
366 | 451 |
367 Raises: | 452 Raises: |
368 RuntimeError: if the attribute container type is not supported. | 453 RuntimeError: if the attribute container type is not supported. |
369 """ | 454 """ |
370 container_type = attribute_container.CONTAINER_TYPE | 455 container_type = attribute_container.CONTAINER_TYPE |
371 if container_type == u'event_source': | 456 if container_type == u'event_source': |
372 self._storage_writer.AddEventSource(attribute_container) | 457 self._storage_writer.AddEventSource(attribute_container) |
373 | 458 |
| 459 elif container_type == u'event_data': |
| 460 identifier = attribute_container.GetIdentifier() |
| 461 lookup_key = identifier.CopyToString() |
| 462 |
| 463 self._storage_writer.AddEventData(attribute_container) |
| 464 |
| 465 identifier = attribute_container.GetIdentifier() |
| 466 self._event_data_identifier_mappings[lookup_key] = identifier |
| 467 |
374 elif container_type == u'event': | 468 elif container_type == u'event': |
| 469 if (hasattr(attribute_container, u'event_data_stream_number') and |
| 470 hasattr(attribute_container, u'event_data_entry_index')): |
| 471 event_data_identifier = identifiers.SerializedStreamIdentifier( |
| 472 attribute_container.event_data_stream_number, |
| 473 attribute_container.event_data_entry_index) |
| 474 lookup_key = event_data_identifier.CopyToString() |
| 475 |
| 476 event_data_identifier = self._event_data_identifier_mappings[lookup_key] |
| 477 attribute_container.SetEventDataIdentifier(event_data_identifier) |
| 478 |
| 479 del attribute_container.event_data_stream_number |
| 480 del attribute_container.event_data_entry_index |
| 481 |
| 482 # TODO: add event identifier mappings for event tags. |
| 483 |
375 self._storage_writer.AddEvent(attribute_container) | 484 self._storage_writer.AddEvent(attribute_container) |
376 | 485 |
377 elif container_type == u'event_tag': | 486 elif container_type == u'event_tag': |
378 event_identifier = identifiers.SerializedStreamIdentifier( | 487 event_identifier = identifiers.SerializedStreamIdentifier( |
379 attribute_container.event_stream_number, | 488 attribute_container.event_stream_number, |
380 attribute_container.event_entry_index) | 489 attribute_container.event_entry_index) |
381 attribute_container.SetEventIdentifier(event_identifier) | 490 attribute_container.SetEventIdentifier(event_identifier) |
382 | 491 |
383 self._storage_writer.AddEventTag(attribute_container) | 492 self._storage_writer.AddEventTag(attribute_container) |
384 | 493 |
(...skipping 17 matching lines...) Expand all Loading... |
402 Returns: | 511 Returns: |
403 bool: True if the entire task storage file has been merged. | 512 bool: True if the entire task storage file has been merged. |
404 | 513 |
405 Raises: | 514 Raises: |
406 OSError: if the task storage file cannot be deleted. | 515 OSError: if the task storage file cannot be deleted. |
407 """ | 516 """ |
408 if not self._data_buffer: | 517 if not self._data_buffer: |
409 # Do not use gzip.readlines() here since it can consume a large amount | 518 # Do not use gzip.readlines() here since it can consume a large amount |
410 # of memory. | 519 # of memory. |
411 self._data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) | 520 self._data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) |
| 521 self._number_of_containers = 0 |
412 | 522 |
413 number_of_containers = 0 | 523 number_of_containers = 0 |
414 while self._data_buffer: | 524 while self._data_buffer: |
415 lines = self._data_buffer.splitlines(True) | 525 lines = self._data_buffer.splitlines(True) |
416 self._data_buffer = b'' | 526 self._data_buffer = b'' |
417 for index, line in enumerate(lines): | 527 for index, line in enumerate(lines): |
418 if not line.endswith(b'\n'): | 528 if not line.endswith(b'\n'): |
419 self._data_buffer = b''.join(lines[index:]) | 529 self._data_buffer = b''.join(lines[index:]) |
420 continue | 530 continue |
421 | 531 |
| 532 identifier = identifiers.SerializedStreamIdentifier( |
| 533 1, self._number_of_containers) |
| 534 |
422 attribute_container = self._DeserializeAttributeContainer( | 535 attribute_container = self._DeserializeAttributeContainer( |
423 u'attribute_container', line) | 536 u'attribute_container', line) |
| 537 attribute_container.SetIdentifier(identifier) |
| 538 |
424 self._AddAttributeContainer(attribute_container) | 539 self._AddAttributeContainer(attribute_container) |
| 540 |
| 541 self._number_of_containers += 1 |
425 number_of_containers += 1 | 542 number_of_containers += 1 |
426 | 543 |
427 if (maximum_number_of_containers > 0 and | 544 if (maximum_number_of_containers > 0 and |
428 number_of_containers >= maximum_number_of_containers): | 545 number_of_containers >= maximum_number_of_containers): |
429 self._data_buffer = b''.join(lines[index+1:]) | 546 self._data_buffer = b''.join(lines[index+1:]) |
430 return False | 547 return False |
431 | 548 |
432 additional_data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) | 549 additional_data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) |
433 self._data_buffer = b''.join([self._data_buffer, additional_data_buffer]) | 550 self._data_buffer = b''.join([self._data_buffer, additional_data_buffer]) |
434 | 551 |
(...skipping 18 matching lines...) Expand all Loading... |
453 | 570 |
454 def __init__(self, path): | 571 def __init__(self, path): |
455 """Initializes a storage reader. | 572 """Initializes a storage reader. |
456 | 573 |
457 Args: | 574 Args: |
458 path (str): path to the input file. | 575 path (str): path to the input file. |
459 """ | 576 """ |
460 super(GZIPStorageFileReader, self).__init__(path) | 577 super(GZIPStorageFileReader, self).__init__(path) |
461 self._storage_file = GZIPStorageFile() | 578 self._storage_file = GZIPStorageFile() |
462 self._storage_file.Open(path=path) | 579 self._storage_file.Open(path=path) |
OLD | NEW |