##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
Acroname Control Auxiliary
**************************
:module: acroname_auxiliary
:synopsis: Auxiliary used to control acroname usb hubs.
.. currentmodule:: acroname_auxiliary
"""
import logging
from typing import Any, Optional
import brainstem
from brainstem.result import Result
from pykiso.auxiliary import AuxiliaryInterface
from pykiso.types import MsgType
log = logging.getLogger(__name__)
ERROR_MESSAGES = {
0: "No Error occurred.",
1: "Memory allocation/de-allocation error.",
2: "Invalid parameters given.",
3: "Entity, module or information not found.",
4: "File name is to long.",
5: "Module or resource is currently busy.",
6: "An Input/Output error occurred.",
7: "Invalid Mode or mode not accessible for current state.",
8: "Write error occurred.",
9: "Read error occurred.",
10: "Unexpected end of file encountered.",
11: "Resource not ready.",
12: "Insufficient permissions.",
13: "Request is outside of valid range.",
14: "Size is incorrect for resource.",
15: "Buffer was overrun or will be.",
16: "Unable to parse command.",
17: "Configuration is invalid.",
18: "Timeout occurred.",
19: "Could not initialize resource.",
20: "Version mismatch",
21: "Functionality unavailable or unimplemented.",
22: "Duplicate request received.",
23: "Request was canceled",
24: "packet was invalid or had invalid contents.",
25: "connection is no longer valid, or was closed.",
26: "Requested entity does not exist.",
27: "Command to short, not enough data to parse.",
28: "Entity is not available, or does not exist.",
29: "Option for given entity is invalid.",
30: "Error allocating or acquiring a resource.",
31: "Media not found or not available.",
32: "Unknown error encountered.",
}
[docs]class AcronameAuxiliary(AuxiliaryInterface):
"""Auxiliary used to control acroname usb hubs"""
MICROVOLT_TO_UNIT = {"uV": 1, "mV": 1e-3, "V": 1e-6}
MICROAMP_TO_UNIT = {"uA": 1, "mA": 1e-3, "A": 1e-6}
def __init__(self, serial_number: str = None, **kwargs):
"""Constructor
:param serial_number: serial number to connect to as hex string. Example "0x66F4859B"
"""
super().__init__(
is_proxy_capable=False,
tx_task_on=False,
rx_task_on=False,
connector_required=False,
**kwargs,
)
self.serial_number = (
int(serial_number, 16) if isinstance(serial_number, str) else serial_number
)
self.stem = brainstem.stem.USBHub2x4()
def _create_auxiliary_instance(self) -> bool:
"""Open the connector. Stop auxiliary if not possible.
:return: True if successful
"""
log.internal_info("Create auxiliary instance")
result = self.stem.discoverAndConnect(
brainstem.link.Spec.USB, self.serial_number
)
if result == (Result.NO_ERROR):
result = self.stem.system.getSerialNumber()
log.internal_info(
"Connected to USBStem with serial number: 0x%08X" % result.value
)
else:
log.error("Could not connect to usb hub acroname")
self.eval_result(result)
return False
return True
def _delete_auxiliary_instance(self) -> bool:
"""Close the connector.
:return: always True
"""
log.internal_info("Delete auxiliary instance")
try:
self.stem.disconnect()
except Exception:
log.exception("Unable to close usb hub acroname.")
return True
[docs] @staticmethod
def eval_result(result: Result) -> None:
"""Log error message if exist from acroname Result object.
:param result: result code to evaluate
"""
if result != (Result.NO_ERROR):
log.error(f"Error occurred: {ERROR_MESSAGES[result]}")
[docs] def set_port_enable(self, port: int) -> int:
"""Enable power and data lines for a USB port.
:param port: the USB port number
:return: brainstem error code. 0 if no error.
"""
result = self.stem.usb.setPortEnable(port)
self.eval_result(result)
return result
[docs] def set_port_disable(self, port: int) -> int:
"""Disable power and data lines for a USB port.
:param port: the USB port number
:return: brainstem error code. 0 if no error.
"""
result = self.stem.usb.setPortDisable(port)
self.eval_result(result)
return result
[docs] def get_port_current(self, port: int, unit: str = "A") -> Optional[float]:
"""Get the current through the power line for selected usb port.
:param port: the USB port number
:param unit: unit of the result in "uA", "mA" or "A". Default "A"
:return: port current for given unit. None if unit is not supported.
"""
port_current_ua = self.stem.usb.getPortCurrent(port)
self.eval_result(port_current_ua.error)
try:
port_current = port_current_ua.value * self.MICROAMP_TO_UNIT[unit]
except KeyError:
log.error(
f"Unit '{unit}' is not supported. Current value will be set to None."
)
return None
return port_current
[docs] def get_port_voltage(self, port: int, unit: str = "V") -> Optional[float]:
"""Get the voltage of the selected usb port.
:param port: the USB port number
:param unit: unit of the result in "uV", "mV" or "V". Default "V"
:return: port voltage for given unit. None if unit is not supported.
"""
port_voltage_uv = self.stem.usb.getPortVoltage(port)
self.eval_result(port_voltage_uv.error)
try:
port_voltage = port_voltage_uv.value * self.MICROVOLT_TO_UNIT[unit]
except KeyError:
log.error(
f"Unit '{unit}' is not supported. Voltage value will be set to None."
)
return None
return port_voltage
[docs] def get_port_current_limit(self, port: int, unit: str = "A") -> Optional[float]:
"""Get the current limit for the port.
:param port: the USB port number
:param unit: unit of the result in "uA", "mA" or "A". Default "A"
:return: port current limit for given unit. None if unit is not supported.
"""
result = self.stem.usb.getPortCurrentLimit(port)
self.eval_result(result.error)
try:
port_current_limit = result.value * self.MICROAMP_TO_UNIT[unit]
except KeyError:
log.error(
f"Unit '{unit}' is not supported. Port current value will be set to None."
)
return None
return port_current_limit
[docs] def set_port_current_limit(self, port: int, amps: float, unit: str = "A") -> int:
"""Set the current limit for the port. If the set limit is not achievable,
devices will round down to the nearest available current limit setting.
:param port: the USB port number
:param amps: value for port current to set in "uA", "mA" or "A". Default "A"
:param unit: unit for the value to set. Default Ampere
:return: brainstem error code. 0 if no error.
"""
try:
micro_volt_value = int(amps / self.MICROAMP_TO_UNIT[unit])
except KeyError:
log.error(f"Unit '{unit}' is not supported. Port value won't be changed")
return
result = self.stem.usb.setPortCurrentLimit(port, micro_volt_value)
self.eval_result(result)
return result
def _run_command(self, cmd_message: Any, cmd_data: Optional[bytes]) -> None:
"""Not used.
Simply respect the interface.
"""
def _receive_message(self, timeout_in_s: float) -> None:
"""Not used.
Simply respect the interface.
"""