Source code for PyICe.spi_instrument
'''
Channel Wrapper for SPI Devices
===============================
'''
from PyICe.lab_core import instrument, delegator, integer_channel
from PyICe import spi_interface
[docs]class spiInstrument(instrument, delegator):
'''Instrument wrapper for basic linear shift register SPI port.
Not apprpriate for context-sensitive (sub addressed) memory.'''
def __init__(self, name, spiInterface, write_shift_register=None, read_shift_register=None):
'''Specify at lease one of (write_shift_register, read_shift_register arguments).
If read data has the same meaning as write data (memory readback), send same shift register object to write_shift_register and read_shift_register arguemnts.
If both (write_shift_register, read_shift_register) arguments are specificied, they must be of the same length.'''
delegator.__init__(self)
instrument.__init__(self, '{} SPI instrument wrapper'.format(name))
self._base_name = name
assert isinstance(spiInterface, spi_interface.spiInterface)
self.spi_interface = spiInterface
self.write_shift_register = write_shift_register
self.read_shift_register = read_shift_register
self.dummy_write_value = 0 #data to shift in if SPI is read-only
self._transceive_enabled = True
if self.write_shift_register is None and self.read_shift_register is None:
raise Exception('spiInstrument must specify at least one of write_shift_register, read_shift_register')
if self.write_shift_register is self.read_shift_register:
#channel name conflict
self.read_shift_register = spi_interface.shift_register()
for bf in read_shift_register:
new_name = '{}_readback'.format(bf)
print "WARNING: {} bit field: {} readback renamed to: {} to avoid duplicated channel name.".format(name, bf, new_name)
self.read_shift_register.add_bit_field(new_name, read_shift_register[bf])
elif self.write_shift_register is not None and self.read_shift_register is not None:
if len(self.write_shift_register) != len(self.read_shift_register):
raise Exception('spiInstrument write_shift_register, read_shift_register must be of equal length')
if self.write_shift_register is not None:
assert isinstance(self.write_shift_register, spi_interface.shift_register)
#offset = 0
for bf in self.write_shift_register:
write_ch = integer_channel(name = bf, size = self.write_shift_register[bf],
write_function = lambda write_data, channel_name=bf: self._transceive(write_channel_name=channel_name, data=write_data))
write_ch.set_delegator(self)
#write_ch.set_attribute("offset",offset) #this is from the beginning of shift (usually MSB)
#offset += self.write_shift_register[bf]
self._add_channel(write_ch)
if self.read_shift_register is not None:
assert isinstance(self.read_shift_register, spi_interface.shift_register)
for bf in self.read_shift_register:
read_ch = integer_channel(name = bf, size = self.read_shift_register[bf], read_function = self._dummy_read)
read_ch.set_delegator(self)
self._add_channel(read_ch)
[docs] def add_channel_transceive_enable(self, channel_name):
'''Add channel to enable/disable SPI port communication.
This can be used to serially change multiple bit fields before sending the data to the SPI slave with a single transaction.
Note that communication is disabled indepent of this setting if not all writable bit fields have been initialized.
Also note that after communication is enabled, a SPI transceive will not take place until a bit field is read or written.'''
trans_en_ch = integer_channel(name=channel_name, size=1, write_function=lambda enable: setattr(self, '_transceive_enabled', enable))
trans_en_ch.set_description(self.get_name() + ': ' + self.add_channel_transceive_enable.__doc__)
trans_en_ch.write(self._transceive_enabled)
return self._add_channel(trans_en_ch)
def _transceive(self, write_channel_name=None, data=None, no_transceive=False):
write_data = {}
for channel in self:
if channel.is_writeable():
write_data[channel.get_name()] = channel.read_without_delegator()
if write_channel_name is not None:
write_data[write_channel_name] = data
if no_transceive or not self._transceive_enabled:
#skip SPI transaction
if self.read_shift_register is not None:
read_data = {bf: None for bf in self.read_shift_register}
else:
read_data = {}
elif None in write_data.values():
#skip SPI transaction
print "Deferring {} SPI write until all writable channels are assigned values.".format(write_channel_name)
for ch in write_data:
print '\t{}:{}'.format(ch,write_data[ch])
if self.read_shift_register is not None:
read_data = {bf: None for bf in self.read_shift_register}
else:
read_data = {}
else:
#doing SPI transaction
if self.write_shift_register is None:
miso = self.spi_interface.transceive(self.dummy_write, len(self.read_shift_register))
else:
miso = self.spi_interface.transceive(*self.write_shift_register.pack(write_data))
if self.read_shift_register is not None:
read_data = self.read_shift_register.unpack(miso)
else:
read_data = {}
merged_data = {}
merged_data.update(write_data)
merged_data.update(read_data)
return merged_data
def _dummy_read(self):
raise Exception("Shouldn't ever get here...")
[docs] def read_delegated_channel_list(self,channels):
'''private'''
results_dict = {}
spi_data = None
for channel in channels:
if not channel.is_writeable(): #read channel in list; need to do spi transaction
spi_data = self._transceive()
break
if spi_data is None: #only writable channels; skip spi transaction
spi_data = self._transceive(no_transceive=True)
for channel in channels:
results_dict[channel.get_name()] = spi_data[channel.get_name()]
return results_dict