Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The storage interface classes.""" | 2 """The storage interface classes.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | 5 |
6 import abc | 6 import abc |
7 import os | 7 import os |
8 import shutil | 8 import shutil |
9 import tempfile | 9 import tempfile |
10 | 10 |
(...skipping 931 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
942 being flushed (written) to the storage. | 942 being flushed (written) to the storage. |
943 | 943 |
944 Args: | 944 Args: |
945 time_range (Optional[TimeRange]): time range used to filter events | 945 time_range (Optional[TimeRange]): time range used to filter events |
946 that fall in a specific period. | 946 that fall in a specific period. |
947 | 947 |
948 Yields: | 948 Yields: |
949 EventObject: event. | 949 EventObject: event. |
950 """ | 950 """ |
951 | 951 |
952 def FinalizeProcessedTaskStorage(self, unused_task): | |
953 """Finalizes a proccessed a task storage. | |
954 | |
955 Args: | |
956 task (Task): task. | |
957 | |
958 Raises: | |
959 NotImplementedError: since there is no implementation. | |
960 """ | |
961 raise NotImplementedError() | |
962 | |
952 @abc.abstractmethod | 963 @abc.abstractmethod |
953 def Open(self): | 964 def Open(self): |
954 """Opens the storage writer.""" | 965 """Opens the storage writer.""" |
955 | 966 |
956 def PrepareMergeTaskStorage(self, unused_task): | 967 def PrepareMergeTaskStorage(self, unused_task): |
957 """Prepares a task storage for merging. | 968 """Prepares a task storage for merging. |
958 | |
959 Args: | |
960 task (Task): task. | |
961 | |
962 Raises: | |
963 NotImplementedError: since there is no implementation. | |
964 """ | |
965 raise NotImplementedError() | |
966 | |
967 def PrepareToMergeTaskStorage(self, unused_task): | |
968 """Prepares a task storage for pending merge. | |
onager
2018/04/01 19:58:26
This prepares a task storage for preparing to merg
Joachim Metz
2018/04/02 05:01:08
Changed
| |
969 | 969 |
970 Args: | 970 Args: |
971 task (Task): task. | 971 task (Task): task. |
972 | 972 |
973 Raises: | 973 Raises: |
974 NotImplementedError: since there is no implementation. | 974 NotImplementedError: since there is no implementation. |
975 """ | 975 """ |
976 raise NotImplementedError() | 976 raise NotImplementedError() |
977 | 977 |
978 @abc.abstractmethod | 978 @abc.abstractmethod |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1048 Args: | 1048 Args: |
1049 session (Session): session the storage changes are part of. | 1049 session (Session): session the storage changes are part of. |
1050 output_file (str): path to the output file. | 1050 output_file (str): path to the output file. |
1051 storage_type (Optional[str]): storage type. | 1051 storage_type (Optional[str]): storage type. |
1052 task(Optional[Task]): task. | 1052 task(Optional[Task]): task. |
1053 """ | 1053 """ |
1054 super(StorageFileWriter, self).__init__( | 1054 super(StorageFileWriter, self).__init__( |
1055 session, storage_type=storage_type, task=task) | 1055 session, storage_type=storage_type, task=task) |
1056 self._merge_task_storage_path = '' | 1056 self._merge_task_storage_path = '' |
1057 self._output_file = output_file | 1057 self._output_file = output_file |
1058 self._processed_task_storage_path = '' | |
1058 self._storage_file = None | 1059 self._storage_file = None |
1059 self._task_storage_path = None | 1060 self._task_storage_path = None |
1060 self._to_merge_task_storage_path = '' | |
1061 | 1061 |
1062 @abc.abstractmethod | 1062 @abc.abstractmethod |
1063 def _CreateStorageFile(self): | 1063 def _CreateStorageFile(self): |
1064 """Creates a storage file. | 1064 """Creates a storage file. |
1065 | 1065 |
1066 Returns: | 1066 Returns: |
1067 BaseStorageFile: storage file. | 1067 BaseStorageFile: storage file. |
1068 """ | 1068 """ |
1069 | 1069 |
1070 @abc.abstractmethod | 1070 @abc.abstractmethod |
(...skipping 24 matching lines...) Expand all Loading... | |
1095 | 1095 |
1096 Args: | 1096 Args: |
1097 task (Task): task. | 1097 task (Task): task. |
1098 | 1098 |
1099 Returns: | 1099 Returns: |
1100 str: path of a task storage file file in the merge directory. | 1100 str: path of a task storage file file in the merge directory. |
1101 """ | 1101 """ |
1102 filename = '{0:s}.plaso'.format(task.identifier) | 1102 filename = '{0:s}.plaso'.format(task.identifier) |
1103 return os.path.join(self._merge_task_storage_path, filename) | 1103 return os.path.join(self._merge_task_storage_path, filename) |
1104 | 1104 |
1105 def _GetProcessedStorageFilePath(self, task): | |
1106 """Retrieves the path of a task storage file in the processed directory. | |
1107 | |
1108 Args: | |
1109 task (Task): task. | |
1110 | |
1111 Returns: | |
1112 str: path of a task storage file in the processed directory. | |
1113 """ | |
1114 filename = '{0:s}.plaso'.format(task.identifier) | |
1115 return os.path.join(self._processed_task_storage_path, filename) | |
1116 | |
1105 def _GetTaskStorageFilePath(self, task): | 1117 def _GetTaskStorageFilePath(self, task): |
1106 """Retrieves the path of a task storage file in the temporary directory. | 1118 """Retrieves the path of a task storage file in the temporary directory. |
1107 | 1119 |
1108 Args: | 1120 Args: |
1109 task (Task): task. | 1121 task (Task): task. |
1110 | 1122 |
1111 Returns: | 1123 Returns: |
1112 str: path of a task storage file in the temporary directory. | 1124 str: path of a task storage file in the temporary directory. |
1113 """ | 1125 """ |
1114 filename = '{0:s}.plaso'.format(task.identifier) | 1126 filename = '{0:s}.plaso'.format(task.identifier) |
1115 return os.path.join(self._task_storage_path, filename) | 1127 return os.path.join(self._task_storage_path, filename) |
1116 | |
1117 def _GetToMergeTaskStorageFilePath(self, task): | |
1118 """Retrieves the path of a task storage file in the to merge directory. | |
1119 | |
1120 Args: | |
1121 task (Task): task. | |
1122 | |
1123 Returns: | |
1124 str: path of a task storage file in the to merge directory. | |
1125 """ | |
1126 filename = '{0:s}.plaso'.format(task.identifier) | |
1127 return os.path.join(self._to_merge_task_storage_path, filename) | |
1128 | 1128 |
1129 def _UpdateCounters(self, event): | 1129 def _UpdateCounters(self, event): |
1130 """Updates the counters. | 1130 """Updates the counters. |
1131 | 1131 |
1132 Args: | 1132 Args: |
1133 event: an event (instance of EventObject). | 1133 event: an event (instance of EventObject). |
1134 """ | 1134 """ |
1135 self._session.parsers_counter['total'] += 1 | 1135 self._session.parsers_counter['total'] += 1 |
1136 | 1136 |
1137 # Here we want the name of the parser or plugin not the parser chain. | 1137 # Here we want the name of the parser or plugin not the parser chain. |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1255 Returns: | 1255 Returns: |
1256 bool: True if the task is ready to be merged. | 1256 bool: True if the task is ready to be merged. |
1257 | 1257 |
1258 Raises: | 1258 Raises: |
1259 IOError: if the storage type is not supported or | 1259 IOError: if the storage type is not supported or |
1260 if the temporary path for the task storage does not exist. | 1260 if the temporary path for the task storage does not exist. |
1261 """ | 1261 """ |
1262 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | 1262 if self._storage_type != definitions.STORAGE_TYPE_SESSION: |
1263 raise IOError('Unsupported storage type.') | 1263 raise IOError('Unsupported storage type.') |
1264 | 1264 |
1265 if not self._to_merge_task_storage_path: | 1265 if not self._processed_task_storage_path: |
1266 raise IOError('Missing to merge task storage path.') | 1266 raise IOError('Missing processed task storage path.') |
1267 | 1267 |
1268 to_merge_storage_file_path = self._GetToMergeTaskStorageFilePath(task) | 1268 processed_storage_file_path = self._GetProcessedStorageFilePath(task) |
1269 | 1269 |
1270 try: | 1270 try: |
1271 stat_info = os.stat(to_merge_storage_file_path) | 1271 stat_info = os.stat(processed_storage_file_path) |
1272 except (IOError, OSError): | 1272 except (IOError, OSError): |
1273 return False | 1273 return False |
1274 | 1274 |
1275 task.storage_file_size = stat_info.st_size | 1275 task.storage_file_size = stat_info.st_size |
1276 return True | 1276 return True |
1277 | 1277 |
1278 def Close(self): | 1278 def Close(self): |
1279 """Closes the storage writer. | 1279 """Closes the storage writer. |
1280 | 1280 |
1281 Raises: | 1281 Raises: |
1282 IOError: when the storage writer is closed. | 1282 IOError: when the storage writer is closed. |
1283 """ | 1283 """ |
1284 self._RaiseIfNotWritable() | 1284 self._RaiseIfNotWritable() |
1285 | 1285 |
1286 self._storage_file.Close() | 1286 self._storage_file.Close() |
1287 self._storage_file = None | 1287 self._storage_file = None |
1288 | 1288 |
1289 def CreateTaskStorage(self, task): | 1289 def CreateTaskStorage(self, task): |
1290 """Creates a task storage. | 1290 """Creates a task storage. |
1291 | 1291 |
1292 The task storage is used to store attributes created by the task. | 1292 The task storage is used to store attributes created by the task. |
1293 | 1293 |
1294 Args: | 1294 Args: |
1295 task(Task): task. | 1295 task(Task): task. |
1296 | 1296 |
1297 Returns: | 1297 Returns: |
1298 StorageWriter: storage writer. | 1298 StorageWriter: storage writer. |
1299 | 1299 |
1300 Raises: | 1300 Raises: |
1301 IOError: if the storage type is not supported or | 1301 IOError: if the storage type is not supported. |
1302 if the temporary path for the task storage does not exist. | |
1303 """ | 1302 """ |
1304 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | 1303 if self._storage_type != definitions.STORAGE_TYPE_SESSION: |
1305 raise IOError('Unsupported storage type.') | 1304 raise IOError('Unsupported storage type.') |
1306 | |
1307 if not self._task_storage_path: | |
1308 raise IOError('Missing task storage path.') | |
1309 | 1305 |
1310 storage_file_path = self._GetTaskStorageFilePath(task) | 1306 storage_file_path = self._GetTaskStorageFilePath(task) |
1311 return self._CreateTaskStorageWriter(storage_file_path, task) | 1307 return self._CreateTaskStorageWriter(storage_file_path, task) |
1312 | 1308 |
1313 def GetEvents(self): | 1309 def GetEvents(self): |
1314 """Retrieves the events. | 1310 """Retrieves the events. |
1315 | 1311 |
1316 Returns: | 1312 Returns: |
1317 generator(EventObject): event generator. | 1313 generator(EventObject): event generator. |
1318 | 1314 |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1374 """ | 1370 """ |
1375 if not self._storage_file: | 1371 if not self._storage_file: |
1376 raise IOError('Unable to read from closed storage writer.') | 1372 raise IOError('Unable to read from closed storage writer.') |
1377 | 1373 |
1378 event_source = self._storage_file.GetEventSourceByIndex( | 1374 event_source = self._storage_file.GetEventSourceByIndex( |
1379 self._written_event_source_index) | 1375 self._written_event_source_index) |
1380 if event_source: | 1376 if event_source: |
1381 self._written_event_source_index += 1 | 1377 self._written_event_source_index += 1 |
1382 return event_source | 1378 return event_source |
1383 | 1379 |
1380 def GetProcessedTaskIdentifiers(self): | |
1381 """Identifers for tasks which have been processed. | |
1382 | |
1383 Returns: | |
1384 list[str]: task identifiers that are processed. | |
1385 | |
1386 Raises: | |
1387 IOError: if the storage type is not supported or | |
1388 if the temporary path for the task storage does not exist. | |
1389 """ | |
1390 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | |
1391 raise IOError('Unsupported storage type.') | |
1392 | |
1393 if not self._processed_task_storage_path: | |
1394 raise IOError('Missing processed task storage path.') | |
1395 | |
1396 return [ | |
1397 path.replace('.plaso', '') | |
1398 for path in os.listdir(self._processed_task_storage_path)] | |
1399 | |
1384 def GetSortedEvents(self, time_range=None): | 1400 def GetSortedEvents(self, time_range=None): |
1385 """Retrieves the events in increasing chronological order. | 1401 """Retrieves the events in increasing chronological order. |
1386 | 1402 |
1387 This includes all events written to the storage including those pending | 1403 This includes all events written to the storage including those pending |
1388 being flushed (written) to the storage. | 1404 being flushed (written) to the storage. |
1389 | 1405 |
1390 Args: | 1406 Args: |
1391 time_range (Optional[TimeRange]): time range used to filter events | 1407 time_range (Optional[TimeRange]): time range used to filter events |
1392 that fall in a specific period. | 1408 that fall in a specific period. |
1393 | 1409 |
1394 Returns: | 1410 Returns: |
1395 generator(EventObject): event generator. | 1411 generator(EventObject): event generator. |
1396 | 1412 |
1397 Raises: | 1413 Raises: |
1398 IOError: when the storage writer is closed. | 1414 IOError: when the storage writer is closed. |
1399 """ | 1415 """ |
1400 if not self._storage_file: | 1416 if not self._storage_file: |
1401 raise IOError('Unable to read from closed storage writer.') | 1417 raise IOError('Unable to read from closed storage writer.') |
1402 | 1418 |
1403 return self._storage_file.GetSortedEvents(time_range=time_range) | 1419 return self._storage_file.GetSortedEvents(time_range=time_range) |
1404 | 1420 |
1405 def GetTaskIdentifiersReadyForMerge(self): | 1421 def FinalizeProcessedTaskStorage(self, task): |
1406 """Retrieves the task identifiers which are ready to be merged. | 1422 """Finalized a proccessed a task storage. |
onager
2018/04/01 19:58:26
-the
Joachim Metz
2018/04/02 05:01:08
Done.
| |
1407 | 1423 |
1408 Returns: | 1424 Moves the task storage file from its temporary directory to the processed |
1409 list[str]: task identifiers that are ready to be merged. | 1425 directory. |
1426 | |
1427 Args: | |
1428 task (Task): task. | |
1410 | 1429 |
1411 Raises: | 1430 Raises: |
1412 IOError: if the storage type is not supported or | 1431 IOError: if the storage type is not supported or |
1413 if the temporary path for the task storage does not exist. | 1432 if the storage file cannot be renamed. |
1414 """ | 1433 """ |
1415 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | 1434 if self._storage_type != definitions.STORAGE_TYPE_SESSION: |
1416 raise IOError('Unsupported storage type.') | 1435 raise IOError('Unsupported storage type.') |
1417 | 1436 |
1418 if not self._to_merge_task_storage_path: | 1437 storage_file_path = self._GetTaskStorageFilePath(task) |
1419 raise IOError('Missing to merge task storage path.') | 1438 processed_storage_file_path = self._GetProcessedStorageFilePath(task) |
1420 | |
1421 return [ | |
1422 path.replace('.plaso', '') | |
1423 for path in os.listdir(self._to_merge_task_storage_path)] | |
1424 | |
1425 def Open(self): | |
1426 """Opens the storage writer. | |
1427 | |
1428 Raises: | |
1429 IOError: if the storage writer is already opened. | |
1430 """ | |
1431 if self._storage_file: | |
1432 raise IOError('Storage writer already opened.') | |
1433 | |
1434 self._storage_file = self._CreateStorageFile() | |
1435 | |
1436 if self._serializers_profiler: | |
1437 self._storage_file.SetSerializersProfiler(self._serializers_profiler) | |
1438 | |
1439 if self._storage_profiler: | |
1440 self._storage_file.SetStorageProfiler(self._storage_profiler) | |
1441 | |
1442 self._storage_file.Open(path=self._output_file, read_only=False) | |
1443 | |
1444 self._first_written_event_source_index = ( | |
1445 self._storage_file.GetNumberOfEventSources()) | |
1446 self._written_event_source_index = self._first_written_event_source_index | |
1447 | |
1448 def PrepareMergeTaskStorage(self, task): | |
1449 """Prepares a task storage for merging. | |
1450 | |
1451 Moves the task storage file from to merge directory to the merge directory. | |
onager
2018/04/01 19:58:26
See note about terminology, this is a very confusi
Joachim Metz
2018/04/02 05:01:08
Ack, reworded.
| |
1452 | |
1453 Args: | |
1454 task (Task): task. | |
1455 | |
1456 Raises: | |
1457 IOError: if the storage type is not supported or | |
1458 if the temporary path for the task storage does not exist. | |
onager
2018/04/01 19:58:26
or if the storage file can't be renamed.
Joachim Metz
2018/04/02 05:01:08
Done.
| |
1459 """ | |
1460 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | |
1461 raise IOError('Unsupported storage type.') | |
1462 | |
1463 if not self._task_storage_path: | |
1464 raise IOError('Missing task storage path.') | |
1465 | |
1466 merge_storage_file_path = self._GetMergeTaskStorageFilePath(task) | |
1467 to_merge_storage_file_path = self._GetToMergeTaskStorageFilePath(task) | |
1468 | |
1469 task.storage_file_size = os.path.getsize(to_merge_storage_file_path) | |
1470 | 1439 |
1471 try: | 1440 try: |
1472 os.rename(to_merge_storage_file_path, merge_storage_file_path) | 1441 os.rename(storage_file_path, processed_storage_file_path) |
1473 except OSError as exception: | |
1474 raise IOError(( | |
1475 'Unable to rename task storage file: {0:s} with error: ' | |
1476 '{1:s}').format(to_merge_storage_file_path, exception)) | |
1477 | |
1478 def PrepareToMergeTaskStorage(self, task): | |
onager
2018/04/01 19:58:26
Having "PrepareMerge" and "PrepareToMerge" methods
Joachim Metz
2018/04/02 05:01:08
Done.
| |
1479 """Prepares a task storage for pending merge. | |
1480 | |
1481 Moves the task storage file from its temporary directory to the to merge | |
1482 directory. | |
1483 | |
1484 Args: | |
1485 task (Task): task. | |
1486 | |
1487 Raises: | |
1488 IOError: if the storage type is not supported or | |
1489 if the temporary path for the task storage does not exist. | |
onager
2018/04/01 19:58:26
+or if the storage file can't be renamed.
Joachim Metz
2018/04/02 05:01:08
Done.
| |
1490 """ | |
1491 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | |
1492 raise IOError('Unsupported storage type.') | |
1493 | |
1494 if not self._task_storage_path: | |
1495 raise IOError('Missing task storage path.') | |
1496 | |
1497 storage_file_path = self._GetTaskStorageFilePath(task) | |
1498 to_merge_storage_file_path = self._GetToMergeTaskStorageFilePath(task) | |
1499 | |
1500 try: | |
1501 os.rename(storage_file_path, to_merge_storage_file_path) | |
1502 except OSError as exception: | 1442 except OSError as exception: |
1503 raise IOError(( | 1443 raise IOError(( |
1504 'Unable to rename task storage file: {0:s} with error: ' | 1444 'Unable to rename task storage file: {0:s} with error: ' |
1505 '{1:s}').format(storage_file_path, exception)) | 1445 '{1:s}').format(storage_file_path, exception)) |
1506 | 1446 |
1447 def Open(self): | |
1448 """Opens the storage writer. | |
1449 | |
1450 Raises: | |
1451 IOError: if the storage writer is already opened. | |
1452 """ | |
1453 if self._storage_file: | |
1454 raise IOError('Storage writer already opened.') | |
1455 | |
1456 self._storage_file = self._CreateStorageFile() | |
1457 | |
1458 if self._serializers_profiler: | |
1459 self._storage_file.SetSerializersProfiler(self._serializers_profiler) | |
1460 | |
1461 if self._storage_profiler: | |
1462 self._storage_file.SetStorageProfiler(self._storage_profiler) | |
1463 | |
1464 self._storage_file.Open(path=self._output_file, read_only=False) | |
1465 | |
1466 self._first_written_event_source_index = ( | |
1467 self._storage_file.GetNumberOfEventSources()) | |
1468 self._written_event_source_index = self._first_written_event_source_index | |
1469 | |
1470 def PrepareMergeTaskStorage(self, task): | |
1471 """Prepares a task storage for merging. | |
1472 | |
1473 Moves the task storage file from the processed directory to the merge | |
1474 directory. | |
1475 | |
1476 Args: | |
1477 task (Task): task. | |
1478 | |
1479 Raises: | |
1480 IOError: if the storage type is not supported or | |
1481 if the storage file cannot be renamed. | |
1482 """ | |
1483 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | |
1484 raise IOError('Unsupported storage type.') | |
1485 | |
1486 merge_storage_file_path = self._GetMergeTaskStorageFilePath(task) | |
1487 processed_storage_file_path = self._GetProcessedStorageFilePath(task) | |
1488 | |
1489 task.storage_file_size = os.path.getsize(processed_storage_file_path) | |
1490 | |
1491 try: | |
1492 os.rename(processed_storage_file_path, merge_storage_file_path) | |
1493 except OSError as exception: | |
1494 raise IOError(( | |
1495 'Unable to rename task storage file: {0:s} with error: ' | |
1496 '{1:s}').format(processed_storage_file_path, exception)) | |
1497 | |
1507 def ReadPreprocessingInformation(self, knowledge_base): | 1498 def ReadPreprocessingInformation(self, knowledge_base): |
1508 """Reads preprocessing information. | 1499 """Reads preprocessing information. |
1509 | 1500 |
1510 The preprocessing information contains the system configuration which | 1501 The preprocessing information contains the system configuration which |
1511 contains information about various system specific configuration data, | 1502 contains information about various system specific configuration data, |
1512 for example the user accounts. | 1503 for example the user accounts. |
1513 | 1504 |
1514 Args: | 1505 Args: |
1515 knowledge_base (KnowledgeBase): is used to store the preprocessing | 1506 knowledge_base (KnowledgeBase): is used to store the preprocessing |
1516 information. | 1507 information. |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1584 if self._task_storage_path: | 1575 if self._task_storage_path: |
1585 raise IOError('Task storage path already exists.') | 1576 raise IOError('Task storage path already exists.') |
1586 | 1577 |
1587 output_directory = os.path.dirname(self._output_file) | 1578 output_directory = os.path.dirname(self._output_file) |
1588 self._task_storage_path = tempfile.mkdtemp(dir=output_directory) | 1579 self._task_storage_path = tempfile.mkdtemp(dir=output_directory) |
1589 | 1580 |
1590 self._merge_task_storage_path = os.path.join( | 1581 self._merge_task_storage_path = os.path.join( |
1591 self._task_storage_path, 'merge') | 1582 self._task_storage_path, 'merge') |
1592 os.mkdir(self._merge_task_storage_path) | 1583 os.mkdir(self._merge_task_storage_path) |
1593 | 1584 |
1594 self._to_merge_task_storage_path = os.path.join( | 1585 self._processed_task_storage_path = os.path.join( |
1595 self._task_storage_path, 'to_merge') | 1586 self._task_storage_path, 'processed') |
1596 os.mkdir(self._to_merge_task_storage_path) | 1587 os.mkdir(self._processed_task_storage_path) |
1597 | 1588 |
1598 def StopTaskStorage(self, abort=False): | 1589 def StopTaskStorage(self, abort=False): |
1599 """Removes the temporary path for the task storage. | 1590 """Removes the temporary path for the task storage. |
1600 | 1591 |
1601 The results of tasks will be lost on abort. | 1592 The results of tasks will be lost on abort. |
1602 | 1593 |
1603 Args: | 1594 Args: |
1604 abort (bool): True to indicate the stop is issued on abort. | 1595 abort (bool): True to indicate the stop is issued on abort. |
1605 | 1596 |
1606 Raises: | 1597 Raises: |
1607 IOError: if the storage type is not supported or | 1598 IOError: if the storage type is not supported. |
1608 if the temporary path for the task storage does not exist. | |
1609 """ | 1599 """ |
1610 if self._storage_type != definitions.STORAGE_TYPE_SESSION: | 1600 if self._storage_type != definitions.STORAGE_TYPE_SESSION: |
1611 raise IOError('Unsupported storage type.') | 1601 raise IOError('Unsupported storage type.') |
1612 | |
1613 if not self._task_storage_path: | |
1614 raise IOError('Missing task storage path.') | |
1615 | 1602 |
1616 if os.path.isdir(self._merge_task_storage_path): | 1603 if os.path.isdir(self._merge_task_storage_path): |
1617 if abort: | 1604 if abort: |
1618 shutil.rmtree(self._merge_task_storage_path) | 1605 shutil.rmtree(self._merge_task_storage_path) |
1619 else: | 1606 else: |
1620 os.rmdir(self._merge_task_storage_path) | 1607 os.rmdir(self._merge_task_storage_path) |
1621 | 1608 |
1609 if os.path.isdir(self._processed_task_storage_path): | |
1610 if abort: | |
1611 shutil.rmtree(self._processed_task_storage_path) | |
1612 else: | |
1613 os.rmdir(self._processed_task_storage_path) | |
1614 | |
1622 if os.path.isdir(self._task_storage_path): | 1615 if os.path.isdir(self._task_storage_path): |
1623 if abort: | 1616 if abort: |
1624 shutil.rmtree(self._task_storage_path) | 1617 shutil.rmtree(self._task_storage_path) |
1625 else: | 1618 else: |
1626 os.rmdir(self._task_storage_path) | 1619 os.rmdir(self._task_storage_path) |
1627 | 1620 |
1628 if os.path.isdir(self._to_merge_task_storage_path): | |
1629 if abort: | |
1630 shutil.rmtree(self._to_merge_task_storage_path) | |
1631 else: | |
1632 os.rmdir(self._to_merge_task_storage_path) | |
1633 | |
1634 self._merge_task_storage_path = None | 1621 self._merge_task_storage_path = None |
1622 self._processed_task_storage_path = None | |
1635 self._task_storage_path = None | 1623 self._task_storage_path = None |
1636 self._to_merge_task_storage_path = None | |
1637 | 1624 |
1638 def WritePreprocessingInformation(self, knowledge_base): | 1625 def WritePreprocessingInformation(self, knowledge_base): |
1639 """Writes preprocessing information. | 1626 """Writes preprocessing information. |
1640 | 1627 |
1641 Args: | 1628 Args: |
1642 knowledge_base (KnowledgeBase): contains the preprocessing information. | 1629 knowledge_base (KnowledgeBase): contains the preprocessing information. |
1643 | 1630 |
1644 Raises: | 1631 Raises: |
1645 IOError: if the storage type does not support writing preprocessing | 1632 IOError: if the storage type does not support writing preprocessing |
1646 information or when the storage writer is closed. | 1633 information or when the storage writer is closed. |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1712 IOError: if the storage type is not supported or | 1699 IOError: if the storage type is not supported or |
1713 when the storage writer is closed. | 1700 when the storage writer is closed. |
1714 """ | 1701 """ |
1715 self._RaiseIfNotWritable() | 1702 self._RaiseIfNotWritable() |
1716 | 1703 |
1717 if self._storage_type != definitions.STORAGE_TYPE_TASK: | 1704 if self._storage_type != definitions.STORAGE_TYPE_TASK: |
1718 raise IOError('Unsupported storage type.') | 1705 raise IOError('Unsupported storage type.') |
1719 | 1706 |
1720 task_start = self._task.CreateTaskStart() | 1707 task_start = self._task.CreateTaskStart() |
1721 self._storage_file.WriteTaskStart(task_start) | 1708 self._storage_file.WriteTaskStart(task_start) |
LEFT | RIGHT |