Source code for marvis.node.docker

"""Docker containers in the simulation."""

import logging
import os
import threading

from nsenter import Namespace
import docker

from ..context import defer
from ..command_executor import DockerCommandExecutor
from .base import Node

logger = logging.getLogger(__name__)

[docs]def expand_volume_shorthand(key_value): """Expand a volume string to something the Docker runtime understands. Parameters ---------- key_value : str or dict The volume configuration Returns ------- tuple Return the volume's name / path and a settings dictionary. """ name_or_path, spec = key_value if isinstance(spec, str): return (name_or_path, {'bind': spec, 'mode': 'rw'}) return (name_or_path, spec)
[docs]def log_to_file(container, log_path, stdout=False, stderr=False): """Log the container's output. This opens a stream to the docker container's log output and writes it into a file. Parameters --------- log_path : str The file path to the log file. stdout : bool Whether stdout should be logged. stderr : bool Whether stderr should be logged. """ log = logging.getLogger(container.name) log.debug('Write log to %s', log_path) with open(log_path, 'wb', 0) as log_file: for line in container.logs(stdout=stdout, stderr=stderr, follow=True, stream=True): log.log(logging.INFO if stdout else logging.ERROR, '%s', line.decode().strip()) log_file.write(line) log.debug('Done logging')
[docs]class DockerNode(Node): """A DockerNode represents a docker container. Parameters ---------- name : str The name of the node (and container). It must consist only of *alphanumeric characters* and :code:`-`, :code:`_` and :code:`.`. docker_image : str The name of the docker image to use. If not specified, `docker_build_dir` must be set. docker_build_dir : str The context directory (absolute or relative) to execute the build in. dockerfile : str, default: 'Dockerfile' The (absolute or relative) path to the Dockerfile. pull: bool Whether to always pull the image specified in `docker_image`. cpus : float The number of virtual CPUs to assign (1.0 meaning 1 vCPU). memory : str The amount of memory to allow the container to use. **Example:** `'128m'`. devices : list Devices that should be attached to the container. **Example:** `/dev/ttyUSB0:/dev/ttyUSB0`. command : str An optional command to override the standard command on container start specified by the Dockerfile. volumes : list of dict or list of str A dictionary of volumes. Each entry has a name or (absolute) path as key and settings or a absolute path inside the container as value. See :code:`examples/volumes_and_ports.py`. exposed_ports : dict A dictionary of port mappings. The key is the container internal port and the value can be an exposed port or a list of ports. environment_variables : dict or list A dictonary of environment variables or a list of environment variables. If a list is specified each item should be in the form :code:`'KEY=VALUE'`. """ def __init__(self, name, docker_image=None, docker_build_dir=None, dockerfile='Dockerfile', pull=False, cpus=0.0, memory=None, devices=None, command=None, volumes=None, exposed_ports=None, environment_variables=None): super().__init__(name) #: The docker image to use. self.docker_image = docker_image #: The context to build the image in. self.docker_build_dir = docker_build_dir #: The path to the Dockerfile. self.dockerfile = dockerfile #: Enforce pulling the image from a registry self.pull = pull #: The number of vCPUs. self.cpus = cpus #: The amount of memory for the container. self.memory = memory #: List of devices to attach to container self.devices = devices #: The startup command. self.command = command #: The volumes for the container. self.volumes = dict(map(expand_volume_shorthand, volumes.items())) if volumes else None #: Ports to expose on the host. self.exposed_ports = exposed_ports if exposed_ports is not None else dict() #: Environment variables in the container. self.environment_variables = environment_variables #: The container instance. self.container = None #: The PID of the container. self.container_pid = None if docker_build_dir is None and docker_image is None: raise Exception('Please specify Docker image or build directory') #: The executor for running commands in the container. #: This is useful for a scripted :class:`.Workflow`. self.command_executor = None @property def docker_image_tag(self): """A tag for the container's image during build time. Returns ------- str The computed tag. """ return f'ns3-{self.name}'
[docs] def wants_ip_stack(self): return True
[docs] def prepare(self, simulation): """This runs a setup on network interfaces and starts the container.""" logger.info('Preparing node %s', self.name) try: self.build_docker_image() except docker.errors.BuildError as exception: logger.error('Could not build docker container "%s": %s', self.name, exception) return except TypeError: logger.error('docker_build_dir for "%s" is incorrect or not existing ', self.name) return else: self.start_docker_container(simulation.log_directory, simulation.hosts) self.setup_host_interfaces()
[docs] def build_docker_image(self): """Build the image for the container.""" client = docker.from_env() if self.docker_image is None: logger.info('Building docker image: %s/%s', self.docker_build_dir, self.dockerfile) self.docker_image = client.images.build( path=self.docker_build_dir, dockerfile=self.dockerfile, rm=True, nocache=False, )[0] elif isinstance(self.docker_image, str): if not self.pull: try: self.docker_image = client.images.get(self.docker_image) except docker.errors.ImageNotFound: pass if isinstance(self.docker_image, str): repo, *tag = self.docker_image.split(':') tag = 'latest' if not tag else tag[0] logger.info('Pulling docker image: %s, tag %s', repo, tag) self.docker_image = client.images.pull(repo, tag=tag) self.docker_image.tag(self.docker_image_tag)
[docs] def start_docker_container(self, log_directory, hosts=None): """Start the docker container. All docker containers are labeled with "ns-3" as the creator. Parameters ---------- log_directory : str The path to the directory to put log files in. hosts : dict A dictionary with hostnames as keys and IP addresses (a list) as value. """ logger.info('Starting docker container: %s', self.name) client = docker.from_env() extra_hosts = [f'{name}:{address}' for name, addresses in hosts.items() for address in addresses] self.container = client.containers.run( self.docker_image_tag, name=self.name, hostname=self.name, labels={"created-by": "ns-3"}, remove=True, auto_remove=True, detach=True, privileged=True, nano_cpus=int(self.cpus * 1e9), mem_limit=0 if self.memory is None else self.memory, devices = self.devices, command=self.command, extra_hosts=extra_hosts, volumes=self.volumes, ports=self.exposed_ports, environment=self.environment_variables, ) defer(f'stop docker container {self.name}', self.stop_docker_container) for stream in ('stdout', 'stderr'): log_file_path = os.path.join(log_directory, f'{self.name}.{stream}.log') threading.Thread(target=log_to_file, args=(self.container, log_file_path), kwargs={stream: True}).start() low_level_client = docker.APIClient() self.container_pid = low_level_client.inspect_container(self.container.id)['State']['Pid'] self.command_executor = DockerCommandExecutor(self.name, self.container)
[docs] def stop_docker_container(self): """Stop the container.""" if self.container is None: logger.error('Could not stop docker container "%s". Container stopped already or failed to start', self.container.name) return logger.info('Stopping docker container: %s', self.container.name) try: self.container.stop(timeout=1) except docker.errors.APIError as exception: logger.error('Could not stop docker container "%s". Container stopped already or failed to start', self.container.name) self.container = None self.container_pid = None self.command_executor = None
[docs] def setup_host_interfaces(self): """Setup the interfaces (bridge, tap, VETH pair) on the host and connect them to the container.""" for name, interface in self.interfaces.items(): interface.setup_bridge() interface.connect_tap_to_bridge() interface.setup_veth_pair({ 'ifname': name, "net_ns_fd": f"/proc/{self.container_pid}/ns/net" }) # Get container's namespace and setup the interface in the container with Namespace(self.container_pid, 'net'): interface.setup_veth_container_end(name)