Source code for PyICe.visa_wrappers

'''
VISA Emulation Layer
====================

Interface wrappers to use various interfaces as if they were VISA resources
without requiring an installed VISA library. Facilitates seamless transition 
between physical inrefaces and operating systems.
'''

import time
import struct
import re
try:
    import visa
    visaMissing = False
except:
    visaMissing = True
try:
    import ctypes
    ctypesMissing = False
except:
    ctypesMissing = True
try:
    import serial
    serialMissing = False
except:
    serialMissing = True

try:
    import deps.RL1009.RL1009_SERIAL_CLASS as RL1009_SERIAL_CLASS
    RL1009SerialMissing = False
except:
    RL1009SerialMissing = True

try:
    import deps.RL1009.RL1009_DLL_INTERFACE as RL1009_DLL_INTERFACE
    RL1009DLLMissing = False
except:
    RL1009DLLMissing = True

try:
    import deps.vxi11.vxi11 as vxi11
    vxi11Missing = False
except:
    vxi11Missing = True
    
try:
    import telnetlib
    telnetlibMissing = False
except:
    telnetlibMissing = True

try:
    import deps.keithley_kxci.KXCIclient as KXCIclient
    KXCIclientMissing = False
except:
    KXCIclientMissing = True

class visaWrapperException(Exception):
    pass

class visa_wrapper(object):
    def __init__(self, address, timeout=5):
        raise NotImplementedError('Interface Not Fully Implemented: __init__()')
    def read(self):
        raise NotImplementedError('Interface Not Fully Implemented: read()')
    def write(self, message):
        raise NotImplementedError('Interface Not Fully Implemented: write()')
    def read_values(self):
        raise NotImplementedError('Interface Not Fully Implemented: read_values()')
    def read_values_binary(self, format_str='=B', byte_order='=', terminationCharacter=''):
        '''Follows Definite Length Arbitrary Block format
        ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
        eg #40003<byte0><byte1><byte2><byte3>
        format_str and byte_order are passed to struct library for to set word boundaries for unpacking and conversion to numeric types
        https://docs.python.org/2/library/struct.html#format-strings'''
        raise NotImplementedError('Interface Not Fully Implemented: read_values_binary()')
    def ask(self, message):
        self.write(message)
        return self.read()
    def ask_for_values(self, message):
        self.write(message)
        return self.read_values()
    def ask_for_values_binary(self, message, format_str='B', byte_order='=', terminationCharacter=''):
        '''Follows Definite Length Arbitrary Block format
        ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
        eg #40003<byte0><byte1><byte2><byte3>
        format_str and byte_order are passed to struct library for to set word boundaries for unpacking and conversion to numeric types
        https://docs.python.org/2/library/struct.html#format-strings'''
        self.write(message)
        return self.read_values_binary(format_str, byte_order, terminationCharacter)
    def clear(self):
        raise NotImplementedError('Interface Not Fully Implemented: clear()')
    def clear_errors(self):
        print "Interface Not Fully Implemented: clear_errors()'"
    def trigger(self):
        raise NotImplementedError('Interface Not Fully Implemented: trigger()')
    def read_raw(self):
        raise NotImplementedError('Interface Not Fully Implemented: read_raw()')
    def resync(self):
        '''flush buffers to resync after communication fault - usb-serial problem'''
        return ''
    def close(self):
        pass
    def __getTimeout(self):
        raise NotImplementedError('Interface Not Fully Implemented: timeout')
    def __setTimeout(self, timeout):
        raise NotImplementedError('Interface Not Fully Implemented: timeout')
    timeout = property(__getTimeout,__setTimeout)
    def __getTerminationChars(self):
        raise NotImplementedError('Interface Not Fully Implemented: term_chars')
    def __setTerminationChars(self, term_chars):
        raise NotImplementedError('Interface Not Fully Implemented: term_chars')
    term_chars = property(__getTerminationChars,__setTerminationChars)

