How to create a connector

On pykiso view, a connector is the communication medium between the auxiliary and the device under test. It can consist of two different blocks:

  • a communication channel

  • a flasher

Thus, its goal is to implement the sending and reception of data on the lower level protocol layers (e.g. CAN, UART, UDP).

Both can be used as context managers as they inherit the common interface Connector.

Note

Many already implemented connectors are available under pykiso.lib.connectors. and can easily be adapted for new implementations.

Communication Channel

In order to facilitate the implementation of a communication channel and to ensure the compatibility with different auxiliaries, pykiso provides a common interface CChannel.

This interface enforces the implementation of the following methods:

  • _cc_open(): open the communication. Does not take any argument.

  • _cc_close(): close the communication. Does not take any argument.

  • _cc_send(): send data if the communication is open. Requires one positional argument msg and one keyword argument raw, used to serialize the data before sending it.

  • _cc_receive(): receive data if the communication is open. Requires one positional argument timeout and one keyword argument raw, used to deserialize the data when receiving it.

Class definition and instanciation

To create a new communication channel, the first step is to define its class and constructor.

Let’s admin that the following code is added to a file called my_connector.py:

import pykiso

MyCommunicationChannel(pykiso.CChannel):

    def __init__(self, arg1, arg2, kwarg1 = "default"):
        ...

Then, if this CChannel has to be used within a test, the test configuration file will derive from its location and constructor parameters:

connectors:
  my_chan:
    # provide the constructor parameters
    config:
      # arg1 and arg2 are mandatory as we defined them as positional arguments
      arg1: value for positional argument arg1
      arg2: value for positional argument arg2
      # kwarg1 is optional as we defined it as a keyword argument with a default value
      kwarg1: different value for keyword argument kwarg1
    # let pykiso know which class we want to instantiate with the provided parameters
    type: path/to/my_connector.py:MyCommunicationChannel

Note

If this file is located inside an installable package my_package, the type will become type: my_package.my_connector:MyCommunicationChannel.

Interface completion

If the code above is left as such, it won’t be usable as a connector as the communication channel’s abstract methods aren’t implemented.

Therefore, all four methods _cc_open, _cc_close, _cc_send and _cc_receive need to be implemented.

In order to complete the code above, let’s assume that a module my_connection_module implements the communication logic.

The connector then becomes:

from my_connection_module import Connection
import pykiso

MyCommunicationChannel(pykiso.CChannel):

    def __init__(self, arg1, arg2, kwarg1 = "default"):
        # Connection class could be anything, like serial.Serial or socket.socket
        self.my_connection = Connection(arg1, arg2)

    def _cc_open(self):
        self.my_connection.open()

    def _cc_close(self):
        self.my_connection.close()

    def _cc_send(self, data: Union[Data, bytes], raw = False):
        if raw:
            data_bytes = data
        else:
            data_bytes = data.serialize()
        self.my_connection.send(data_bytes)

    def _cc_receive(self, timeout, raw = False):
        received_data = self.my_connection.receive(timeout=timeout)
        if received_data:
            if not raw:
                data = Data.deserialize(received_data)
            return data

Note

The API used in this example for the fictive my_connection module entirely depends on the used module.

Flasher

pykiso provides a common interface for flashers Flasher that aims to be as simple as possible.

It only consists of 3 methods to implement:

  • open(): open the communication with the flashing hardware if any (for e.g. JTAG flashing) and perform any preliminaly action

  • flash(): perform all actions to flash the target device

  • close(): close the communication with the flashing hardware.

Note

To ensure that a Flasher is closed after being opened, it should be used as a context manager (see Auxiliary implementation example).