OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 """The task manager.""" | 2 """The task manager.""" |
3 | 3 |
4 from __future__ import unicode_literals | 4 from __future__ import unicode_literals |
5 | 5 |
6 import collections | 6 import collections |
7 import heapq | 7 import heapq |
8 import threading | 8 import threading |
9 import time | 9 import time |
10 | 10 |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
97 | 97 |
98 A task being tracked by the manager must be in exactly one of the | 98 A task being tracked by the manager must be in exactly one of the |
99 following states: | 99 following states: |
100 | 100 |
101 * abandoned: a task assumed to be abandoned because a tasks that has been | 101 * abandoned: a task assumed to be abandoned because a tasks that has been |
102 queued or was processing exceeds the maximum inactive time. | 102 queued or was processing exceeds the maximum inactive time. |
103 * queued: the task is waiting for a worker to start processing it. It is also | 103 * queued: the task is waiting for a worker to start processing it. It is also |
104 possible that a worker has already completed the task, but no status | 104 possible that a worker has already completed the task, but no status |
105 update was collected from the worker while it processed the task. | 105 update was collected from the worker while it processed the task. |
106 * processing: a worker is processing the task. | 106 * processing: a worker is processing the task. |
107 * pending_merge: a worker has completed processing the task and the | 107 * processed: a worker has completed processing the task, but it is not ready |
108 results are ready to be merged with the session storage. | 108 to be merged into the session storage. |
| 109 * pending_merge: the task has been processed and is ready to be merged with |
| 110 the session storage. |
109 * merging: tasks that are being merged by the engine. | 111 * merging: tasks that are being merged by the engine. |
110 | 112 |
111 Once the engine reports that a task is completely merged, it is removed | 113 Once the engine reports that a task is completely merged, it is removed |
112 from the task manager. | 114 from the task manager. |
113 | 115 |
114 Tasks are considered "pending" when there is more work that needs to be done | 116 Tasks are considered "pending" when there is more work that needs to be done |
115 to complete these tasks. Pending applies to tasks that are: | 117 to complete these tasks. Pending applies to tasks that are: |
116 * not abandoned; | 118 * not abandoned; |
117 * abandoned, but need to be retried. | 119 * abandoned, but need to be retried. |
118 """ | 120 """ |
(...skipping 213 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
332 Raises: | 334 Raises: |
333 KeyError: if the task was not processing, queued or abandoned. | 335 KeyError: if the task was not processing, queued or abandoned. |
334 """ | 336 """ |
335 with self._lock: | 337 with self._lock: |
336 task = self._tasks_processing.get(task_identifier, None) | 338 task = self._tasks_processing.get(task_identifier, None) |
337 if not task: | 339 if not task: |
338 task = self._tasks_queued.get(task_identifier, None) | 340 task = self._tasks_queued.get(task_identifier, None) |
339 if not task: | 341 if not task: |
340 task = self._tasks_abandoned.get(task_identifier, None) | 342 task = self._tasks_abandoned.get(task_identifier, None) |
341 if not task: | 343 if not task: |
342 raise KeyError('Status of task {0:s} is unknown.'.format( | 344 raise KeyError('Status of task {0:s} is unknown'.format( |
343 task_identifier)) | 345 task_identifier)) |
344 | 346 |
345 return task | 347 return task |
346 | 348 |
347 def GetStatusInformation(self): | 349 def GetStatusInformation(self): |
348 """Retrieves status information about the tasks. | 350 """Retrieves status information about the tasks. |
349 | 351 |
350 Returns: | 352 Returns: |
351 TasksStatus: tasks status information. | 353 TasksStatus: tasks status information. |
352 """ | 354 """ |
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
469 if is_abandoned: | 471 if is_abandoned: |
470 del self._tasks_abandoned[task.identifier] | 472 del self._tasks_abandoned[task.identifier] |
471 | 473 |
472 if is_abandoned: | 474 if is_abandoned: |
473 logger.warning( | 475 logger.warning( |
474 'Previously abandoned task {0:s} is now pending merge.'.format( | 476 'Previously abandoned task {0:s} is now pending merge.'.format( |
475 task.identifier)) | 477 task.identifier)) |
476 else: | 478 else: |
477 logger.debug('Task {0:s} is pending merge.'.format(task.identifier)) | 479 logger.debug('Task {0:s} is pending merge.'.format(task.identifier)) |
478 | 480 |
479 def UpdateTasksAsPendingMerge(self, mergeable_tasks): | |
480 """Updates the task manager to reflect that tasks are ready to be merged. | |
481 | |
482 Args: | |
483 mergeable_tasks (list[Task]): tasks that are ready to be merged. | |
484 | |
485 Raises: | |
486 KeyError: if a task was not processing or abandoned. | |
487 """ | |
488 for task in mergeable_tasks: | |
489 self.UpdateTaskAsPendingMerge(task) | |
490 | |
491 def UpdateTaskAsProcessingByIdentifier(self, task_identifier): | 481 def UpdateTaskAsProcessingByIdentifier(self, task_identifier): |
492 """Updates the task manager to reflect the task is processing. | 482 """Updates the task manager to reflect the task is processing. |
493 | 483 |
494 Args: | 484 Args: |
495 task_identifier (str): unique identifier of the task. | 485 task_identifier (str): unique identifier of the task. |
496 | 486 |
497 Raises: | 487 Raises: |
498 KeyError: if the task is not known to the task manager. | 488 KeyError: if the task is not known to the task manager. |
499 """ | 489 """ |
500 with self._lock: | 490 with self._lock: |
(...skipping 20 matching lines...) Expand all Loading... |
521 task_abandoned.UpdateProcessingTime() | 511 task_abandoned.UpdateProcessingTime() |
522 return | 512 return |
523 | 513 |
524 if task_identifier in self._tasks_pending_merge: | 514 if task_identifier in self._tasks_pending_merge: |
525 # No need to update the processing time, as this task is already | 515 # No need to update the processing time, as this task is already |
526 # finished processing and is just waiting for merge. | 516 # finished processing and is just waiting for merge. |
527 return | 517 return |
528 | 518 |
529 # If we get here, we don't know what state the tasks is in, so raise. | 519 # If we get here, we don't know what state the tasks is in, so raise. |
530 raise KeyError('Status of task {0:s} is unknown.'.format(task_identifier)) | 520 raise KeyError('Status of task {0:s} is unknown.'.format(task_identifier)) |
OLD | NEW |