##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Multiprocessing Proxy Auxiliary
*******************************
:module: mp_proxy_auxiliary
:synopsis: concrete implementation of a multiprocessing proxy auxiliary
This auxiliary simply spread all commands and received messages to all
connected auxiliaries. This auxiliary is only usable through mp proxy
connector.
.. code-block:: none
___________ ___________ ___________
| | | | ..... | |
| Aux 1 | | Aux 1 | | Aux n |
|___________| |___________| |___________|
| | |
| | |
___________ ___________ ___________
| | | | ..... | |
|Proxy_con 1| |Proxy_con 2| |Proxy_con n|
|___________| |___________| |___________|
| | |
| | |
| | |
_____________________________________________
| |
| Proxy Auxiliary |
|_____________________________________________|
|
|
_____________________________________________
| |
| Connector Channel |
|_____________________________________________|
.. currentmodule:: mp_proxy_auxiliary
"""
import collections
import logging
import multiprocessing
import sys
import time
from pathlib import Path
from typing import List, Optional, Tuple
from pykiso import AuxiliaryInterface, CChannel, MpAuxiliaryInterface
from pykiso.test_setup.config_registry import ConfigRegistry
from pykiso.test_setup.dynamic_loader import PACKAGE
TraceOptions = collections.namedtuple("TraceOptions", "activate dir name")
log = logging.getLogger(__name__)
[docs]class MpProxyAuxiliary(MpAuxiliaryInterface):
"""Proxy auxiliary for multi auxiliaries communication handling.
..note :: this auxiliary version is using the multiprocessing
auxiliary interface.
"""
def __init__(
self,
com: CChannel,
aux_list: List[str],
activate_trace: bool = False,
trace_dir: Optional[str] = None,
trace_name: Optional[str] = None,
**kwargs,
) -> None:
"""Initialize attributes.
:param com: Communication connector
:param aux_list: list of auxiliary's alias
:param activate_trace: True if the trace is activate otherwise
False
:param trace_dir: trace directory path (absolute or relative)
:param trace_name: trace full name (without file extension)
"""
self.channel = com
self.trace_options = TraceOptions(
activate=activate_trace, dir=trace_dir, name=trace_name
)
self.proxy_channels = self.get_proxy_con(aux_list)
self.logger = None
self.aux_list = aux_list
super().__init__(**kwargs)
def _init_trace(
self,
logger,
activate: bool,
t_dir: Optional[str] = None,
t_name: Optional[str] = None,
) -> None:
"""Initialize the logging trace for proxy auxiliary received
message recording and process debugging.
:param logger: base logger where a FileHandler is added
:param activate: True if the trace is activate otherwise False
:param t_dir: trace directory path (absolute or relative)
:param t_name: trace full name (without file extension)
"""
if not activate:
return None
# Just avoid the case the given trace directory is None
t_dir = "" if t_dir is None else t_dir
# if the given log path is not absolute add root path
# (where pykiso is launched) otherwise take it as it is
dir_path = (
(Path() / t_dir).resolve() if not Path(t_dir).is_absolute() else Path(t_dir)
)
# if no specific logging file name is given take the default one
t_name = (
time.strftime(f"%Y-%m-%d_%H-%M-%S_{t_name}.log")
if t_name is not None
else time.strftime("%Y-%m-%d_%H-%M-%S_proxy_logging.log")
)
# if path doesn't exists take root path (where pykiso is launched)
log_path = (
dir_path / t_name if dir_path.exists() else (Path() / t_name).resolve()
)
# configure the file handler and create the tarce file
log_format = logging.Formatter("%(asctime)s : %(message)s")
logger.info(f"create proxy trace file at {log_path}")
handler = logging.FileHandler(log_path, "w+")
handler.setFormatter(log_format)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
[docs] def get_proxy_con(self, aux_list: List[str]) -> Tuple[AuxiliaryInterface]:
"""Retrieve all connector associated to all given existing Auxiliaries.
If auxiliary alias exists but auxiliary instance was not created
yet, create it immediately using ConfigRegistry _aux_cache.
:param aux_list: list of auxiliary's alias
:return: tuple containing all connectors associated to
all given auxiliaries
"""
channel_inst = []
for aux_name in aux_list:
aux_inst = sys.modules.get(f"{PACKAGE}.auxiliaries.{aux_name}")
if aux_inst is not None:
self._check_compatibility(aux_inst)
channel_inst.append(aux_inst.channel)
# check if the given aux_name is in the available aux
# alias list
elif aux_name in ConfigRegistry.get_auxes_alias():
log.warning(
f"Auxiliary : {aux_name} is not using import magic mechanism (pre-loaded)"
)
# load it using ConfigRegistry _aux_cache
aux_inst = ConfigRegistry._linker._aux_cache.get_instance(aux_name)
self._check_compatibility(aux_inst)
channel_inst.append(aux_inst.channel)
# the given auxiliary alias doesn't exist or refer to a
# invalid one
else:
log.error(f"Auxiliary : {aux_name} doesn't exist")
return tuple(channel_inst)
@staticmethod
def _check_compatibility(aux: AuxiliaryInterface) -> None:
"""Check if the given auxiliary is proxy compatible.
:param aux: auxiliary instance to check
:raises NotImplementedError: if is_proxy_capable flag is False
"""
if not aux.is_proxy_capable:
raise NotImplementedError(
f"Auxiliary {aux} is not compatible with a proxy auxiliary"
)
def _create_auxiliary_instance(self) -> bool:
"""Open current associated channel.
:return: if channel creation is successful return True otherwise false
"""
try:
self.logger.info("Create auxiliary instance")
self.logger.info("Enable channel")
self.channel.open()
return True
except Exception:
self.logger.exception("Error encouting during channel creation")
self.stop()
return False
def _delete_auxiliary_instance(self) -> bool:
"""Close current associated channel.
:return: always True
"""
try:
self.logger.info("Delete auxiliary instance")
self.channel.close()
# in very rare case if a queue is populated, ITF could be
# blocked in a infinite loop. This is a workaround, and not
# the final implementation.
self._empty_queue(self.queue_in)
for conn in self.proxy_channels:
self._empty_queue(conn.queue_in)
self._empty_queue(conn.queue_out)
except Exception:
self.logger.exception("Error encouting during channel closure")
return True
@staticmethod
def _empty_queue(queue: multiprocessing.Queue) -> None:
"""Empty a given queue by simply calling get until queue is
empty.
:param queue: queue to clear
"""
while not queue.empty():
queue.get_nowait()
def _run_command(self) -> None:
"""Run all commands present in each proxy connectors queue in
by sending it over current associated CChannel.
In addition, all commands are dispatch to others auxiliaries
using proxy connector queue out.
"""
for conn in self.proxy_channels:
if not conn.queue_in.empty():
args, kwargs = conn.queue_in.get()
message = kwargs.get("msg")
if message is not None:
self._dispatch_command(
message=message,
remote_id=kwargs.get("remote_id"),
con_use=conn,
)
self.channel._cc_send(*args, **kwargs)
def _dispatch_command(self, message: bytes, con_use: CChannel, remote_id: int):
"""Dispatch the current command to others connected auxiliaries.
This action is performed by populating the queue out from each
proxy connectors.
:param message: message to send
:param con_use: current proxy connector where the command come from
:param remote_id: if CAN is used, CAN frame ID on which
the message will be sent
"""
for conn in self.proxy_channels:
if conn != con_use:
conn.queue_out.put([message, remote_id])
def _abort_command(self) -> None:
"""Not Used."""
return True
def _receive_message(self, timeout_in_s: float = 0) -> None:
"""When no request are sent this method is called by
AuxiliaryInterface run method. At each message received, this
method will populate each proxy connectors queue out.
:param timeout_in_s: maximum amount of time in second to wait
for a message.
"""
try:
received_data, source = self.channel.cc_receive(
timeout=timeout_in_s, raw=True
)
# if data are received, populate connected proxy connectors
# queue out
if received_data is not None:
self.logger.debug(
f"raw data : {received_data.hex()} || source : {source} || channel : {self.channel.name}"
)
for conn in self.proxy_channels:
conn.queue_out.put([received_data, source])
except Exception:
self.logger.exception(
f"encountered error while receiving message via {self.channel}"
)
[docs] def run(self) -> None:
"""Run function of the auxiliary process."""
# initialize logging mechanism at process startup
self.initialize_loggers()
# initialize trace for message recording
self._init_trace(self.logger, *self.trace_options)
while not self.stop_event.is_set():
# Step 1: Check if a request is available & process it
request = ""
# Check if a request was received
if not self.queue_in.empty():
request = self.queue_in.get_nowait()
# Process the request
if request == "create_auxiliary_instance" and not self.is_instance:
# Call the internal instance creation method
self.is_instance = self._create_auxiliary_instance()
# Enqueue the result for the request caller
self.queue_out.put(self.is_instance)
elif request == "delete_auxiliary_instance" and self.is_instance:
# Call the internal instance delete method
self.is_instance = not (self._delete_auxiliary_instance())
# Enqueue the result for the request caller
self.queue_out.put(self.is_instance)
elif request != "":
# A request was received but could not be processed
self.logger.warning(
f"Unknown request '{request}', will not be processed!"
)
self.logger.warning(f"Aux status: {self.__dict__}")
# Step 2: Send stack command and propagate them to others
# connected auxiliaires and check if something was received
# if instance was created
if self.is_instance:
self._run_command()
self._receive_message(timeout_in_s=0)
# Free up cpu usage when auxiliary is suspended
if not self.is_instance:
time.sleep(0.050)
# Thread stop command was set
self.logger.info("{} was stopped".format(self))
# Delete auxiliary external instance if not done
if self.is_instance:
self._delete_auxiliary_instance()