##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Communication Channel Via Uart
********************************
:module: cc_uart
:synopsis: Uart communication channel
.. currentmodule:: cc_uart
"""
import struct
import time
import serial
from pykiso import connector, message
[docs]class IncompleteCCMsgError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs]class CCUart(connector.CChannel):
"""UART implementation of the coordination channel."""
headerSize = 8
payloadLengthIndex = 7
WAITING_FOR_START = 0
RECEIVING_HEADER = 1
RECEIVING_PAYLOAD = 2
RECEIVED_DONE = 3
START = 0xC0
ESC = 0xDB
ESC_START = 0xDC
ESC_ESC = 0xDD
def __init__(self, serialPort, baudrate=9600, **kwargs):
# Initialize the super class
super().__init__(**kwargs)
# Initialize the serial connection
self.serial = serial.Serial(timeout=1)
self.serial.port = serialPort
self.serial.baudrate = baudrate
self.serial.paritiy = serial.PARITY_NONE
# Set a timeout to send the signal to the GIL to change thread.
# In case of a multi-threading system, all tasks will be called one after the other.
self.timeout = 1e-6
[docs] def _cc_open(self):
self.serial.open()
[docs] def _cc_close(self):
self.serial.close()
[docs] def _cc_send(self, msg):
rawPacket = msg.serialize()
# Use CRC to verify content
crc = self._calculate_crc32(rawPacket)
rawPacket = struct.pack(">H", crc) + rawPacket # Force big endian notation
self._send_using_slip(rawPacket)
[docs] def _cc_receive(self, timeout=0.00001, raw=False):
if raw:
raise NotImplementedError()
self.serial.timeout = timeout or self.timeout
receivingState = self.WAITING_FOR_START
bytesToRead = 10
rawPacket = []
while bytesToRead > 0:
singleByteRead = self.serial.read(1)
if 0 == len(singleByteRead):
return None
if self.WAITING_FOR_START == receivingState:
if self.START == singleByteRead[0]:
receivingState = self.RECEIVING_HEADER
else:
if self.START == singleByteRead[0]:
bytesToRead = 10
rawPacket = []
receivingState = self.RECEIVING_HEADER
# TODO: dei9bue something went wrong
else:
bytesToRead -= 1
if self.ESC == singleByteRead[0]:
singleByteRead = self.serial.read(1)
if self.ESC_START == singleByteRead[0]:
rawPacket.append(self.START)
elif self.ESC_ESC == singleByteRead[0]:
rawPacket.append(self.ESC)
else:
rawPacket.append(singleByteRead[0])
if 0 == bytesToRead:
if self.RECEIVING_HEADER == receivingState:
bytesToRead = singleByteRead[0]
if bytesToRead:
receivingState = self.RECEIVING_PAYLOAD
else:
receivingState = self.RECEIVED_DONE
elif self.RECEIVING_PAYLOAD == receivingState:
receivingState = self.RECEIVED_DONE
if receivingState == self.RECEIVED_DONE:
expectedCRC = ((rawPacket[0] & 0xFF) << 8) + (rawPacket[1] & 0xFF)
rawPacket = rawPacket[2 : len(rawPacket)]
calculatedCRC = self._calculate_crc32(rawPacket)
if calculatedCRC != expectedCRC:
return None
# Reception was a success, we need now to convert the list into a array of bytes
binary_message = b"".join(struct.pack("B", value) for value in rawPacket)
return message.Message.parse_packet(binary_message)
def _send_using_slip(self, rawPacket):
self.serial.write(self.START.to_bytes(1, byteorder="big"))
for aByte in rawPacket:
if aByte == self.START:
self.serial.write(self.ESC.to_bytes(1, byteorder="big"))
self.serial.write(self.ESC_START.to_bytes(1, byteorder="big"))
elif aByte == self.ESC:
self.serial.write(self.ESC.to_bytes(1, byteorder="big"))
self.serial.write(self.ESC_ESC.to_bytes(1, byteorder="big"))
else:
self.serial.write(aByte.to_bytes(1, byteorder="big"))
time.sleep(0.01)
self.serial.flushOutput()
return
def _calculate_crc32(self, buffer):
crc = 0
CRC_BIT_SHIFT_4 = 4
CRC_BIT_SHIFT_5 = 5
CRC_BIT_SHIFT_8 = 8
CRC_BIT_SHIFT_12 = 12
CRC_BYTE_MASK = 0xFF
for aByte in buffer:
crc = ((crc >> (CRC_BIT_SHIFT_8)) | (crc << (CRC_BIT_SHIFT_8))) & 0xFFFF
crc = (crc ^ (aByte & CRC_BYTE_MASK)) & 0xFFFF
crc = (crc ^ (crc & (CRC_BYTE_MASK)) >> (CRC_BIT_SHIFT_4)) & 0xFFFF
crc = (crc ^ ((crc << (CRC_BIT_SHIFT_12)) & 0xFFFF)) & 0xFFFF
crc = (crc ^ (crc & (CRC_BYTE_MASK)) << (CRC_BIT_SHIFT_5)) & 0xFFFF
return crc & 0xFFFF