Source code for cohydra.command_executor.local

"""Execute commands on the simulation host."""
import logging
import subprocess
import threading

from . import util
from .base import CommandExecutor, ExitCode

[docs]def log_file(logger, level, file, logfile): with file, util.LogFile(logger, logfile) as log: for line in file: log.log(level, line.rstrip())
[docs]class LocalCommandExecutor(CommandExecutor): """The LocalCommandExecutor runs commands on the simulation host. *Warning:* This raises an execption if something goes wrong. Be sure to catch it or the simulation will stop. """ def __init__(self, name=None): super().__init__(name)
[docs] def execute(self, command, user=None, shell=None, stdout_logfile=None, stderr_logfile=None): if stdout_logfile is not None or stderr_logfile is not None: raise ValueError(f'LocalCommandExecutor does not implement logfiles') if user is not None: command = util.apply_user_and_shell(command, user=user, shell=shell) shell = None logger = self.get_logger() logger.debug('%s', command) process = subprocess.Popen( # pylint: disable=subprocess-run-check command, shell=False if shell is None else shell, encoding='utf8', stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out_thread = threading.Thread(target=log_file, args=(logger, logging.INFO, process.stdout, stdout_logfile)) err_thread = threading.Thread(target=log_file, args=(logger, logging.ERROR, process.stderr, stderr_logfile)) out_thread.start() err_thread.start() code = process.wait() out_thread.join() err_thread.join() if code != 0: raise ExitCode(code, command)