##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Proxy Channel
*************
:module: cc_proxy
:synopsis: CChannel implementation for multi-auxiliary usage.
CCProxy channel was created, in order to enable the connection of
multiple auxiliaries on one and only one CChannel. This CChannel
has to be used with a so called proxy auxiliary.
.. currentmodule:: cc_proxy
"""
from __future__ import annotations
import logging
import queue
import threading
from typing import TYPE_CHECKING, Any, Callable
from pykiso.connector import CChannel
if TYPE_CHECKING:
from pykiso.lib.auxiliaries.proxy_auxiliary import ProxyAuxiliary
from pykiso.types import ProxyReturn
log = logging.getLogger(__name__)
[docs]class CCProxy(CChannel):
"""Proxy CChannel to bind multiple auxiliaries to a single 'physical' CChannel."""
# keep an uninitialized proxy variable at class-level to fulfill the existence conditions.
# when a CCProxy instance will then set it, it will only be set at instance level but stay
# uninitialized at class-level
_proxy: ProxyAuxiliary = None
_physical_channel: CChannel = None
def __init__(self, **kwargs):
"""Initialize attributes."""
super().__init__(**kwargs)
self.queue_out = None
self.timeout = 1
self._lock = threading.Lock()
self._tx_callback = None
[docs] def _bind_channel_info(self, proxy_aux: ProxyAuxiliary):
"""Bind a :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary`
instance that is instanciated in order to handle the connection of
multiple auxiliaries to a single communication channel.
This allows to access the real communication channel's attributes
and hides the underlying proxy setup.
:param proxy_aux: the proxy auxiliary instance that is holding the
real communication channel.
"""
self._proxy = proxy_aux
self._physical_channel = proxy_aux.channel
def __getattr__(self, name: str) -> Any:
"""Implement getattr to retrieve attributes from the real channel attached
to the underlying :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary`.
:param name: name of the attribute to get.
:raises AttributeError: if the attribute is not part of the real
channel instance or if the real channel hasn't been bound to
this proxy channel yet.
:return: the found attribute value.
"""
if self._physical_channel is not None:
with self._proxy.lock:
return getattr(self._physical_channel, name)
raise AttributeError(
f"{self.__class__.__name__} object has no attribute '{name}'"
)
[docs] def detach_tx_callback(self) -> None:
"""Detach the current callback."""
with self._lock:
log.internal_warning("reset current attached transmit callback!")
self._tx_callback = None
[docs] def attach_tx_callback(self, func: Callable) -> None:
"""Attach to a callback to the _cc_send method.
:param func: function to call when _cc_send is called
"""
with self._lock:
log.internal_debug(f"attached function {func.__name__}")
if self._tx_callback is not None:
log.internal_warning(
f"function {func.__name__} will replace current transmit callback {self._tx_callback.__name__}"
)
self._tx_callback = func
[docs] def open(self) -> None:
super().open()
if self._proxy is not None:
# will start the proxy_auxiliary if this is the first CCProxy to be opened
self._proxy._open_connections += 1
[docs] def close(self) -> None:
super().close()
if self._proxy is not None:
# will stop the proxy_auxiliary if this is the last CCProxy to be closed
self._proxy._open_connections -= 1
[docs] def _cc_open(self) -> None:
"""Open proxy channel."""
log.internal_info("Open proxy channel")
self.queue_out = queue.Queue()
[docs] def _cc_close(self) -> None:
"""Close proxy channel."""
log.internal_debug("Close proxy channel")
self.queue_out = None
[docs] def _cc_send(self, *args: Any, **kwargs: Any) -> None:
"""Call the attached ProxyAuxiliary's transmission callback
with the provided arguments.
:param args: positionnal arguments to pass to the callback
:param kwargs: named arguments to pass to the callback
"""
log.internal_debug(f"put at proxy level: {args} {kwargs}")
if self._tx_callback is not None:
# call the attached ProxyAuxiliary's run_command method
self._tx_callback(self, *args, **kwargs)
[docs] def _cc_receive(self, timeout: float = 0.1) -> ProxyReturn:
"""Depopulate the queue out of the proxy connector.
:param timeout: not used
:return: bytes and source when it exist. if queue timeout
is reached return None
"""
try:
return_response = self.queue_out.get(True, self.timeout)
log.internal_debug(f"received at proxy level : {return_response}")
return return_response
except queue.Empty:
return {"msg": None}