Source code for pykiso.lib.connectors.cc_process

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Process Channel
***************

:module: cc_process

:synopsis: CChannel implementation for process execution.

The CCProcess channel provides functionality to start a process and
to communicate with it.

.. currentmodule:: cc_process

"""

import logging
import queue
import subprocess
import threading
from dataclasses import dataclass
from typing import IO, ByteString, List, Optional, Union

from pykiso.connector import CChannel

log = logging.getLogger(__name__)

MessageType = Union[str, ByteString]


[docs]class CCProcessError(BaseException): ...
[docs]@dataclass class ProcessMessage: """Holds the data that is read from the process""" # Stream name: stdout or stderr stream: str # Data as bytes or string data: Union[str, bytes]
[docs]@dataclass class ProcessExit: """Contains information about process exit""" exit_code: int
[docs]class CCProcess(CChannel): """Channel to run processes""" def __init__( self, shell: bool = False, pipe_stderr: bool = False, pipe_stdout: bool = True, pipe_stdin: bool = False, text: bool = True, cwd: Optional[str] = None, env: Optional[str] = None, encoding: Optional[str] = None, executable: Optional[str] = None, args: List[str] = [], **kwargs, ): """Initialize a process :param shell: Start process through shell :param pipe_stderr: Pipe stderr for reading with this connector :param pipe_stdout: Pipe stdout for reading with this connector :param pipe_stdin: Pipe stdin for writing with this connector :param text: Read/write stdout, stdin, stderr in binary mode :param cwd: The current working directory for the new process :param env: Environment variables for the new process :param encoding: Encoding to use in text mode :param executable: The path of the executable for the process :param args: Process arguments """ super().__init__(**kwargs) self._shell = shell self._pipe_stderr = pipe_stderr self._pipe_stdout = pipe_stdout self._pipe_stdin = pipe_stdin self._encoding = encoding self._executable = executable self._args = args self._text = text self._cwd = cwd self._env = env self._process: Optional[subprocess.Popen] = None self._queue_in: Optional[queue.Queue[Union[ProcessMessage, ProcessExit]]] = None self._stdout_thread: Optional[threading.Thread] = None self._stderr_thread: Optional[threading.Thread] = None self._lock = threading.Lock() self._finished_threads_count = 0 # Buffer for messages that where read from the process but not yet returned by _cc_receive self._buffer: List[Union[ProcessMessage, ProcessExit]] = []
[docs] def start(self, executable: Optional[str] = None, args: Optional[List[str]] = None): """Start a process :param executable: The executable path. Default to path specified in yaml if not given. :param args: The process arguments. Default to arguments specified in yaml if not given. :raises CCProcessError: Process is already running """ if self._process is not None and self._process.returncode is None: raise CCProcessError(f"Process is already running: {self._executable}") self._cleanup() self._finished_threads_count = 0 self._queue_in = queue.Queue() self._process = subprocess.Popen( ([executable] if executable is not None else [self._executable]) + (args if args is not None else self._args), stderr=subprocess.PIPE if self._pipe_stderr else None, stdout=subprocess.PIPE if self._pipe_stdout else None, stdin=subprocess.PIPE if self._pipe_stdin else None, shell=self._shell, text=self._text, encoding=self._encoding, cwd=self._cwd, env=self._env, ) # nosec B602 Since we only provide an interface to the user to popen, we accept the risk of a vulnerablility to various shell injection attacks. if self._pipe_stdout: self._stdout_thread = self._start_read_thread( self._process.stdout, "stdout" ) if self._pipe_stderr: self._stderr_thread = self._start_read_thread( self._process.stderr, "stderr" )
[docs] def _start_read_thread(self, stream: IO, name: str) -> threading.Thread: """Start a read thread :param stream: The stream to read from :param name: The name of the stream :return: The thread object """ thread = threading.Thread( name=f"cc_process_{name}", target=self._read_thread, args=(stream, name) ) thread.start() return thread
[docs] def _read_thread(self, stream: IO, name: str) -> None: """Thread for reading data from stdout or stderr :param stream: The stream to read from :param name: The name of the stream """ try: while True: if self._text: data = stream.readline() else: data = stream.read(1) if len(data) == 0: break self._queue_in.put(ProcessMessage(name, data)) finally: with self._lock: self._finished_threads_count += 1 if self._finished_threads_count == int(self._pipe_stdout) + int( self._pipe_stderr ): # ProcessExit marks the termination of all read threads self._queue_in.put(ProcessExit(self._process.wait()))
[docs] def _cc_close(self) -> None: """Close the channel.""" self._cleanup()
[docs] def _cc_send(self, msg: MessageType, **kwargs) -> None: """Execute process commands or write data to stdin :param msg: data to send :raises CCProcessError: Stdin pipe is not enabled """ if isinstance(msg, dict) and msg.get("command") == "start": self.start(msg.get("executable"), msg.get("args")) elif self._pipe_stdin: if self._process is None: raise CCProcessError("Process is not running.") log.internal_debug(f"write stdin: {msg}") self._process.stdin.write(msg) self._process.stdin.flush() else: raise CCProcessError("Can not send to stdin because pipe is not enabled.")
[docs] def _cleanup(self) -> None: """Cleanup threads and process objects""" if self._process is not None: # Terminate the process if still running self._process.terminate() try: self._process.wait(5) except subprocess.TimeoutExpired: log.internal_warning( f"Process {self._executable} could not be terminated" ) self._process.kill() # Wait for the threads to finish if self._stdout_thread is not None: self._stdout_thread.join() self._stdout_thread = None if self._stderr_thread is not None: self._stderr_thread.join() self._stderr_thread = None if self._queue_in is not None: self._queue_in = None
[docs] def _cc_open(self) -> None: """Implement abstract method""" pass
[docs] def _read_existing(self) -> Optional[ProcessMessage]: """Read buffered messages that where already received from the process. Messages from the same stream are combined. This is only used in binary mode. :return: Existing messages """ messages = self._buffer # Get all messages from the process that are available while not self._queue_in.empty(): messages.append(self._queue_in.get_nowait()) i = 1 # Find messages from the same stream(first entry in the tuple) as the first message while ( i < len(messages) and not isinstance(messages[0], ProcessExit) and not isinstance(messages[i], ProcessExit) and messages[0].stream == messages[i].stream ): i += 1 # Save the remaining messages for next time messages, self._buffer = messages[:i], messages[i:] # Process only messages from the same stream messages = messages[:i] if len(messages) == 0: return None if isinstance(messages[0], ProcessExit): return messages[0] # Join messages return ProcessMessage(messages[0].stream, b"".join([x.data for x in messages]))
[docs] @staticmethod def _create_message_dict(msg: Union[ProcessMessage, ProcessExit]) -> dict: """Create a dict from an entry in the process queue :param msg: The message to convert :return: The dictionary """ if isinstance(msg, ProcessMessage): ret = {"msg": {msg.stream: msg.data}} elif isinstance(msg, ProcessExit): ret = {"msg": {"exit": msg.exit_code}} return ret
[docs] def _cc_receive(self, timeout: float = 0.0001) -> MessageType: """Receive messages :param timeout: Time to wait in seconds for a message to be received :param size: unused return The received message """ if self._queue_in is None: return {"msg": None} # Get message from the queue try: read = self._queue_in.get(True, timeout) except queue.Empty: # Queue is empty, but there might be previously received messages when in binary mode existing = None if self._text else self._read_existing() if existing is not None: ret = CCProcess._create_message_dict(existing) else: ret = {"msg": None} return ret if not isinstance(read, ProcessExit): # A message was received if self._text: # Just return that message when in text mode ret = CCProcess._create_message_dict(read) else: # Add message to the buffer and join messages for binary mode self._buffer.append(read) existing = self._read_existing() ret = CCProcess._create_message_dict(existing) return ret else: # Process has exited self._cleanup() return CCProcess._create_message_dict(read)