'''
VISA Emulation Layer
====================
Interface wrappers to use various interfaces as if they were VISA resources
without requiring an installed VISA library. Facilitates seamless transition
between physical inrefaces and operating systems.
'''
import time
import struct
import re
try:
import visa
visaMissing = False
except:
visaMissing = True
try:
import ctypes
ctypesMissing = False
except:
ctypesMissing = True
try:
import serial
serialMissing = False
except:
serialMissing = True
try:
import deps.RL1009.RL1009_SERIAL_CLASS as RL1009_SERIAL_CLASS
RL1009SerialMissing = False
except:
RL1009SerialMissing = True
try:
import deps.RL1009.RL1009_DLL_INTERFACE as RL1009_DLL_INTERFACE
RL1009DLLMissing = False
except:
RL1009DLLMissing = True
try:
import deps.vxi11.vxi11 as vxi11
vxi11Missing = False
except:
vxi11Missing = True
try:
import telnetlib
telnetlibMissing = False
except:
telnetlibMissing = True
try:
import deps.keithley_kxci.KXCIclient as KXCIclient
KXCIclientMissing = False
except:
KXCIclientMissing = True
class visaWrapperException(Exception):
pass
class visa_wrapper(object):
def __init__(self, address, timeout=5):
raise NotImplementedError('Interface Not Fully Implemented: __init__()')
def read(self):
raise NotImplementedError('Interface Not Fully Implemented: read()')
def write(self, message):
raise NotImplementedError('Interface Not Fully Implemented: write()')
def read_values(self):
raise NotImplementedError('Interface Not Fully Implemented: read_values()')
def read_values_binary(self, format_str='=B', byte_order='=', terminationCharacter=''):
'''Follows Definite Length Arbitrary Block format
ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
eg #40003<byte0><byte1><byte2><byte3>
format_str and byte_order are passed to struct library for to set word boundaries for unpacking and conversion to numeric types
https://docs.python.org/2/library/struct.html#format-strings'''
raise NotImplementedError('Interface Not Fully Implemented: read_values_binary()')
def ask(self, message):
self.write(message)
return self.read()
def ask_for_values(self, message):
self.write(message)
return self.read_values()
def ask_for_values_binary(self, message, format_str='B', byte_order='=', terminationCharacter=''):
'''Follows Definite Length Arbitrary Block format
ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
eg #40003<byte0><byte1><byte2><byte3>
format_str and byte_order are passed to struct library for to set word boundaries for unpacking and conversion to numeric types
https://docs.python.org/2/library/struct.html#format-strings'''
self.write(message)
return self.read_values_binary(format_str, byte_order, terminationCharacter)
def clear(self):
raise NotImplementedError('Interface Not Fully Implemented: clear()')
def clear_errors(self):
print "Interface Not Fully Implemented: clear_errors()'"
def trigger(self):
raise NotImplementedError('Interface Not Fully Implemented: trigger()')
def read_raw(self):
raise NotImplementedError('Interface Not Fully Implemented: read_raw()')
def resync(self):
'''flush buffers to resync after communication fault - usb-serial problem'''
return ''
def close(self):
pass
def __getTimeout(self):
raise NotImplementedError('Interface Not Fully Implemented: timeout')
def __setTimeout(self, timeout):
raise NotImplementedError('Interface Not Fully Implemented: timeout')
timeout = property(__getTimeout,__setTimeout)
def __getTerminationChars(self):
raise NotImplementedError('Interface Not Fully Implemented: term_chars')
def __setTerminationChars(self, term_chars):
raise NotImplementedError('Interface Not Fully Implemented: term_chars')
term_chars = property(__getTerminationChars,__setTerminationChars)
class visa_wrapper_serial(visa_wrapper):
def __init__(self, address, timeout = 5, baudrate = 9600):
if isinstance(address, serial.SerialBase):
self.ser = address
elif isinstance(address, telnetlib.Telnet): #TODO: migrate telnet library to use serial_for_url rfc2217://<host>:<port>[?<option>[&<option>]] class rfc2217.Serial
self.ser = address
else:
self.ser = serial.Serial()
self.ser.port = address # open the specified serial port
self.ser.timeout = timeout
self.ser.baudrate = baudrate
self.ser.open()
self.terminationCharacter = "\n" # readline inherits from "io" which doesn't support termination character
self.resync()
def readline(self):
#TODO: speed this up using buffered IO to wrap serial port
#readline() of ser doesn't work correctly
#https://docs.python.org/2/library/io.html#io.TextIOWrapper
#http://pyserial.readthedocs.org/en/latest/shortintro.html#eol
response = ""
while True:
char = self.ser.read(1)
response += char
if char == self.terminationCharacter:
break
elif char == "":
raise visaWrapperException("Serial timeout on port {}!".format(self.ser.port))
#print "Serial timeout on port {}!".format(self.ser.port)
return response
def read(self):
return self.readline().rstrip()
def write(self, message):
if self.terminationCharacter is not None:
message = message.rstrip() + self.terminationCharacter
self.ser.write(message)
def read_values(self):
return self.read.split(",").strip()
def read_values_binary(self, format_str='B', byte_order='=', terminationCharacter=''):
'''Follows Definite Length Arbitrary Block format
ie ASCII header '#<heder_bytes_following><data_bytes_following><data0>...<dataN>
eg #40003<byte0><byte1><byte2><byte3>
format_str is passed to struct library for to set word boundaries for unpacking and conversion to numeric types
https://docs.python.org/2/library/struct.html#format-strings
https://en.wikipedia.org/wiki/IEEE_754-1985
'B': default unsigned single byte
'b': signed single byte
'H': unsigned short 2-byte integer
'h': signed short 2-byte integer
'I': unsigned 4-byte integer
'i': signed 4-byte integer
'Q': unsigned long long 8-byte integer
'q': signed long long 8-byte integer
'f': 4-byte IEEE_754-1985 float
'd': 8-byte IEEE_754-1985 double precision float
'<n>s': An <n>-byte long string
byte_order sets endianness:
'=': native
'<': little-endian (LSByte first)
'>': big-endian (MSByte first)
'''
hash = self.ser.read(1)
while hash != '#':
if len(hash):
print 'Saw extra character code: {} in read_values_binary header'.format(ord(hash))
else: #timeout
raise visaWrapperException('Timeout in read_values_binary header')
hash = self.ser.read(1)
header_len = int(self.ser.read(1))
data_len = int(self.ser.read(header_len))
data = self.ser.read(data_len)
term = self.ser.read(len(terminationCharacter))
format_len = struct.calcsize(format_str)
format = byte_order + format_str * (data_len / format_len)
return struct.unpack(format, data)
def read_raw(self):
return self.readline()
def write_raw(self, message):
self.ser.write(message)
def flush(self):
print self.ser.flush()
def resync(self):
return self.ser.read(self.ser.inWaiting())
def close(self):
self.ser.close()
def __getTimeout(self):
return self.ser.timeout
def __setTimeout(self, timeout):
self.ser.timeout = timeout
timeout = property(__getTimeout,__setTimeout)
class visa_wrapper_tcp(visa_wrapper_serial):
def __init__(self, ip_address, port, timeout = 5):
port = serial.serial_for_url('socket://{}:{}'.format(ip_address,port),timeout=timeout)
visa_wrapper_serial.__init__(self, port)
def resync(self):
print 'TCP Resync in progress.'
resp_all = ''
try:
resp = self.readline()
resp_all += resp
while resp[-1:] == self.terminationCharacter:
resp = self.readline()
resp_all += resp
except visaWrapperException as e:
pass
except Exception as e:
raise e #what happened???
finally:
return resp_all
def __getTimeout(self):
return self.ser.timeout
def __setTimeout(self, timeout):
self.ser.timeout = timeout
timeout = property(__getTimeout,__setTimeout)
class visa_wrapper_telnet(visa_wrapper_serial):
#TODO?: migrate telnet library to use serial_for_url rfc2217://<host>:<port>[?<option>[&<option>]] class rfc2217.Serial
def __init__(self, ip_address, port, timeout = 5):
port = telnetlib.Telnet(ip_address,port,timeout=timeout)
self._timeout = timeout
visa_wrapper_serial.__init__(self, port)
def resync(self):
return self.ser.read_very_eager()
def readline(self):
response = self.ser.read_until(self.terminationCharacter, self._timeout)
if response[-1] != self.terminationCharacter:
print "Telnet timeout on port {}!".format(self.ser.port)
# prolly should raise exception here (I am Dave)
return response
def __getTimeout(self):
return self._timeout
def __setTimeout(self, timeout):
self._timeout = timeout
timeout = property(__getTimeout,__setTimeout)
class visa_wrapper_vxi11(visa_wrapper):
def __init__(self, address, timeout=5):
self.terminationCharacter = None
self.vxi_interface = vxi11.Instrument(address, term_char=self.terminationCharacter, timeout=60)
def read(self):
return self.vxi_interface.read()
def write(self, message):
self.vxi_interface.write(message)
def read_values(self):
#ascii transfer only
#see visa.py for binary parsing example
float_regex = re.compile(r"[-+]?(?:\d+(?:\.\d*)?|\d*\.\d+)"
"(?:[eE][-+]?\d+)?")
return [float(raw_value) for raw_value in
float_regex.findall(self.read())]
def ask(self, message):
return self.vxi_interface.ask(message)
def ask_for_values(self, message):
self.write(message)
return self.read_values()
def clear(self):
raise NotImplementedError('Interface Not Fully Implemented: clear()')
def trigger(self):
self.vxi_interface.trigger()
def read_raw(self):
self.vxi_interface.read_raw()
def resync(self):
'''flush buffers to resync after communication fault - usb-serial problem'''
pass
def close(self):
self.vxi_interface.close()
def open(self):
self.vxi_interface.open()
def __getTimeout(self):
return self.vxi_interface.io_timeout
def __setTimeout(self, timeout):
#not sure if this will actually take effect
self.vxi_interface.io_timeout = timeout
timeout = property(__getTimeout,__setTimeout)
class visa_wrapper_keithley_kxci(visa_wrapper):
def __init__(self, ip_address, port):
self.error_int = ctypes.POINTER(ctypes.c_int)()
self.ret_str = ctypes.create_string_buffer('\x00'*1024)
if KXCIclientMissing:
raise Exception('KXCIclient DLL missing. Try copying from deps/keithley_kxci to c:\windows\system')
KXCIclient.kxcilib.OpenKXCIConnection_C(ip_address,port,self.error_int)
self._timeout = 1 #not used
def write(self, message):
'''return value will be discarded'''
self.ask(message)
def ask(self, message):
KXCIclient.kxcilib.SendKXCICommand_C(message,self.ret_str,self.error_int)
return self.ret_str.value.rstrip()
def close(self):
KXCIclient.kxcilib.CloseKXCIConnection_C()
#KXCIclient.kxcilib.GetKXCISpollByte_C() not implemented
def __getTimeout(self):
'''not used'''
return self._timeout
def __setTimeout(self, timeout):
'''not used'''
self._timeout = timeout
timeout = property(__getTimeout,__setTimeout)
[docs]class visa_interface(visa_wrapper):
'''agilent visa strips trailing termination character, but NI VISA seems to leave them in response.'''
def __init__(self, address, timeout=5):
if visaMissing:
raise visaWrapperException('VISA library missing from this system')
elif "instrument" in dir(visa): # Old API from PyVISA rev < 1.5
self.visaInterface = visa.instrument(resource_name=address)#, timeout=timeout)
self.timeout_scale = 1
else: # Use new API PyVISA rev >= 1.5
self.visaInterface = visa.ResourceManager().open_resource(address)
self.timeout_scale = 1e-3
self.timeout = timeout
def read(self):
return self.visaInterface.read().rstrip()
def write(self, message):
self.visaInterface.write(message)
def read_values(self):
return self.visaInterface.read_values().rstrip()
def ask(self, message):
return self.visaInterface.ask(message).rstrip()
def ask_for_values(self, message):
return self.visaInterface.ask_for_values(message).rstrip()
def clear(self):
self.visaInterface.clear()
def trigger(self):
self.visaInterface.trigger()
def read_raw(self):
return self.visaInterface.read_raw()
def close(self):
self.visaInterface.close()
def __getTimeout(self):
return self.visaInterface.timeout * self.timeout_scale
def __setTimeout(self, timeout):
self.visaInterface.timeout = timeout / self.timeout_scale
def __delTimeout(self):
del self.visaInterface.timeout
timeout = property(__getTimeout, __setTimeout, __delTimeout)
def __getTerminationChars(self):
return self.visaInterface.term_chars
def __setTerminationChars(self, term_chars):
self.visaInterface.term_chars = term_chars
term_chars = property(__getTerminationChars,__setTerminationChars)
class rl1009_visa_wrapper_generic(visa_wrapper):
def __init__(self):
raise Exception('Instantiate serial or dll subclasses')
def __del__(self):
self.rl1009.set_timeouts(0,0)
def read(self):
return self.read_raw().rstrip() #remove whitespace to emulate pyvisa behavior
def read_values(self):
ascii_values = self.read().rstrip().split(',')
values = []
for value in ascii_values:
values.append(eval(value))
return values
def ask_for_values(self, message):
ascii_values = self.query(message).rstrip().split(',')
values = []
for value in ascii_values:
values.append(eval(value))
return values
def close(self):
self.rl1009.close_connection()
def set_timeout(self, timeout):
self.rl1009.set_timeouts(int(timeout*1000), int(timeout*1000))
def get_timeout(self):
return self.rl1009.get_timeouts()[0]/1000
timeout = property(get_timeout,set_timeout)
def _getTerminationChars(self):
return self.term_index[self.rl1009.get_terminator()]
def _setTerminationChars(self, term_chars):
try:
self.rl1009.set_terminator(self.term_index.index(term_chars))
except ValueError as e:
print "Termination characters restricted to ['','\\n','\\r','\\r\\n'] by RL1009 DLL"
raise e
term_chars = property(_getTerminationChars,_setTerminationChars)
class rl1009_visa_wrapper_serial(rl1009_visa_wrapper_generic):
#Static reference to single serial interface shared amongst all object instances of class
def __init__(self, GPIBAddress=None, timeout=5, serialPort=None):
if RL1009SerialMissing:
raise visaWrapperException('RL1009 Serial Driver class missing')
else:
if serialPort is not None:
self.rl1009 = RL1009_SERIAL_CLASS.RL1009(serialPort)
else:
raise visaWrapperException('Must specify serial port in constructor before/while creating interface')
self.GPIBAddress = GPIBAddress
self._my_timeout = timeout #multiple gpib interfaces can use the same rl1009 on so timeout must be kept outside the rl1009
self.term_index = ['','\n','\r','\r\n']
def write(self, message):
self.rl1009.set_address(self.GPIBAddress)
self.set_timeout(self._my_timeout)
self.rl1009.write(message)
time.sleep(0.030)
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
def ask(self, message):
self.rl1009.set_address(self.GPIBAddress)
self.set_timeout(self._my_timeout)
response = self.rl1009.query(message).rstrip()
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
return response
def clear(self):
self.set_timeout(self._my_timeout)
self.rl1009.device_clear() #clear_errors()
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
def clear_errors(self):
self.set_timeout(self._my_timeout)
self.rl1009.clear_errors()
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
def trigger(self):
self.set_timeout(self._my_timeout)
self.rl1009.group_trigger('%02d' % self.GPIBAddress)
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
def read_raw(self):
self.rl1009.set_address(self.GPIBAddress)
self.set_timeout(self._my_timeout)
response = self.rl1009.read()
if self.rl1009.error == True:
raise visaWrapperException(self.rl1009.error_description())
return response
def __getGPIBAddress(self):
return self.__GPIBAddress
def __setGPIBAddress(self, GPIBAddress):
self.__GPIBAddress = GPIBAddress
GPIBAddress = property(__getGPIBAddress,__setGPIBAddress)
class rl1009_visa_wrapper_dll(rl1009_visa_wrapper_generic):
def __init__(self, GPIBAddress=None, timeout=5):
if RL1009DLLMissing:
raise visaWrapperException('RL1009 DLL Driver class missing')
else:
self.rl1009 = RL1009_DLL_INTERFACE.RL1009(address=GPIBAddress, rlib=None)
self.timeout = timeout
self.term_index = ['','\n','\r','\r\n']
def write(self, message):
self.rl1009.write(message)
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
def ask(self, message):
response = self.rl1009.query(message).rstrip()
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
return response
def clear(self):
self.rl1009.device_clear() #clear_error()
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
def clear_errors(self):
self.rl1009.clear_error()
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
def trigger(self):
self.rl1009.group_trigger('%02d' % self.GPIBAddress)
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
def read_raw(self):
response = self.rl1009.read()
if self.rl1009.gpib_error() == True:
raise visaWrapperException(self.rl1009.error_description())
return response
def __getGPIBAddress(self):
return self.rl1009.address
def __setGPIBAddress(self, GPIBAddress):
self.rl1009.address = GPIBAddress
GPIBAddress = property(__getGPIBAddress,__setGPIBAddress)