##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Thread based Auxiliary Interface
********************************
:module: thread_auxiliary
:synopsis: common thread based auxiliary interface
.. currentmodule:: thread_auxiliary
"""
import abc
import logging
import queue
import threading
import time
from typing import List, Optional
from pykiso.auxiliary import AuxiliaryCommon
from pykiso.test_setup.dynamic_loader import PACKAGE
from ..types import MsgType
log = logging.getLogger(__name__)
# Ensure lock and queue unique reference: needed because python will do
# a copy of the created object when the unittests are called
[docs]class AuxiliaryInterface(threading.Thread, AuxiliaryCommon):
"""Defines the Interface of all thread based auxiliaries.
Auxiliaries get configured by the Test Coordinator, get instantiated by
the TestCases and in turn use Connectors.
"""
def __init__(
self,
name: str = None,
is_proxy_capable: bool = False,
is_pausable: bool = False,
activate_log: List[str] = None,
auto_start: bool = True,
) -> None:
"""Auxiliary initialization.
:param name: alias of the auxiliary instance
:param is_proxy_capable: notify if the current auxiliary could
be (or not) associated to a proxy-auxiliary.
:param is_pausable: notify if the current auxiliary could be
(or not) paused
:param activate_log: loggers to deactivate
:param auto_start: determine if the auxiliayry is automatically
started (magic import) or manually (by user)
"""
# Initialize thread class
super().__init__()
self.initialize_loggers(activate_log)
# Save the name
self.name = name
# Define thread control attributes & methods
self.lock = threading.RLock()
self.queue_in = queue.Queue()
self.queue_out = queue.Queue()
self.stop_event = threading.Event()
self.wait_event = threading.Event()
self.is_proxy_capable = is_proxy_capable
self.is_pausable = is_pausable
# Create state
self.is_instance = False
self.auto_start = auto_start
self._aux_copy = None
[docs] def start(self) -> None:
"""Start the thread and create the auxiliary only if auto_start
flag is False.
"""
if not self.is_alive():
super().start()
if not self.auto_start:
self.create_instance()
[docs] @staticmethod
def initialize_loggers(loggers: Optional[List[str]]) -> None:
"""Deactivate all external loggers except the specified ones.
:param loggers: list of logger names to keep activated
"""
if loggers is None:
loggers = list()
# keyword 'all' should keep all loggers to the configured level
if "all" in loggers:
log.warning(
"All loggers are activated, this could lead to performance issues."
)
return
# keep package and auxiliary loggers
relevant_loggers = {
name: logger
for name, logger in logging.root.manager.loggerDict.items()
if not (name.startswith(PACKAGE) or name.endswith("auxiliary"))
and not isinstance(logger, logging.PlaceHolder)
}
# keep child loggers
childs = [
logger
for logger in relevant_loggers.keys()
for parent in loggers
if (logger.startswith(parent) or parent.startswith(logger))
]
loggers += childs
# keep original level for specified loggers
loggers_to_deactivate = set(relevant_loggers) - set(loggers)
for logger_name in loggers_to_deactivate:
logging.getLogger(logger_name).setLevel(logging.WARNING)
[docs] def create_instance(self) -> bool:
"""Create an auxiliary instance and ensure the communication to it.
:return: message.Message() - Contain received message
"""
if self.lock.acquire():
# Trigger the internal requests
self.queue_in.put("create_auxiliary_instance")
# Wait until the request was processed
report = self.queue_out.get()
# Release the above lock
self.lock.release()
# Return the report
return report
[docs] def delete_instance(self) -> bool:
"""Delete an auxiliary instance and its communication to it.
:return: message.Message() - Contain received message
"""
if self.lock.acquire():
# Trigger the internal requests
self.queue_in.put("delete_auxiliary_instance")
# Wait until the request was processed
report = self.queue_out.get()
# Release the above lock
self.lock.release()
# Return the report
return report
[docs] def run(self) -> None:
"""Run function of the auxiliary thread."""
while not self.stop_event.is_set():
# Step 1: Check if a request is available & process it
request = ""
# Check if a request was received
if not self.queue_in.empty():
request = self.queue_in.get_nowait()
# Process the request
if request == "create_auxiliary_instance" and not self.is_instance:
# Call the internal instance creation method
return_code = self._create_auxiliary_instance()
# Based on the result set status:
self.is_instance = return_code
# Enqueue the result for the request caller
self.queue_out.put(return_code)
elif request == "delete_auxiliary_instance" and self.is_instance:
# Call the internal instance delete method
return_code = self._delete_auxiliary_instance()
# Based on the result set status:
self.is_instance = not return_code
# Enqueue the result for the request caller
self.queue_out.put(return_code)
elif (
isinstance(request, tuple)
and self.is_instance
and request[0] == "command"
):
# If the instance is created, send the requested command
# to the instance via the internal method
_, cmd, data = request
cmd_response = self._run_command(cmd, data)
if cmd_response is not None:
self.queue_out.put(cmd_response)
elif request == "abort" and self.is_instance:
self.queue_out.put(self._abort_command())
elif request != "":
# A request was received but could not be processed
log.warning(f"Unknown request '{request}', will not be processed!")
log.warning(f"Aux status: {self.__dict__}")
# Step 2: Check if something was received from the aux instance if instance was created
if self.is_instance and not self.is_pausable:
received_message = self._receive_message(timeout_in_s=0)
# If yes, send it via the out queue
if received_message is not None:
self.queue_out.put(received_message)
# Free up cpu usage when auxiliary is suspended
if not self.is_instance:
time.sleep(0.050)
# If auxiliary instance is created and is pausable
if self.is_instance and self.is_pausable:
self.wait_event.wait()
# Thread stop command was set
log.info("{} was stopped".format(self))
# Delete auxiliary external instance if not done
if self.is_instance:
self._delete_auxiliary_instance()
@abc.abstractmethod
def _create_auxiliary_instance(self) -> bool:
"""Create the auxiliary instance with witch we will communicate.
:return: True - Successfully created / False - Failed by creation
.. note: Errors should be logged via the logging with the right level
"""
pass
@abc.abstractmethod
def _delete_auxiliary_instance(self) -> bool:
"""Delete the auxiliary instance with witch we will communicate.
:return: True - Successfully deleted / False - Failed deleting
.. note: Errors should be logged via the logging with the right level
"""
pass
@abc.abstractmethod
def _run_command(self, cmd_message: MsgType, cmd_data: bytes = None) -> MsgType:
"""Run a command for the auxiliary.
:param cmd_message: command in form of a message to run
:param cmd_data: payload data for the command
:return: True - Successfully received by the instance / False - Failed sending
.. note: Errors should be logged via the logging with the right level
"""
pass
@abc.abstractmethod
def _abort_command(self) -> bool:
"""Abort the sent command."""
pass
@abc.abstractmethod
def _receive_message(self, timeout_in_s: float) -> MsgType:
"""Defines what needs to be done as a receive message. Such as,
what do I need to do to receive a message.
:param timeout_in_s: How much time to block on the receive
:return: message.Message - If one received / None - Else
"""
pass