Source code for pykiso.lib.auxiliaries.udsaux.common.uds_callback

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Helper classes for UDS callback registration
********************************************

:module: uds_callback

:synopsis: This module defines classes for the definition of
    UDS callbacks to be registered by the
    :py:class:`~pykiso.lib.auxiliaries.uds_aux.uds_server_auxiliary.UdsServerAuxiliary`_
    along with callbacks for functional requests (TransferData service).

.. currentmodule:: uds_callback

"""

from __future__ import annotations

import logging
import time
import typing
from dataclasses import dataclass
from itertools import cycle
from typing import Callable, ClassVar, List, Optional, Tuple, Union

from uds import IsoServices

if typing.TYPE_CHECKING:
    from pykiso.lib.auxiliaries.udsaux import UdsServerAuxiliary


log = logging.getLogger(__name__)


[docs]@dataclass class UdsCallback: """Class used to store information in order to configure and send a response to the specified UDS request. :param request: request from the client that should be responded to. :param response: full UDS response to send. If not set, respond with a basic positive response with the specified response_data. :param response_data: UDS data to send. If not set, respond with a basic positive response containing no data. :param data_length: optional length of the data to send in the response if the data has a fixed length (zero-padded). :param callback: custom callback function to register. The callback function must have two positional arguments: the received request as a list of integers and the UdsServerAuxiliary instance. """ POSITIVE_RESPONSE_OFFSET: ClassVar[int] = 0x40 # e.g. 0x1003 or [0x10, 0x03] request: Union[int, List[int]] # e.g. 0x5003 or [0x50, 0x03] response: Optional[Union[int, List[int]]] = None # e.g. 0x1011 or b'DATA' response_data: Optional[Union[int, bytes]] = None # specify zero-padding data_length: Optional[int] = None # custom callback callback: Optional[Callable[[List[int], UdsServerAuxiliary], None]] = None def __post_init__(self): """Parse the provided parameters to create a fully formed UDS response.""" self.call_count = 0 if isinstance(self.request, int): self.request = list(self.int_to_bytes(self.request)) if isinstance(self.response, int): self.response = list(self.int_to_bytes(self.response)) elif self.response is None: # create positive response based on request self.response = [ self.request[0] + self.POSITIVE_RESPONSE_OFFSET ] + self.request[1:] if self.response_data is not None: # convert response_data and append it to the response self.response_data = ( list(self.int_to_bytes(self.response_data)) if isinstance(self.response_data, int) else list(self.response_data) ) self.response.extend(self.response_data) if self.data_length is not None: # pad with zeros to reach the expected length self.response.extend( [0x00] * (self.data_length - len(self.response_data)) ) def __call__(self, received_request: List[int], aux: UdsServerAuxiliary) -> None: """Trigger the callback and increase the call count. If a custom callback function is defined call it, otherwise send the registered response to the client. :param received_request: the received UDS request as a list of integers :param aux: the UdsServerAuxiliary instance for which the callback is registered """ self.call_count += 1 if self.callback is not None: self.callback(received_request, aux) else: aux.send_response(self.response) @staticmethod def int_to_bytes(integer: int) -> bytes: return integer.to_bytes((integer.bit_length() + 7) // 8, "big")
[docs]@dataclass class UdsDownloadCallback(UdsCallback): """UDS Callback for DownloadData handling on server-side. In comparison to the base UdsCallback, this callback fixes the request tu 0x34 (RequestDownload) and the custom callback to execute in order to handle the data transfer. :param stmin: minimum separation time between two received consecutive frames. """ TRANSFER_TIMEOUT: ClassVar[float] = 1 MAX_TRANSFER_SIZE: ClassVar[int] = 0xFFFF request: Union[int, List[int]] = IsoServices.RequestDownload stmin: float = 0 def __post_init__(self): super().__post_init__() self.callback = self.handle_data_download self.transfer_successful = False self.transferred_data_size = 0
[docs] @staticmethod def get_transfer_size(download_request: List[int]) -> int: """Extract the size of the data to download from the passed DownloadData request. :param download_request: the received DownloadData request. :return: the expected download size. """ addr_len_format_id_idx = 2 address_and_length_format_identifier = download_request[addr_len_format_id_idx] # high nibble of address_and_length_format_identifier nb_bytes_memory_size_param = address_and_length_format_identifier >> 4 # low nibble of address_and_length_format_identifier nb_bytes_memory_address_param = address_and_length_format_identifier & 0x0F data_size_start_index = addr_len_format_id_idx + nb_bytes_memory_address_param data_size_end_index = data_size_start_index + nb_bytes_memory_size_param memory_size = download_request[ data_size_start_index + 1 : data_size_end_index + 1 ] data_transfer_size = int.from_bytes(bytes(memory_size), byteorder="big") - 1 return data_transfer_size
[docs] @staticmethod def get_first_frame_data_length(first_frame: List[int]) -> Tuple[int, int]: """Extract the expected data size to receive from a first frame (ISO TP) and the start index of the contained UDS request. :param first_frame: received first frame. :return: tuple of two integers corresponding to the expected data length to receive and the resulting start index of the UDS request. """ first_frame_data_len = ((first_frame[0] & 0x0F) << 8) + first_frame[1] data_len_start_index = uds_data_start_index = 2 if first_frame_data_len == 0: uds_data_start_index = 6 first_frame_data_len = int.from_bytes( bytes(first_frame[data_len_start_index:uds_data_start_index]), "big" ) return first_frame_data_len, uds_data_start_index
[docs] def make_request_download_response(self, max_transfer_size: int = None): """Build a positive response for a RequestDownload request based on the maximum transfer size. :param max_transfer_size: maximum transfer size. If not set, use the default one configured inside the callback class (0xFFFF). :return: the UDS RequestDownload positive response. """ max_transfer_size = max_transfer_size or self.MAX_TRANSFER_SIZE max_nb_of_block_length = self.int_to_bytes(max_transfer_size) length_format_identifier = len(max_nb_of_block_length) << 4 request_download_positive_response = [ IsoServices.RequestDownload + self.POSITIVE_RESPONSE_OFFSET, length_format_identifier, *list(max_nb_of_block_length), ] return request_download_positive_response
[docs] def handle_data_download( self, download_request: List[int], aux: UdsServerAuxiliary ) -> None: """Handle a download request from the client. This method handles the entire download functional unit composed of: - sending the appropriate RequestDownload response to the received request - waiting for the initial TransferData request and sending the ISO TP flow control frame - receiving the data blocks until the transfer is finished - sending the resulting TransferData positive response :param download_request: DownloadData request received from the client. :param aux: UdsServerAuxiliary instance used by the callback to handle data reception and transmission. """ self.transfer_successful = False self.transferred_data_size = 0 # send RequestDownload response request_download_response = self.make_request_download_response() aux.send_response(request_download_response) log.internal_info( f"Sent response to request {aux.format_data(download_request)}: {aux.format_data(request_download_response)}" ) # get expected transfer data size from DownloadData request expected_transfer_size = self.get_transfer_size(download_request) transfer_size = 0 transfer_start_time = time.perf_counter() # handle data transfer while ( not aux.stop_rx.is_set() and transfer_size < expected_transfer_size and time.perf_counter() - transfer_start_time < self.TRANSFER_TIMEOUT ): # wait for initial transfer data request transfer_request = aux.receive() if transfer_request is None: continue # decode PCI to extract the block data length expected_data_len, pci_len = self.get_first_frame_data_length( transfer_request ) uds_transfer_request = transfer_request[pci_len:] service_id, sequence_number, *block_data = uds_transfer_request if not service_id == IsoServices.TransferData: log.error( f"Expected TransferData request from ECU, got: {bytes(transfer_request).hex()}" ) return # send flow control with configured STmin aux.send_flow_control(stmin=self.stmin) # expected PCI sequence of the received frames transfer_data_pci_sequence = cycle(tuple(range(0x21, 0x30)) + (0x20,)) # initial received block data length -> remove UDS service bytes block_data_len = len(block_data) elapsed_time = 0 block_start_time = time.perf_counter() # receive block data while ( not aux.stop_rx.is_set() and block_data_len < (expected_data_len - 1) and elapsed_time < self.TRANSFER_TIMEOUT ): data = aux.receive() if data is None: time.sleep(1e-4) elapsed_time = time.perf_counter() - block_start_time continue # reset data reception timeout block_start_time, elapsed_time = time.perf_counter(), 0 # verify received PCI and increase receive length if one was missed expected_pci = next(transfer_data_pci_sequence) if data[0] != expected_pci: block_data_len += len(data) - 1 log.internal_warning( f"Consecutive frame missed: " f"expected PCI {hex(expected_pci)}, got {hex(data[0])}" ) block_data_len += len(data) - 1 transfer_size += block_data_len log.internal_info( f"Block number {sequence_number}: " f"Received {transfer_size} B out of {expected_transfer_size} B" ) # send TransferData positive response with current sequence number success_response = [ IsoServices.TransferData + self.POSITIVE_RESPONSE_OFFSET, sequence_number, ] aux.send_response(success_response) # reset transfer timer transfer_start_time = time.perf_counter() self.transfer_successful = transfer_size >= expected_transfer_size self.transferred_data_size = transfer_size