class visa_wrapper_serial(visa_wrapper):
    def __init__(self, address, timeout = 5, baudrate = 9600):
        if isinstance(address, serial.SerialBase):
            self.ser = address
        elif isinstance(address, telnetlib.Telnet): #TODO: migrate telnet library to use serial_for_url rfc2217://<host>:<port>[?<option>[&<option>]] class rfc2217.Serial
            self.ser = address
        else:
            self.ser = serial.Serial()
            self.ser.port = address		# open the specified serial port
            self.ser.timeout = timeout
            self.ser.baudrate = baudrate
            self.ser.open()
        self.terminationCharacter = "\n" # readline inherits from "io" which doesn't support termination character
        self.resync()
    def readline(self):
        #TODO: speed this up using buffered IO to wrap serial port
        #readline() of ser doesn't work correctly
        #https://docs.python.org/2/library/io.html#io.TextIOWrapper
        #http://pyserial.readthedocs.org/en/latest/shortintro.html#eol
        response = ""
        while True:
            char = self.ser.read(1)
            response += char
            if char == self.terminationCharacter:
                break
            elif char == "":
                raise visaWrapperException("Serial timeout on port {}!".format(self.ser.port))
                #print "Serial timeout on port {}!".format(self.ser.port)
        return response        
    def read(self):
        return self.readline().rstrip()
    def write(self, message):
        if self.terminationCharacter is not None:
            message = message.rstrip() + self.terminationCharacter
        self.ser.write(message)
    def read_values(self):
        return self.read.split(",").strip()
    def read_values_binary(self, format_str='B', byte_order='=', terminationCharacter=''):
        '''Follows Definite Length Arbitrary Block format
        ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
        eg #40003<byte0><byte1><byte2><byte3>

        format_str is passed to struct library for to set word boundaries for unpacking and conversion to numeric types
        https://docs.python.org/2/library/struct.html#format-strings
        https://en.wikipedia.org/wiki/IEEE_754-1985
        'B': default unsigned single byte
        'b': signed single byte

        'H': unsigned short 2-byte integer
        'h': signed short 2-byte integer

        'I': unsigned 4-byte integer
        'i': signed 4-byte integer

        'Q': unsigned long long 8-byte integer
        'q': signed long long 8-byte integer

        'f': 4-byte IEEE_754-1985 float
        'd': 8-byte IEEE_754-1985 double precision float

        '<n>s': An <n>-byte long string

        byte_order sets endianness:
        '=': native
        '<': little-endian (LSByte first)
        '>': big-endian (MSByte first)
        '''
        hash = self.ser.read(1)
        while hash != '#':
            if len(hash):
                print 'Saw extra character code: {} in read_values_binary header'.format(ord(hash))
            else: #timeout
                raise visaWrapperException('Timeout in read_values_binary header')
            hash = self.ser.read(1)
        header_len = int(self.ser.read(1))
        data_len = int(self.ser.read(header_len))
        data = self.ser.read(data_len)
        term = self.ser.read(len(terminationCharacter))
        format_len = struct.calcsize(format_str)
        format = byte_order + format_str * (data_len / format_len)
        return struct.unpack(format, data)
    def read_raw(self):
        return self.readline()
    def write_raw(self, message):
        self.ser.write(message)
    def flush(self):
        print self.ser.flush()
    def resync(self):
        return self.ser.read(self.ser.inWaiting())
    def close(self):
        self.ser.close()
    def __getTimeout(self):
        return self.ser.timeout
    def __setTimeout(self, timeout):
        self.ser.timeout = timeout
    timeout = property(__getTimeout,__setTimeout)
    
class visa_wrapper_tcp(visa_wrapper_serial):
    def __init__(self, ip_address, port, timeout = 5):
        port = serial.serial_for_url('socket://{}:{}'.format(ip_address,port),timeout=timeout)
        visa_wrapper_serial.__init__(self, port)
    def resync(self):
        print 'TCP Resync in progress.'
        resp_all = ''
        try:
            resp = self.readline()
            resp_all += resp
            while resp[-1:] == self.terminationCharacter:
                resp = self.readline()
                resp_all += resp
        except visaWrapperException as e:
            pass
        except Exception as e:
            raise e #what happened???
        finally:
            return resp_all
    def __getTimeout(self):
        return self.ser.timeout
    def __setTimeout(self, timeout):
        self.ser.timeout = timeout
    timeout = property(__getTimeout,__setTimeout)

class visa_wrapper_telnet(visa_wrapper_serial):
    #TODO?: migrate telnet library to use serial_for_url rfc2217://<host>:<port>[?<option>[&<option>]] class rfc2217.Serial
    def __init__(self, ip_address, port, timeout = 5):
        port = telnetlib.Telnet(ip_address,port,timeout=timeout)
        self._timeout = timeout
        visa_wrapper_serial.__init__(self, port)
    def resync(self):
        return self.ser.read_very_eager()
    def readline(self):
        response = self.ser.read_until(self.terminationCharacter, self._timeout)
        if response[-1] != self.terminationCharacter:
            print "Telnet timeout on port {}!".format(self.ser.port)
            # prolly should raise exception here (I am Dave)
        return response        
    def __getTimeout(self):
        return self._timeout
    def __setTimeout(self, timeout):
        self._timeout = timeout
    timeout = property(__getTimeout,__setTimeout)

