Source code for pykiso.lib.connectors.cc_visa

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Communication Channel using VISA protocol
*****************************************

:module: cc_visa

:synopsis: VISA communication channel to communicate to instruments using SCPI protocol.

.. currentmodule:: cc_visa

"""

import abc
import logging

import pyvisa

from pykiso import CChannel
from pykiso.types import MsgType

log = logging.getLogger(__name__)


[docs]class VISAChannel(CChannel): """VISA Interface for devices communicating with SCPI""" def __init__(self, **kwargs): """Initialize channel settings.""" # Use PyVISA Resource Manager with PyVISA-py backend self.ResourceManager = pyvisa.ResourceManager("@py") # Instantiate message-based resource: self.resource_name = "messagebased_resource" self.resource = pyvisa.resources.messagebased.MessageBasedResource( self.ResourceManager, self.resource_name ) super().__init__(**kwargs)
[docs] @abc.abstractmethod def _cc_open(self) -> None: """Open an instrument""" pass
[docs] def _cc_close(self) -> None: """Close a resource""" log.info(f"Close VISA resource: {self.resource_name}") self.resource.close()
[docs] def _process_request(self, request: str, request_data: str = "") -> str: """Send a SCPI request. :param request: command request to the instrument (write, read or query) :param request_data: command payload (for write and query requests only) :return: response message from the instrument (read and query requests) or an empty string for write requests and if read or query request failed. """ request_response = "" try: if request == "read": log.debug(f"Reading {self.resource_name} ") request_response = self.resource.read().strip() elif request == "query": log.debug(f"Querying {request_data} to {self.resource_name}") request_response = self.resource.query(request_data).strip() else: log.warning(f"Unknown request '{request}'!") except pyvisa.errors.InvalidSession: log.exception( f"Request {request}:{request_data} failed! Invalid session (resource might be closed)." ) except pyvisa.errors.VisaIOError: log.exception( f"Request {request}:{request_data} failed! Timeout expired before operation completed." ) except Exception as e: log.exception(f"Request {request}:{request_data} failed!\n{e}") else: log.debug(f"Response received: {request_response}") finally: return str(request_response)
[docs] def _cc_send(self, msg: MsgType, raw: bool = False) -> None: """Send a write request to the instrument :param msg: message to send :param raw: is the message in a raw format (True) or is it a string (False)? """ if raw: msg = msg.decode() log.debug(f"Writing {msg} to {self.resource_name}") self.resource.write(msg)
[docs] def _cc_receive(self, timeout: float = 0.1, raw: bool = False) -> str: """Send a read request to the instrument :param timeout: time in second to wait for reading a message :param raw: should the message be returned raw or should it be interpreted as a pykiso.Message? :return: the received response message, or an empty string if the request expired with a timeout. """ if raw: return self._process_request("read").encode() else: return self._process_request("read")
[docs] def query(self, query_command: str) -> str: """Send a query request to the instrument :param query_command: query command to send :return: Response message, None if the request expired with a timeout. """ return self._process_request("query", query_command)
[docs]class VISASerial(VISAChannel): """Connector used to communicate with an instrument via Serial.""" def __init__(self, serial_port: int, baud_rate=9600, **kwargs): """Initialize channel attributes. :param serial_port: COM port to use to connect to the instrument :param baud_rate: baud rate used to communicate with the instrument """ # Use PyVISA Resource Manager with PyVISA-py backend super().__init__(**kwargs) # Redefining resource name and type for serial communication self.resource_name = f"ASRL{serial_port}::INSTR" self.baud_rate = baud_rate self.resource = pyvisa.resources.serial.SerialInstrument( self.ResourceManager, self.resource_name )
[docs] def _cc_open(self) -> None: """Open an instrument via serial""" log.info(f"Open VISA resource: {self.resource_name}") # check if the resource is available (for Serial only) if self.resource_name not in self.ResourceManager.list_resources(): raise ConnectionRefusedError( f"The resource named {self.resource_name} is unavailable and cannot be opened." ) elif self.resource_name in [ res.resource_name for res in self.ResourceManager.list_opened_resources() ]: raise ConnectionRefusedError( f"The resource named {self.resource_name} is opened and cannot be opened again." ) else: self.resource = self.ResourceManager.open_resource( self.resource_name, baud_rate=self.baud_rate )
[docs]class VISATcpip(VISAChannel): """Connector used to communicate with an instrument via TCPIP""" def __init__(self, ip_address: str, protocol="INSTR", **kwargs): """Initialize channel attributes. :param ip_address: target instrument's ip address :param protocol: communication protocol to use """ # Use PyVISA Resource Manager with PyVISA-py backend super().__init__(**kwargs) # Redefining resource name and type for TCPIP communication self.ip_address = ip_address self.resource_name = f"TCPIP::{ip_address}::{protocol}" self.resource = pyvisa.resources.tcpip.TCPIPInstrument( self.ResourceManager, self.resource_name ) self.protocol = protocol
[docs] def _cc_open(self) -> None: """Open a remote instrument via TCPIP""" log.info(f"Open VISA resource: {self.resource_name}") self.resource = self.ResourceManager.open_resource(self.resource_name)