Source code for pykiso.lib.connectors.cc_udp_server

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel via UDP server
************************************

:module: cc_udp_server

:synopsis: basic UDP server

.. currentmodule:: cc_udp_server

.. warning:: if multiple clients are connected to this server,
    ensure that each client receives all necessary responses before
    receiving messages again. Otherwise the responses may be
    sent to the wrong client

"""
import logging
import socket
from typing import Union

from pykiso import Message, connector

log = logging.getLogger(__name__)


[docs]class CCUdpServer(connector.CChannel): """Connector channel used to set up an UDP server.""" def __init__(self, dest_ip: str, dest_port: int, **kwargs): """Initialize attributes. :param dest_ip: destination port :param dest_port: destination port """ super().__init__(**kwargs) self.dest_ip = dest_ip self.dest_port = dest_port self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.address = None self.max_msg_size = 256 # Set a timeout to send the signal to the GIL to change thread. # In case of a multi-threading system, all tasks will be called one after the other. self.timeout = 1e-6
[docs] def _cc_open(self) -> None: """Bind UDP socket with configured port and IP address.""" log.info(f"UDP socket open at address: {self.address}") self.udp_socket.bind((self.dest_ip, self.dest_port))
[docs] def _cc_close(self) -> None: """Close UDP socket.""" log.info(f"UDP socket closed at address: {self.address}") self.udp_socket.close()
[docs] def _cc_send(self, msg: bytes or Message, raw: bool = False) -> None: """Send back a UDP message to the previous sender. :param msg: message instance to serialize into bytes """ if not raw: msg = msg.serialize() log.debug(f"UDP server send: {msg} at {self.address}") self.udp_socket.sendto(msg, self.address)
[docs] def _cc_receive( self, timeout=0.0000001, raw: bool = False ) -> Union[Message, bytes, None]: """Read message from UDP socket. :param timeout: timeout applied on receive event :param raw: should the message be returned raw or should it be interpreted as a pykiso.Message? :return: Message if successful, otherwise none """ self.udp_socket.settimeout(timeout or self.timeout) try: msg_received, self.address = self.udp_socket.recvfrom(self.max_msg_size) if not raw: msg_received = Message.parse_packet(msg_received) log.debug(f"UDP server receives: {msg_received} at {self.address}") # catch the errors linked to the socket timeout without blocking except BlockingIOError: log.debug(f"encountered error while receiving message via {self}") return None except socket.timeout: log.debug(f"encountered error while receiving message via {self}") return None except BaseException: log.exception(f"encountered error while receiving message via {self}") return None return msg_received