Source code for pykiso.lib.auxiliaries.example_test_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Example Auxiliary
*****************

:module: example_test_auxiliary

:synopsis: Example auxiliary that simulates a normal test run without
    ever doing anything.

.. currentmodule:: example_test_auxiliary


.. warning:: Still under test
"""
import logging

# Import the framework modules
from pykiso import AuxiliaryInterface, message

log = logging.getLogger(__name__)


[docs]class ExampleAuxiliary(AuxiliaryInterface): """Example of an auxiliary implementation.""" def __init__( self, name=None, com=None, flash=None, **kwargs, ) -> None: """Constructor. :param name: Alias of the auxiliary instance :param com: Communication connector :param flash: flash connector """ super().__init__(name=name, **kwargs) self.channel = com self.flash = flash self.options = kwargs log.debug(f"com is {com}") log.debug(f"flash is {flash}") def _create_auxiliary_instance(self) -> bool: """Open current associated channel and send a ping command. :return: if channel creation is successful and ping command is answered return True otherwise False """ log.info("Create auxiliary instance") # Create an instance (e.g. flash it) -> Placeholder # Enable the communication with it log.info("Enable channel") self.channel.open() # Pingpong return_code = self._send_and_wait_ack(message.Message(), 0.5, 2) # Return output return return_code def _delete_auxiliary_instance(self) -> bool: """Close the connector communication. :return: always True """ log.info("Delete auxiliary instance") # Close the communication with it self.channel.close() return True def _run_command( self, cmd_message: message.Message, cmd_data: bytes = None ) -> bool: """Run the corresponding command. :param cmd_message: command type :param cmd_data: payload data to send over CChannel :return: True if ACK is received otherwise False """ log.debug("Send test request: {}".format(cmd_message)) return self._send_and_wait_ack(cmd_message, 0.5, 2) def _abort_command(self) -> bool: """Send an abort command and reset the auxiliary if needed. :return: True if ACK is received otherwise False """ log.info("Send abort request") # Try a soft abort msg = message.Message( message.MessageType.COMMAND, message.MessageCommandType.ABORT ) # TODO verify if this generation can happened somewhere else result = self._send_and_wait_ack(msg, 0.5, 2) # If not successful, do hard reset if result is False: self._delete_auxiliary_instance() self._create_auxiliary_instance() return result def _receive_message(self, timeout_in_s: float) -> message.Message: """Get message from the device under test. :param timeout_in_s: Time in ms to wait for one try :return: receive message """ # Read message on the channel received_message = self.channel.cc_receive(timeout_in_s) if received_message is not None: # Send ack self.channel._cc_send( msg=received_message.generate_ack_message(message.MessageAckType.ACK) ) # Return message return received_message def _send_and_wait_ack( self, message_to_send: message.Message, timeout: float, tries: int ) -> bool: """Send via the channel a message and wait for an acknowledgement. :param message_to_send: Message you want to send out :param timeout: Time in ms to wait for one try :param tries: Number of tries to send the message :return: True - Ack, False - Nack """ number_of_tries = 0 result = False while number_of_tries < tries: log.debug("Run send try n:" + str(number_of_tries)) # Increase number of tries number_of_tries += 1 # Send the message self.channel.cc_send(msg=message_to_send) # Wait until we get something back received_message = self.channel.cc_receive(timeout) # Check the outcome if received_message is None: continue # Next try will occur else: if message_to_send.check_if_ack_message_is_matching(received_message): log.debug("{} sent".format(message_to_send)) result = True break else: log.warning( "Received {} not matching {}!".format( received_message, message_to_send ) ) continue # A NACK got received # TODO later on we should have "not supported" and ignore it than. # Must be ">" because number_of_tries is increased at the beginning of the while loop above if number_of_tries > tries: log.warning( f"Exceeded the number of tries ({tries}) for sending the message" ) return result