##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
uds_auxiliary
*************
:module: uds_auxiliary
:synopsis: Auxiliary used to handle Unified Diagnostic Service protocol
.. currentmodule:: uds_auxiliary
"""
import logging
import threading
import time
from contextlib import contextmanager
from typing import Iterator, List, Optional, Union
import can
from uds import IsoServices
from .common import uds_exceptions
from .common.uds_base_auxiliary import UdsBaseAuxiliary
from .common.uds_request import UDSCommands
from .common.uds_response import UdsResponse
from .common.uds_utils import get_uds_service
log = logging.getLogger(__name__)
[docs]class UdsAuxiliary(UdsBaseAuxiliary):
"""Auxiliary used to handle the UDS protocol on client (tester) side."""
errors = uds_exceptions
[docs] def transmit(self, data: bytes, req_id: int, extended: bool = False) -> None:
"""Transmit a message through ITF connector. This method is a
substitute to transmit method present in python-uds package.
:param data: data to send
:param req_id: CAN message identifier
:param extended: True if addressing mode is extended otherwise
False
"""
self.channel._cc_send(msg=data, remote_id=req_id, raw=True)
[docs] def send_uds_raw(
self,
msg_to_send: Union[bytes, List[int], tuple],
timeout_in_s: float = 6,
response_required: bool = True,
) -> Union[UdsResponse, bool]:
"""Send a UDS diagnostic request to the target ECU and check response.
:param msg_to_send: can uds raw bytes to be sent
:param timeout_in_s: not used, actual timeout in seconds for the response can be
configured with the P2_CAN_Client parameter in the config.ini file
(default value is 5s)
:param response_required: Wait for a response if True
:raise ResponseNotReceivedError: raised when no answer has been received
:raise Exception: raised when the raw message could not be send properly
:return: the raw uds response's bytes, or True if a response is
not expected and the command is properly sent otherwise
False
"""
try:
log.internal_info(
f"UDS request to send '{['0x{:02X}'.format(i) for i in msg_to_send]}'"
)
resp = self.uds_config.send(
msg_to_send,
responseRequired=response_required,
tpWaitTime=self.tp_waiting_time,
)
except Exception:
log.exception("Error while sending uds raw request")
return False
if resp is None:
if not response_required:
return True
else:
raise self.errors.ResponseNotReceivedError(msg_to_send)
resp_print = (
f"UDS response received {['0x{:02X}'.format(i) for i in resp]}"
if not isinstance(resp, bool)
else resp
)
log.internal_info(resp_print)
resp = UdsResponse(resp)
return resp
[docs] def check_raw_response_positive(self, resp: UdsResponse) -> Optional[bool]:
"""Check if the response is positive, raise an error if not
:param resp: raw response of uds request
:raise UnexpectedResponseError: raised when the answer is not the expected one
:return: True if response is positive
"""
if resp.is_negative:
log.internal_info(f"Negative response with NRC: {resp.nrc.name}")
raise self.errors.UnexpectedResponseError(resp)
return True
[docs] def check_raw_response_negative(self, resp: UdsResponse) -> Optional[bool]:
"""Check if the response is negative, raise an error if not
:param resp: raw response of uds request
:raise UnexpectedResponseError: raised when the answer is not the expected one
:return: True if response is negative
"""
if not resp.is_negative:
raise self.errors.UnexpectedResponseError(resp)
log.internal_info(f"Negative response with :{resp.nrc.name}")
return True
[docs] def send_uds_config(
self,
msg_to_send: dict,
timeout_in_s: float = 6,
) -> Union[dict, bool]:
"""Send UDS config to the target ECU.
:param msg_to_send: uds config to be sent
:param timeout_in_s: not used
:return: a dict containing the uds response, or True if a
response is not expected and the command is properly sent
otherwise False
"""
if not self.uds_config_enable:
log.error("Uds configured without ODX, sending Uds config impossible!")
return False
try:
uds_service_name = get_uds_service(msg_to_send["service"])
uds_service = getattr(self.uds_config, uds_service_name)
req_resp_data = uds_service(**msg_to_send["data"])
if req_resp_data is None:
req_resp_data = True
log.internal_info(f"UDS response received {req_resp_data}")
return req_resp_data
except AttributeError:
# Service not found, raised by getattr()
log.exception(
f"Could not send UDS config request: unknown service {uds_service_name}"
)
return False
except Exception:
# Data send failed
log.exception("Could not send UDS config request: sending failed")
return False
[docs] def hard_reset(self) -> Union[dict, UdsResponse]:
"""Allow power reset of the component
:return: response of the hard reset request
"""
return self.send_uds_raw(UDSCommands.ECUReset.HARD_RESET)
[docs] def force_ecu_reset(self) -> UdsResponse:
"""Allow power reset of the component
:return: response of the force ecu reset request
"""
if not self.uds_config_enable:
return self.send_uds_raw(UDSCommands.ECUReset.KEY_OFF_ON_RESET)
else:
ecu_hard_reset_req = {
"service": IsoServices.EcuReset,
"data": {"parameter": "ForceEcuReset"},
}
return self.send_uds_config(ecu_hard_reset_req)
[docs] def soft_reset(self) -> Union[dict, UdsResponse]:
"""Perform soft reset of the component, equivalent to a restart of application
:return: response of the soft reset request
"""
if not self.uds_config_enable:
return self.send_uds_raw(UDSCommands.ECUReset.SOFT_RESET)
else:
ecu_soft_reset_req = {
"service": IsoServices.EcuReset,
"data": {"parameter": "Soft Reset"},
}
return self.send_uds_config(ecu_soft_reset_req)
[docs] def read_data(self, parameter: str) -> Optional[Union[dict, bool]]:
"""UDS config command that allow data reading
:param parameter: data to be read
:return: a dict with uds config response
"""
if self.uds_config_enable:
req = {
"service": IsoServices.ReadDataByIdentifier,
"data": {
"parameter": parameter,
},
}
return self.send_uds_config(req)
else:
log.error("No uds config found")
return
[docs] def write_data(
self, parameter: str, value: Union[List[bytes], bytes]
) -> Optional[Union[dict, bool]]:
"""UDS config command that allow data writing
:param parameter: data to be set
:param value: new content of the data
:return: a dict with uds config response
"""
if self.uds_config_enable:
req = {
"service": IsoServices.WriteDataByIdentifier,
"data": {"parameter": parameter, "dataRecord": value},
}
return self.send_uds_config(req)
else:
log.error("No uds config found")
return
def _sender_run(self, period: int, stop_event: threading.Event) -> None:
"""send tester present at defined period until stopped
:param period: period in seconds to use for the cyclic sending of tester present
:param stop_event: event to set to stop the sending of tester present
"""
while not stop_event.is_set():
self.send_uds_raw(
UDSCommands.TesterPresent.TESTER_PRESENT_NO_RESPONSE,
response_required=False,
)
time.sleep(period)
[docs] @contextmanager
def tester_present_sender(self, period: int = 4) -> Iterator[None]:
"""Context manager that continuously sends tester present messages via UDS
:param period: period in seconds to use for the cyclic sending of tester present
"""
stop_event = threading.Event()
sender = threading.Thread(
name="TesterPresentSender",
target=self._sender_run,
args=(period, stop_event),
)
try:
yield sender.start()
finally:
stop_event.set()
sender.join()
def _receive_message(self, timeout_in_s: float) -> None:
"""This method is only used to populate the python-uds reception
buffer. When a message is received, invoke python-uds configured
callback to make it available for pyton-uds
:param timeout_in_s: timeout on reception.
"""
recv_response = self.channel.cc_receive(timeout=timeout_in_s, raw=True)
received_data = recv_response.get("msg")
arbitration_id = recv_response.get("remote_id")
if received_data is not None:
can_msg = can.Message(
data=received_data, arbitration_id=arbitration_id, is_extended_id=False
)
if arbitration_id == self.res_id:
self.uds_config.tp.callback_onReceive(can_msg)
def _run_command(self, cmd_message, cmd_data=None) -> Union[dict, bytes, bool]:
"""Not used."""
pass