Source code for pykiso.lib.connectors.cc_usb

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel Via Usb
********************************

:module: cc_usb

:synopsis: Usb communication channel

.. currentmodule:: cc_usb


.. warning: Still under test
"""

import serial

from pykiso.lib.connectors import cc_uart


[docs]class CCUsb(cc_uart.CCUart): def __init__(self, serial_port): super().__init__(serial_port, baudrate=9600)
[docs] def _cc_send(self, msg, raw=False): if raw: raise NotImplementedError() raw_packet = msg.serialize() crc = self._calculate_crc32(raw_packet) raw_packet.insert(0, ((crc >> 8) & 0xFF)) raw_packet.insert(1, (crc & 0xFF)) self._send_using_slip(raw_packet)
def _send_using_slip(self, raw_packet): slip_raw_packet = [] slip_raw_packet.append(self.START) for a_byte in raw_packet: if a_byte == self.START: slip_raw_packet.append(self.ESC) slip_raw_packet.append(self.ESC_START) elif a_byte == self.ESC: slip_raw_packet.append(self.ESC) slip_raw_packet.append(self.ESC_ESC) else: slip_raw_packet.append(a_byte) self.serial.write(bytearray(slip_raw_packet)) self.serial.flushOutput() return #################### todo TO DELETE IF NOT NEEDED ANYMORE ####################### def CCwaitAfterReboot(self, retry_count, reboot_time): import time self.CCclose() for i in range(retry_count): try: time.sleep(reboot_time) self.CCopen() break except (serial.SerialException, serial.serialutil.SerialTimeoutException): self.logger.debug( "Unable to connect to USB port after reboot." + "Retrying after " + reboot_time + " seconds" )