"""Contexts to tear down the simulation."""
import logging
import collections
import threading
logger = logging.getLogger(__name__)
[docs]class ThreadLocalStack:
def __init__(self):
self.data = threading.local()
@property
def stack(self):
if not hasattr(self.data, 'stack'):
self.data.stack = []
return self.data.stack
[docs] def push(self, item):
self.stack.append(item)
[docs] def pop(self):
return self.stack.pop()
[docs] def top(self):
return self.stack[-1] if self.stack else None
[docs]class Context:
"""A context can be used for deferring function calls.
In this project, it is used for deferring teardowns of the simulation.
You can use it like this:
.. code-block:: python
with SimpleContext() as ctx:
defer('Call afunction', afunction, args)
defer('Call another function', anotherfunction, args)
ctx.cleanup()
"""
__stack = ThreadLocalStack()
[docs] @staticmethod
def current():
"""Return the current context."""
return Context.__stack.top()
def __init__(self):
#: The number of failed cleanups.
self.fails = 0
[docs] def defer(self, item):
"""Store a :class:`.DeferredItem` for running it later.
Parameters
----------
item : :class:`.DeferredItem`
The function to execute later.
"""
raise NotImplementedError
[docs] def cancel(self, item):
"""Cancel a specific item of the current context.
Parameters
----------
item : :class:`.DeferredItem`
The function to cancel.
"""
raise NotImplementedError
[docs] def cleanup(self):
"""Do whatever is needed to cleanup the context."""
raise NotImplementedError
[docs] def add_error(self, err: Exception): # pylint: disable=unused-argument
"""Add an error (to be implemented)."""
self.fails += 1
def __enter__(self):
Context.__stack.push(self)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
assert Context.__stack.pop() is self, 'Invalid context stack, not the same number of push and pop operations'
[docs]class DeferredItem:
"""A DeferredItem is used for storing functions calls that need to be executed later on.
Parameters
----------
ctx
The context the item belongs to.
name
A name for this item (for logging purposes).
func
The function to defer.
args : list
The positional arguments to be passed to the function.
kwargs : dict
The keyword arguments to be passed to the function.
"""
def __init__(self, ctx: Context, name: str, func: callable, args, kwargs):
#: The context to execute this item in.
self.ctx = ctx
#: The name of the item (and description).
self.name = name
#: The callable.
self.func = func
#: (Positional) Arguments to be passed to the callable.
self.args = args
#: Keyword arguments to be passed to the callable.
self.kwargs = kwargs
[docs] def cancel(self):
"""Cancel the execution of the item."""
self.ctx.cancel(self)
[docs] def cleanup(self):
"""Execute the function call."""
try:
self.func(*self.args, **self.kwargs)
except Exception as err: # pylint: disable=broad-except
logger.error('Failed to cleanup deferred item %s: %s', self.name, err)
self.ctx.add_error(err)
def __str__(self):
return self.name
[docs]def defer(name, func, *args, **kwargs):
"""Defer a function call.
The function call be assigned to the current context.
Parameters
----------
func : callable
The function to execute.
"""
if name is None:
name = func.__qualname__
ctx = Context.current()
if ctx is None:
logging.warning('No context available to defer %s', name)
ctx = NoContext()
item = DeferredItem(ctx, name, func, args, kwargs)
ctx.defer(item)
return item
[docs]class NoContext(Context):
"""The NoContext is a Null-Object and therefore **does nothing**.
It does not do any cleanups. This is useful if you do not want your simulated
containers to be torn down.
"""
[docs] def defer(self, item):
pass
[docs] def cancel(self, item):
pass
[docs] def cleanup(self):
pass
[docs]class SimpleContext(Context):
"""The simple context executes deferred items like it is intented."""
def __init__(self):
super().__init__()
## The deque, the DeferredItems are stored in.
self.dequeue = collections.deque()
[docs] def defer(self, item):
logger.debug('Added deferred item: %s', item)
self.dequeue.appendleft(item)
[docs] def cancel(self, item):
logger.debug('Removed deferred item: %s', item)
self.dequeue.remove(item)
[docs] def cleanup(self):
logger.info('Cleanup %d items', len(self.dequeue))
while self.dequeue:
item = self.dequeue.popleft()
logger.debug('Cleanup deferred item: %s', item)
item.cleanup()
def __exit__(self, exc_type, exc_value, exc_traceback):
super().__exit__(exc_type, exc_value, exc_traceback)
self.cleanup()