Source code for pykiso.lib.connectors.cc_virtual_can

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Virtual Can Communication Channel
*********************************

:module: cc_virtual_can

:synopsis: CChannel implementation for victual CAN using UDP multicast API from python-can

.. currentmodule:: cc_virtual_can

"""
import logging
from typing import Dict, Optional, Union

from pykiso import connector

try:
    import can
    from can.interfaces.udp_multicast.bus import UdpMulticastBus
except ImportError as e:
    raise ImportError(f"{e.name} dependency missing, consider installing pykiso with 'pip install pykiso[can]'")

log = logging.getLogger(__name__)


[docs]class CCVirtualCan(connector.CChannel): def __init__( self, channel: UdpMulticastBus = UdpMulticastBus.DEFAULT_GROUP_IPv4, receive_own_messages: bool = False, is_fd: bool = True, enable_brs: bool = False, is_extended_id: bool = False, **kwargs, ): super().__init__(**kwargs) self.channel = channel self.interface = "udp_multicast" self.receive_own_messages = receive_own_messages self.is_fd = is_fd self.enable_brs = enable_brs self.is_extended_id = is_extended_id self._is_open = False self.timeout = 1e-6 self.config = { "interface": self.interface, "channel": self.channel, "preserve_timestamps": False, "receive_own_messages": self.receive_own_messages, "fd": self.is_fd, } @property def is_open(self): return self._is_open
[docs] def _cc_open(self) -> None: """Open a can bus channel.""" if self.is_open: log.info(f"{self} is already open") return self.bus = can.Bus(**self.config) self._is_open = True
[docs] def _cc_close(self) -> None: """Close the current can bus channel.""" self.bus.shutdown() self.bus = None self._is_open = False
[docs] def _cc_send(self, msg: bytes, remote_id: Optional[int] = None, **kwargs) -> None: """Send a CAN message at the configured id. If remote_id parameter is not given take configured ones :param msg: data to send :param remote_id: destination can id used :param kwargs: named arguments """ remote_id = remote_id or self.remote_id can_msg = can.Message( arbitration_id=remote_id, data=msg, is_fd=self.is_fd, dlc=len(msg), is_extended_id=self.is_extended_id, bitrate_switch=self.enable_brs, check=True, ) self.bus.send(can_msg) log.internal_debug("%s sent CAN Message: %s, data: %s", self, can_msg, msg)
[docs] def _cc_receive(self, timeout: float = 0.0001) -> Dict[str, Union[bytes, int, None]]: """Receive a can message using configured filters. :param timeout: timeout applied on reception :return: the received data and the source can id """ try: received_msg = self.bus.recv(timeout=timeout or self.timeout) if received_msg is not None: frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp log.internal_debug( "received CAN Message: %s, %s, %s", frame_id, payload, timestamp, ) return { "msg": payload, "remote_id": frame_id, "timestamp": timestamp, } else: return {"msg": None} except can.CanError as can_error: log.internal_info("encountered CAN error while receiving message: %s", can_error) return {"msg": None} except Exception: log.exception(f"encountered error while receiving message via {self}") return {"msg": None}