Source code for pykiso.lib.auxiliaries.udsaux.uds_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
uds_auxiliary
*************

:module: uds_auxiliary

:synopsis: Auxiliary used to handle Unified Diagnostic Service protocol

.. currentmodule:: uds_auxiliary

"""
import configparser
import logging
import threading
from pathlib import Path
from typing import List, Optional, Union

import can

from pykiso.connector import CChannel
from pykiso.interfaces.thread_auxiliary import AuxiliaryInterface

from . import uds_exceptions
from .uds_request import UDSCommands
from .uds_response import UdsResponse
from .uds_utils import get_uds_service

log = logging.getLogger(__name__)

Uds = None
createUdsConnection = None
IsoServices = None


[docs]class UdsAuxiliary(AuxiliaryInterface): """Auxiliary used to handle UDS messages""" POSITIVE_RESPONSE_OFFSET = 0x40 errors = uds_exceptions def __new__(cls, *args, **kwargs): """Handle class creation. Import python-uds at instanciation to avoid warnings due to missing drivers. :raises ImportError: If python-uds is not install on the current computer. """ try: global Uds, createUdsConnection, IsoServices from uds import IsoServices, Uds, createUdsConnection Uds = Uds createUdsConnection = createUdsConnection IsoServices = IsoServices except ImportError: raise ImportError("python-uds package missing, could not import it!") return super(UdsAuxiliary, cls).__new__(cls) def __init__( self, com: CChannel, config_ini_path: str, odx_file_path: str = None, **kwargs, ): """Initialize attributes. :param com: communication channel connector. :param config_ini_path: uds parameters file. :param odx_file_path: ecu diagnostic definition file. """ self.channel = com self.odx_file_path = odx_file_path if odx_file_path: self.odx_file_path = Path(odx_file_path) self.config_ini_path = Path(config_ini_path) config = configparser.ConfigParser() config.read(self.config_ini_path) self.req_id = int(config.get("can", "defaultReqId"), 16) self.res_id = int(config.get("can", "defaultResId"), 16) self.uds_config_enable = False self.uds_config = None super().__init__(is_proxy_capable=True, **kwargs) def _create_auxiliary_instance(self) -> bool: """Open current associated channel. :return: if channel creation is successful return True otherwise false """ try: log.info("Create auxiliary instance") log.info("Enable channel") self.channel.open() channel_name = self.channel.__class__.__name__.lower() if "vectorcan" in channel_name: interface = "vector" bus = self.channel.bus elif "pcan" in channel_name: interface = "peak" bus = self.channel.bus elif "socketcan" in channel_name: interface = "socketcan" bus = self.channel.bus elif "ccproxy" in channel_name: # Just fake python-uds (when proxy auxiliary is used), # by setting bus to True no channel creation is # performed bus = True interface = "peak" if self.odx_file_path: log.info("Create Uds Config connection with ODX") self.uds_config_enable = True self.uds_config = createUdsConnection( self.odx_file_path, "", configPath=self.config_ini_path, bus=bus, interface=interface, reqId=self.req_id, resId=self.res_id, ) else: log.info("Create Uds Config connection without ODX") self.uds_config_enable = False self.uds_config = Uds( configPath=self.config_ini_path, bus=bus, interface=interface, reqId=self.req_id, resId=self.res_id, ) # replace transmit method from python-uds with a method # using ITF's connector self.uds_config.tp.overwrite_transmit_method(self.transmit) return True except Exception: log.exception("Error during channel creation") self.stop() return False
[docs] def transmit(self, data: bytes, req_id: int, extended: bool = False) -> None: """Transmit a message through ITF connector. This method is a substitute to transmit method present in python-uds package. :param data: data to send :param req_id: CAN message identifier :param extended: True if addressing mode is extended otherwise False """ self.channel._cc_send(msg=data, remote_id=req_id, raw=True)
def _delete_auxiliary_instance(self) -> bool: """Close current associated channel. :return: always True """ log.info("Delete auxiliary instance") self.uds_config.disconnect() self.channel.close() return True
[docs] def send_uds_raw( self, msg_to_send: Union[bytes, List[int], tuple], timeout_in_s: float = 6, ) -> Union[UdsResponse, bool]: """Send a UDS diagnostic request to the target ECU and check response. :param msg_to_send: can uds raw bytes to be sent :param timeout_in_s: not used :raise ResponseNotReceivedError: raised when no answer has been received :raise Exception: raised when the raw message could not be send properly :return: the raw uds response's bytes, or True if a response is not expected and the command is properly sent otherwise False """ try: log.info( f"UDS request to send '{['0x{:02X}'.format(i) for i in msg_to_send]}'" ) resp = self.uds_config.send(msg_to_send) except Exception: log.exception("Error while sending uds raw request") return False if resp is None: raise self.errors.ResponseNotReceivedError(msg_to_send) resp_print = ( f"UDS response received {['0x{:02X}'.format(i) for i in resp]}" if not isinstance(resp, bool) else resp ) log.info(resp_print) resp = UdsResponse(resp) return resp
[docs] def check_raw_response_positive(self, resp: UdsResponse) -> Optional[bool]: """Check if the response is positive, raise an error if not :param resp: raw response of uds request :raise UnexpectedResponseError: raised when the answer is not the expected one :return: True if response is positive """ if resp.is_negative: log.info(f"Negative response with NRC: {resp.nrc.name}") raise self.errors.UnexpectedResponseError(resp) return True
[docs] def check_raw_response_negative(self, resp: UdsResponse) -> Optional[bool]: """Check if the response is negative, raise an error if not :param resp: raw response of uds request :raise UnexpectedResponseError: raised when the answer is not the expected one :return: True if response is negative """ if not resp.is_negative: raise self.errors.UnexpectedResponseError(resp) log.info(f"Negative response with :{resp.nrc.name}") return True
[docs] def send_uds_config( self, msg_to_send: dict, timeout_in_s: float = 6 ) -> Union[dict, bool]: """Send UDS config to the target ECU. :param msg_to_send: uds config to be sent :param timeout_in_s: not used :return: a dict containing the uds response, or True if a response is not expected and the command is properly sent otherwise False """ if not self.uds_config_enable: log.error("Uds configured without ODX, sending Uds config impossible!") return False try: uds_service_name = get_uds_service(msg_to_send["service"]) uds_service = getattr(self.uds_config, uds_service_name) req_resp_data = uds_service(**msg_to_send["data"]) if req_resp_data is None: req_resp_data = True log.info(f"UDS response received {req_resp_data}") return req_resp_data except AttributeError: # Service not found, raised by getattr() log.exception( f"Could not send UDS config request: unknown service {uds_service_name}" ) return False except Exception: # Data send failed log.exception("Could not send UDS config request: sending failed") return False
[docs] def hard_reset(self) -> Union[dict, UdsResponse]: """Allow power reset of the component :return: response of the hard reset request """ return self.send_uds_raw(UDSCommands.ECUReset.ECU_RESET_SW_INSTALLATION_STATE)
[docs] def force_ecu_reset(self) -> UdsResponse: """Allow power reset of the component :return: response of the force ecu reset request """ if not self.uds_config_enable: return self.send_uds_raw(UDSCommands.ECUReset.FORCE_ECU_RESET) else: ecu_hard_reset_req = { "service": IsoServices.EcuReset, "data": {"parameter": "ForceEcuReset"}, } return self.send_uds_config(ecu_hard_reset_req)
[docs] def soft_reset(self) -> Union[dict, UdsResponse]: """Perform soft reset of the component, equivalent to a restart of application :return: response of the soft reset request """ if not self.uds_config_enable: return self.send_uds_raw(UDSCommands.ECUReset.SOFT_RESET) else: ecu_soft_reset_req = { "service": IsoServices.EcuReset, "data": {"parameter": "Soft Reset"}, } return self.send_uds_config(ecu_soft_reset_req)
[docs] def read_data(self, parameter: str) -> Optional[Union[dict, bool]]: """UDS config command that allow data reading :param parameter: data to be read :return: a dict with uds config response """ if self.uds_config_enable: req = { "service": IsoServices.ReadDataByIdentifier, "data": { "parameter": parameter, }, } return self.send_uds_config(req) else: log.error("No uds config found") return
[docs] def write_data( self, parameter: str, value: Union[List[bytes], bytes] ) -> Optional[Union[dict, bool]]: """UDS config command that allow data writing :param parameter: data to be set :param value: new content of the data :return: a dict with uds config response """ if self.uds_config_enable: req = { "service": IsoServices.WriteDataByIdentifier, "data": {"parameter": parameter, "dataRecord": value}, } return self.send_uds_config(req) else: log.error("No uds config found") return
def _receive_message(self, timeout_in_s: float) -> None: """This method is only used to populate the python-uds reception buffer. When a message is received, invoke python-uds configured callback to make it available for pyton-uds :param timeout_in_s: timeout on reception. """ received_data, arbitration_id = self.channel.cc_receive( timeout=timeout_in_s, raw=True ) if received_data is not None: can_msg = can.Message( data=received_data, arbitration_id=arbitration_id, is_extended_id=False ) if arbitration_id == self.res_id: self.uds_config.tp.callback_onReceive(can_msg) def _abort_command(self) -> None: """Not use.""" pass def _run_command(self, cmd_message, cmd_data=None) -> Union[dict, bytes, bool]: """Not use.""" pass