Source code for pykiso.lib.auxiliaries.proxy_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Proxy Auxiliary
***************

:module: proxy_auxiliary

:synopsis: auxiliary use to connect multiple auxiliaries on a unique connector.

This auxiliary simply spread all commands and received messages to all connected
auxiliaries. This auxiliary is only usable through proxy connector.

.. code-block:: none

     ___________   ___________         ___________
    |           | |           | ..... |           |
    |   Aux 1   | |   Aux 1   |       |   Aux n   |
    |___________| |___________|       |___________|
          |             |                   |
          |             |                   |
     ___________   ___________         ___________
    |           | |           | ..... |           |
    |Proxy_con 1| |Proxy_con 2|       |Proxy_con n|
    |___________| |___________|       |___________|
          |             |                   |
          |             |                   |
          |             |                   |
     _____________________________________________
    |                                             |
    |               Proxy Auxiliary               |
    |_____________________________________________|
                        |
                        |
     _____________________________________________
    |                                             |
    |               Connector Channel             |
    |_____________________________________________|

.. currentmodule:: proxy_auxiliary

"""

import logging
import sys
import time
from pathlib import Path
from typing import List, Optional, Tuple, Union

from pykiso import AuxiliaryInterface, CChannel
from pykiso.test_setup.config_registry import ConfigRegistry
from pykiso.test_setup.dynamic_loader import PACKAGE

log = logging.getLogger(__name__)


[docs]class ProxyAuxiliary(AuxiliaryInterface): """Proxy auxiliary for multi auxiliaries communication handling.""" def __init__( self, com: CChannel, aux_list: List[str], activate_trace: bool = False, trace_dir: Optional[str] = None, trace_name: Optional[str] = None, **kwargs, ): """Initialize attributes. :param com: Communication connector :param aux_list: list of auxiliary's alias """ self.channel = com self.logger = ProxyAuxiliary._init_trace(activate_trace, trace_dir, trace_name) self.proxy_channels = self.get_proxy_con(aux_list) super().__init__(**kwargs) @staticmethod def _init_trace( activate: bool, t_dir: Optional[str] = None, t_name: Optional[str] = None ) -> logging.Logger: """Initialize the logging trace for proxy auxiliary received message recording. :param activate: True if the trace is activate otherwise False :param t_dir: trace directory path (absolute or relative) :param t_name: trace full name (without file extension) :return : created logger containing the configured FileHander otherwise default logger """ logger = log if not activate: return logger # Just avoid the case the given trace directory is None t_dir = "" if t_dir is None else t_dir # if the given log path is not absolute add root path # (where pykiso is launched) otherwise take it as it is dir_path = ( (Path() / t_dir).resolve() if not Path(t_dir).is_absolute() else Path(t_dir) ) # if no specific logging file name is given take the default one t_name = ( time.strftime(f"%Y-%m-%d_%H-%M-%S_{t_name}.log") if t_name is not None else time.strftime("%Y-%m-%d_%H-%M-%S_proxy_logging.log") ) # if path doesn't exists take root path (where pykiso is launched) log_path = ( dir_path / t_name if dir_path.exists() else (Path() / t_name).resolve() ) # configure the file handler and create the trace file log_format = logging.Formatter("%(asctime)s : %(message)s") log.info(f"create proxy trace file at {log_path}") handler = logging.FileHandler(log_path, "w+") handler.setFormatter(log_format) # create logger and set the log level to DEBUG logger = logging.getLogger(f"{__name__}.PROXY") logger.addHandler(handler) logger.setLevel(logging.DEBUG) return logger
[docs] def get_proxy_con( self, aux_list: List[Union[str, AuxiliaryInterface]] ) -> Tuple[AuxiliaryInterface]: """Retrieve all connector associated to all given existing Auxiliaries. If auxiliary alias exists but auxiliary instance was not created yet, create it immediately using ConfigRegistry _aux_cache. :param aux_list: list of auxiliary's alias :return: tuple containing all connectors associated to all given auxiliaries """ channel_inst = [] for aux in aux_list: # aux_list can contain a auxiliary instance just grab the # channel if isinstance(aux, AuxiliaryInterface): self._check_compatibility(aux) channel_inst.append(aux.channel) continue # check the system module in order to get the auxiliary # instance aux_inst = sys.modules.get(f"{PACKAGE}.auxiliaries.{aux}") if aux_inst is not None: self._check_compatibility(aux_inst) channel_inst.append(aux_inst.channel) # check if the given aux_name is in the available aux # alias list elif aux in ConfigRegistry.get_auxes_alias(): log.warning( f"Auxiliary : {aux} is not using import magic mechanism (pre-loaded)" ) # load it using ConfigRegistry _aux_cache aux_inst = ConfigRegistry._linker._aux_cache.get_instance(aux) self._check_compatibility(aux_inst) channel_inst.append(aux_inst.channel) # the given auxiliary alias doesn't exist or refer to a # invalid one else: log.error(f"Auxiliary : {aux} doesn't exist") return tuple(channel_inst)
@staticmethod def _check_compatibility(aux: AuxiliaryInterface) -> None: """Check if the given auxiliary is proxy compatible. :param aux: auxiliary instance to check :raises NotImplementedError: if is_proxy_capable flag is False """ if not aux.is_proxy_capable: raise NotImplementedError( f"Auxiliary {aux} is not compatible with a proxy auxiliary" ) def _create_auxiliary_instance(self) -> bool: """Open current associated channel. :return: if channel creation is successful return True otherwise false """ try: log.info("Create auxiliary instance") log.info("Enable channel") self.channel.open() return True except Exception as e: log.exception(f"Error encouting during channel creation, reason : {e}") self.stop() return False def _delete_auxiliary_instance(self) -> bool: """Close current associated channel. :return: always True """ try: log.info("Delete auxiliary instance") self.channel.close() except Exception as e: log.exception(f"Error encouting during channel closure, reason : {e}") finally: return True def _run_command(self) -> None: """Run all commands present in each proxy connectors queue in by sending it over current associated CChannel. In addition, all commands are dispatch to others auxiliaries using proxy connector queue out. """ for conn in self.proxy_channels: if not conn.queue_in.empty(): args, kwargs = conn.queue_in.get() message = kwargs.get("msg") if message is not None: self._dispatch_command( message=message, remote_id=kwargs.get("remote_id"), con_use=conn, ) self.channel._cc_send(*args, **kwargs) def _dispatch_command(self, message: bytes, con_use: CChannel, remote_id: int): """Dispatch the current command to others connected auxiliaries. This action is performed by populating the queue out from each proxy connectors. :param message: message to send :param con_use: current proxy connector where the command come from :param remote_id: if CAN is used, CAN frame ID on which the message will be sent """ for conn in self.proxy_channels: if conn != con_use: conn.queue_out.put([message, remote_id]) def _abort_command(self) -> None: """Not Used.""" return True def _receive_message(self, timeout_in_s: float = 0) -> None: """When no request are sent this method is called by AuxiliaryInterface run method. At each message received, this method will populate each proxy connectors queue out. :param timeout_in_s: maximum amount of time in second to wait for a message. """ try: received_data, source = self.channel.cc_receive( timeout=timeout_in_s, raw=True ) # if data are received, populate connected proxy connectors queue out if received_data is not None: self.logger.debug( f"raw data : {received_data.hex()} || source : {source} || channel : {self.channel.name}" ) for conn in self.proxy_channels: conn.queue_out.put([received_data, source]) except Exception: log.exception( f"encountered error while receiving message via {self.channel}" )
[docs] def run(self) -> None: """Run function of the auxiliary thread""" while not self.stop_event.is_set(): # Step 1: Check if a request is available & process it request = "" # Check if a request was received if not self.queue_in.empty(): request = self.queue_in.get_nowait() # Process the request if request == "create_auxiliary_instance" and not self.is_instance: # Call the internal instance creation method return_code = self._create_auxiliary_instance() # Based on the result set status: self.is_instance = return_code # Enqueue the result for the request caller self.queue_out.put(return_code) elif request == "delete_auxiliary_instance" and self.is_instance: # Call the internal instance delete method return_code = self._delete_auxiliary_instance() # Based on the result set status: self.is_instance = not return_code # Enqueue the result for the request caller self.queue_out.put(return_code) elif request != "": # A request was received but could not be processed log.warning(f"Unknown request '{request}', will not be processed!") log.warning(f"Aux status: {self.__dict__}") # Step 2: Send stack command and propagate them to others connected auxiliaires # and check if something was received if instance was created if self.is_instance: self._run_command() self._receive_message(timeout_in_s=0) # Thread stop command was set log.info("{} was stopped".format(self)) # Delete auxiliary external instance if not done if self.is_instance: self._delete_auxiliary_instance()