How to create a connector
On pykiso view, a connector is the communication medium between the auxiliary and the device under test. It can consist of two different blocks:
a communication channel
a flasher
Thus, its goal is to implement the sending and reception of data on the lower level protocol layers (e.g. CAN, UART, UDP).
Both can be used as context managers as they inherit the common interface
Connector
.
Note
Many already implemented connectors are available under pykiso.lib.connectors
.
and can easily be adapted for new implementations.
Communication Channel
In order to facilitate the implementation of a communication channel and to ensure
the compatibility with different auxiliaries, pykiso provides a common
interface CChannel
.
This interface enforces the implementation of the following methods:
_cc_open()
: open the communication. Does not take any argument._cc_close()
: close the communication. Does not take any argument._cc_send()
: send data if the communication is open. Requires one positional argumentmsg
._cc_receive()
: receive data if the communication is open. Requires one positional argumenttimeout
.
Class definition and instanciation
To create a new communication channel, the first step is to define its class and constructor.
Let’s admin that the following code is added to a file called my_connector.py:
import pykiso
MyCommunicationChannel(pykiso.CChannel):
def __init__(self, arg1, arg2, kwarg1 = "default"):
...
Then, if this CChannel has to be used within a test, the test configuration file will derive from its location and constructor parameters:
connectors:
my_chan:
# provide the constructor parameters
config:
# arg1 and arg2 are mandatory as we defined them as positional arguments
arg1: value for positional argument arg1
arg2: value for positional argument arg2
# kwarg1 is optional as we defined it as a keyword argument with a default value
kwarg1: different value for keyword argument kwarg1
# let pykiso know which class we want to instantiate with the provided parameters
type: path/to/my_connector.py:MyCommunicationChannel
Note
If this file is located inside an installable package my_package
,
the type will become type: my_package.my_connector:MyCommunicationChannel
.
Interface completion
If the code above is left as such, it won’t be usable as a connector as the communication channel’s abstract methods aren’t implemented.
Therefore, all four methods _cc_open
, _cc_close
, _cc_send
and
_cc_receive
need to be implemented.
In order to complete the code above, let’s assume that a module my_connection_module implements the communication logic.
The connector then becomes:
from my_connection_module import Connection
import pykiso
MyCommunicationChannel(pykiso.CChannel):
def __init__(self, arg1, arg2, kwarg1 = "default"):
# Connection class could be anything, like serial.Serial or socket.socket
self.my_connection = Connection(arg1, arg2)
def _cc_open(self):
self.my_connection.open()
def _cc_close(self):
self.my_connection.close()
def _cc_send(self, data: bytes):
self.my_connection.send(data_bytes)
def _cc_receive(self, timeout) -> Optional[bytes]:
received_data = self.my_connection.receive(timeout=timeout)
if received_data:
return received_data
Note
The API used in this example for the fictive my_connection module entirely depends on the used module.
Parameters and return values
In order to stay compatible and usable by the attached auxiliary, the created connector has to respect certain rules (in addition to the CChannel base class interface):
rule 1 : _cc_receive concrete implementation has to return a dictionary containing at least the key “msg”. If more than the data received is return, for example the CAN ID from the emitter, just put it in the return dictionary.
see the example below with the cc_pcan_can connector and the return of the remote_id value :
def _cc_receive(
self, timeout: float = 0.0001
) -> Dict[str, Union[MessageType, int]]:
"""Receive a can message using configured filters.
:param timeout: timeout applied on reception
:return: the received data and the source can id
"""
try: # Catch bus errors & rcv.data errors when no messages where received
received_msg = self.bus.recv(timeout=timeout or self.timeout)
if received_msg is not None:
frame_id = received_msg.arbitration_id
payload = received_msg.data
timestamp = received_msg.timestamp
log.internal_debug(f"received CAN Message: {frame_id}, {payload}, {timestamp}")
return {"msg": payload, "remote_id": frame_id}
else:
return {"msg": None}
except can.CanError as can_error:
log.internal_debug(f"encountered can error: {can_error}")
return {"msg": None}
except Exception:
log.exception(f"encountered error while receiving message via {self}")
return {"msg": None}
rule 2 : additional arguments associated to _cc_send concret implementation has to be named arguments and used the **kwargs
see example below with the cc_pcan_can connector and the additional remote_id parameter:
def _cc_send(self, msg: MessageType, **kwargs) -> None:
"""Send a CAN message at the configured id.
If remote_id parameter is not given take configured ones
:param msg: data to send
:param kwargs: named arguments
"""
_data = msg
remote_id = kwargs.get("remote_id")
if remote_id is None:
remote_id = self.remote_id
Flasher
pykiso provides a common interface for flashers Flasher
that aims to be as simple as possible.
It only consists of 3 methods to implement:
open()
: open the communication with the flashing hardware if any (for e.g. JTAG flashing) and perform any preliminaly actionflash()
: perform all actions to flash the target deviceclose()
: close the communication with the flashing hardware.
Note
To ensure that a Flasher is closed after being opened, it should be used as a context manager (see Auxiliary implementation example).