##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Proxy Auxiliary
***************
:module: proxy_auxiliary
:synopsis: auxiliary use to connect multiple auxiliaries on a unique connector.
This auxiliary simply spread all commands and received messages to all connected
auxiliaries. This auxiliary is only usable through proxy connector.
.. code-block:: none
___________ ___________ ___________
| | | | ..... | |
| Aux 1 | | Aux 1 | | Aux n |
|___________| |___________| |___________|
| | |
| | |
___________ ___________ ___________
| | | | ..... | |
|Proxy_con 1| |Proxy_con 2| |Proxy_con n|
|___________| |___________| |___________|
| | |
| | |
| | |
_____________________________________________
| |
| Proxy Auxiliary |
|_____________________________________________|
|
|
_____________________________________________
| |
| Connector Channel |
|_____________________________________________|
.. currentmodule:: proxy_auxiliary
"""
import logging
import queue
import sys
import time
from pathlib import Path
from typing import List, Optional, Tuple
from pykiso import CChannel
from pykiso.auxiliary import (
AuxiliaryInterface,
close_connector,
open_connector,
)
from pykiso.lib.connectors.cc_proxy import CCProxy
from pykiso.test_setup.config_registry import ConfigRegistry
from pykiso.test_setup.dynamic_loader import PACKAGE
log = logging.getLogger(__name__)
[docs]class ProxyAuxiliary(AuxiliaryInterface):
"""Proxy auxiliary for multi auxiliaries communication handling."""
def __init__(
self,
com: CChannel,
aux_list: List[str],
activate_trace: bool = False,
trace_dir: Optional[str] = None,
trace_name: Optional[str] = None,
**kwargs,
):
"""Initialize attributes.
:param com: Communication connector
:param aux_list: list of auxiliary's alias
:param activate_trace: log all received messages in a
dedicated trace file or not
:param trace_dir: where to place the trace
:param trace_name: trace's file name
"""
super().__init__(
is_proxy_capable=True, tx_task_on=False, rx_task_on=True, **kwargs
)
self.channel = com
self._open_count = 0
self.logger = self._init_trace(activate_trace, trace_dir, trace_name)
self.proxy_channels = self.get_proxy_con(aux_list)
def _count_open_proxy_channels(self) -> int:
"""
Get the number of proxy channels connected to this auxiliary
that are currently open.
"""
return len(
[
ccproxy
for ccproxy in self.proxy_channels
if isinstance(ccproxy.queue_out, queue.Queue)
]
)
@property
def _open_connections(self) -> int:
"""A counter monitoring the number of attached running auxiliaries.
.. warning::
Do not set this manually as it can easily result in unexpected behaviours.
:getter: returns the number of currently attached and running auxiliaries.
:setter: set by ``CCProxy`` instances when being opened or closed.
When no open connection remains and the counter falls back to 0,
the ``ProxyAuxiliary`` will be stopped and the attached 'physical'
communication channel closed.
When the first connection is opened and the counter is increased
from 0 to 1, the ``ProxyAuxiliary`` will be started and the attached
'physical' communication channel opened.
"""
with self.lock:
return self._open_count
@_open_connections.setter
def _open_connections(self, value: int):
# locking shouldn't be necessary but we're never too paranoid
with self.lock:
# on the original proxy setup, the auxiliaries are created before the proxy
# therefore, when the first auxiliary will be stopped, the counter value will be -1
if value < 0:
value = self._count_open_proxy_channels()
last_connection_closed = value == 0 and self._open_count == 1
first_connection_opened = value == 1 and self._open_count == 0
self._open_count = value
# stop proxy if the last attached auxiliary was stopped
if last_connection_closed and self.is_instance:
self.delete_instance()
# start proxy if the first attached auxiliary is started
elif first_connection_opened and not self.is_instance:
self.create_instance()
@staticmethod
def _init_trace(
activate: bool, t_dir: Optional[str] = None, t_name: Optional[str] = None
) -> logging.Logger:
"""Initialize the logging trace for proxy auxiliary received
message recording.
:param activate: True if the trace is activate otherwise False
:param t_dir: trace directory path (absolute or relative)
:param t_name: trace full name (without file extension)
:return : created logger containing the configured
FileHander otherwise default logger
"""
logger = log
if not activate:
return logger
# Just avoid the case the given trace directory is None
t_dir = "" if t_dir is None else t_dir
# if the given log path is not absolute add root path
# (where pykiso is launched) otherwise take it as it is
dir_path = (
(Path() / t_dir).resolve() if not Path(t_dir).is_absolute() else Path(t_dir)
)
# if no specific logging file name is given take the default one
t_name = (
time.strftime(f"%Y-%m-%d_%H-%M-%S_{t_name}.log")
if t_name is not None
else time.strftime("%Y-%m-%d_%H-%M-%S_proxy_logging.log")
)
# if path doesn't exists take root path (where pykiso is launched)
log_path = (
dir_path / t_name if dir_path.exists() else (Path() / t_name).resolve()
)
# configure the file handler and create the trace file
log_format = logging.Formatter("%(asctime)s : %(message)s")
log.internal_info(f"create proxy trace file at {log_path}")
handler = logging.FileHandler(log_path, "w+")
handler.setFormatter(log_format)
# create logger and set the log level to DEBUG
logger = logging.getLogger(f"{__name__}.PROXY")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
[docs] def get_proxy_con(self, aux_list: List[str]) -> Tuple[CCProxy, ...]:
"""Retrieve all connector associated to all given existing Auxiliaries.
If auxiliary alias exists but auxiliary instance was not created
yet, create it immediately using ConfigRegistry _aux_cache.
:param aux_list: list of auxiliary aliases or instances.
:return: tuple containing all connectors associated to
all given auxiliaries.
"""
channel_inst: List[CCProxy] = []
for aux in aux_list:
# aux_list can contain a auxiliary instance just grab the
# channel
if isinstance(aux, AuxiliaryInterface):
self._check_aux_compatibility(aux)
channel_inst.append(aux.channel)
continue
# check the system module in order to get the auxiliary
# instance
aux_inst = sys.modules.get(f"{PACKAGE}.auxiliaries.{aux}")
if aux_inst is not None:
self._check_aux_compatibility(aux_inst)
channel_inst.append(aux_inst.channel)
# check if the given aux_name is in the available aux
# alias list
elif aux in ConfigRegistry.get_auxes_alias():
log.internal_info(
f"Auxiliary '{aux}' is not using import magic mechanism (pre-loaded)"
)
# load it using ConfigRegistry _aux_cache
aux_inst = ConfigRegistry.get_aux_by_alias(aux)
self._check_aux_compatibility(aux_inst)
channel_inst.append(aux_inst.channel)
# the given auxiliary alias doesn't exist or refer to a
# invalid one
else:
log.error(f"Auxiliary '{aux}' doesn't exist")
# Check if auxes/connectors are compatible with the proxy aux
self._check_channels_compatibility(channel_inst)
# Finally bind the physical channel to the proxy channels to
# share its API to the user's auxiliaries
for channel in channel_inst:
channel._bind_channel_info(self)
return tuple(channel_inst)
@staticmethod
def _check_aux_compatibility(aux: AuxiliaryInterface) -> None:
"""Check if the given auxiliary is proxy compatible.
:param aux: auxiliary instance to check
:raises NotImplementedError: if is_proxy_capable flag is False
"""
if not aux.is_proxy_capable:
raise NotImplementedError(
f"Auxiliary {aux} is not compatible with a proxy auxiliary!"
)
@staticmethod
def _check_channels_compatibility(channels: List[CChannel]) -> None:
"""Check if all associated channels are compatible.
:param channels: all channels collected by the proxy aux
:raises TypeError: if the connector is not an instance of
CCProxy
"""
for channel in channels:
if not isinstance(channel, CCProxy):
raise TypeError(f"Channel {channel} is not compatible!")
def _dispatch_tx_method_to_channels(self) -> None:
"""Attached public run_command method to all connected proxy
channels.
.. note:: This method use the thread safe method implemented by
each proxy channel (attach_tx_callback).
"""
for conn in self.proxy_channels:
conn.attach_tx_callback(self.run_command)
def _remove_tx_method_from_channels(self) -> None:
"""Detach public run_command method from all connected proxy
channels.
.. note:: This method use the thread safe method implemented by
each proxy channel (detach_tx_callback).
"""
for conn in self.proxy_channels:
conn.detach_tx_callback()
@open_connector
def _create_auxiliary_instance(self) -> bool:
"""Open current associated channel and dispatch tx method.
:return: if channel creation is successful return True otherwise
False
"""
self._dispatch_tx_method_to_channels()
log.internal_info("Auxiliary instance created")
return True
@close_connector
def _delete_auxiliary_instance(self) -> bool:
"""Close current associated channel.
:return: if channel deletion is successful return True otherwise
False
"""
self._remove_tx_method_from_channels()
log.internal_info("Auxiliary instance deleted")
return True
[docs] def run_command(self, conn: CChannel, *args: tuple, **kwargs: dict) -> None:
"""Transmit an incoming request from a linked proxy channel
to the proxy auxiliary's channel.
:param conn: current proxy channel instance which the command
comes from
:param args: postional arguments
:param args: named arguments
"""
with self.lock:
self._run_command(conn, *args, **kwargs)
def _run_command(self, conn: CChannel, *args: tuple, **kwargs: dict) -> None:
"""Send the request coming from the given proxy channel and
dispatch it to the other linked proxy channels.
:param conn: current proxy channel instance which the command
comes from
:param args: postional arguments
:param args: named arguments
In addition, all commands are dispatch to others auxiliaries
using proxy connector queue out.
"""
self.channel.cc_send(*args, **kwargs)
self._dispatch_command(con_use=conn, **kwargs)
def _dispatch_command(self, con_use: CChannel, **kwargs: dict):
"""Dispatch the current command to others connected auxiliaries.
This action is performed by populating the queue out from each
proxy connectors.
:param con_use: current proxy channel instance which the command
comes from
:param kwargs: named arguments
"""
for conn in self.proxy_channels:
if conn != con_use and conn.queue_out is not None:
conn.queue_out.put(kwargs)
def _receive_message(self, timeout_in_s: float = 0) -> None:
"""Get a message from the associated channel and dispatch it to
the linked proxy channels.
.. note:: this method is called by the rx thread task from the
inherited interface class
:param timeout_in_s: maximum amount of time (seconds) to wait
for a message
"""
try:
recv_response = self.channel.cc_receive(timeout=timeout_in_s)
received_data = recv_response.get("msg")
# if data are received, populate connector's queue_out
if received_data is not None:
self.logger.debug(
"received response : data %s || channel : %s",
received_data.hex(),
self.channel.name,
)
for conn in self.proxy_channels:
if conn.queue_out is not None:
conn.queue_out.put(recv_response)
except Exception:
log.exception(
f"encountered error while receiving message via {self.channel}"
)