Source code for pykiso.auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Auxiliary common Interface Definition
*****************************************

:module: auxiliary
:synopsis: base auxiliary interface

.. currentmodule:: pykiso

"""
import abc
import enum
import functools
import logging
import queue
import threading
from enum import Enum, unique
from typing import Any, Callable, List, Optional

from typing_extensions import Self

from pykiso.test_setup.config_registry import ConfigRegistry

from .exceptions import AuxiliaryCreationError, AuxiliaryNotStarted
from .logging_initializer import add_internal_log_levels, initialize_loggers

log = logging.getLogger(__name__)


[docs]@unique class AuxCommand(Enum): """Contain all available auxiliary's commands.""" #: create auxiliary command id CREATE_AUXILIARY = enum.auto() #: delete auxiliary command id DELETE_AUXILIARY = enum.auto()
[docs]class AuxiliaryInterface(abc.ABC): """Common interface for all double threaded auxiliary. A so called << double threaded >> auxiliary, simply encapsulate two threads one for the reception and one for the transmmission. """
[docs] @classmethod def get_instance(cls, name: str) -> Self: """Experimental - Get an auxiliary instance by its name.""" auxiliary = ConfigRegistry.get_aux_by_alias(name) # Verify if the auxiliary is of the right type if not isinstance(auxiliary, cls): raise ValueError(f"Requested auxiliary {name} is not of type {cls}") return auxiliary
def __init__( self, name: str = None, is_proxy_capable: bool = False, connector_required: bool = True, activate_log: List[str] = None, tx_task_on=True, rx_task_on=True, auto_start: bool = True, ) -> None: """Initialize auxiliary attributes :param name: alias of the auxiliary instance :param is_proxy_capable: notify if the current auxiliary could be (or not) associated to a proxy-auxiliary. :param connector_required: define if a connector is required for this auxiliary. :param activate_log: loggers to deactivate :param tx_task_on: enable or not the tx thread :param rx_task_on: enable or not the rx thread :param auto_start: determine if the auxiliayry is automatically started (magic import) or manually (by user) """ initialize_loggers(activate_log) add_internal_log_levels() self.name = name self.is_proxy_capable = is_proxy_capable self.auto_start = auto_start self.lock = threading.RLock() self.rx_lock = threading.Lock() self._stop_event = threading.Event() self.stop_tx = threading.Event() self.stop_rx = threading.Event() self.queue_in = queue.Queue() self.queue_out = queue.Queue() self.tx_task_on = tx_task_on self.rx_task_on = rx_task_on self.tx_thread = None self.rx_thread = None self.recv_timeout = 1 self.is_instance = False self.connector_required = connector_required
[docs] def run_command( self, cmd_message: Any, cmd_data: Any = None, blocking: bool = True, timeout_in_s: int = 5, timeout_result: Any = None, ) -> Any: """Send a request by transmitting it through queue_in and waiting for a response using queue_out. :param cmd_message: command request to the auxiliary :param cmd_data: data you would like to populate the command with :param blocking: If you want the command request to be blocking or not :param timeout_in_s: Number of time (in s) you want to wait for an answer :param timeout_result: Value to return when the command times out. Defaults to None. :raises pykiso.exceptions.AuxiliaryNotStarted: if a command is executed although the auxiliary was not started. :return: True if the request is correctly executed otherwise False """ # avoid a deadlock in case run_command is called within a _receive_message implementation # (for e.g. callback implementation) while delete_instance is being executed from the main thread if self._stop_event.is_set(): return timeout_result with self.lock: if not self.is_instance: raise AuxiliaryNotStarted(self.name) log.internal_debug(f"sending command '{cmd_message}' with payload {cmd_data} using {self.name} aux.") response_received = timeout_result self.queue_in.put((cmd_message, cmd_data)) try: response_received = self.queue_out.get(blocking, timeout_in_s) log.internal_debug(f"reply to command '{cmd_message}' received: '{response_received}' in {self.name}") except queue.Empty: log.error( f"no reply received within time for command {cmd_message} for payload {cmd_data} using {self.name} aux." ) return response_received
[docs] def create_instance(self) -> bool: """Start auxiliary's running tasks and activities. :return: True if the auxiliary is created otherwise False :raises AuxiliaryCreationError: if instance creation failed """ log.internal_info(f"Creating instance of auxiliary {self.name}") with self.lock: # if the current aux is alive don't try to create it again if self.is_instance: log.internal_info(f"Auxiliary {self.name} is already created") return True is_created = self._create_auxiliary_instance() if not is_created: raise AuxiliaryCreationError(self.name) # start each auxiliary's tasks self._start_tx_task() self._start_rx_task() self.is_instance = True return is_created
[docs] def delete_instance(self) -> bool: """Stop auxiliary's running tasks and activities. :return: True if the auxiliary is deleted otherwise False """ log.internal_info(f"Deleting instance of auxiliary {self.name}") with self.lock: self._stop_event.set() # if the current aux is not alive don't try to delete it again if not self.is_instance: log.internal_info(f"Auxiliary {self.name} is already deleted") self._stop_event.clear() return True # stop each auxiliary's tasks self._stop_tx_task() self._stop_rx_task() is_deleted = self._delete_auxiliary_instance() if not is_deleted: log.error(f"Unexpected error occurred during deletion of auxiliary instance {self.name}") self.is_instance = False self._stop_event.clear() # Reset queue so no old commands are processed when restarting the auxiliary self.queue_in = queue.Queue() self.queue_out = queue.Queue() return is_deleted
def _start_tx_task(self) -> None: """Start transmission task.""" if self.tx_task_on is False: log.internal_debug("transmit task is not needed, don't start it") return task_name = f"{self.name}_tx" log.internal_debug("start transmit task %s", task_name) # Any created thread should disappear after main-thread exit self.tx_thread = threading.Thread(name=task_name, target=self._transmit_task, daemon=True) self.tx_thread.start() def _start_rx_task(self) -> None: """Start reception task.""" if self.rx_task_on is False: log.internal_debug("reception task is not needed, don't start it") return with self.rx_lock: task_name = f"{self.name}_rx" log.internal_debug("start reception task %s", task_name) # Any created thread should disappear after main-thread exit self.rx_thread = threading.Thread(name=task_name, target=self._reception_task, daemon=True) self.rx_thread.start() def _stop_tx_task(self) -> None: """Stop transmission task.""" if self.tx_task_on is False: log.internal_debug("transmit task was not started, no need to stop it") return log.internal_debug(f"stop transmit task {self.name}_tx") self.queue_in.put((AuxCommand.DELETE_AUXILIARY, None)) self.stop_tx.set() self.tx_thread.join() self.stop_tx.clear() def _stop_rx_task(self) -> None: """Stop reception task.""" if self.rx_task_on is False: log.internal_debug("reception task was not started, no need to stop it") return with self.rx_lock: log.internal_debug(f"stop reception task {self.name}_rx") self.stop_rx.set() self.rx_thread.join() self.stop_rx.clear()
[docs] def start(self) -> bool: """Force the auxiliary to start all running tasks and activities. .. warning:: due to the usage of create_instance if an issue occurred the exception AuxiliaryCreationError is raised. :return: True if the auxiliary is started otherwise False """ return self.create_instance()
[docs] def stop(self) -> bool: """Force the auxiliary to stop all running tasks and activities. :return: True if the auxiliary is stopped otherwise False """ return self.delete_instance()
def __enter__(self) -> Self: """Context manager entry point""" if self.start(): return self else: raise AuxiliaryNotStarted(f"Failed to start auxiliary {self.name}") def __exit__(self, type, value, traceback): """Context manager exit point""" stop_status = self.stop() if traceback: log.error(f"Error occurred during auxiliary {self.name} execution: {type=}, {value=}, {traceback=}") if not stop_status: raise RuntimeError(f"Failed to stop auxiliary {self.name}")
[docs] def suspend(self) -> bool: """Supend current auxiliary's run. :return: True if the auxiliary is suspend otherwise False """ return self.delete_instance()
[docs] def resume(self) -> bool: """Resume current auxiliary's run. .. warning:: due to the usage of create_instance if an issue occurred the exception AuxiliaryCreationError is raised. :return: True if the auxiliary is resumed otherwise False """ return self.create_instance()
def _transmit_task(self) -> None: """Auxiliary transmission task. Simply call the child defined _run_command. """ while not self.stop_tx.is_set(): cmd, data = self.queue_in.get() # just stop the current Tx thread task if cmd == AuxCommand.DELETE_AUXILIARY: break self._run_command(cmd, data) def _reception_task(self) -> None: """Auxiliary reception task. Simply call the child defined _receive_message method. """ while not self.stop_rx.is_set(): self._receive_message(timeout_in_s=self.recv_timeout)
[docs] def wait_for_queue_out(self, blocking: bool = False, timeout_in_s: int = 0) -> Optional[Any]: """Wait for data from the queue out. :param blocking: True: wait for timeout to expire, False: return immediately :param timeout_in_s: if blocking, wait the defined time in seconds :return: data contained in the auxiliary's queue_out """ try: return self.queue_out.get(blocking, timeout_in_s) except queue.Empty: return None
[docs] def shutdown(self): """Uninitialize method. Will be called at the end of the test session.""" pass
@abc.abstractmethod def _create_auxiliary_instance(self) -> bool: """Common interface call at auxiliary creation. This method could be used to e.g initiate the communication using the attached connector. :return: True - Successfully created / False - Failed by creation """ @abc.abstractmethod def _delete_auxiliary_instance(self) -> bool: """Common interface call at auxiliary deletion. This method could be used to e.g terminate the current running communication... :return: True - Successfully deleted / False - Failed deleting """ @abc.abstractmethod def _run_command(self, cmd_message: Any, cmd_data: Optional[bytes]) -> None: """Run a command for the auxiliary. :param cmd_message: command to send :param cmd_data: payload data for the command """ @abc.abstractmethod def _receive_message(self, timeout_in_s: float) -> None: """Defines what needs to be done as a receive message. Such as, what do I need to do to receive a message. :param timeout_in_s: How much time to block on the receive """
[docs]def open_connector(func: Callable) -> Callable: """Open current associated auxiliary's channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_open(self, *arg, **kwargs) -> bool: """Open the channel. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ log.internal_info(f"Open {self.channel.__class__.__name__} channel {self.channel.name!r}") try: self.channel.open() return func(self, *arg, **kwargs) except Exception: log.exception( f"Unable to open {self.channel.__class__.__name__} channel communication for {self.channel.name!r}" ) return False return inner_open
[docs]def close_connector(func: Callable) -> Callable: """Close current associated auxiliary's channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_close(self, *arg, **kwargs) -> bool: """Close the channel. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ log.internal_info(f"Close {self.channel.__class__.__name__} channel {self.channel.name!r}") try: ret = func(self, *arg, **kwargs) self.channel.close() return ret except Exception: log.exception( f"Unable to close {self.channel.__class__.__name__} channel communication {self.channel.name!r}" ) return False return inner_close
[docs]def flash_target(func: Callable) -> Callable: """Flash firmware on the target, using associated auxiliary's flasher channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_flash(self, *arg, **kwargs) -> bool: """Flash the device under test. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ if self.flash is None and not self.is_instance: log.internal_debug("No flasher configured!") return func(self, *arg, **kwargs) try: log.internal_info("Flash target") with self.flash as flasher: flasher.flash() return func(self, *arg, **kwargs) except Exception: log.exception("Error occurred during flashing") return False return inner_flash