Source code for pykiso.auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Auxiliary common Interface Definition
*****************************************

:module: auxiliary
:synopsis: base auxiliary interface

.. currentmodule:: pykiso

"""
import abc
import enum
import functools
import logging
import queue
import threading
from enum import Enum, unique
from typing import Any, Callable, List, Optional

from .exceptions import AuxiliaryCreationError, AuxiliaryNotStarted
from .logging_initializer import add_internal_log_levels, initialize_loggers

log = logging.getLogger(__name__)


[docs]@unique class AuxCommand(Enum): """Contain all available auxiliary's commands.""" #: create auxiliary command id CREATE_AUXILIARY = enum.auto() #: delete auxiliary command id DELETE_AUXILIARY = enum.auto()
[docs]class AuxiliaryInterface(abc.ABC): """Common interface for all double threaded auxiliary. A so called << double threaded >> auxiliary, simply encapsulate two threads one for the reception and one for the transmmission. """ def __init__( self, name: str = None, is_proxy_capable: bool = False, connector_required: bool = True, activate_log: List[str] = None, tx_task_on=True, rx_task_on=True, auto_start: bool = True, ) -> None: """Initialize auxiliary attributes :param name: alias of the auxiliary instance :param is_proxy_capable: notify if the current auxiliary could be (or not) associated to a proxy-auxiliary. :param connector_required: define if a connector is required for this auxiliary. :param activate_log: loggers to deactivate :param tx_task_on: enable or not the tx thread :param rx_task_on: enable or not the rx thread :param auto_start: determine if the auxiliayry is automatically started (magic import) or manually (by user) """ initialize_loggers(activate_log) add_internal_log_levels() self.name = name self.is_proxy_capable = is_proxy_capable self.auto_start = auto_start self.lock = threading.RLock() self._stop_event = threading.Event() self.stop_tx = threading.Event() self.stop_rx = threading.Event() self.queue_in = queue.Queue() self.queue_out = queue.Queue() self.tx_task_on = tx_task_on self.rx_task_on = rx_task_on self.tx_thread = None self.rx_thread = None self.recv_timeout = 1 self.is_instance = False self.connector_required = connector_required
[docs] def run_command( self, cmd_message: Any, cmd_data: Any = None, blocking: bool = True, timeout_in_s: int = 5, timeout_result: Any = None, ) -> Any: """Send a request by transmitting it through queue_in and waiting for a response using queue_out. :param cmd_message: command request to the auxiliary :param cmd_data: data you would like to populate the command with :param blocking: If you want the command request to be blocking or not :param timeout_in_s: Number of time (in s) you want to wait for an answer :param timeout_result: Value to return when the command times out. Defaults to None. :raises pykiso.exceptions.AuxiliaryNotStarted: if a command is executed although the auxiliary was not started. :return: True if the request is correctly executed otherwise False """ # avoid a deadlock in case run_command is called within a _receive_message implementation # (for e.g. callback implementation) while delete_instance is being executed from the main thread if self._stop_event.is_set(): return timeout_result with self.lock: if not self.is_instance: raise AuxiliaryNotStarted(self.name) log.internal_debug( f"sending command '{cmd_message}' with payload {cmd_data} using {self.name} aux." ) response_received = timeout_result self.queue_in.put((cmd_message, cmd_data)) try: response_received = self.queue_out.get(blocking, timeout_in_s) log.internal_debug( f"reply to command '{cmd_message}' received: '{response_received}' in {self.name}" ) except queue.Empty: log.error( f"no reply received within time for command {cmd_message} for payload {cmd_data} using {self.name} aux." ) return response_received
[docs] def create_instance(self) -> bool: """Start auxiliary's running tasks and activities. :return: True if the auxiliary is created otherwise False :raises AuxiliaryCreationError: if instance creation failed """ log.internal_info(f"Creating instance of auxiliary {self.name}") with self.lock: # if the current aux is alive don't try to create it again if self.is_instance: log.internal_info(f"Auxiliary {self.name} is already created") return True is_created = self._create_auxiliary_instance() if not is_created: raise AuxiliaryCreationError(self.name) # start each auxiliary's tasks self._start_tx_task() self._start_rx_task() self.is_instance = True return is_created
[docs] def delete_instance(self) -> bool: """Stop auxiliary's running tasks and activities. :return: True if the auxiliary is deleted otherwise False """ log.internal_info(f"Deleting instance of auxiliary {self.name}") with self.lock: self._stop_event.set() # if the current aux is not alive don't try to delete it again if not self.is_instance: log.internal_info(f"Auxiliary {self.name} is already deleted") self._stop_event.clear() return True # stop each auxiliary's tasks self._stop_tx_task() self._stop_rx_task() is_deleted = self._delete_auxiliary_instance() if not is_deleted: log.error( f"Unexpected error occurred during deletion of auxiliary instance {self.name}" ) self.is_instance = False self._stop_event.clear() return is_deleted
def _start_tx_task(self) -> None: """Start transmission task.""" if self.tx_task_on is False: log.internal_debug("transmit task is not needed, don't start it") return task_name = f"{self.name}_tx" log.internal_debug("start transmit task %s", task_name) self.tx_thread = threading.Thread(name=task_name, target=self._transmit_task) self.tx_thread.start() def _start_rx_task(self) -> None: """Start reception task.""" if self.rx_task_on is False: log.internal_debug("reception task is not needed, don't start it") return task_name = f"{self.name}_rx" log.internal_debug("start reception task %s", task_name) self.rx_thread = threading.Thread(name=task_name, target=self._reception_task) self.rx_thread.start() def _stop_tx_task(self) -> None: """Stop transmission task.""" if self.tx_task_on is False: log.internal_debug("transmit task was not started, no need to stop it") return log.internal_debug(f"stop transmit task {self.name}_tx") self.queue_in.put((AuxCommand.DELETE_AUXILIARY, None)) self.stop_tx.set() self.tx_thread.join() self.stop_tx.clear() def _stop_rx_task(self) -> None: """Stop reception task.""" if self.rx_task_on is False: log.internal_debug("reception task was not started, no need to stop it") return log.internal_debug(f"stop reception task {self.name}_rx") self.stop_rx.set() self.rx_thread.join() self.stop_rx.clear()
[docs] def start(self) -> bool: """Force the auxiliary to start all running tasks and activities. .. warning:: due to the usage of create_instance if an issue occurred the exception AuxiliaryCreationError is raised. :return: True if the auxiliary is started otherwise False """ return self.create_instance()
[docs] def stop(self) -> bool: """Force the auxiliary to stop all running tasks and activities. :return: True if the auxiliary is stopped otherwise False """ return self.delete_instance()
[docs] def suspend(self) -> bool: """Supend current auxiliary's run. :return: True if the auxiliary is suspend otherwise False """ return self.delete_instance()
[docs] def resume(self) -> bool: """Resume current auxiliary's run. .. warning:: due to the usage of create_instance if an issue occurred the exception AuxiliaryCreationError is raised. :return: True if the auxiliary is resumed otherwise False """ return self.create_instance()
def _transmit_task(self) -> None: """Auxiliary transmission task. Simply call the child defined _run_command. """ while not self.stop_tx.is_set(): cmd, data = self.queue_in.get() # just stop the current Tx thread task if cmd == AuxCommand.DELETE_AUXILIARY: break self._run_command(cmd, data) def _reception_task(self) -> None: """Auxiliary reception task. Simply call the child defined _receive_message method. """ while not self.stop_rx.is_set(): self._receive_message(timeout_in_s=self.recv_timeout)
[docs] def wait_for_queue_out( self, blocking: bool = False, timeout_in_s: int = 0 ) -> Optional[Any]: """Wait for data from the queue out. :param blocking: True: wait for timeout to expire, False: return immediately :param timeout_in_s: if blocking, wait the defined time in seconds :return: data contained in the auxiliary's queue_out """ try: return self.queue_out.get(blocking, timeout_in_s) except queue.Empty: return None
[docs] def shutdown(self): """Uninitialize method. Will be called at the end of the test session.""" pass
@abc.abstractmethod def _create_auxiliary_instance(self) -> bool: """Common interface call at auxiliary creation. This method could be used to e.g initiate the communication using the attached connector. :return: True - Successfully created / False - Failed by creation """ @abc.abstractmethod def _delete_auxiliary_instance(self) -> bool: """Common interface call at auxiliary deletion. This method could be used to e.g terminate the current running communication... :return: True - Successfully deleted / False - Failed deleting """ @abc.abstractmethod def _run_command(self, cmd_message: Any, cmd_data: Optional[bytes]) -> None: """Run a command for the auxiliary. :param cmd_message: command to send :param cmd_data: payload data for the command """ @abc.abstractmethod def _receive_message(self, timeout_in_s: float) -> None: """Defines what needs to be done as a receive message. Such as, what do I need to do to receive a message. :param timeout_in_s: How much time to block on the receive """
[docs]def open_connector(func: Callable) -> Callable: """Open current associated auxiliary's channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_open(self, *arg, **kwargs) -> bool: """Open the channel. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ log.internal_info( f"Open {self.channel.__class__.__name__} channel {self.channel.name!r}" ) try: self.channel.open() return func(self, *arg, **kwargs) except Exception: log.exception( f"Unable to open {self.channel.__class__.__name__} channel communication for {self.channel.name!r}" ) return False return inner_open
[docs]def close_connector(func: Callable) -> Callable: """Close current associated auxiliary's channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_close(self, *arg, **kwargs) -> bool: """Close the channel. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ log.internal_info( f"Close {self.channel.__class__.__name__} channel {self.channel.name!r}" ) try: ret = func(self, *arg, **kwargs) self.channel.close() return ret except Exception: log.exception( f"Unable to close {self.channel.__class__.__name__} channel communication {self.channel.name!r}" ) return False return inner_close
[docs]def flash_target(func: Callable) -> Callable: """Flash firmware on the target, using associated auxiliary's flasher channel. :param func: decorated method :return: inner decorated function """ @functools.wraps(func) def inner_flash(self, *arg, **kwargs) -> bool: """Flash the device under test. :param args: positional arguments :param kwargs: named arguments :return: True if everything was successful otherwise False """ if self.flash is None and not self.is_instance: log.internal_debug("No flasher configured!") return func(self, *arg, **kwargs) try: log.internal_info("Flash target") with self.flash as flasher: flasher.flash() return func(self, *arg, **kwargs) except Exception: log.exception("Error occurred during flashing") return False return inner_flash