Source code for pykiso.lib.auxiliaries.communication_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
CommunicationAuxiliary
**********************

:module: communication_auxiliary

:synopsis: Auxiliary used to send raw bytes via a connector instead of
    pykiso.Messages

.. currentmodule:: communication_auxiliary


"""

import logging

from pykiso import AuxiliaryInterface, CChannel, Message

log = logging.getLogger(__name__)


[docs]class CommunicationAuxiliary(AuxiliaryInterface): """Auxiliary used to send raw bytes via a connector instead of pykiso.Messages.""" def __init__(self, com: CChannel, **kwargs): """Constructor. :param com: CChannel that supports raw communication """ super().__init__(is_proxy_capable=True, **kwargs) self.channel = com
[docs] def send_message(self, raw_msg: bytes) -> bool: """Send a raw message (bytes) via the communication channel. :param raw_msg: message to send :return: True if command was executed otherwise False """ return self.run_command("send", raw_msg, timeout_in_s=None)
[docs] def receive_message( self, blocking: bool = True, timeout_in_s: float = None ) -> bytes: """Receive a raw message. :param blocking: wait for message till timeout elapses? :param timeout_in_s: maximum time in second to wait for a response :returns: raw message """ log.debug( f"retrieving message in {self} (blocking={blocking}, timeout={timeout_in_s})" ) msg = self.wait_and_get_report(blocking=blocking, timeout_in_s=timeout_in_s) log.debug(f"retrieved message '{msg}' in {self}") return msg
def _create_auxiliary_instance(self) -> bool: """Open the connector communication. :return: always True """ state = False log.info("Create auxiliary instance") log.info("Enable channel") try: self.channel.open() state = True except Exception: log.exception("Unable to open channel communication") self.stop() return state def _delete_auxiliary_instance(self) -> bool: """Close the connector communication. :return: always True """ log.info("Delete auxiliary instance") try: self.channel.close() except Exception: log.exception("Unable to close channel communication") return True def _run_command(self, cmd_message: bytes, cmd_data: bytes = None) -> bool: """Run the corresponding command. :param cmd_message: command type :param cmd_data: payload data to send over CChannel :return: True if command is executed otherwise False """ if cmd_message == "send": try: self.channel.cc_send(msg=cmd_data, raw=True) return True except Exception: log.exception( f"encountered error while sending message '{cmd_data}' to {self.channel}" ) elif isinstance(cmd_message, Message): log.debug(f"ignored command '{cmd_message} in {self}'") return True else: log.warning(f"received unknown command '{cmd_message} in {self}'") return False def _abort_command(self) -> None: """No-op since we don't wait for ACKs""" pass def _receive_message(self, timeout_in_s: float) -> bytes: """No-op since it's handled in _run_command :param timeout_in_s: not used :return: received message """ try: rcv_data = self.channel.cc_receive(timeout=0, raw=True) log.debug(f"received message '{rcv_data}' from {self.channel}") return rcv_data except Exception: log.exception( f"encountered error while receiving message via {self.channel}" ) return None