class visa_wrapper_vxi11(visa_wrapper):
    def __init__(self, address, timeout=5):
        self.terminationCharacter = None
        self.vxi_interface = vxi11.Instrument(address, term_char=self.terminationCharacter, timeout=60)
    def read(self):
        return self.vxi_interface.read()
    def write(self, message):
        self.vxi_interface.write(message)
    def read_values(self):
        #ascii transfer only
        #see visa.py for binary parsing example
        float_regex = re.compile(r"[-+]?(?:\d+(?:\.\d*)?|\d*\.\d+)"
                                     "(?:[eE][-+]?\d+)?")
        return [float(raw_value) for raw_value in
            float_regex.findall(self.read())]
    def ask(self, message):
        return self.vxi_interface.ask(message)
    def ask_for_values(self, message):
        self.write(message)
        return self.read_values()
    def clear(self):
        raise NotImplementedError('Interface Not Fully Implemented: clear()')
    def trigger(self):
        self.vxi_interface.trigger()
    def read_raw(self):
        self.vxi_interface.read_raw()
    def resync(self):
        '''flush buffers to resync after communication fault - usb-serial problem'''
        pass
    def close(self):
        self.vxi_interface.close()
    def open(self):
        self.vxi_interface.open()
    def __getTimeout(self):
        return self.vxi_interface.io_timeout
    def __setTimeout(self, timeout):
        #not sure if this will actually take effect
        self.vxi_interface.io_timeout = timeout
    timeout = property(__getTimeout,__setTimeout)

class visa_wrapper_keithley_kxci(visa_wrapper):
    def __init__(self, ip_address, port):
        self.error_int = ctypes.POINTER(ctypes.c_int)()
        self.ret_str = ctypes.create_string_buffer('\x00'*1024)
        if KXCIclientMissing:
            raise Exception('KXCIclient DLL missing.  Try copying from deps/keithley_kxci to c:\windows\system')
        KXCIclient.kxcilib.OpenKXCIConnection_C(ip_address,port,self.error_int)
        self._timeout = 1 #not used
    def write(self, message):
        '''return value will be discarded'''
        self.ask(message)
    def ask(self, message):
        KXCIclient.kxcilib.SendKXCICommand_C(message,self.ret_str,self.error_int)
        return self.ret_str.value.rstrip()
    def close(self):
        KXCIclient.kxcilib.CloseKXCIConnection_C()
    #KXCIclient.kxcilib.GetKXCISpollByte_C() not implemented
    def __getTimeout(self):
        '''not used'''
        return self._timeout
    def __setTimeout(self, timeout):
        '''not used'''
        self._timeout = timeout
    timeout = property(__getTimeout,__setTimeout)
        
