Source code for pykiso.lib.connectors.cc_socket_can.cc_socket_can

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Can Communication Channel SocketCAN
***********************************

:module: cc_socket_can

:synopsis: CChannel implementation for CAN(fd) using SocketCAN

.. currentmodule:: cc_socket_can

"""

import logging
import platform
import time
from pathlib import Path
from typing import Union

import can
import can.bus

from pykiso import CChannel, Message
from pykiso.lib.connectors.cc_socket_can.socketcan_to_trc import (
    SocketCan2Trc,
    can,
)

MessageType = Union[Message, bytes]

log = logging.getLogger(__name__)


[docs]def os_name() -> str: """Returns the system/OS name. :return: os name such as 'Linux', 'Darwin', 'Java', 'Windows' """ return platform.system()
[docs]class CCSocketCan(CChannel): """CAN FD channel-adapter.""" def __init__( self, channel: str = "vcan0", remote_id: int = None, is_fd: bool = True, enable_brs: bool = False, can_filters: list = None, is_extended_id: bool = False, receive_own_messages: bool = False, logging_activated: bool = False, log_path: str = None, log_name: str = None, **kwargs, ): """Initialize can channel settings. :param channel: the can interface name. (i.e. vcan0, can1, ..) :param remote_id: id used for transmission :param is_fd: should the Bus be initialized in CAN-FD mode :param enable_brs: sets the bitrate_switch flag to use higher transmission speed :param can_filters: iterable used to filter can id on reception :param is_extended_id: this flag controls the size of the arbitration_id field :param receive_own_messages: if set transmitted messages will be received :param logging_activated: boolean used to enable logfile creation :param log_path: trace directory path (absolute or relative) :param log_name: trace full name (without file extension) """ super().__init__(**kwargs) self.channel = channel self.remote_id = remote_id self.is_fd = is_fd self.enable_brs = enable_brs self.can_filters = can_filters if can_filters else [] self.is_extended_id = is_extended_id self.receive_own_messages = receive_own_messages self.logging_activated = logging_activated self.bus = None self.logger = None self.log_path = log_path self.log_name = log_name # Set a timeout to send the signal to the GIL to change thread. # In case of a multi-threading system, all tasks will be called one after the other. self.timeout = 1e-6 if self.logging_activated: # Just avoid the case the given trace directory is None self.log_path = "" if self.log_name is None else self.log_name # if the given log path is not absolute add root path # (where pykiso is launched) otherwise take it as it is dir_path = ( (Path() / self.log_path).resolve() if not Path(self.log_path).is_absolute() else Path(self.log_path) ) # if no specific logging file name is given take the default one self.log_name = ( time.strftime(f"%Y-%m-%d_%H-%M-%S_{self.log_name}.trc") if self.log_name is not None else time.strftime("%Y-%m-%d_%H-%M-%S_CanLog.trc") ) # if path doesn't exists take root path (where pykiso is launched) self.log_path = ( dir_path / self.log_name if dir_path.exists() else (Path() / self.log_name).resolve() ) if self.enable_brs and not self.is_fd: log.warning( "Bitrate switch will have no effect because option is_fd is set to false." )
[docs] def _cc_open(self) -> None: """Open a can bus channel, set filters for reception and activate""" if not os_name() == "Linux": raise OSError("socketCAN is only available under linux.") self.bus = can.interface.Bus( interface="socketcan", channel=self.channel, can_filters=self.can_filters, fd=self.is_fd, receive_own_messages=self.receive_own_messages, ) if self.logging_activated: log.info(f"Logging path for socketCAN set to {self.log_path} ") self.logger = SocketCan2Trc(self.channel, str(self.log_path)) self.logger.start()
[docs] def _cc_close(self) -> None: """Close the current can bus channel and close the log handler.""" self.bus.shutdown() self.bus = None if self.logging_activated: del self.logger self.logger = None
[docs] def _cc_send( self, msg: MessageType, remote_id: int = None, raw: bool = False ) -> None: """Send a CAN message at the configured id. If remote_id parameter is not given take configured ones, in addition if raw is set to True take the msg parameter as it is otherwise parse it using test entity protocol format. :param msg: data to send :param remote_id: destination can id used :param raw: boolean use to select test entity protocol format """ _data = msg if remote_id is None: remote_id = self.remote_id if not raw: _data = msg.serialize() can_msg = can.Message( arbitration_id=remote_id, data=_data, is_extended_id=self.is_extended_id, is_fd=self.is_fd, bitrate_switch=self.enable_brs, ) self.bus.send(can_msg) log.debug(f"{self} sent CAN Message: {can_msg}, data: {_data}")
[docs] def _cc_receive( self, timeout: float = 0.0001, raw: bool = False ) -> Union[Message, bytes, None]: """Receive a can message using configured filters. If raw parameter is set to True return received message as it is (bytes) otherwise test entity protocol format is used and Message class type is returned. :param timeout: timeout applied on reception :param raw: boolean use to select test entity protocol format :return: tuple containing the received data and the source can id """ try: # Catch bus errors & rcv.data errors when no messages where received received_msg = self.bus.recv(timeout=timeout or self.timeout) if received_msg is not None: frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp if not raw: payload = Message.parse_packet(payload) log.debug( "received CAN Message: {}, {}, {}".format( frame_id, payload, timestamp ) ) return payload, frame_id else: return None, None except can.CanError as can_error: log.debug(f"encountered can error: {can_error}") return None, None except Exception: log.exception(f"encountered error while receiving message via {self}") return None, None