##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Communication Channel Via segger j-link
***************************************
:module: cc_rtt_segger
:synopsis: channel used to enable RTT communication using Segger J-Link debugger.
Additionally, RTT logs can be captured by setting the rtt_log_path parameter
on the specified channel.
.. currentmodule:: cc_rtt_segger
"""
import functools
import logging
import threading
import time
from pathlib import Path
from typing import Optional, Union
import pylink
from pykiso import connector
from pykiso.message import Message
log = logging.getLogger(__name__)
[docs]def _need_connection(f):
"""Decorator to check the JLink is connected and raises an error otherwise"""
@functools.wraps(f)
def check_before_execution(self, *args, **kwargs):
"""Check if Jlink is opened, else raise error
:raises pylink.JLinkException: if Jlink not available
"""
if not self.jlink.opened():
raise pylink.JLinkException("You need to connect to a JLink first")
return f(self, *args, **kwargs)
return check_before_execution
[docs]def _need_rtt(f):
"""Decorator to check that a RTT connection has been configured and raises an error otherwise"""
@functools.wraps(f)
def check_before_execution(self, *args, **kwargs):
"""Check if Jlink is opened and RTT configured, else raise errors
:raises pylink.JLinkException: if RTT config not done or Jlink not available
"""
if not self.jlink.opened():
raise pylink.JLinkException("You need to connect to a JLink first")
if not self.rtt_configured:
raise pylink.JLinkException("You need to configure RTT first")
return f(self, *args, **kwargs)
return check_before_execution
[docs]class CCRttSegger(connector.CChannel):
"""Channel using RTT to communicate through Segger J-Link debugger."""
def __init__(
self,
serial_number: int = None,
chip_name: str = "STM32L562QE",
speed: int = 4000,
block_address: int = 0x2003F800,
verbose: bool = False,
tx_buffer_idx: int = 3,
rx_buffer_idx: int = 0,
rtt_log_path: Optional[str] = None,
rtt_log_buffer_idx: int = 0,
rtt_log_speed: float = 1000,
connection_timeout: int = 5,
**kwargs,
):
"""Initialize attributes.
:param serial_number: optional segger debugger serial number (required if many connected)
:param chip_name: microcontoller name (STM....)
:param speed: communication speed in Hz
:param block_address: start address to start RTT communication
:param tx_buffer_idx: buffer index used for transmission
:param rx_buffer_idx: buffer index used for reception
:param verbose: boolean indicating if J-Link connection should be verbose in logging
:param rtt_log_path: path to the folder where the RTT log file should be stored
:param rtt_log_buffer_idx: buffer index used for RTT logging
:param rtt_log_speed: number of log per second to be pulled (manage the CPU load for logging)
None value fetch log at the CPU's speed. Default 1000 logs/s
:param connection_timeout: available time (in seconds) to open the connection
"""
super().__init__(**kwargs)
self.serial_number = serial_number if isinstance(serial_number, int) else None
self.chip_name = chip_name
self.speed = speed
self.block_address = block_address
self.tx_buffer_idx = tx_buffer_idx
self.rx_buffer_idx = rx_buffer_idx
self.verbose = verbose
self.jlink = None
self.connection_timeout = connection_timeout
self.rtt_log_buffer_idx = rtt_log_buffer_idx
self.rx_buffer_size = None
# initialize rtt logging specific parameters
self.rtt_configured = False
self._is_running = False
self.rtt_log_refresh_time = round(1 / rtt_log_speed, 6) if rtt_log_speed else 0
self.rtt_log_thread = threading.Thread(target=self.receive_log)
self.rtt_log_path = rtt_log_path
self.rtt_log = logging.getLogger(f"{__name__}.RTT")
if self.rtt_log_path is not None:
self.rtt_log_buffer_size = 0
self.rtt_log_path = Path(rtt_log_path)
rtt_fh = logging.FileHandler(self.rtt_log_path / "rtt.log")
rtt_fh.setLevel(logging.DEBUG)
self.rtt_log.setLevel(logging.DEBUG)
self.rtt_log.addHandler(rtt_fh)
[docs] def read_target_memory(
self, addr: int, num_units: int, zone: str = None, nbits: int = 32
) -> Optional[list]:
"""Read the given target's memory units and the given address.
.. note:: The optional ``zone`` specifies a memory zone to
access to read from, e.g. ``IDATA``, ``DDATA``, or ``CODE``.
.. warning:: The given number of bits, if provided, must be
either ``8``, ``16``, or ``32``. If not provided, always
reads 32 bits.
:param addr: start address to read from
:param num_units: number of units to read
:param zone: optional memory zone name to access
:param nbits: number of bits to use for each unit
:return: List of units read from the target.
"""
mem_vals = None
try:
mem_vals = self.jlink.memory_read(addr, num_units, zone, nbits)
except pylink.errors.JLinkException:
log.exception(f"encountered error while reading memory at {hex(addr)}")
except ValueError:
log.exception("wrong number of bits given : must be 8, 16 or 32 bits")
return mem_vals
[docs] def _cc_open(self) -> None:
"""Connect debugger/microcontroller.
This method proceed to the following actions :
- create a JLink class instance
- connect to the debugger(using open method)
- set debugger interface to SWD
- connect debugger to the specified chip
- start RTT communication
- start RTT Logging the specified channel if activated
:raise JLinkRTTException: if connection timeout occurred.
"""
self.jlink = pylink.JLink()
# connect to J-Link debugger
if not self.jlink.opened():
self.jlink.open(self.serial_number)
log.info(f"connection made with J-Link debugger {self.serial_number}")
else:
log.debug("connection to J-Link already started")
# set target interface to SWD
self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD)
# connect debugger to the specified target
self.jlink.connect(
chip_name=self.chip_name, speed=self.speed, verbose=self.verbose
)
log.debug(
f"connection to chip {self.chip_name} performed at speed {self.speed}Hz"
)
# reset debugger if halted
if self.jlink.halted():
log.info(
f"J-Link is halted, reset target and wait for {self.connection_timeout}s"
)
self.jlink.reset(halt=False)
time.sleep(self.connection_timeout)
# start rtt at the specified address
self.jlink.rtt_start(self.block_address)
log.info(f"RTT communication started at address {self.block_address}")
t_start = time.perf_counter()
while True:
try:
num_up = self.jlink.rtt_get_num_up_buffers()
num_down = self.jlink.rtt_get_num_down_buffers()
log.debug(
f"RTT started. Found {num_up} up and {num_down} down channels."
)
# get rx buffer size
rx_buffer = self.jlink.rtt_get_buf_descriptor(self.rx_buffer_idx, True)
self.rx_buffer_size = rx_buffer.SizeOfBuffer
log.debug(
f"Maximum size for a received message set to {self.rx_buffer_size}"
)
self.rtt_configured = True
break
except pylink.errors.JLinkRTTException:
time.sleep(0.1)
# Exit the while loop once timeout is reached
if time.perf_counter() > (t_start + self.connection_timeout):
raise
# start rtt logging thread on buffer index rtt_log_buffer_idx
if self.rtt_log_path is not None:
try:
rtt_log_buffer = self.jlink.rtt_get_buf_descriptor(
self.rtt_log_buffer_idx, True
)
self.rtt_log_buffer_size = rtt_log_buffer.SizeOfBuffer
if self.rtt_log_buffer_size == 0:
raise ValueError
log.debug(f"RTT log buffer size is {self.rtt_log_buffer_size} bytes")
except ValueError:
log.debug("Read RTT log buffer size is 0, defaulting to 1kB")
self.rtt_log_buffer_size = 1024
except pylink.errors.JLinkRTTException as e:
log.error(f"Could not get RTT log buffer size: {e}")
if self.rtt_log_buffer_idx not in range(num_up + 1):
log.error(f"Buffer index must be at most {num_up}")
self.rtt_log_buffer_idx = 0
self.rtt_log_buffer_size = 1024
finally:
self._is_running = True
self.rtt_log_thread.start()
log.info("RTT logging started")
[docs] def _cc_close(self) -> None:
"""Close current RTT communication in use."""
if self.jlink is not None:
self._is_running = False
self.jlink.rtt_stop()
self.jlink.close()
log.info("RTT communication closed")
[docs] def _cc_send(self, msg: Message or bytes, raw: bool = False) -> None:
"""Send message using the corresponding RTT buffer.
:param msg: message to send, should be Message type or bytes.
:param raw: if raw is True simply send it as it is, otherwise apply serialization
"""
try:
if not raw:
msg = msg.serialize()
msg = list(msg)
bytes_written = self.jlink.rtt_write(self.tx_buffer_idx, msg)
log.debug(
"===> message sent (RTT) on buffer %d: %s, number of bytes written: %d",
self.tx_buffer_idx,
msg,
bytes_written,
)
except Exception:
log.exception(
f"ERROR occurred while sending {len(msg)} bytes on buffer {self.tx_buffer_idx}"
)
[docs] def _cc_receive(
self, timeout: float = 0.1, raw: bool = False
) -> Union[Message, bytes, None]:
"""Read message from the corresponding RTT buffer.
:param timeout: timeout applied on receive event
:param raw: if raw is True return raw bytes, otherwise Message type like
:return: Message or raw bytes if successful, otherwise None
"""
is_timeout = False
# maximum amount of bytes to read out
size = self.rx_buffer_size if raw else Message().header_size
t_start = time.perf_counter()
# rtt_read is not a blocking method due to this fact a while loop is used
# to act like a blocking ones.
while not is_timeout:
try:
# Read the message header or all of the buffer
msg_received = self.jlink.rtt_read(self.rx_buffer_idx, size)
# if a message is received
if msg_received:
if not raw:
# Read the payload and CRC
msg_received += self.jlink.rtt_read(
self.rx_buffer_idx,
msg_received[-1] + Message().crc_byte_size,
)
# Parse the bytes list into bytes string
msg_received = bytes(msg_received)
log.debug(
"<=== message received (RTT) on buffer %d: %s, number of bytes read: %d",
self.rx_buffer_idx,
msg_received,
len(msg_received),
)
if not raw:
msg_received = Message.parse_packet(msg_received)
break
except Exception:
log.exception(
f"encountered error while receiving message via {self} on buffer {self.rx_buffer_idx}"
)
return None
# Exit the while loop once timeout is reached
if time.perf_counter() > (t_start + timeout):
is_timeout = True
msg_received = None
return msg_received
[docs] @_need_rtt
def receive_log(self) -> None:
"""Receive RTT log messages from the corresponding RTT buffer."""
while self._is_running:
# receive at most rtt_log_buffer_size of RTT logs
log_msg = self.jlink.rtt_read(
self.rtt_log_buffer_idx, self.rtt_log_buffer_size
)
if log_msg:
self.rtt_log.debug(bytes(log_msg).decode())
time.sleep(self.rtt_log_refresh_time) # reduce resource consumption
[docs] @_need_connection
def reset_target(self, wait_time: int = 100, halt: bool = False) -> None:
"""Reset target via JLink.
:param wait_time: Amount of milliseconds to delay after reset
:param halt: if the CPU should halt after reset
"""
logging.info("Reset Target")
self.jlink.enable_reset_pulls_reset()
self.jlink.reset(wait_time, halt)