[docs]class visa_interface(visa_wrapper): '''agilent visa strips trailing termination character, but NI VISA seems to leave them in response.''' def __init__(self, address, timeout=5): if visaMissing: raise visaWrapperException('VISA library missing from this system') elif "instrument" in dir(visa): # Old API from PyVISA rev < 1.5 self.visaInterface = visa.instrument(resource_name=address)#, timeout=timeout) self.timeout_scale = 1 else: # Use new API PyVISA rev >= 1.5 self.visaInterface = visa.ResourceManager().open_resource(address) self.timeout_scale = 1e-3 self.timeout = timeout def read(self): return self.visaInterface.read().rstrip() def write(self, message): self.visaInterface.write(message) def read_values(self): return self.visaInterface.read_values().rstrip() def ask(self, message): return self.visaInterface.ask(message).rstrip() def ask_for_values(self, message): return self.visaInterface.ask_for_values(message).rstrip() def clear(self): self.visaInterface.clear() def trigger(self): self.visaInterface.trigger() def read_raw(self): return self.visaInterface.read_raw() def close(self): self.visaInterface.close() def __getTimeout(self): return self.visaInterface.timeout * self.timeout_scale def __setTimeout(self, timeout): self.visaInterface.timeout = timeout / self.timeout_scale def __delTimeout(self): del self.visaInterface.timeout timeout = property(__getTimeout, __setTimeout, __delTimeout) def __getTerminationChars(self): return self.visaInterface.term_chars def __setTerminationChars(self, term_chars): self.visaInterface.term_chars = term_chars term_chars = property(__getTerminationChars,__setTerminationChars)
class rl1009_visa_wrapper_generic(visa_wrapper): def __init__(self): raise Exception('Instantiate serial or dll subclasses') def __del__(self): self.rl1009.set_timeouts(0,0) def read(self): return self.read_raw().rstrip() #remove whitespace to emulate pyvisa behavior def read_values(self): ascii_values = self.read().rstrip().split(',') values = [] for value in ascii_values: values.append(eval(value)) return values def ask_for_values(self, message): ascii_values = self.query(message).rstrip().split(',') values = [] for value in ascii_values: values.append(eval(value)) return values def close(self): self.rl1009.close_connection() def set_timeout(self, timeout): self.rl1009.set_timeouts(int(timeout*1000), int(timeout*1000)) def get_timeout(self): return self.rl1009.get_timeouts()[0]/1000 timeout = property(get_timeout,set_timeout) def _getTerminationChars(self): return self.term_index[self.rl1009.get_terminator()] def _setTerminationChars(self, term_chars): try: self.rl1009.set_terminator(self.term_index.index(term_chars)) except ValueError as e: print "Termination characters restricted to ['','\\n','\\r','\\r\\n'] by RL1009 DLL" raise e term_chars = property(_getTerminationChars,_setTerminationChars) class rl1009_visa_wrapper_serial(rl1009_visa_wrapper_generic): #Static reference to single serial interface shared amongst all object instances of class def __init__(self, GPIBAddress=None, timeout=5, serialPort=None): if RL1009SerialMissing: raise visaWrapperException('RL1009 Serial Driver class missing') else: if serialPort is not None: self.rl1009 = RL1009_SERIAL_CLASS.RL1009(serialPort) else: raise visaWrapperException('Must specify serial port in constructor before/while creating interface') self.GPIBAddress = GPIBAddress self._my_timeout = timeout #multiple gpib interfaces can use the same rl1009 on so timeout must be kept outside the rl1009 self.term_index = ['','\n','\r','\r\n'] def write(self, message): self.rl1009.set_address(self.GPIBAddress) self.set_timeout(self._my_timeout) self.rl1009.write(message) time.sleep(0.030) if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) def ask(self, message): self.rl1009.set_address(self.GPIBAddress) self.set_timeout(self._my_timeout) response = self.rl1009.query(message).rstrip() if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) return response def clear(self): self.set_timeout(self._my_timeout) self.rl1009.device_clear() #clear_errors() if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) def clear_errors(self): self.set_timeout(self._my_timeout) self.rl1009.clear_errors() if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) def trigger(self): self.set_timeout(self._my_timeout) self.rl1009.group_trigger('%02d' % self.GPIBAddress) if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) def read_raw(self): self.rl1009.set_address(self.GPIBAddress) self.set_timeout(self._my_timeout) response = self.rl1009.read() if self.rl1009.error == True: raise visaWrapperException(self.rl1009.error_description()) return response def __getGPIBAddress(self): return self.__GPIBAddress def __setGPIBAddress(self, GPIBAddress): self.__GPIBAddress = GPIBAddress GPIBAddress = property(__getGPIBAddress,__setGPIBAddress) class rl1009_visa_wrapper_dll(rl1009_visa_wrapper_generic): def __init__(self, GPIBAddress=None, timeout=5): if RL1009DLLMissing: raise visaWrapperException('RL1009 DLL Driver class missing') else: self.rl1009 = RL1009_DLL_INTERFACE.RL1009(address=GPIBAddress, rlib=None) self.timeout = timeout self.term_index = ['','\n','\r','\r\n'] def write(self, message): self.rl1009.write(message) if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) def ask(self, message): response = self.rl1009.query(message).rstrip() if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) return response def clear(self): self.rl1009.device_clear() #clear_error() if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) def clear_errors(self): self.rl1009.clear_error() if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) def trigger(self): self.rl1009.group_trigger('%02d' % self.GPIBAddress) if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) def read_raw(self): response = self.rl1009.read() if self.rl1009.gpib_error() == True: raise visaWrapperException(self.rl1009.error_description()) return response def __getGPIBAddress(self): return self.rl1009.address def __setGPIBAddress(self, GPIBAddress): self.rl1009.address = GPIBAddress GPIBAddress = property(__getGPIBAddress,__setGPIBAddress)