_can_auxiliary:

11. can_auxiliary

11.1. Collecting messages

The CanAuxiliary offer a context manager that can be used to collect all messages received while the context manager is used.

Example test script using the collect message context manager:

##########################################################################
# Copyright (c) 2010-2023 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Can Auxiliary example
*********************

:module: test_can

:synopsis: Example test show how send and receive can messages from can

.. currentmodule:: test_can
"""
import logging
import threading
import time

import pykiso
from pykiso.auxiliaries import can_aux1, can_aux2
from pykiso.lib.auxiliaries.can_auxiliary.can_auxiliary import CanAuxiliary
from pykiso.lib.auxiliaries.can_auxiliary.can_message import CanMessage

can_aux1: CanAuxiliary
can_aux2: CanAuxiliary


@pykiso.define_test_parameters(suite_id=1, case_id=1, aux_list=[can_aux1, can_aux2])
class CanAuxTest(pykiso.BasicTest):
    def test_send_and_receive_message(self):
        """
        Test send message and receive messages
        """
        logging.info(f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------")

        send_t = threading.Thread(target=self.tx_thread, args=[can_aux1])

        send_t.start()
        time.sleep(0.3)
        # get last received message by message name
        recv_msg = can_aux2.get_last_message("Message_1")
        logging.info("Signal A = " + str(recv_msg.signals["signal_a"]))
        assert recv_msg.signals["signal_a"] == 4

        # get certain signal of last received message by message name
        recv_signal = can_aux2.get_last_signal("Message_1", "signal_a")
        logging.info("Received Signal = " + str(recv_signal))
        assert recv_signal == 4

        # wait certain time to receive specific message
        recv_msg = can_aux2.wait_for_message("Message_1", 1)
        logging.info("New msg with wait Signal A = " + str(recv_msg.signals["signal_a"]))
        assert recv_msg.signals == {"signal_a": 7, "signal_b": 8}
        send_t.join()

    def test_wait_for_expected_signals(self):
        """
        Send messages, and wait one of them to match the expected signals
        """
        logging.info(f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------")

        send_t = threading.Thread(target=self.tx_thread, args=[can_aux1])

        send_t.start()
        time.sleep(0.3)
        recv_msg = can_aux2.wait_to_match_message_with_signals("Message_1", {"signal_a": 7}, 1)
        logging.info(f"Matched Message signals {recv_msg.signals}")
        assert recv_msg.signals == {"signal_a": 7, "signal_b": 8}
        send_t.join()

    def test_collect_messages_with_a_context_manager(self):
        """Collect all the messages that were received while the context manager is active"""
        logging.info(f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------")

        send_t = threading.Thread(target=self.tx_thread, args=[can_aux1])

        send_t.start()
        time.sleep(0.1)
        with can_aux2.collect_messages():
            can_aux2.send_message("Message_1", {"signal_a": 1, "signal_b": 2})
            time.sleep(0.6)
        messages: list[CanMessage] = can_aux2.get_collected_messages()
        assert messages[0].name == "Message_1"
        assert messages[0].signals == {"signal_a": 4, "signal_b": 5}
        assert messages[1].name == "Message_1"
        assert messages[1].signals == {"signal_a": 7, "signal_b": 8}

        send_t.join()

    def tx_thread(self, can_aux):
        messages_to_send = [
            CanMessage("Message_1", {"signal_a": 1, "signal_b": 2}, 3),
            CanMessage("Message_1", {"signal_a": 4, "signal_b": 5}, 6),
            CanMessage("Message_1", {"signal_a": 7, "signal_b": 8}, 9),
        ]
        # send multiple messages one by one from can auxiliary with certain interval
        for message_to_send in messages_to_send:
            can_aux.send_message(message_to_send.name, message_to_send.signals)
            time.sleep(0.2)