##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
UDS Auxiliary plugin
********************
:module: uds_auxiliary
:synopsis: implementation of existing UdsAuxiliary for Robot
framework usage.
.. currentmodule:: uds_auxiliary
"""
from typing import List, Optional, Union
from robot.api import logger
from robot.api.deco import keyword, library
from uds import IsoServices
from pykiso.lib.robot_framework.aux_interface import RobotAuxInterface
from ..auxiliaries.udsaux.uds_auxiliary import UdsAuxiliary as UdsAux
from ..auxiliaries.udsaux.uds_response import UdsResponse
[docs]@library(version="0.1.0")
class UdsAuxiliary(RobotAuxInterface):
"""Robot framework plugin for UdsAuxiliary."""
ROBOT_LIBRARY_SCOPE = "SUITE"
def __init__(self):
"""Initialize attributes."""
super().__init__(aux_type=UdsAux)
[docs] @keyword(name="Send uds raw")
def send_uds_raw(
self, msg_to_send: bytes, aux_alias: str, timeout_in_s: float = 6
) -> Union[list, bool]:
"""Send a UDS diagnostic request to the target ECU.
:param msg_to_send: can uds raw bytes to be sent
:param aux_alias: auxiliary's alias
:param timeout_in_s: maximum time used to wait for a response.
:return: the raw uds response's, or True if a response is
not expected and the command is properly sent otherwise
False
"""
aux = self._get_aux(aux_alias)
msg = aux.send_uds_raw(msg_to_send, timeout_in_s)
return list(msg) if not isinstance(msg, bool) else msg
[docs] @keyword(name="Send uds config")
def send_uds_config(
self, msg_to_send: dict, aux_alias: str, timeout_in_s: float = 6
) -> Union[dict, bool]:
"""Send UDS config to the target ECU.
:param msg_to_send: uds config to be sent
:param aux_alias: auxiliary's alias
:param timeout_in_s: maximum time used to wait for a response
:return: a dict containing the uds response, or True if a
response is not expected and the command is properly sent
otherwise False
"""
msg_to_send["service"] = self.get_service_id(msg_to_send["service"])
aux = self._get_aux(aux_alias)
return aux.send_uds_config(msg_to_send, timeout_in_s)
[docs] @keyword(name="Check raw response positive")
def check_raw_response_positive(
self, resp: UdsResponse, aux_alias: str
) -> Optional[bool]:
"""Check if the response is positive, raise an error if not
:param resp: raw response of uds request
:raise UnexpectedResponseError: raised when the answer is not the expected one
:return: True if response is positive
"""
aux = self._get_aux(aux_alias)
return aux.check_raw_response_positive(resp)
[docs] @keyword(name="Check raw response negative")
def check_raw_response_negative(
self, resp: UdsResponse, aux_alias: str
) -> Optional[bool]:
"""Check if the response is negative, raise an error if not
:param resp: raw response of uds request
:raise UnexpectedResponseError: raised when the answer is not the expected one
:return: True if response is negative
"""
aux = self._get_aux(aux_alias)
return aux.check_raw_response_negative(resp)
[docs] @keyword(name="Soft reset")
def soft_reset(self, aux_alias: str) -> Union[dict, List[int]]:
"""Perform soft reset of the component, equivalent to a restart of application
:param aux_alias: auxiliary's alias
"""
aux = self._get_aux(aux_alias)
return aux.soft_reset()
[docs] @keyword(name="Hard reset")
def hard_reset(self, aux_alias: str) -> Union[dict, List[int]]:
"""Allow power reset of the component
:param aux_alias: auxiliary's alias
"""
aux = self._get_aux(aux_alias)
return aux.hard_reset()
[docs] @keyword(name="Force ecu reset")
def force_ecu_reset(self, aux_alias: str) -> List[int]:
"""Allow power reset of the component
:param aux_alias: auxiliary's alias
"""
aux = self._get_aux(aux_alias)
return aux.force_ecu_reset()
[docs] @keyword(name="Read data")
def read_data(self, parameter: str, aux_alias: str) -> Union[dict, bool]:
"""UDS config command that allow data reading
:param parameter: data to be read
:param aux_alias: auxiliary's alias
:return: a dict with uds config response
"""
aux = self._get_aux(aux_alias)
return aux.read_data(parameter)
[docs] @keyword(name="Write data")
def write_data(
self, parameter: str, value: Union[List[bytes], bytes], aux_alias: str
) -> Union[dict, bool]:
"""UDS config command that allow data reading
:param parameter: data to be read
:param value: new content of the data
:param aux_alias: auxiliary's alias
:return: a dict with uds config response
"""
aux = self._get_aux(aux_alias)
return aux.write_data(parameter, value)
[docs] @staticmethod
def get_service_id(service_name: str) -> int:
"""Return the uds service id.
:param service_name: service's name (EcuReset,
ReadDataByIdentifier ...)
:return: corresponding service identification number
"""
try:
service_id = IsoServices[service_name].value
except KeyError:
logger.error(f"{service_name} doesn't exist", html=True)
raise KeyError(f"{service_name} doesn't exist")
return service_id