OLD | NEW |
1 import os | 1 import os |
2 import logging | 2 import logging |
3 import shutil | 3 import shutil |
4 import tempfile | 4 import tempfile |
5 | 5 |
6 from twisted.internet.defer import ( | 6 from twisted.internet.defer import ( |
7 inlineCallbacks, DeferredLock, DeferredList, returnValue) | 7 inlineCallbacks, DeferredLock, DeferredList, returnValue) |
8 | 8 |
9 from juju.errors import CharmUpgradeError | 9 from juju.errors import CharmUpgradeError |
10 from juju.hooks.invoker import Invoker | 10 from juju.hooks.invoker import Invoker |
(...skipping 342 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
353 if not self._running: | 353 if not self._running: |
354 self._log.debug("stop service-rel watcher, discarding changes") | 354 self._log.debug("stop service-rel watcher, discarding changes") |
355 self._watching_relation_memberships = False | 355 self._watching_relation_memberships = False |
356 raise StopWatcher() | 356 raise StopWatcher() |
357 | 357 |
358 self._log.debug("processing relations changed") | 358 self._log.debug("processing relations changed") |
359 yield self._process_service_changes(old_relations, new_relations) | 359 yield self._process_service_changes(old_relations, new_relations) |
360 finally: | 360 finally: |
361 self._run_lock.release() | 361 self._run_lock.release() |
362 | 362 |
| 363 def _sort_relations(self, rel_ids, rels, invert=False): |
| 364 """ Sort a set of relations. |
| 365 |
| 366 We process peer relations first when adding, and last when |
| 367 removing, else deferring to creation order. |
| 368 """ |
| 369 rel_ids = list(rel_ids) |
| 370 |
| 371 def _sort(x, y): |
| 372 xr, yr = rels[x].relation_role, rels[y].relation_role |
| 373 if xr == yr: |
| 374 return cmp(x, y) |
| 375 elif xr == "peer": |
| 376 return -1 |
| 377 elif yr == "peer": |
| 378 return 1 |
| 379 return cmp(x, y) |
| 380 |
| 381 rel_ids.sort(_sort) |
| 382 |
| 383 if invert: |
| 384 return list(reversed(rel_ids)) |
| 385 return rel_ids |
| 386 |
363 @inlineCallbacks | 387 @inlineCallbacks |
364 def _process_service_changes(self, old_relations, new_relations): | 388 def _process_service_changes(self, old_relations, new_relations): |
365 """Add and remove unit lifecycles per the service relations Determine. | 389 """Add and remove unit lifecycles per the service relations Determine. |
366 """ | 390 """ |
367 # Calculate delta between zookeeper state and our stored state. | 391 # Calculate delta between zookeeper state and our stored state. |
368 new_relations = dict( | 392 new_relations = dict( |
369 (service_relation.internal_relation_id, service_relation) | 393 (service_relation.internal_relation_id, service_relation) |
370 for service_relation in new_relations) | 394 for service_relation in new_relations) |
371 | 395 |
372 if old_relations: | 396 if old_relations: |
373 old_relations = dict( | 397 old_relations = dict( |
374 (service_relation.internal_relation_id, service_relation) | 398 (service_relation.internal_relation_id, service_relation) |
375 for service_relation in old_relations) | 399 for service_relation in old_relations) |
376 | 400 |
377 added = set(new_relations.keys()) - set(self._relations.keys()) | 401 added = self._sort_relations( |
378 removed = set(self._relations.keys()) - set(new_relations.keys()) | 402 set(new_relations.keys()) - set(self._relations.keys()), |
| 403 new_relations) |
| 404 print 'add', added, [new_relations[a] for a in added] |
| 405 |
| 406 removed = self._sort_relations( |
| 407 set(self._relations.keys()) - set(new_relations.keys()), |
| 408 self._relations, |
| 409 invert=True) |
| 410 print 'removed', removed |
| 411 |
379 # Could this service be a principal container? | 412 # Could this service be a principal container? |
380 is_principal = not (yield self._service.is_subordinate()) | 413 is_principal = not (yield self._service.is_subordinate()) |
381 | 414 |
382 # Once we know a relation is departed, *immediately* stop running | 415 # Once we know a relation is departed, *immediately* stop running |
383 # its hooks. We can't really handle the case in which a hook is | 416 # its hooks. We can't really handle the case in which a hook is |
384 # *already* running, but we can at least make sure it doesn't run | 417 # *already* running, but we can at least make sure it doesn't run |
385 # any *more* hooks (which could have been queued in the past, but | 418 # any *more* hooks (which could have been queued in the past, but |
386 # not yet executed).# This isn't *currently* an exceptionally big | 419 # not yet executed).# This isn't *currently* an exceptionally big |
387 # deal, because: | 420 # deal, because: |
388 # | 421 # |
389 # (1) The ZK state won't actually be deleted, so an inappropriate | 422 # (1) The ZK state won't actually be deleted, so an inappropriate |
390 # hook will still run happily. | 423 # hook will still run happily. |
391 # (2) Even if the state is deleted, and the hook errors out, the | 424 # (2) Even if the state is deleted, and the hook errors out, the |
392 # only actual consequence is that we'll eventually run the | 425 # only actual consequence is that we'll eventually run the |
393 # error_depart transition rather than depart or down_depart. | 426 # error_depart transition rather than depart or down_depart. |
394 # | 427 # |
395 # However, (1) will certainly change in the future, and (2) is not | 428 # However, (1) will certainly change in the future, and (2) is not |
396 # necessarily a watertight guarantee. | 429 # necessarily a watertight guarantee. |
397 for relation_id in removed: | 430 for relation_id in removed: |
398 yield self._relations[relation_id].lifecycle.stop() | 431 yield self._relations[relation_id].lifecycle.stop() |
399 | 432 |
400 # Actually depart old relations. | 433 # Actually depart old relations. |
401 for relation_id in removed: | 434 for relation_id in removed: |
402 workflow = self._relations.pop(relation_id) | 435 workflow = self._relations.pop(relation_id) |
403 with (yield workflow.lock()): | 436 with (yield workflow.lock()): |
404 yield workflow.transition_state("departed") | 437 yield workflow.transition_state("departed") |
405 self._store_relations() | 438 self._store_relations() |
406 | 439 |
407 # Process new relations. | 440 # Process new relations |
408 for relation_id in added: | 441 for relation_id in added: |
409 service_relation = new_relations[relation_id] | 442 service_relation = new_relations[relation_id] |
410 yield self._add_relation(service_relation) | 443 yield self._add_relation(service_relation) |
411 if (is_principal and service_relation.relation_scope == "container")
: | 444 if (is_principal |
412 self._add_subordinate_unit(service_relation) | 445 and service_relation.relation_scope == "container"): |
| 446 yield self._add_subordinate_unit(service_relation) |
413 yield self._store_relations() | 447 yield self._store_relations() |
414 | 448 |
415 @inlineCallbacks | 449 @inlineCallbacks |
416 def _add_relation(self, service_relation): | 450 def _add_relation(self, service_relation): |
417 try: | 451 try: |
418 unit_relation = yield service_relation.get_unit_state( | 452 unit_relation = yield service_relation.get_unit_state( |
419 self._unit) | 453 self._unit) |
420 except UnitRelationStateNotFound: | 454 except UnitRelationStateNotFound: |
421 # This unit has not yet been assigned a unit relation state, | 455 # This unit has not yet been assigned a unit relation state, |
422 # Go ahead and add one. | 456 # Go ahead and add one. |
(...skipping 324 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
747 | 781 |
748 if not self._error_handler: | 782 if not self._error_handler: |
749 raise | 783 raise |
750 self._log.info( | 784 self._log.info( |
751 "Invoked error handler for %s hook", hook_name) | 785 "Invoked error handler for %s hook", hook_name) |
752 yield self._error_handler(change, e) | 786 yield self._error_handler(change, e) |
753 returnValue(False) | 787 returnValue(False) |
754 else: | 788 else: |
755 yield self._run_lock.release() | 789 yield self._run_lock.release() |
756 returnValue(True) | 790 returnValue(True) |
OLD | NEW |