Source code for pykiso.lib.auxiliaries.ykush_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Ykush Auxiliary
***************

:module: ykush_auxiliary

:synopsis: Auxiliary that can power on and off ports on an Ykush device.

.. currentmodule:: ykush_auxiliary

"""
import logging
from contextlib import contextmanager
from enum import IntEnum
from typing import Any, List, Optional, Tuple

import hid

from pykiso.auxiliary import AuxiliaryInterface

log = logging.getLogger(__name__)

# YKUSH device USB VID
YKUSH_USB_VID = 0x04D8
# YKUSH PIDs when in normal operation mode: YKUSH beta, YKUSH, YKUSH3, YKUSHXS
YKUSH_USB_PID_LIST = (0x0042, 0xF2F7, 0xF11B, 0xF0CD)

# YKUSH device USB comm declarations
YKUSH_USB_TIMEOUT = 1000  # timeout in ms
YKUSH_USB_PACKET_SIZE = 64
YKUSH_USB_PACKET_PAYLOAD_SIZE = 20

# YKUSH device protocol status declarations
YKUSH_PROTO_OK_STATUS = 1

# YKUSH port state meaning declarations
YKUSH_PORT_STATE_ON = 1
YKUSH_PORT_STATE_OFF = 0
YKUSH_PORT_STATE_ERROR = 255
YKUSH_PORT_STATE_DICT = {0: "OFF", 1: "ON", 255: "ERROR"}


[docs]class YkushError(Exception): """General Ykush specific exception used as basis for all others.""" pass
[docs]class YkushDeviceNotFound(YkushError): """Raised when no Ykush device is found.""" pass
[docs]class YkushStatePortNotRetrieved(YkushError): """Raised when the state of a port can't be retrieved.""" pass
[docs]class YkushSetStateError(YkushError): """Raised when a port couldn't be switched on or off.""" pass
[docs]class YkushPortNumberError(YkushError): """Raised when the port number doesn't exist.""" pass
[docs]class PortState(IntEnum): OFF = 0 ON = 1
[docs]class YkushAuxiliary(AuxiliaryInterface): """Auxiliary used to power on and off the ports.""" def __init__(self, serial_number: str = None, **kwargs): """Initialize attribute :param serial: Serial number of the device to connect, if he is not defined then it will connect to the first Ykush device it find, defaults to None. :raises YkushDeviceNotFound: If no device is found or the serial number is not the serial of one device. """ super().__init__( is_proxy_capable=False, tx_task_on=False, rx_task_on=False, connector_required=False, **kwargs, ) self._ykush_device = None self._product_id = None self._path = None self.connect_device(serial_number) self.number_of_port = self.get_number_of_port() def _create_auxiliary_instance(self) -> bool: """Power on all port at the start. :return: True if succesful """ log.internal_info("Create auxiliary instance") self.set_all_ports_on() return True def _delete_auxiliary_instance(self) -> bool: """Power on all port to restore the ports. :return: always True """ self.set_all_ports_on() log.internal_info("Auxiliary instance deleted") return True
[docs] def connect_device(self, serial: int = None): """Find an Ykush device, will automatically connect to the first one it find, if you have multiple connected you have to precise the serial number of the device. :param serial: serial number of the device, defaults to None :raises YkushDeviceNotFound: if no ykush device is found """ list_ykush_device = [] self._path = None # try to locate a device for device in hid.enumerate(0, 0): if ( device["vendor_id"] == YKUSH_USB_VID and device["product_id"] in YKUSH_USB_PID_LIST ): list_ykush_device.append(device["serial_number"]) if serial is None or serial == device["serial_number"]: self._product_id = device["product_id"] self._path = device["path"] if self._path is not None: self._ykush_device = hid.device() self._ykush_device.open_path(self._path) else: if list_ykush_device == []: raise YkushDeviceNotFound( "Could not connect to a ykush hub, no device was found." ) else: raise YkushDeviceNotFound( f"The serial numbers available are : {list_ykush_device}\n" f"No device was found with the serial number {serial}\n" if serial else "", )
@contextmanager def _open_and_close_device(self): """Context manager to open and close device every time we send a message otherwise we will get an empty message every time in response. """ self._ykush_device = hid.device() self._ykush_device.open_path(self._path) try: yield finally: self._ykush_device.close()
[docs] def check_port_number(self, port_number: int): """Check if the port indicated is a port of the device :raises YkushPortNumberError: Raise error if no port has this number """ if port_number not in range(1, self.number_of_port + 1): raise YkushPortNumberError( f"The port number {port_number} is not valid for the device," f" it has only {self.number_of_port} ports" )
[docs] @staticmethod def get_str_state(state: int) -> str: """Return the str of a state :param state: 1 (port on), 0 (port off) :return: ON, OFF """ return YKUSH_PORT_STATE_DICT[state]
[docs] def get_serial_number_string(self) -> str: """Returns the device serial number string""" with self._open_and_close_device(): return self._ykush_device.get_serial_number_string()
[docs] def get_number_of_port(self) -> int: """Returns the number of port on the ykush device""" # original YKUSH 1,3 port count count = 3 # YKUSHXS Make it have 1 port if self._product_id == 0xF0CD: count = 1 return count
[docs] def get_port_state(self, port_number: int) -> PortState: """Returns a specific port state. :raises YkushStatePortNotRetrieved: If the state couldn't be retrieved :return: 0 if the port is off, 1 if the port is on """ self.check_port_number(port_number) return self.get_all_ports_state()[port_number - 1]
[docs] def get_all_ports_state(self) -> List[PortState]: """Returns the state of all the ports. :raises YkushStatePortNotRetrieved: The states couldn't be retrieved :return: list with 0 if a port is off, 1 if on """ if self.get_firmware_version()[0] >= 1: recvbytes = self._raw_sendreceive([0x2A])[: self.number_of_port + 1] if recvbytes[0] == YKUSH_PROTO_OK_STATUS: return [ PortState.ON if p > 0x10 else PortState.OFF for p in recvbytes[1:] ] else: raise YkushStatePortNotRetrieved( "The states of the ports couldn't be retrieved" ) else: # firmware glitch workaround list_state = [] for port_number in range(1, self.number_of_port + 1): status, port_state = self._raw_sendreceive([0x20 | port_number])[:2] if status == YKUSH_PROTO_OK_STATUS: list_state.append( PortState.ON if port_state > 0x10 else PortState.OFF ) else: raise YkushStatePortNotRetrieved( f"The state of the port {port_number} couldn't be retrieved" ) return list_state
[docs] def get_firmware_version(self) -> Tuple[int, int]: """Returns a tuple with YKUSH firmware version in format (major, minor).""" status, major, minor = self._raw_sendreceive([0xF0])[:3] if status == YKUSH_PROTO_OK_STATUS: self._firmware_major_version, self._firmware_minor_version = ( major, minor, ) else: # early devices will not recognize it, figure it out from serial self._firmware_major_version = 1 self._firmware_minor_version = ( 2 if "YK2" in self.get_serial_number_string() else 255 if "YKD2" in self.get_serial_number_string() else 0 ) return self._firmware_major_version, self._firmware_minor_version
[docs] def set_port_state(self, port_number: int, state: int): """Set a specific port On or Off. :raises YkushSetStateError: if the operation had an error :param state: 1 (port on), 0 (port off) """ str_state = self.get_str_state(state) self.check_port_number(port_number) self._raw_sendreceive( [(0x10 if state == YKUSH_PORT_STATE_ON else 0x0) | port_number] ) try: state_port = self.get_port_state(port_number) except YkushStatePortNotRetrieved as e: raise YkushSetStateError( f"The state of the action to power {str_state}" "couldn't be confirmed because the state of the port can't be retrieved" ) from e if state_port != PortState[str_state]: raise YkushSetStateError( f"There was an error trying to power {str_state.lower()} the port," f"the port {port_number} is {state_port}" )
[docs] def set_port_on(self, port_number: int): """Power on a specific port. :raises YkushSetStateError: if the operation had an error """ self.set_port_state(port_number, state=YKUSH_PORT_STATE_ON)
[docs] def set_port_off(self, port_number: int): """Power off a specific port. :raises YkushSetStateError: if the operation had an error """ self.set_port_state(port_number, state=YKUSH_PORT_STATE_OFF)
[docs] def set_all_ports(self, state: int): """Power off or on all YKUSH ports. :param state: state wanted 1 for On, 0 for Off :raises YkushSetStateError: if the operation had an error """ str_state = self.get_str_state(state) self._raw_sendreceive([(0x1A if state == YKUSH_PORT_STATE_ON else 0x0A)]) try: states_port = self.get_all_ports_state() except YkushStatePortNotRetrieved as e: raise YkushSetStateError( f"The state of the action to power {str_state}" "couldn't be confirmed because the state of the ports can't be retrieved" ) from e if not (PortState[str_state] == states_port[0] and len(set(states_port)) == 1): raise YkushSetStateError( "There was an error during the power " f"{str_state.lower()}," f"the ports have the following states : {states_port}" )
[docs] def set_all_ports_on(self): """Power on all YKUSH downstreams ports. :raises YkushSetStateError: if the operation had an error """ self.set_all_ports(state=YKUSH_PORT_STATE_ON)
[docs] def set_all_ports_off(self): """Power off all YKUSH downstreams ports. :raises YkushSetStateError: if the operation had an error """ self.set_all_ports(state=YKUSH_PORT_STATE_OFF)
[docs] def is_port_on(self, port_number: int) -> bool: """Check if a port is on. :return: True if a port is on, else False """ return bool(self.get_port_state(port_number))
[docs] def is_port_off(self, port_number: int) -> bool: """Check if a port is off. :return: True if a port is off, else False """ return not bool(self.get_port_state(port_number))
def _raw_sendreceive(self, packetarray) -> List[int]: """Send a message to the device and get the returned message. :param packetarray: packet to send to do an operation :return: response to the message send """ with self._open_and_close_device(): packetarray = packetarray * 2 + [0x00] * ( YKUSH_USB_PACKET_SIZE - len(packetarray) ) self._ykush_device.write(packetarray) recvpacket = self._ykush_device.read( max_length=YKUSH_USB_PACKET_SIZE + 1, timeout_ms=YKUSH_USB_TIMEOUT ) # if not None return the bytes we actually need if recvpacket is None or len(recvpacket) < YKUSH_USB_PACKET_PAYLOAD_SIZE: return [0xFF] * YKUSH_USB_PACKET_PAYLOAD_SIZE return recvpacket[:YKUSH_USB_PACKET_PAYLOAD_SIZE] def _run_command(self, cmd_message: Any, cmd_data: Optional[bytes]) -> None: """Not used. Simply respect the interface. """ def _receive_message(self, timeout_in_s: float) -> None: """Not used. Simply respect the interface. """