Source code for pykiso.lib.auxiliaries.can_auxiliary

##########################################################################
# Copyright (c) 2010-2023 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Can Auxiliary
*************

:module: can_auxiliary

:synopsis: Auxiliary enables you to read write can messages which are defined in a dbc file.

The goal of this module is to be able to receive and send signals which are defined in dbc file.

.. currentmodule:: can_auxiliary
"""

import logging
import time
from queue import Empty, Queue
from typing import Any, Optional

from pykiso import Message
from pykiso.can_message import CanMessage
from pykiso.can_parser import CanMessageParser
from pykiso.connector import CChannel
from pykiso.interfaces.dt_auxiliary import (
    DTAuxiliaryInterface,
    close_connector,
    open_connector,
)

log = logging.getLogger(__name__)


[docs]class CanAuxiliary(DTAuxiliaryInterface): """Auxiliary is used for reading and writing can messages defined in dbc file""" def __init__(self, com: CChannel, dbc_file: str, **kwargs): """Constructor. :param com: CChannel that supports raw communication over CAN """ self.tx_task_on = False super().__init__( is_proxy_capable=True, tx_task_on=True, rx_task_on=True, **kwargs ) self.recv_can_messages = {} self.channel = com path_to_dbc_file = dbc_file self.parser = CanMessageParser(path_to_dbc_file) self.can_messages = {} @open_connector def _create_auxiliary_instance(self) -> bool: """Open the connector communication. :return: True if the channel is correctly opened otherwise False """ log.internal_info("Auxiliary instance created") return True @close_connector def _delete_auxiliary_instance(self) -> bool: """Close the connector communication. :return: always True """ log.internal_info("Auxiliary instance deleted") return True def _receive_message(self, timeout_in_s: float = 0.1) -> None: """Trigger connector reception method and put received messages in the queue. :param timeout_in_s: maximum time in second to wait for a response """ try: rcv_data = self.channel.cc_receive(timeout=timeout_in_s) if rcv_data.get("msg") is not None: log.internal_debug(f"received message '{rcv_data}' from {self.channel}") message_name = self.parser.dbc.get_message_by_frame_id( rcv_data["remote_id"] ).name message_signals = self.parser.decode( rcv_data["msg"], rcv_data["remote_id"] ) message_timestamp = float(rcv_data.get("timestamp", 0)) old_can_message = self.can_messages.get(message_name, None) if old_can_message is None: self.can_messages[message_name] = Queue(maxsize=1) if not self.can_messages[message_name].empty(): self.can_messages[message_name].get_nowait() can_msg = CanMessage(message_name, message_signals, message_timestamp) self.can_messages[message_name].put(can_msg) except KeyError: log.exception("Specific message signal is not found in message") pass except (Exception): log.exception( f"encountered error while receiving message via {self.channel}" ) pass
[docs] def get_last_message(self, message_name: str) -> Optional[CanMessage]: """Get the last message which has been received on the bus. :param message_name: name of the message, you want to get :return: last can massage or return none if the message, or return none if message is not occur """ can_msg_queue = self.can_messages.get(message_name, None) if can_msg_queue is not None and not can_msg_queue.empty(): msg = can_msg_queue.get_nowait() self.can_messages[message_name].put_nowait(msg) return msg return None
[docs] def get_last_signal(self, message_name: str, signal_name: str) -> Optional[Any]: """Get the last message which has been received on the bus. :param message_name: name of the message, you want to get :return: last can massage or return none if the message or return none if message or signal is not occur """ can_msg_queue = self.can_messages.get(message_name, None) if can_msg_queue is not None and not can_msg_queue.empty(): last_can_message = can_msg_queue.get_nowait() if last_can_message is not None: self.can_messages[message_name].put_nowait(last_can_message) return last_can_message.signals.get(signal_name, None) return None
[docs] def wait_for_message( self, message_name: str, timeout: float = 0.2 ) -> dict[str, any]: """Get the last message with certain timeout in seconds. :param message_name: name of the message to receive :param timeout time to wait till a message receives in seconds :return: list of last can messages or None if no messages for this component """ can_msg_queue = self.can_messages.get(message_name, None) old_value = None if can_msg_queue is None: self.can_messages[message_name] = Queue(maxsize=1) if not self.can_messages[message_name].empty(): old_value = self.can_messages[message_name].get() try: new_msg = self.can_messages[message_name].get(timeout=timeout) self.can_messages[message_name].put_nowait(new_msg) return new_msg except Empty: if old_value is not None: self.can_messages[message_name].put_nowait(old_value) return None
[docs] def wait_to_match_message_with_signals( self, message_name: str, expected_signals: dict[str, any], timeout: float = 0.2 ) -> dict[str, any]: """Get first message which matches the patter of signals . :param message_name: name of the message to receive :param expected_signal: list of expected signals to match with message :param timeout time to wait till a message receives in seconds :return: list of last can messages or None if no messages for this component """ t1 = time.perf_counter() message_to_return = None is_msg_matched = False while time.perf_counter() - t1 < timeout and not is_msg_matched: expected_signals_names = expected_signals.keys() last_msg_queue = self.can_messages.get(message_name, None) if last_msg_queue is None: continue try: last_can_msg = last_msg_queue.get_nowait() except Empty: continue is_msg_matched = True for signal_name in expected_signals_names: if last_can_msg.signals[signal_name] != expected_signals[signal_name]: is_msg_matched = False break if is_msg_matched: message_to_return = last_can_msg self.can_messages[message_name].put_nowait(last_can_msg) break return message_to_return
[docs] def send_message(self, message: str, signals: dict[str, Any]) -> bool: """Send one message, the message need to be defined in the dbc file. :param message: name of the message to send. :param signals: dict of the signals of the message and their value. :return: True or False if the message has been successfully send or not. """ for signal, value in signals.items(): if isinstance(value, str): signals[signal] = int.from_bytes(value.encode("utf8"), byteorder="big") try: message = self.parser.dbc.get_message_by_name(message) except KeyError: raise ValueError(f"{message} is not a message defined in the DBC file.") msg_to_send = self.parser.encode(message, signals) self.channel.cc_send(msg_to_send[0], remote_id=msg_to_send[1])
def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool: """Run the corresponding command. :param cmd_message: command type :param cmd_data: payload data to send over CChannel :return: True if command is executed otherwise False """ if cmd_message == "send": try: self.channel.cc_send(msg=cmd_data) except Exception: log.exception( f"encountered error while sending message '{cmd_data}' to {self.channel}" ) elif isinstance(cmd_message, Message): log.internal_debug(f"ignored command '{cmd_message} in {self}'") else: log.internal_warning(f"received unknown command '{cmd_message} in {self}'")