Source code for pykiso.lib.connectors.cc_socket_can.cc_socket_can

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Can Communication Channel SocketCAN
***********************************

:module: cc_socket_can

:synopsis: CChannel implementation for CAN(fd) using SocketCAN

.. currentmodule:: cc_socket_can

"""

import logging
import platform
import time
from pathlib import Path
from typing import Dict, Optional, Union

try:
    import can
    import can.bus
except ImportError as e:
    raise ImportError(
        f"{e.name} dependency missing, consider installing pykiso with 'pip install pykiso[can]'"
    )


from pykiso import CChannel, Message
from pykiso.lib.connectors.cc_socket_can.socketcan_to_trc import (
    SocketCan2Trc,
    can,
)

MessageType = Union[Message, bytes]

log = logging.getLogger(__name__)


[docs]def os_name() -> str: """Returns the system/OS name. :return: os name such as 'Linux', 'Darwin', 'Java', 'Windows' """ return platform.system()
[docs]class CCSocketCan(CChannel): """CAN FD channel-adapter.""" def __init__( self, channel: str = "vcan0", remote_id: int = None, is_fd: bool = True, enable_brs: bool = False, can_filters: list = None, is_extended_id: bool = False, receive_own_messages: bool = False, logging_activated: bool = False, log_path: str = None, log_name: str = None, **kwargs, ): """Initialize can channel settings. :param channel: the can interface name. (i.e. vcan0, can1, ..) :param remote_id: id used for transmission :param is_fd: should the Bus be initialized in CAN-FD mode :param enable_brs: sets the bitrate_switch flag to use higher transmission speed :param can_filters: iterable used to filter can id on reception :param is_extended_id: this flag controls the size of the arbitration_id field :param receive_own_messages: if set transmitted messages will be received :param logging_activated: boolean used to enable logfile creation :param log_path: trace directory path (absolute or relative) :param log_name: trace full name (without file extension) """ super().__init__(**kwargs) self.channel = channel self.remote_id = remote_id self.is_fd = is_fd self.enable_brs = enable_brs self.can_filters = can_filters if can_filters else [] self.is_extended_id = is_extended_id self.receive_own_messages = receive_own_messages self.logging_activated = logging_activated self.bus = None self.logger = None self.log_path = log_path self.log_name = log_name # Set a timeout to send the signal to the GIL to change thread. # In case of a multi-threading system, all tasks will be called one after the other. self.timeout = 1e-6 if self.logging_activated: # Just avoid the case the given trace directory is None self.log_path = "" if self.log_name is None else self.log_name # if the given log path is not absolute add root path # (where pykiso is launched) otherwise take it as it is dir_path = ( (Path() / self.log_path).resolve() if not Path(self.log_path).is_absolute() else Path(self.log_path) ) # if no specific logging file name is given take the default one self.log_name = ( time.strftime(f"%Y-%m-%d_%H-%M-%S_{self.log_name}.trc") if self.log_name is not None else time.strftime("%Y-%m-%d_%H-%M-%S_CanLog.trc") ) # if path doesn't exists take root path (where pykiso is launched) self.log_path = ( dir_path / self.log_name if dir_path.exists() else (Path() / self.log_name).resolve() ) if self.enable_brs and not self.is_fd: log.internal_warning( "Bitrate switch will have no effect because option is_fd is set to false." )
[docs] def _cc_open(self) -> None: """Open a can bus channel, set filters for reception and activate""" if not os_name() == "Linux": raise OSError("socketCAN is only available under linux.") self.bus = can.interface.Bus( interface="socketcan", channel=self.channel, can_filters=self.can_filters, fd=self.is_fd, receive_own_messages=self.receive_own_messages, ) if self.logging_activated: log.internal_info(f"Logging path for socketCAN set to {self.log_path} ") self.logger = SocketCan2Trc(self.channel, str(self.log_path)) self.logger.start()
[docs] def _cc_close(self) -> None: """Close the current can bus channel and close the log handler.""" self.bus.shutdown() self.bus = None if self.logging_activated: del self.logger self.logger = None
[docs] def _cc_send( self, msg: MessageType, remote_id: Optional[int] = None, **kwargs ) -> None: """Send a CAN message at the configured id. If remote_id parameter is not given take configured ones :param msg: data to send :param remote_id: destination can id used """ remote_id = remote_id or self.remote_id can_msg = can.Message( arbitration_id=remote_id, data=msg, is_extended_id=self.is_extended_id, is_fd=self.is_fd, bitrate_switch=self.enable_brs, ) self.bus.send(can_msg) log.internal_debug(f"{self} sent CAN Message: {can_msg}, data: {msg}")
[docs] def _cc_receive(self, timeout: float = 0.0001) -> Dict[str, Union[bytes, int]]: """Receive a can message using configured filters. :param timeout: timeout applied on reception :return: tuple containing the received data and the source can id """ try: # Catch bus errors & rcv.data errors when no messages where received received_msg = self.bus.recv(timeout=timeout or self.timeout) if received_msg is not None: frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp log.internal_debug( "received CAN Message: {}, {}, {}".format( frame_id, payload, timestamp ) ) return {"msg": payload, "remote_id": frame_id} else: return {"msg": None} except can.CanError as can_error: log.internal_debug(f"encountered can error: {can_error}") return {"msg": None} except Exception: log.exception(f"encountered error while receiving message via {self}") return {"msg": None}