Source code for pykiso.lib.auxiliaries.instrument_control_auxiliary.instrument_control_auxiliary

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
InstrumentControl Auxiliary
***************************

:module: instrument_control_auxiliary

:synopsis: Auxiliary used to communicate via a VISA connector using the
    SCPI protocol.

.. currentmodule:: instrument_control_auxiliary

"""
import logging
import queue
import re
import time
from typing import Any, List, Optional, Tuple, Union

from pykiso import CChannel
from pykiso.auxiliary import AuxiliaryInterface, close_connector

from .lib_scpi_commands import LibSCPI

log = logging.getLogger(__name__)


[docs]class InstrumentControlAuxiliary(AuxiliaryInterface): """Auxiliary used to communicate via a VISA connector using the SCPI protocol. """ def __init__( self, com: CChannel, instrument="", write_termination="\n", output_channel: int = None, **kwargs, ): """Constructor. :param com: VISAChannel that supports VISA communication :param instrument: name of the instrument currently in use (will be used to adapt the SCPI commands) :param write_termination: write termination character :param output_channel: output channel to use on the instrument currently in use (if more than one) """ super().__init__( is_proxy_capable=False, tx_task_on=False, rx_task_on=False, **kwargs ) self.channel = com self.instrument = instrument self.write_termination = write_termination self.output_channel = output_channel self.helpers = LibSCPI(self, self.instrument)
[docs] def write( self, write_command: str, validation: Tuple[str, Union[str, List[str]]] = None ) -> str: """Send a write request to the instrument. :param write_command: command to send :param validation: contain validation criteria apply on the response """ log.internal_debug(f"Sending a write request in {self} for {write_command}") return self.handle_write(write_command, validation)
[docs] def handle_write( self, write_command: str, validation: Tuple[str, Union[str, List[str]]] = None ) -> str: """Send a write request to the instrument and then returns if the value was successfully written. A query is sent immediately after the writing and the answer is compared to the expected one. :param write_command: write command to send :param validation: tuple of the form (validation command (str), expected output (str or list of str)) :return: status message depending on the command validation: SUCCESS, FAILURE or NO_VALIDATION """ log.internal_debug(f"Sending a write request in {self} for {write_command}") # Send the message with the termination character self.channel.cc_send(msg=write_command + self.write_termination) if validation is not None: # Check that the writing request was successfully performed on the instrument validation_query, validation_expected_output = validation if isinstance(validation_expected_output, str): validation_expected_output = [validation_expected_output] # Send the validation query with a delay between the previous write and this query request # to make sure that the instrument has had sufficient time to complete the operation time.sleep(0.1) validation_query_response = self.handle_query(validation_query) # Check that the responses matches the expected response if ( isinstance(validation_query_response, str) and validation_query_response != "" ): # Check the tag VALUE{} in the expected output and adapt the validation process match = re.compile(r"VALUE{([+-]?\d+[\.\d]*)}").findall( validation_expected_output[0] ) if match: # tag "VALUE{value}" detected. This tag can be used when we want to ensure that # a numerical parameter was correctly set on the instrument if float(match[0]) == float(validation_query_response.split()[0]): write_success = True else: write_success = False else: # compare the expected output with the result of the validation query command if validation_query_response in validation_expected_output: write_success = True else: write_success = False if write_success is True: log.internal_debug( f"Write request {write_command} successful after verification" ) return "SUCCESS" else: log.internal_warning( f"Write request {write_command} failed! Validation query response was different than expected." ) return "FAILURE" else: log.internal_warning( f"Write request {write_command} failed! No response received for the validation query." ) return "FAILURE" else: log.internal_debug( f"Write request {write_command} processed without validation" ) return "NO_VALIDATION"
[docs] def read(self) -> Union[str, bool]: """Send a read request to the instrument. :return: received response from instrument otherwise empty string """ log.internal_debug(f"Sending a read request in {self}") return self.handle_read()
[docs] def handle_read(self) -> Optional[str]: """Handle read command by calling associated connector cc_receive. :return: received response from instrument otherwise empty string """ response = self.channel.cc_receive() response_data = response.get("msg") if isinstance(response_data, bytes): response_data = response_data.decode().strip() return response_data
[docs] def query(self, query_command: str) -> Union[bytes, str]: """Send a query request to the instrument. Uses the 'query' method of the channel if available, uses 'cc_send' and 'cc_receive' otherwise. :param query_command: query command to send :return: Response message, None if the request expired with a timeout. """ log.internal_debug(f"Sending a query request in {self}) for {query_command}") return self.handle_query(query_command)
[docs] def handle_query(self, query_command: str) -> Optional[str]: """Send a query request to the instrument. Uses the 'query' method of the channel if available, uses 'cc_send' and 'cc_receive' otherwise. :param query_command: query command to send :return: Response message, None if the request expired with a timeout. """ if hasattr(self.channel, "query"): response = self.channel.query(query_command + self.write_termination) return response.get("msg") else: self.channel.cc_send(msg=query_command + self.write_termination) time.sleep(0.05) response = self.channel.cc_receive() response_data = response.get("msg") if isinstance(response_data, bytes): response_data = response_data.decode().strip() return response_data
def _create_auxiliary_instance(self) -> bool: """Open the connector. :return: always True """ log.internal_info("Create auxiliary instance") try: # Open instrument log.internal_info("Open instrument") self.channel.open() # Enable remote control log.internal_info("Enable remote control") command, validation = self.helpers.get_command( cmd_tag="REMOTE_CONTROL", cmd_type="write", cmd_validation=["REMOTE", "ON", "1"], ) self.handle_write(f"{command} ON".strip(), validation) # Select channel if needed if self.output_channel is not None: log.internal_info(f"Select channel {self.output_channel}") command, validation = self.helpers.get_command( cmd_tag="OUTPUT_CHANNEL", cmd_type="write", cmd_validation=f"{self.output_channel}", ) self.handle_write( f"{command} {self.output_channel}".strip(), validation ) else: log.internal_info("Using default output channel.") except Exception: log.exception("Unable to safely open the instrument.") return False return True @close_connector def _delete_auxiliary_instance(self) -> bool: """Close the connector communication. :return: True if the connectors is closed otherwise False """ log.internal_info("Auxiliary instance deleted") return True def _run_command(self, cmd_message: Any, cmd_data: Optional[bytes]) -> None: """Not used. Simply respect the interface. """ def _receive_message(self, timeout_in_s: float) -> None: """Not used. Simply respect the interface. """