Source code for pykiso.lib.connectors.cc_fdx_lauterbach

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel Via lauterbach
************************************

:module: cc_fdx_lauterbach

:synopsis: CChannel implementation for lauterbach(FDX)

.. currentmodule:: cc_fdx_lauterbach

"""

import ctypes
import enum
import logging
import subprocess
import time
from typing import Dict, Optional, Union

from pykiso import connector
from pykiso.message import Message

log = logging.getLogger(__name__)


[docs]class PracticeState(enum.IntEnum): """Available state for any scripts loaded into TRACE32.""" UNKNOWN = -1 NOT_RUNNING = 0 RUNNING = 1 DIALOG_OPEN = 2
[docs]class CCFdxLauterbach(connector.CChannel): """Lauterbach connector using the FDX protocol.""" def __init__( self, t32_exc_path: str = None, t32_config: str = None, t32_main_script_path: str = None, t32_reset_script_path: str = None, t32_fdx_clr_buf_script_path: str = None, t32_in_test_reset_script_path: str = None, t32_api_path: str = None, port: str = None, node: str = "localhost", packlen: str = "1024", device: int = 1, **kwargs, ): """Constructor: initialize attributes with configuration data. :param t32_exc_path: full path of Trace32 app to execute :param t32_config: full path of Trace32 configuration file :param t32_main_script_path: full path to the main cmm script to execute :param t32_reset_script_path: full path to the reset cmm script to execute :param t32_fdx_clr_buf_script_path: full path to the FDX reset cmm script to execute :param t32_in_test_reset_script_path: full path to the board reset cmm script to execute :param t32_api_path: full path of remote api :param port: port number used for UDP communication :param node: node name (default localhost) :param packlen: data pack length for UDP communication (default 1024) :param device: configure device number given by Trace32 (default 1) """ self.t32_main_script_path = t32_main_script_path self.t32_reset_script_path = t32_reset_script_path self.t32_fdx_clr_buf_script_path = t32_fdx_clr_buf_script_path self.t32_in_test_reset_script_path = t32_in_test_reset_script_path self.t32_api_path = t32_api_path self.t32_start_args = [t32_exc_path, "-c", t32_config] self.port = str(port) self.node = str(node) self.packlen = str(packlen) self.device = device self.t32_process = None self.t32_api = None self.loadup_wait_time = 4 self.fdxin = -1 self.fdxout = -1 self.reset_flag = False self.safe_reset_flag = False self.allowed_t32_errors = 10 # Initialize the super class super().__init__(**kwargs)
[docs] def load_script(self, script_path: str): """Load a cmm script. :param script_path: cmm file path :return: error status """ err = 0 # Run the script err = self.t32_api.T32_Cmd(f"DO {script_path}".encode()) if err != 0: log.error(f"Error '{err}' while loading the cmm file: {script_path}") return err log.internal_info(f"Loading the cmm file: {script_path}") # Check whether the scrip we just launched has completed or not. state = ctypes.c_int(PracticeState.UNKNOWN) error_count = 0 while not state.value == PracticeState.NOT_RUNNING: err = self.t32_api.T32_GetPracticeState(ctypes.byref(state)) if err != 0: error_count += 1 if error_count >= self.allowed_t32_errors: log.error( f"Abort execution because lauterbach was unresponsive for {error_count} times" ) break time.sleep(0.05) # 50 ms pause in each loop if err != 0: log.error("Error occurred while checking the script state") return err
[docs] def _cc_open(self) -> bool: """Load the Trace32 library, open the application and open the FDX channels (in/out). :return: True if Trace32 is correctly open otherwise False """ lauterbach_open_state = False # Load Trace32 remote api library try: self.t32_api = ctypes.CDLL(self.t32_api_path) log.internal_debug("Trace32 remote API loaded") except Exception as e: log.exception(f"Unable to open Trace32: {e}") return lauterbach_open_state # Looking for an active Trace32 process and close it if self.t32_api.T32_Init() == 0: # Quit properly the previous T32 instance self.t32_api.T32_Cmd("QUIT".encode("latin-1")) log.internal_debug("Previous process Trace32 closed") # Open a new Trace32 process try: self.t32_process = subprocess.Popen(self.t32_start_args) log.internal_debug( f"Trace32 process open with arguments {self.t32_start_args}" ) except Exception: log.exception("Unable to open Trace32") return lauterbach_open_state # Wait until Trace32 app is launched time.sleep(self.loadup_wait_time) # Set channel configuration self.t32_api.T32_Config(b"NODE=", self.node.encode("utf-8")) self.t32_api.T32_Config(b"PORT=", self.port.encode("utf-8")) self.t32_api.T32_Config(b"PACKLEN=", self.packlen.encode("utf-8")) # Open UDP connection with Trace32 self.t32_api.T32_Init() self.t32_api.T32_Attach(self.device) # Ping if self.t32_api.T32_Ping() == 0: log.internal_debug(f"ITF connected on {self.node}:{self.port}") else: log.fatal(f"Unable to connect on port :{self.port}") return lauterbach_open_state # Clear the FDX buffer if script provided if self.load_script(self.t32_fdx_clr_buf_script_path) == 0: log.internal_info(f"script {self.t32_fdx_clr_buf_script_path} loaded") # Load the cmm script if self.load_script(self.t32_main_script_path) == 0: log.internal_debug(f"script {self.t32_main_script_path} loaded") else: log.fatal(f"Unable to load {self.t32_main_script_path}") return lauterbach_open_state # Get FDX receiver channel self.fdxin = self.t32_api.T32_Fdx_Open(b"FdxTestSendBuffer", b"r") if self.fdxin == -1: log.fatal("No FDXin buffer") return lauterbach_open_state # Get FDX sender channel self.fdxout = self.t32_api.T32_Fdx_Open(b"FdxTestReceiveBuffer", b"w") if self.fdxout == -1: log.fatal("No FDXout buffer") return lauterbach_open_state # Run the script self.start() lauterbach_open_state = True return lauterbach_open_state
[docs] def _cc_close(self) -> None: """Close FDX connection, UDP socket and shut down Trace32 App.""" self.t32_api.T32_Stop() # Close FDX receiver communication fdxin_state = self.t32_api.T32_Fdx_Close(self.fdxin) log.internal_debug( f"Disconnected from FDX {self.fdxin} with state {fdxin_state}" ) # Close FDX sender communication fdxout_state = self.t32_api.T32_Fdx_Close(self.fdxout) log.internal_debug( f"Disconnected from FDX {self.fdxout} with state {fdxout_state}" ) # Reset Target reset_cpu_state = self.t32_api.T32_ResetCPU() log.internal_debug(f"Reset the CPU with state {reset_cpu_state}") # Reset the target if script provided if self.t32_reset_script_path is not None: self.load_script(self.t32_reset_script_path) # Close Trace32 application self.t32_api.T32_Cmd("QUIT".encode("latin-1"))
[docs] def _cc_send(self, msg: bytes, **kwargs) -> int: """Sends a message using FDX channel. :param msg: message :param kwargs: not used :return: poll length """ log.internal_debug(f"===> {msg}") log.internal_debug(f"Sent on channel {self.fdxout}") # Create and fill the buffer with the message buffer = ctypes.pointer(ctypes.create_string_buffer(len(msg))) buffer.contents.raw = msg # Send the message poll_len = self.t32_api.T32_Fdx_SendPoll(self.fdxout, buffer, 1, len(msg)) if poll_len <= 0: log.exception( f"ERROR occurred while sending {len(msg)} bytes on {self.fdxout}" ) return poll_len
[docs] def _cc_receive(self, timeout: float = 0.1) -> Dict[str, Union[bytes, str, None]]: """Receive message using the FDX channel. :return: message """ # Add a small delay to allow other functions to execute time.sleep(0.1) received_msg = None if self.reset_flag: # If the Reset function is called, do not attempt to read messages return {"msg": received_msg} self.safe_reset_flag = False # Get the current time to process the timeout t_start = time.perf_counter() # Ensure to enter into the while loop (at least one time) if timeout is set to 0 is_timeout = False # Check if a message has been received within the timeout while not is_timeout: # Create the buffer buffer = ctypes.pointer( ctypes.create_string_buffer(4096) ) # MaxSize = [0 ; 4096] # Check if msg available poll_len = self.t32_api.T32_Fdx_ReceivePoll( self.fdxin, buffer, len(buffer.contents[0]), int(len(buffer.contents) / len(buffer.contents[0])), ) # Check if T32 api got an error if poll_len < 0: log.error( f"ERROR occurred while listening channel {self.fdxin} with buffer: {buffer.contents.value}" ) break # Check if a message has been received elif poll_len > 0: log.internal_info(f"Message size: {poll_len}") log.internal_info( f"<=== {Message.parse_packet(buffer.contents.raw[:poll_len])}" ) log.internal_debug(f"Received on channel {self.fdxin}") received_msg = Message.parse_packet(buffer.contents.raw[:poll_len]) break # Exit the while loop once timeout is reached if time.perf_counter() > (t_start + timeout): is_timeout = True self.safe_reset_flag = True # No message received return {"msg": received_msg}
[docs] def start(self) -> None: """Override clicking on "go" in the Trace32 application. The channel must have been successfully opened (Trace32 application opened and script loaded). """ self.t32_api.T32_Go()
[docs] def reset_board(self) -> None: """Executes the board reset.""" log.internal_debug(f"In Reset Function safety flag is: {self.safe_reset_flag}") log.internal_debug(f"In Reset Function reset flag is: {self.reset_flag}") while not self.safe_reset_flag: log.internal_debug(f"Safety flag in while loop is: {self.safe_reset_flag}") time.sleep(0.3) self.reset_flag = True log.internal_debug("Do the board reset") self.t32_api.T32_Stop() # Close FDX receiver communication fdxin_state = self.t32_api.T32_Fdx_Close(self.fdxin) log.internal_debug( f"Disconnected from FDX {self.fdxin} with state {fdxin_state}" ) # Close FDX sender communication fdxout_state = self.t32_api.T32_Fdx_Close(self.fdxout) log.internal_debug( f"Disconnected from FDX {self.fdxout} with state {fdxout_state}" ) # Reset Target reset_cpu_state = self.t32_api.T32_ResetCPU() log.internal_debug(f"Reset the CPU with state {reset_cpu_state}") # Run reset script if self.t32_in_test_reset_script_path is not None: self.load_script(self.t32_in_test_reset_script_path) # Get FDX receiver channel self.fdxin = self.t32_api.T32_Fdx_Open(b"FdxTestSendBuffer", b"r") if self.fdxin == -1: log.fatal("No FDXin buffer") # Get FDX sender channel self.fdxout = self.t32_api.T32_Fdx_Open(b"FdxTestReceiveBuffer", b"w") if self.fdxout == -1: log.fatal("No FDXout buffer") self.t32_api.T32_Go() self.reset_flag = False log.internal_debug("Reset finished")