Source code for cohydra.mobility_input.sumo

"""SUMO co-simulation."""

import logging
import os
import sys
import threading
import time
from math import hypot
from xmlrpc.server import SimpleXMLRPCServer

if 'SUMO_HOME' in os.environ:
    SUMO_HOME = os.environ['SUMO_HOME']
    sys.path.append(os.path.join(SUMO_HOME, 'tools'))
    os.environ['PATH'] += os.pathsep + os.path.join(SUMO_HOME, 'bin')

import traci

from .mobility_input import MobilityInput
import threading

logger = logging.getLogger(__name__)

[docs]class RPCServer: def __init__(self, connection_details): self._running = True self.connection_details = connection_details thread = threading.Thread(target=self.run, args=()) thread.daemon = True # Daemonize thread thread.start() # Start the execution
[docs] def run(self): server = SimpleXMLRPCServer(self.connection_details, allow_none=True) server.register_instance(traci, allow_dotted_names=True) server.serve_forever()
[docs]class SUMOMobilityInput(MobilityInput): """SUMOMobilityInput is an interface to the SUMO simulation environment. This mobility input supports two modes: * | **Remote Mode**: In this mode the testbed connects to an external already running SUMO instance. | You configure the host and port where the SUMO server is running via the ``sumo_host`` and ``sumo_port`` argument. * | **Local Mode**: In this mode the testbed starts a locally installed version of SUMO. | You configure the simulation via the ``sumo_cmd`` and ``config_path`` argument. If SUMO is not installed globally you need to set the ``SUMO_HOME`` environment variable. | **Warning: This does not work when using cohydra in the prebuilt docker containers.** For instructions on how to use cohydra without Docker, please refer to :ref:`Local Installation Without Docker` and :ref:`Install SUMO On Simulation Host`. Parameters ---------- name : str The name of the MobilityInput. steps : int The number of steps to run the SUMO simulation. sumo_host : str The host on which the SUMO simulation is running. sumo_port : int The TraCI port. sumo_cmd : str The command to start sumo when using the local mode (default: ``sumo``). config_path : str The path to the simulation configuration (.cfg). steplength : float The length of each simulation step in seconds (default: 1). It only has effect in the local mode. rpc_server : tuple Starts a rpc server if value is not None. The tuple needs two elements: (ip, port). """ def __init__(self, name="SUMO External Simulation", steps=1000, sumo_host='localhost', sumo_port=8813, sumo_cmd="sumo", config_path=None, step_length=1, rpc_server=None): super().__init__(name) #: The host on which the SUMO simulation is running. #: #: When running on a devcontainer, this is probably ``localhost``. self.sumo_host = sumo_host #: The TraCI port. #: #: Can be specified on the server with the ``--remote-port`` option. self.sumo_port = sumo_port #: The command to start sumo locally self.sumo_cmd = sumo_cmd #: The path to the simulation scenario configuration. self.config_path = config_path #: The number of steps to simulate. self.steps = steps #: The length of every simulation step self.step_length = step_length #: The number of steps to simulate in SUMO. self.step_counter = 0 #: The connection details of the RPC server self.rpc_server = rpc_server
[docs] def prepare(self, simulation): """Connect to SUMO server.""" logger.info('Starting SUMO for simulation "%s".', self.name) if self.config_path is None: traci.init(host=self.sumo_host, port=self.sumo_port) else: traci.start([self.sumo_cmd, "--step-length", str(self.step_length), '-c', self.config_path]) self.step_counter = 0 if self.rpc_server is not None: logger.info("Starting RPC server on %s:%d", self.rpc_server[0], self.rpc_server[1]) RPCServer(self.rpc_server)
[docs] def start(self): """Start a thread stepping through the sumo simulation.""" logger.info('Starting SUMO stepping for %s.', self.name) def run_sumo(): try: while self.step_counter < self.steps: traci.simulationStep() # Update positions: for node in self.node_mapping: x, y, z = self.__get_position_of_node(node) node.set_position(x, y, z) self.step_counter = self.step_counter + 1 time.sleep(traci.simulation.getDeltaT()) except traci.exceptions.FatalTraCIError: logger.warning('Something went wrong with SUMO for %s. Maybe the connection was closed.', self.name) thread = threading.Thread(target=run_sumo) thread.start()
[docs] def add_node_to_mapping(self, node, sumo_vehicle_id, obj_type="vehicle"): self.node_mapping[node] = (sumo_vehicle_id, obj_type)
def __get_position_of_node(self, node): if node not in self.node_mapping: print("Unknown node "+str(node.name)) else: if self.node_mapping[node][1] == "person": return traci.person.getPosition3D(self.node_mapping[node][0]) elif self.node_mapping[node][1] == "vehicle": return traci.vehicle.getPosition3D(self.node_mapping[node][0]) elif self.node_mapping[node][1] == "junction": # Junction has no support for 3D positions x, y = traci.junction.getPosition(self.node_mapping[node][0]) return x, y, 0.0 else: print("Unknown type " + str(self.node_mapping[node][1]))
[docs] def destroy(self): """Stop SUMO.""" logger.info('Trying to close SUMO for %s.', self.name) # Trigger abort of loop. self.step_counter = self.steps traci.close()