Source code for pykiso.lib.connectors.cc_serial

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel Via Serial
********************************

:module: cc_serial

:synopsis: Serial communication channel

.. currentmodule:: cc_serial

"""


from enum import Enum, IntEnum
from typing import ByteString, Dict, Optional, Union

import serial

from pykiso import Message, connector

MessageType = Union[Message, bytes]


[docs]class ByteSize(IntEnum): FIVE_BITS = serial.FIVEBITS SIX_BITS = serial.SIXBITS SEVEN_BITS = serial.SEVENBITS EIGHT_BITS = serial.EIGHTBITS
[docs]class Parity(str, Enum): PARITY_NONE = serial.PARITY_NONE PARITY_EVEN = serial.PARITY_EVEN PARITY_ODD = serial.PARITY_ODD PARITY_MARK = serial.PARITY_MARK PARITY_SPACE = serial.PARITY_SPACE
[docs]class StopBits(IntEnum): STOPBITS_ONE = serial.STOPBITS_ONE STOPBITS_ONE_POINT_FIVE = serial.STOPBITS_ONE_POINT_FIVE STOPBITS_TWO = serial.STOPBITS_TWO
[docs]class CCSerial(connector.CChannel): """Connector for serial devices""" def __init__( self, port: str, baudrate: int = 9600, bytesize: ByteSize = ByteSize.EIGHT_BITS, parity: Parity = Parity.PARITY_NONE, stopbits: StopBits = StopBits.STOPBITS_ONE, timeout: Optional[float] = None, xonxoff: bool = False, rtscts: bool = False, write_timeout: Optional[float] = None, dsrdtr: bool = False, inter_byte_timeout: Optional[float] = None, exclusive: Optional[bool] = None, **kwargs ): """Init Serial settings :param port: Device name (e.g. "COM1" for Windows or "/dev/ttyACM0" for Linux) :param baudrate: Baud rate such as 9600 or 115200 etc, defaults to 9600 :param bytesize: Number of data bits. Possible values: FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS, defaults to EIGHTBITS :param parity: Enable parity checking. Possible values: PARITY_NONE, PARITY_EVEN, PARITY_ODD PARITY_MARK, PARITY_SPACE, defaults to PARITY_NONE :param stopbits: Number of stop bits. Possible values: STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO, defaults to STOPBITS_ONE :param timeout: Set a read timeout value in seconds, defaults to None :param xonxoff: Enable software flow contro, defaults to False :param rtscts: Enable hardware (RTS/CTS) flow control, defaults to False :param write_timeout: Set a write timeout value in seconds, defaults to None :param dsrdtr: Enable hardware (DSR/DTR) flow control, defaults to False :param inter_byte_timeout: Inter-character timeout, None to disable, defaults to None :param exclusive: Set exclusive access mode (POSIX only). A port cannot be opened in exclusive access mode if it is already open in exclusive access mode., defaults to None """ # Initialize the super class super().__init__(**kwargs) # Initialize the serial connection. Port None prevents automatic opening self.serial = serial.Serial( port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=timeout, xonxoff=xonxoff, rtscts=rtscts, write_timeout=write_timeout, dsrdtr=dsrdtr, inter_byte_timeout=inter_byte_timeout, exclusive=exclusive, ) self.current_write_timeout = write_timeout self.serial.port = port
[docs] def _cc_open(self) -> None: """Open serial port""" self.serial.open()
[docs] def _cc_close(self) -> None: """Close serial port""" self.serial.close()
[docs] def _cc_send( self, msg: ByteString, raw: bool = True, timeout: float = None ) -> None: """Sends data to the serial port :param msg: data to send :param raw: unused :param timeout: write timeout in seconds. None sets it to blocking, defaults to None :raises: SerialTimeoutException - In case a write timeout is configured for the port and the time is exceeded. """ if timeout != self.current_write_timeout: self.current_write_timeout = timeout self.serial.write_timeout = timeout self.serial.write(msg)
[docs] def _cc_receive(self, timeout=0.00001, raw: bool = True) -> Dict[str, bytes]: """Read bytes from the serial port. Try to read one byte in blocking mode. After blocking read check remaining bytes and read them without a blocking call. :param timeout: timeout in seconds, 0 for non blocking read, defaults to 0.00001 :param raw: raw mode only, defaults to True :raises NotImplementedError: if raw is to True :return: received bytes """ if not raw: raise NotImplementedError() self.serial.timeout = timeout received = self.serial.read() # We have waited already so set the timeout to non blocking self.serial.timeout = 0 # Check how many bytes are still left and read them at once in_waiting = self.serial.in_waiting if in_waiting > 0: received += self.serial.read(in_waiting) return {"msg": received}