Source code for PyICe.twi_instrument

'''
Channel Wraper for SMBus Compliant Devices
==========================================

Can automatically populate channels/reisters from XML description
'''
import lab_core
import twoWireInterface
import xml.etree.ElementTree as ET
import lab_utils
from scipy.interpolate import UnivariateSpline
#this is kept separate from lab_instruments because its not completely general purpose


class twi_instrument(lab_core.instrument,lab_core.delegator):
    def __init__(self,interface_twi,except_on_i2cInitError=True,except_on_i2cCommError=False,retry_count=5,PEC=False):  
        lab_core.instrument.__init__(self,name=None)
        lab_core.delegator.__init__(self)
        self.add_interface_twi(interface_twi)
        self._interface = interface_twi
        self._PEC = PEC
        self.except_on_i2cInitError = except_on_i2cInitError
        self.except_on_i2cCommError = except_on_i2cCommError
        self.retry_count = retry_count
        self.formatters = {}
        self._constants = {}
        self._streaming_enabled = False
        self._previous_command_codes = []
    def add_register(self,name,addr7,command_code,size,offset,word_size,is_readable):
        extract = lambda data : self._extract(data,size,offset)
        write = lambda data : self._read_merge_write(data,addr7,command_code,size,offset,word_size,is_readable)
        new_register = lab_core.register(name, size,
                                read_function=lambda: self.read_and_apply_function(command_code,extract),
                                write_function= write)
        new_register.set_delegator(self)
        self._add_channel(new_register)
    def read_delegated_channel_list(self,register_list):
        # this only supports single chip address now, it could easily be changed
        # it also only supports a single word size / read type at a time
        # it also only supports read_word and read_byte
        start_streaming = False
        addr7 = register_list[0].get_attribute('chip_address7')
        protocol = register_list[0].get_attribute("protocol")
        command_codes = []
        for register in register_list:
            if register.is_readable():
                command_codes.append(register.get_attribute('command_code'))
            else:
                raise lab_core.ChannelAccessException('Read a non-readable channel')
        #filter only unique command codes
        command_codes = list(set(command_codes))
        self._cc_data = {}
        if len(command_codes) == 1:
            if protocol == "read_write_word":
                if self._PEC:
                    function = lambda: self._interface.read_word_pec(addr7,command_code)
                else:
                    function = lambda: self._interface.read_word(addr7,command_code)
            elif protocol == "read_write_byte":
                if self._PEC:
                    function = lambda: self._interface.read_byte_pec(addr7,command_code)
                else:
                    function = lambda: self._interface.read_byte(addr7,command_code)
            else:
                raise Exception('Unknown protocol "{}"'.format(protocol))
            for command_code in command_codes:
                self._cc_data[command_code] = self._twi_try_function( function )
        else:
            if protocol == "read_write_word":
                if self._PEC:
                    function = lambda: self._interface.read_word_list_pec(addr7, command_codes)
                else:
                    if not self._streaming_enabled or command_codes != self._previous_command_codes:
                        self._previous_command_codes = command_codes
                        function = lambda: self._interface.read_word_list(addr7, command_codes)
                        if self._streaming_enabled:
                            start_streaming = True
                    else:
                        function = lambda: self._interface.read_streaming_word_list()
            elif protocol == "read_write_byte":
                if self._PEC:
                    function = lambda: self._interface.read_byte_list_pec(addr7, command_codes)
                else:
                    function = lambda: self._interface.read_byte_list(addr7, command_codes)
            else:
                raise Exception('Unknown protocol "{}"'.format(protocol))
            self._cc_data = self._twi_try_function( function )
            if self._cc_data is None:
                # Returning a dict mapping command_codes to None values
                # makes more sense than just returning None.  --FL 1/20/17
                self._cc_data = dict([(cc, None) for cc in command_codes])
        if start_streaming:
            self._interface.enable_streaming_word_list()
        results = {}
        for register in register_list:
            results[register.get_name()] = register.read_without_delegator()
        return results
    def _read_raw(self,command_code):
        return self._cc_data[command_code]
    def read_and_apply_function(self,command_code,function):
        data = function(self._read_raw(command_code))
        return data
    def _twi_try_function(self,function):
        try_count = self.retry_count + 1
        while True:
            try_count -= 1
            try:
                return function()
            except (twoWireInterface.i2cError,twoWireInterface.i2cMasterError) as e:
                print e
                try:
                    self._interface.resync_communication()
                except (twoWireInterface.i2cError,twoWireInterface.i2cMasterError) as init_err:
                    if (self.except_on_i2cInitError):
                        raise init_err
                    else:
                        print init_err
                if try_count <= 0:
                    if (self.except_on_i2cCommError):
                        raise e
                    else:
                        print "{}:  twi transaction failed: {}".format(self.get_name(), e)
                        return None
    def _extract(self,data,size,offset):
        if data == None:
            return None
        mask = (2**size-1) << offset
        return (data & mask)>> offset
    def _read_merge_write(self,data,addr7,command_code,size,offset,word_size,is_readable):
        data = int(data)
        if word_size == 16:
            if self._PEC:
                function = lambda: self._interface.read_word_pec(addr7,command_code)
            else:
                function = lambda: self._interface.read_word(addr7,command_code)
            
        elif word_size == 8:
            if self._PEC:
                function = lambda: self._interface.read_byte_pec(addr7,command_code)
            else:
                function = lambda: self._interface.read_byte(addr7,command_code)
        else:
            raise Exception('Unknown protocol word size {}'.format(word_size))
        #read the data
        if size == word_size:
            old_data = 0
        elif not is_readable:
            old_data = 0
        else:
            old_data = self._twi_try_function(function)
            if old_data == None:
                print "i2c_write pre-read failed, not writing"
                return
        assert word_size >= offset+size
        mask = (2**size-1) << offset
        mask_inv = (2**word_size-1) ^ mask
        old_data = old_data & mask_inv
        if data > 2**size-1:
            oversize_data = data
            data = 2**size-1
            print "Data {} doesn't fit into register of size {}! Clipping at {}".format(oversize_data, size, data)
        elif data < 0:
            oversize_data = data
            data = 0
            print "Negative data {} not valid for register! Clipping at {}".format(oversize_data, data)
        data = (data << offset) & mask
        data = data | old_data
        data &= 2**word_size-1 #necessary with offset/size assert?
        #write the data
        if word_size == 16:
            if self._PEC:
                function = lambda: self._interface.write_word_pec(addr7,command_code,data)
            else:
                function = lambda: self._interface.write_word(addr7,command_code,data)
        elif word_size == 8:
            if self._PEC:
                function = lambda: self._interface.write_byte_pec(addr7,command_code,data)
            else:
                function = lambda: self._interface.write_byte(addr7,command_code,data)
        self._twi_try_function(function)
    def populate_from_file(self ,xml_file, format_dict={}, access_list=[], use_case=None, channel_prefix="", channel_suffix=""):
        '''
        xml_register parsing accepts xml input complying with the following DTD (register_map.dtd):

    <!-- Visit http://en.wikipedia.org/wiki/Document_Type_Definition for an excellent explanation of DTD syntax -->
    <!ELEMENT register_map (chip+, use*, format_definitions?)>
    <!ELEMENT chip (description, address+, command_code*)>
    <!ELEMENT address EMPTY>
    <!ELEMENT command_code (description?, access+, bit_field+)>
    <!ELEMENT access EMPTY>
    <!ELEMENT bit_field (description, default?, preset*, format*)>
    <!ELEMENT description (#PCDATA)>
    <!ELEMENT default (#PCDATA)>
    <!ELEMENT preset (description?)>
    <!ELEMENT format EMPTY>
    <!ELEMENT use (category+)>
    <!ELEMENT category (#PCDATA)>
    <!ELEMENT format_definitions (format_definition+)>
    <!ELEMENT format_definition (description, transformed_units?, piecewise_linear_points?)>
    <!ELEMENT transformed_units (#PCDATA)>
    <!ELEMENT piecewise_linear_points (point,point+)>
    <!ELEMENT point EMPTY>
    <!ATTLIST chip name CDATA #REQUIRED word_size CDATA #REQUIRED>
    <!ATTLIST address address_7bit CDATA #REQUIRED>
    <!ATTLIST command_code name ID #REQUIRED value CDATA #REQUIRED>
    <!ATTLIST bit_field name ID #REQUIRED size CDATA #REQUIRED offset CDATA #REQUIRED category CDATA #REQUIRED>
    <!ATTLIST access mode CDATA #REQUIRED type (read | write) #REQUIRED>
    <!ATTLIST preset name CDATA #REQUIRED value CDATA #REQUIRED>
    <!ATTLIST format name IDREF #REQUIRED>
    <!ATTLIST use name CDATA #REQUIRED>
    <!ATTLIST format_definition name ID #REQUIRED signed (True | False | 1 | 0) #REQUIRED>
    <!ATTLIST point native CDATA #REQUIRED transformed CDATA #REQUIRED>
    '''
        xml_reg_map = ET.parse(xml_file).getroot()
        chip = xml_reg_map.find("./chip")
        addr7 = lab_utils.str2num(chip.find("./address").attrib["address_7bit"])
        chip_name = chip.attrib["name"]
        self.set_name(chip_name)
        word_size = lab_utils.str2num(chip.attrib["word_size"])
        # extract all the xml formats into self._xml_formats
        self._xml_formats = {}
        for fmt_def in xml_reg_map.findall('.//format_definition'):
            name = fmt_def.attrib['name']
            desc = fmt_def.find('./description').text
            units = fmt_def.find('./transformed_units').text
            signed = fmt_def.attrib['signed'] in ["True","true","1"] 
            points = fmt_def.findall('./piecewise_linear_points/point')
            xlist = []
            ylist = []
            for xpoints in points:
                xlist.append(xpoints.attrib['native'])
            for ypoints in points:
                ylist.append(ypoints.attrib['transformed'])
            points = zip(xlist, ylist)
            self._xml_formats[name] = {'points': points, 'description': desc, 'signed': signed, 'units': units}
        #extract constant definitions
        for constant in xml_reg_map.findall('.//constant_definition'):
            self._constants[constant.attrib['name']] = constant.attrib['value']
        # generate actual formats using the xml formats
        self._update_xml_formatters()
        #Check which bit fields are allowed to become channels. use_case=None results in no filtering.
        if use_case is None:
            self.categories = None
        else:
            self.categories = []
            for category in xml_reg_map.findall("./use[@name='{}']/category".format(use_case)):
                self.categories.append(category.text)
        #now extract the bit fields
        for physical_register in chip.findall("./command_code"):
            command_code = lab_utils.str2num(physical_register.attrib['value'])
            is_readable = False
            is_writable = False
            for access in physical_register.findall("./access"):
                if access.attrib['mode'] in access_list or access_list == []:
                    if access.attrib['type'] == 'read':
                        is_readable = True
                    if access.attrib['type'] == 'write':
                        is_writable = True
            if not (is_writable or is_readable):
                continue
            # MAYBE INCLUDE FULL REGISTER ACCESS HERE
            for bit_field in physical_register.findall('./bit_field'):
                #tag attributes
                name = channel_prefix + bit_field.attrib['name'] + channel_suffix
                category = bit_field.attrib['category']
                size = lab_utils.str2num(bit_field.attrib['size'])
                offset = lab_utils.str2num(bit_field.attrib['offset'])
                default = bit_field.find('./default')
                if default is not None:
                    default = lab_utils.str2num(default.text)
                if self.categories is not None and category not in self.categories:
                    continue #filter out unauthorized categories for this use_case
                self.add_register(name,addr7,command_code,size,offset,word_size,is_readable)
                register = self[name]
                register.set_category(category)
                register.set_attribute("default",default)
                register.set_attribute("chip_address7",addr7)
                register.set_read_access(is_readable)
                register.set_write_access(is_writable)
                if word_size == 16:
                    register.set_attribute("protocol","read_write_word")
                elif word_size == 8:
                    register.set_attribute("protocol","read_write_byte")
                register.set_attribute("offset",offset)
                register.set_attribute("command_code",command_code)
                #add presets
                for preset in bit_field.findall('./preset'):
                    register.add_preset(preset.attrib['name'], lab_utils.str2num(preset.attrib['value']))
                #add additional user formats
                for format in bit_field.findall('./format'):
                    format_name = format.attrib['name']
                    format_definition = [definition for definition in xml_reg_map.findall(".//format_definition") if definition.attrib['name'] == format_name]
                    if format_name in format_dict and not len(format_definition):
                        register.add_format(format_name, format_dict[format_name]['format'], format_dict[format_name]['unformat']) 
                        if len(format_definition):
                            print "Warning: format dict being used instead of XML for {}".format(format_name)
                    elif len(format_definition):
                        register.add_format(format_name,self.formatters[format_name]['format'],self.formatters[format_name]['unformat'],self.formatters[format_name]['signed'],self.formatters[format_name]['units'])
                    else:
                        raise Exception('Format {} undefined in format_dict and in XML'.format(format_name))
                description = bit_field.find('./description')
                if (description is not None):
                    register.set_description(description.text)
    def create_format(self, format_name, format_function, unformat_function, signed=False, description=None, units=''):
        '''Create a new format definition or modify an existing definition.

        format_function should take a single argument of integer raw data from the register and return a version of the data scaled to appropriate units.
        unformat_function should take a single argument of data in real units and return an integer version of the data scaled to the register LSB weight.
        If the data is signed in two's-complement format, set signed=True.
        After creating format, use set_active_format method to make the new format active.
        '''
        self.formatters[format_name] = {'format': format_function, 'unformat': unformat_function, 'description': description, 'signed': signed, 'units': units}
    def set_constant(self, constant, value):
        '''Sets the constants found in the datasheet used by the formatters to convert from real world values to digital value and back.'''
        self._constants[constant] = value
        self._update_xml_formatters()
    def get_constant(self,constant):
        '''Sets the constants found in the datasheet used by the formatters to convert from real world values to digital value and back.'''
        return self._constants[constant]
    def list_constants(self):
        '''Returns the list of constants found in the datasheet used by the formatters to convert from real world values to digital value and back.'''
        return self._constants
    def _update_xml_formatters(self):
        self.create_format( format_name = 'None',
                            format_function = lambda x:x,
                            unformat_function = lambda x:x,
                            signed = False,
                            description = '''No formatting applied to data.''',
                            units = '')  
        for fmt_name in self._xml_formats:
            xlist, ylist = zip(*self._xml_formats[fmt_name]["points"])
            self.create_format( format_name = fmt_name,
                                format_function = self._transform_from_points(xlist, ylist, "format"),
                                unformat_function = self._transform_from_points(xlist, ylist, "unformat"),
                                signed = self._xml_formats[fmt_name]["signed"],
                                description = self._xml_formats[fmt_name]["description"],
                                units = self._xml_formats[fmt_name]["units"])
    def _transform_from_points(self, xlist, ylist, direction):
        '''Used internally to convert from register values to real world values and back again.'''
        x_evaled = []
        y_evaled = []
        only_constants = {}
        only_constants.update(self._constants)
        only_constants = {key:float(eval(value)) for key, value in self._constants.iteritems()} #add eval to allow expressions within XML constants
        for xpoint in xlist:
            x_evaled.append(eval(xpoint, only_constants))
        for ypoint in ylist:
            y_evaled.append(eval(ypoint, only_constants))
        if direction == "format":
            z = sorted(zip(x_evaled, y_evaled), key = lambda x: x[0])
            return lambda x: None if x is None else float(UnivariateSpline(x = zip(*z)[0], y = zip(*z)[1], k=1, s = 0)(x))
        elif direction == "unformat":
            z = sorted(zip(x_evaled, y_evaled), key = lambda x: x[1])
            return lambda x: int(round(UnivariateSpline(x = zip(*z)[1], y = zip(*z)[0], k=1, s = 0)(float(x))))
        else:
            print("'transform_from_points()' requires one of either: 'format' or 'unformat'")
            return

[docs]class twi_instrument_dummy(twi_instrument): '''use for formatters, etc without having to set up a master and physical hardware.''' def __init__(self): lab_core.instrument.__init__(self,name=None) self.formatters = {} self._constants = {}
if __name__ == "__main__": import lab_core m = lab_core.master() twi_interface = m.get_twi_dummy_interface() twi = twi_instrument(twi_interface) twi.populate_from_file("./xml_registers/EXAMPLE/LTC3350.xml", format_dict={}, access_list=['user'], use_case="demo")