Source code for cohydra.workflow

"""Allow fault-injection by executing a sequence of instructions."""

import logging
import threading

from aexpr import aexpr

logger = logging.getLogger(__name__)

[docs]class Workflow: """A workflow is a contains a list of commands for planned execution during the simulation. This is based on the work of Arne Boockmeyer's `dasylab-testbed <https://gitlab.hpi.de/arne.boockmeyer/dasylab-testbed/>`_. Note ---- Every :class:`Workflow` creates a thread. Example ------- Please use it with a decorator provided by the :class:`.Scenario`: .. code-block:: python @scenario.workflow def workflow_function_name(workflow): do_something() workflow.sleep(5) do_another_thing() Parameters ---------- task : callable The function to be executed in the workflow. """ def __init__(self, task): #: An event indicating when to stop the workflow thread. self.stop_event = threading.Event() #: These events are waiting on some condition to come true. self.current_waiting_events = [] #: The function being executed by the workflow thread. self.task = task
[docs] def stop(self): """Stop the workflow. **Warning:** Do not call this manually. This is done by the :class:`.Simulation`. """ logger.debug('Stopping workflow.') self.stop_event.set() for event in self.current_waiting_events: event.set()
[docs] def start(self): """Start the workflow. **Warning:** Do not call this manually. This is done by the :class:`.Simulation`. """ thread = threading.Thread(target=self.task, args=(self,)) thread.start()
def __stopped(self): raise Exception('Simulation stopped') def __check_stop(self): if self.stop_event.is_set(): self.__stopped()
[docs] def sleep(self, duration): """Sleep and wait. Parameters ---------- duration : float The duration to sleep in seconds. """ logger.debug('Sleep for %gs.', duration) close = self.stop_event.wait(duration) logger.debug('Slept for %ds or stopped.', duration) if close is True: self.__stopped()
[docs] def wait_until(self, expression, expected_result, global_variables, local_variables): """Wait until an expression is equal to a specific value. Like :func:`sleep` this is a blocking call. Parameters ---------- expression The expression to evaluate. **Warning:** not all expressions are supported. See https://github.com/active-expressions/active-expressions-static-python for further infos. expected_result The value to compare againt. global_variables In order to work properly, you need to pass the global variable space with :code:`globals()`. local_variables In order to work properly, you need to pass the global variable space with :code:`locals()`. """ logger.debug('Waiting for condition.') wait_event = threading.Event() aexpr(expression, global_variables, local_variables)\ .on_change(lambda obs, old, new: wait_event.set() if new == expected_result else None) self.current_waiting_events.append(wait_event) wait_event.wait() self.current_waiting_events.remove(wait_event) logger.debug('Finished waiting for condition.') self.__check_stop()
[docs] def wait_until_true(self, expression, global_variables, local_variables): """Wait until an expression is equal to True. Like :func:`sleep` this is a blocking call. Parameters ---------- expression The expression to evaluate. **Warning:** not all expressions are supported. See https://github.com/active-expressions/active-expressions-static-python for further infos. global_variables In order to work properly, you need to pass the global variable space with :code:`globals()`. local_variables In order to work properly, you need to pass the global variable space with :code:`locals()`. """ logger.debug('Waiting for condition to come true.') self.wait_until(expression, True, global_variables, local_variables)