##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Lauterbach Flasher
******************
:module: flash_lauterbach
:synopsis: used to flash through lauterbach probe.
.. currentmodule:: flash_lauterbach
.. warning: Still under test
"""
import ctypes
import enum
import logging
import subprocess
import time
from pykiso import connector
log = logging.getLogger(__name__)
[docs]class ScriptState(enum.IntEnum):
"""Use to determine script command execution."""
UNKNOWN = -1
NOT_RUNNING = 0
RUNNING = 1
DIALOG_OPEN = 2
[docs]class MessageLineState(enum.IntEnum):
"""Use to determine Message reading command."""
ERROR = 2
ERROR_INFO = 16
[docs]class LauterbachFlasher(connector.Flasher):
"""Connector used to flash through one and only one
Lauterbach probe using Trace32 as remote API.
"""
def __init__(
self,
t32_exc_path: str = None,
t32_config: str = None,
t32_script_path: str = None,
t32_api_path: str = None,
port: str = None,
node: str = "localhost",
packlen: str = "1024",
device: int = 1,
**kwargs,
):
"""Initialize attributes with configuration data.
:param t32_exc_path: full path of Trace32 app to execute
:param t32_config: full path of Trace32 configuration file
:param t32_script_path: full path to .cmm flash script to execute
:param t32_api_path: full path of remote api
:param port: port number used for UDP communication
:param node: node name (default localhost)
:param packlen: data pack length for UDP communication (default 1024)
:param device: configure device number given by Trace32 (default 1)
"""
self.t32_script_path = t32_script_path
self.t32_api_path = t32_api_path
self.t32_start_args = [t32_exc_path, "-c", t32_config]
self.port = str(port)
self.node = str(node)
self.packlen = str(packlen)
self.device = device
self.t32_process = None
self.t32_api = None
self.loadup_wait_time = 5
self.allowed_t32_errors = 10
super().__init__(self.t32_script_path, **kwargs)
[docs] def open(self) -> None:
"""Open UDP socket between ITF and Trace32 loaded app.
The open command leads to the following sub-tasks execution:
- Open a Trace32 app
- Load remote API using ctypes
- Configure UPD channel (Port/buffer size...)
- Open UDP connection
- Make a ping request
"""
# load Trace32 remote api
try:
self.t32_api = ctypes.cdll.LoadLibrary(self.t32_api_path)
log.internal_info("Trace32 remote API loaded")
except Exception as e:
log.exception("Unable to open Trace32")
raise e
# Looking for an active Trace32 process and close it
if self.t32_api.T32_Init() == 0:
# Quit properly the previous T32 instance
cmd_status = self.t32_api.T32_Cmd("QUIT".encode("latin-1"))
log.internal_debug(
f"Previous process Trace32 closed with exit code:{cmd_status}"
)
# Properly exit t32 api. Otherwise subsequential commands will fail
self.t32_api.T32_Exit()
# open a new Trace32 process
try:
self.t32_process = subprocess.Popen(self.t32_start_args)
log.internal_debug(
f"Trace32 process open with arguments {self.t32_start_args}"
)
except Exception as e:
log.exception("Unable to open Trace32")
raise e
# wait until Trace32 app is launched
time.sleep(self.loadup_wait_time)
# channel configuration
self.t32_api.T32_Config(b"NODE=", self.node.encode("utf-8"))
self.t32_api.T32_Config(b"PORT=", self.port.encode("utf-8"))
self.t32_api.T32_Config(b"PACKLEN=", self.packlen.encode("utf-8"))
# open UDP connection with Trace32
if self.t32_api.T32_Init() == 0:
self.t32_api.T32_Attach(self.device)
log.internal_info(f"ITF connected on {self.node}:{self.port}")
else:
log.fatal(f"Unable to connect on port : {self.port}")
raise Exception(f"Unable to connect on port : {self.port}")
[docs] def flash(self) -> None:
"""Flash software using configured .cmm script.
The Flash command leads to the following sub-tasks execution :
- Send to Trace32 CD.DO internal command (execute script)
- Wait until script is finished
- Get script execution verdict
:raise Exception: if Trace32 error occurred during flash.
"""
# read actual flash script content for logging puprpose
with open(file=self.t32_script_path, mode="r") as script:
log.internal_debug(
f"flash using script at {self.t32_script_path}, with the following commands : {script.read()}"
)
# run flash script
cmd = f"CD.DO {self.t32_script_path}"
log.internal_info(f"Call T32_Cmd {cmd}")
request_state = self.t32_api.T32_Cmd(cmd.encode("utf-8"))
if request_state < 0:
log.fatal("TRACE32 Remote API communication error")
raise Exception("TRACE32 Remote API communication error")
else:
# wait until script's end
script_state = ctypes.c_int(ScriptState.UNKNOWN)
request_state = 0
error_count = 0
while not script_state.value == ScriptState.NOT_RUNNING:
request_state = self.t32_api.T32_GetPracticeState(
ctypes.byref(script_state)
)
log.internal_debug(
f"Called T32_GetPracticeState. request_state: {request_state} "
f"script_state: {script_state.value} request error: {error_count}"
)
if request_state != 0:
error_count += 1
if error_count >= self.allowed_t32_errors:
raise RuntimeError(
f"Error during lauterbach script run. Script: {self.t32_script_path}\n"
f"Abort execution because lauterbach was unresponsive for {error_count} times"
)
time.sleep(1)
# reset target
cmd = "SYStem.RESet"
log.internal_info(f"Call T32_Cmd {cmd}")
request_state = self.t32_api.T32_Cmd(cmd.encode("utf-8"))
# get script execution verdict
script_state = ctypes.c_uint16(-1)
message = ctypes.create_string_buffer(256)
request_state = self.t32_api.T32_GetMessage(
ctypes.byref(message), ctypes.byref(script_state)
)
msg = message.value.decode("utf-8")
if (
request_state == 0
and not script_state.value == MessageLineState.ERROR
and not script_state.value == MessageLineState.ERROR_INFO
):
log.internal_info("flash procedure successful")
else:
log.fatal(
f"An error occurred during flash,state : {script_state.value} -> {msg}"
)
raise Exception(
f"An error occurred during flash,state : {script_state.value} -> {msg}"
)
[docs] def close(self) -> None:
"""Close UDP socket and shut down Trace32 App."""
# Close Trace32 application
self.t32_api.T32_Cmd("QUIT".encode("latin-1"))
# close communication with Trace32
exit_state = self.t32_api.T32_Exit()
log.internal_info(f"Disconnect from Trace32 with state {exit_state}")
# Process has to quit properly.
# Otherwise lauterbach needs a power reset to rerun t32mppc.
try:
self.t32_process.wait(timeout=5)
except Exception:
log.internal_warning("Trace32 failed to exit properly")