Source code for PyICe.twoWireInterface

'''
SMBus Interface Hardware Drivers
================================
'''

import time, random, abc
from PyICe import visa_wrappers
try:
    import serial
except ImportError:
    pass
try:
    import smbus
    smbusMissing = False
except ImportError:
    smbusMissing = True

[docs]class twoWireInterface(object): '''this is the master i2c class, all other i2c adapters inherit from this All SMBus Protocols are implemented generically with I2C Primitives Board specific subclasses should overload as necessary for increased performance addr7 is the 7-bit chip address The 8-bit read/write addresses are computed locally ''' __metaclass__ = abc.ABCMeta
[docs] def close(self): '''close the underlying (serial) interface''' pass #I2C Generic Protocol Methods - Must be implemented in hardware/firmware specific classes
@abc.abstractmethod
[docs] def start(self): '''I2C Start - Falling SDA with SCL high. Returns True or False to indicate successful arbitration''' raise i2cUnimplementedError()
[docs] def restart(self): '''I2C Re-Start - Falling SDA with SCL high between start and stop condition. Implement only if restart requires a different action than a normal start in underlying hardware. Returns True or False to indicate successful arbitration''' return self.start()
@abc.abstractmethod
[docs] def stop(self): '''I2C Stop - Rising SDA with SCL high. Returns True or False to indicate successful arbitration''' raise i2cUnimplementedError()
@abc.abstractmethod
[docs] def write(self,data8): '''Transmit 8 bits plus 9th acknowledge clock. Returns True or False to indicate slave acknowledge''' raise i2cUnimplementedError()
@abc.abstractmethod
[docs] def read_ack(self): '''Read 8 bits from slave transmitter and assert SDA during 9th acknowledge clock. Returns 8 bit data''' raise i2cUnimplementedError()
@abc.abstractmethod
[docs] def read_nack(self): '''Read 8 bits from slave transmitter and release SDA during 9th acknowledge clock to request end of transmission. Returns 8 bit data''' raise i2cUnimplementedError()
[docs] def resync_communication(self): '''attempt to correct problems caused by dropped/duplicate characters in serial buffer, etc. Don't do anything here. Method must be overloaded to implement hardware-specific recovery procedure. ''' pass
def set_frequency(self, frequency): raise i2cUnimplementedError() ###Utilities###
[docs] def scan(self): '''Find all devices on bus by checking acknowledge of each address in turn.''' responses = [] for addr in range(0,255,2): self.start() if self.write(addr): responses.append(addr) return responses
[docs] def check_size(self, data, bits): '''make sure data fits within word of length "bits"''' assert data >= 0 assert data < 2**bits return True
[docs] def read_addr(self,addr7): '''compute 8-bit read address from 7-bit address''' self.check_size(addr7, 7) return ((addr7 << 1) + 1) #& 0xFF
[docs] def write_addr(self, addr7): '''compute 8-bit write address from 7-bit address''' self.check_size(addr7, 7) return (addr7 << 1) #& 0xFF
[docs] def low_byte(self, data16): '''Select lower byte from 16-bit data''' self.check_size(data16, 16) return data16 & 0xFF
[docs] def high_byte(self, data16): '''Select upper byte from 16-bit data''' self.check_size(data16, 16) return (data16 >> 8) & 0xFF
[docs] def word(self, lowByte, highByte): '''Return 16-bit value from two SMBus bytes''' return ((highByte & 0xFF) << 8) + (lowByte & 0xFF)
[docs] def pec(self,byteList): ''' http://smbus.org/specs/smbus20.pdf Each bus transaction requires a Packet Error Code (PEC) calculation by both the transmitter and receiver within each packet. The PEC uses an 8-bit cyclic redundancy check (CRC-8) of each read or write bus transaction to calculate a Packet Error Code (PEC). The PEC may be calculated in any way that conforms to a CRC-8 represented by the polynomial, C(x) = x8 + x2 + x1 + 1 and must be calculated in the order of the bits as received. Calculating the PEC for transmission or reception is implemented in a method chosen by the device manufacturer. It is possible to perform the check with low-cost hardware or firmware algorithm that could process the message bit-by-bit or with a byte-wise look-up table. The SMBus web page provides some example CRC-8 methods. The PEC is appended to the message as dictated by the protocols in section 5.5. The PEC calculation includes all bytes in the transmission, including address, command and data. The PEC calculation does not include ACK, NACK, START, STOP nor Repeated START bits. This means that the PEC is computed over the entire message from the first START condition. Whether a device implements packet error checking may be determined by the specification revision code that is present in the SpecificationInfo() command for a Smart Battery, Smart Battery Charger or Smart Battery Selector. See these individual specifications for exact revision coding identities. It may also be discovered in the UDID, defined in section 5.6.1, for other devices. ''' #byteList is an ordered list of every byte in the transaction including address, command code (subaddr) and data #http://en.wikipedia.org/wiki/Cyclic_redundancy_check #http://en.wikipedia.org/wiki/Computation_of_CRC #http://ghsi.de/CRC/ #http://smbus.org/faq/crc8Applet.htm #http://www.hackersdelight.org/crc.pdf crc = 0 poly = 0x07 #x^8 + x^2 + x^1 + 1, discard x^8 term for byte in byteList: #big endian -> msb goes in first crc ^= byte for cycle in range(8): crc <<= 1 if (crc & 0x100): #msb was set before left shift ( & 0x80) #xor with crc if pre-shift msb was 1 crc ^= poly return int(crc & 0xFF) #SMBus Protocols implemented with I2C Primitives #Board specific subclasses should overload as necessary for increased performance #addr7 is the 7-bit chip address The 8-bit read/write addresses are computed locally
[docs] def quick_command_rd(self,addr7): '''Here, part of the slave address denotes the command – the R/W# bit. The R/W# bit may be used to simply turn a device function on or off, or enable/disable a low-power standby mode. There is no data sent or received. The quick command implementation is good for very small devices that have limited support for the SMBus specification. It also limits data on the bus for simple devices.''' if not self.start(): raise i2cStartStopError('I2C Error: Failed to Assert Start Signal during quick_command_rd') if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError('I2C Error: Slave read address failed to acknowledge during quick_command_rd') if not self.stop(): raise i2cStartStopError('I2C Error: Failed to Assert Stop Signal during quick_command_rd')
[docs] def quick_command_wr(self,addr7): '''Here, part of the slave address denotes the command – the R/W# bit. The R/W# bit may be used to simply turn a device function on or off, or enable/disable a low-power standby mode. There is no data sent or received. The quick command implementation is good for very small devices that have limited support for the SMBus specification. It also limits data on the bus for simple devices.''' if not self.start(): raise i2cStartStopError('I2C Error: Failed to Assert Start Signal during quick_command_wr') if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError('I2C Error: Slave write address failed to acknowledge during quick_command_wr') if not self.stop(): raise i2cStartStopError('I2C Error: Failed to Assert Stop Signal during quick_command_wr')
[docs] def send_byte(self,addr7,data8): '''A simple device may recognize its own slave address and accept up to 256 possible encoded commands in the form of a byte that follows the slave address. All or parts of the Send Byte may contribute to the command. For example, the highest 7 bits of the command code might specify an access to a feature, while the least significant bit would tell the device to turn the feature on or off. Or, a device may set the “volume” of its output based on the value it received from the Send Byte protocol.''' self.check_size(data8, 8) if not self.start(): raise i2cStartStopError('I2C Error: Failed to Assert Start Signal during send_byte') if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() if not self.write(data8): raise i2cDataAcknowledgeError() if not self.stop(): raise i2cStartStopError('I2C Error: Failed to Assert Stop Signal during send_byte')
[docs] def send_byte_pec(self,addr7,data8): '''send_byte with additional PEC byte written to slave.''' byteList = [] self.check_size(data8, 8) if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() byteList.append(data8) if not self.write(data8): raise i2cDataAcknowledgeError() if not self.write(self.pec(byteList)): raise i2cDataPECAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def receive_byte(self, addr7): '''The Receive Byte is similar to a Send Byte, the only difference being the direction of data transfer. A simple device may have information that the host needs. It can do so with the Receive Byte protocol. The same device may accept both Send Byte and Receive Byte protocols. A NACK (a ‘1’ in the ACK bit position) signifies the end of a read transfer.''' if not self.start(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() data8 = self.read_nack() if not self.stop(): raise i2cStartStopError() return data8
[docs] def receive_byte_pec(self, addr7): '''receive_byte with additional PEC byte read from slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() data8 = self.read_ack() byteList.append(data8) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError("I2C Error: Failed PEC check") return data8
[docs] def alert_response(self): '''Another optional signal is an interrupt line for devices that want to trade their ability to master for a pin. SMBALERT# is a wired-AND signal just as the SMBCLK and SMBDAT signals are. SMBALERT# is used in conjunction with the SMBus General Call Address. Messages invoked with the SMBus are 2 bytes long. A slave-only device can signal the host through SMBALERT# that it wants to talk. The host processes the interrupt and simultaneously accesses all SMBALERT# devices through the Alert Response Address (ARA). Only the device(s) which pulled SMBALERT# low will acknowledge the Alert Response Address. The host performs a modified Receive Byte operation. The 7 bit device address provided by the slave transmit device is placed in the 7 most significant bits of the byte. The eighth bit can be a zero or one. Returns 7 bit address of responding device. Returns None if no response to ARA.''' if not self.start(): raise i2cStartStopError() if not self.write(self.read_addr(0xC)): self.stop() return None #no response to Alert Response Address data8 = self.read_nack() if not self.stop(): raise i2cStartStopError() return data8 >> 1
[docs] def alert_response_pec(self): '''Alert Response Query to SMBALERT# interrupt with Packet Error Check. Returns 7 bit address of responding device. Returns None if no response to ARA.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.read_addr(0xC)) if not self.write(self.read_addr(0xC)): self.stop() return None #no response to Alert Response Address data8 = self.read_ack() byteList.append(data8) pec = self.read_nack() byteList.append(pec) if not self.stop(): raise i2cStartStopError() if self.pec(byteList): raise i2cPECError("I2C Error: Failed PEC check") return data8 >> 1
[docs] def write_byte(self, addr7, commandCode, data8): '''The first byte of a Write Byte access is the command code. The next byte is the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data byte. The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.check_size(commandCode, 8) self.check_size(data8, 8) if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.write(data8): raise i2cDataAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def write_byte_pec(self, addr7, commandCode, data8): '''write_byte with additional PEC byte written to slave.''' self.check_size(commandCode, 8) self.check_size(data8, 8) byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteList.append(data8) if not self.write(data8): raise i2cDataAcknowledgeError() if not self.write(self.pec(byteList)): raise i2cDataPECAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def write_word(self, addr7, commandCode, data16): '''The first byte of a Write Word access is the command code. The next two bytes are the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data word (low byte first). The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.check_size(commandCode, 8) dataLow = self.low_byte(data16) dataHigh = self.high_byte(data16) if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.write(dataLow): raise i2cDataLowAcknowledgeError() if not self.write(dataHigh): raise i2cDataHighAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def write_word_pec(self, addr7, commandCode, data16): '''write_word with additional PEC byte written to slave.''' self.check_size(commandCode, 8) dataLow = self.low_byte(data16) dataHigh = self.high_byte(data16) byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteList.append(dataLow) if not self.write(dataLow): raise i2cDataLowAcknowledgeError() byteList.append(dataHigh) if not self.write(dataHigh): raise i2cDataHighAcknowledgeError() if not self.write(self.pec(byteList)): raise i2cDataPECAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def read_byte(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns one byte of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() data8 = self.read_nack() if not self.stop(): raise i2cStartStopError() return data8
[docs] def read_byte_pec(self,addr7,commandCode): '''read_byte with additional PEC byte read from slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() data8 = self.read_ack() byteList.append(data8) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError('PEC Failure: expected 0x{:X} but got 0x{:X}'.format(self.pec(byteList), pec)) return data8
[docs] def read_word(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns two bytes of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() dataLow = self.read_ack() dataHigh = self.read_nack() if not self.stop(): raise i2cStartStopError() return self.word(lowByte=dataLow, highByte=dataHigh)
[docs] def read_word_pec(self,addr7,commandCode): '''read_word with additional PEC byte read from slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() dataLow = self.read_ack() byteList.append(dataLow) dataHigh = self.read_ack() byteList.append(dataHigh) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError('PEC Failure: expected 0x{:X} but got 0x{:X}'.format(self.pec(byteList), pec)) return self.word(lowByte=dataLow, highByte=dataHigh)
[docs] def process_call(self, addr7, commandCode, data16): '''The process call is so named because a command sends data and waits for the slave to return a value dependent on that data. The protocol is simply a Write Word followed by a Read Word without the Read- Word command field and the Write-Word STOP bit. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.check_size(commandCode, 8) dataLow = self.low_byte(data16) dataHigh = self.high_byte(data16) if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.write(dataLow): raise i2cDataLowAcknowledgeError() if not self.write(dataHigh): raise i2cDataHighAcknowledgeError() if not self.restart(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() dataLow = self.read_ack() dataHigh = self.read_nack() if not self.stop(): raise i2cStartStopError() return self.word(lowByte=dataLow, highByte=dataHigh)
[docs] def process_call_pec(self, addr7, commandCode, data16): '''process_call with additional PEC byte read from slave.''' self.check_size(commandCode, 8) dataLow = self.low_byte(data16) dataHigh = self.high_byte(data16) byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteList.append(dataLow) if not self.write(dataLow): raise i2cDataLowAcknowledgeError() byteList.append(dataHigh) if not self.write(dataHigh): raise i2cDataHighAcknowledgeError() if not self.restart(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() dataLow = self.read_ack() byteList.append(dataLow) dataHigh = self.read_ack() byteList.append(dataHigh) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError('PEC Failure: expected 0x{:X} but got 0x{:X}'.format(self.pec(byteList), pec)) return self.word(lowByte=dataLow, highByte=dataHigh)
[docs] def block_write(self,addr7,commandCode,dataByteList): '''The Block Write begins with a slave address and a write condition. After the command code the host issues a byte count which describes how many more bytes will follow in the message. If a slave has 20 bytes to send, the byte count field will have the value 20 (14h), followed by the 20 bytes of data. The byte count does not include the PEC byte. The byte count may not be 0. A Block Read or Write is allowed to transfer a maximum of 32 data bytes.''' if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteCount = len(dataByteList) if byteCount > 32: raise i2cError("I2C Error: Block Write requires maximum 32 data bytes") if not self.write(byteCount): raise i2cDataAcknowledgeError() for byte in dataByteList: self.check_size(byte, 8) if not self.write(byte): raise i2cDataAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def block_write_pec(self,addr7,commandCode,dataByteList): '''block_write with additional PEC byte written to slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteCount = len(dataByteList) if byteCount > 32: raise i2cError("I2C Error: Block Write requires maximum 32 data bytes") byteList.append(byteCount) if not self.write(byteCount): raise i2cDataAcknowledgeError() for byte in dataByteList: self.check_size(byte, 8) byteList.append(byte) if not self.write(byte): raise i2cDataAcknowledgeError() if not self.write(self.pec(byteList)): raise i2cDataPECAcknowledgeError() if not self.stop(): raise i2cStartStopError()
[docs] def block_read(self,addr7,commandCode): '''A Block Read differs from a block write in that the repeated START condition exists to satisfy the requirement for a change in the transfer direction. A NACK immediately preceding the STOP condition signifies the end of the read transfer.''' if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() byteCount = self.read_ack() if byteCount > 32: raise i2cError("I2C Error: Block Write requires maximum 32 data bytes") dataByteList = [] for i in range(0,byteCount-1): byte = self.read_ack() dataByteList.append(byte) byte = self.read_nack() dataByteList.append(byte) if not self.stop(): raise i2cStartStopError() return dataByteList
[docs] def block_read_pec(self,addr7,commandCode): '''block_read with additional PEC byte read from slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() if not self.restart(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() byteCount = self.read_ack() byteList.append(byteCount) if byteCount > 32: raise i2cError("I2C Error: Block Write requires maximum 32 data bytes") dataByteList = [] for i in range(0,byteCount): byte = self.read_ack() byteList.append(byte) dataByteList.append(byte) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError('PEC Failure: expected 0x{:X} but got 0x{:X}'.format(self.pec(byteList), pec)) return dataByteList
[docs] def block_process_call(self,addr7,commandCode,dataByteListWrite): '''The block write-block read process call is a two-part message. The call begins with a slave address and a write condition. After the command code the host issues a write byte count (M) that describes how many more bytes will be written in the first part of the message. If a master has 6 bytes to send, the byte count field will have the value 6 (0000 0110b), followed by the 6 bytes of data. The write byte count (M) cannot be zero. The second part of the message is a block of read data beginning with a repeated start condition followed by the slave address and a Read bit. The next byte is the read byte count (N), which may differ from the write byte count (M). The read byte count (N) cannot be zero. The combined data payload must not exceed 32 bytes. The byte length restrictions of this process call are summarized as follows: • M ≥ 1 byte • N ≥ 1 byte • M + N ≤ 32 bytes The read byte count does not include the PEC byte. The PEC is computed on the total message beginning with the first slave address and using the normal PEC computational rules. It is highly recommended that a PEC byte be used with the Block Write-Block Read Process Call. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' if not self.start(): raise i2cStartStopError() if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteCountWrite = len(dataByteListWrite) if byteCountWrite > 31 or byteCountWrite < 1: #slave must return at least 1 byte and total limitation is 32 raise i2cError("I2C Error: Block Process Call requires maximum 32 data bytes") if not self.write(byteCountWrite): raise i2cDataAcknowledgeError() for byte in dataByteListWrite: self.check_size(byte, 8) if not self.write(byte): raise i2cDataAcknowledgeError() if not self.restart(): raise i2cStartStopError() if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() byteCountRead = self.read_ack() if byteCountRead < 1 or (byteCountWrite+byteCountRead) > 32: raise i2cError("I2C Error: Block Process Call requires maximum 32 data bytes") dataByteListRead = [] for i in range(0,byteCountRead-1): dataByteListRead.append(self.read_ack()) dataByteListRead.append(self.read_nack()) if not self.stop(): raise i2cStartStopError() return dataByteListRead
[docs] def block_process_call_pec(self,addr7,commandCode,dataByteListWrite): '''block write-block read process call with additional PEC byte read from slave.''' byteList = [] if not self.start(): raise i2cStartStopError() byteList.append(self.write_addr(addr7)) if not self.write(self.write_addr(addr7)): raise i2cWriteAddressAcknowledgeError() self.check_size(commandCode, 8) byteList.append(commandCode) if not self.write(commandCode): raise i2cCommandCodeAcknowledgeError() byteCountWrite = len(dataByteListWrite) if byteCountWrite > 31 or byteCountWrite < 1: #slave must return at least 1 byte and total limitation is 32 raise i2cError("I2C Error: Block Process Call requires maximum 32 data bytes") byteList.append(byteCountWrite) if not self.write(byteCountWrite): raise i2cDataAcknowledgeError() for byte in dataByteListWrite: self.check_size(byte, 8) byteList.append(byte) if not self.write(byte): raise i2cDataAcknowledgeError() if not self.restart(): raise i2cStartStopError() byteList.append(self.read_addr(addr7)) if not self.write(self.read_addr(addr7)): raise i2cReadAddressAcknowledgeError() byteCountRead = self.read_ack() byteList.append(byteCountRead) if byteCountRead < 1 or (byteCountWrite+byteCountRead) > 32: raise i2cError("I2C Error: Block Process Call requires maximum 32 data bytes") dataByteListRead = [] for i in range(0,byteCountRead): byte = self.read_ack() byteList.append(byte) dataByteListRead.append(byte) pec = self.read_nack() if not self.stop(): raise i2cStartStopError() if pec != self.pec(byteList): raise i2cPECError('PEC Failure: expected 0x{:X} but got 0x{:X}'.format(self.pec(byteList), pec)) return dataByteListRead ###List reading aggregation commands###
def _read_x_list(self, addr7, cc_list, rd_function): '''Return dictionary of read results. Reads each commandCode of cc_list in turn at chip address addr7 using rd_function protocol. ''' cc_data = {} for cc in cc_list: cc_data[cc] = rd_function(addr7, cc) return cc_data
[docs] def read_byte_list(self, addr7, cc_list): '''Return dictionary of read_byte results. Reads each commandCode of cc_list in turn at chip address addr7 Overload this method to improve communication speed when the instrument supports it. ''' return self._read_x_list(addr7, cc_list, self.read_byte)
[docs] def read_byte_list_pec(self, addr7, cc_list): '''Return dictionary of read_byte_pec results. Reads each commandCode of cc_list in turn at chip address addr7 Overload this method to improve communication speed when the instrument supports it. ''' return self._read_x_list(addr7, cc_list, self.read_byte_pec)
[docs] def read_word_list(self, addr7, cc_list): '''Return dictionary of read_word results. Reads each commandCode of cc_list in turn at chip address addr7 Overload this method to improve communication speed when the instrument supports it. ''' return self._read_x_list(addr7, cc_list, self.read_word)
[docs] def read_word_list_pec(self, addr7, cc_list): '''Return dictionary of read_word_pec results. Reads each commandCode of cc_list in turn at chip address addr7 Overload this method to improve communication speed when the instrument supports it. ''' return self._read_x_list(addr7, cc_list, self.read_word_pec)
[docs]class i2c_dummy(twoWireInterface): '''dummy interface for testing without any hardware. No actual communication occurs.''' def __init__(self,delay=0): self._delay = delay
[docs] def start(self): '''don't do anything. Return value indicates that transaction was successful.''' time.sleep(self._delay) return True
[docs] def stop(self): '''don't do anything. Return value indicates that transaction was successful.''' time.sleep(self._delay) return True
[docs] def write(self,data8): '''don't do anything. Return value indicates that transaction was successful.''' time.sleep(self._delay) return True
[docs] def read_ack(self): '''don't do anything. Return random 8-bit value''' time.sleep(self._delay) return random.randint(0, 255)
[docs] def read_nack(self): '''don't do anything. Return random 8-bit value''' time.sleep(self._delay) return random.randint(0, 255)
[docs] def resync_communication(self): '''don't do anything.''' time.sleep(self._delay)
[docs]class i2c_buspirate(twoWireInterface): '''dangerous prototypes bus pirate communication board''' def __init__(self,interface_raw_serial): self.ser = interface_raw_serial self.__init_i2c() self.commands = {} self.commands['start'] = '\x02' #responds 0x01 self.commands['stop'] = '\x03' #responds 0x01 self.commands['read'] = '\x04' #responds with byte self.commands['ack'] = '\x06' #responds 0x01 self.commands['nack'] = '\x07' #responds 0x01 self.commands['writeread'] = '\x08' self.commands['writebyte'] = '\x10' #responds 0x01 then 0x00 ACK or 0x01 NACK self.commands['writeword'] = '\x11' #responds 0x01 then (0x00 ACK or 0x01 NACK) for each byte self.commands['write3'] = '\x12' #responds 0x01 then (0x00 ACK or 0x01 NACK) for each byte self.commands['write4'] = '\x13' #responds 0x01 then (0x00 ACK or 0x01 NACK) for each byte def __del__(self): self.ser.close() def __init_i2c(self): self.ser.write('\n'*10) #exit any menus self.ser.write('#') #reset self.ser.read(self.ser.inWaiting()) #get into binary mode resp = '' tries = 0 while (resp != 'BBIO1'): tries += 1 if tries > 20: raise i2cMasterError('Buspirate failed to enter binary mode after 20 attempts') print 'Buspirate entering binary mode attempt {}: '.format(tries), self.ser.write('\x00') #enter binary mode time.sleep(0.05) resp = self.ser.read(self.ser.inWaiting()) print resp #get into i2c mode self.ser.write('\x02') #enter binary i2c mode time.sleep(0.05) resp = self.ser.read(self.ser.inWaiting()) if resp != 'I2C1': raise i2cMasterError('Buspirate failed to enter I2C mode: {}'.format(resp)) #set voltage levels self.ser.write('\x4C') #power and pullups on resp = self.ser.read(1) if resp != '\x01': raise i2cMasterError('Buspirate failed to enable supply and pullups: {}'.format(resp)) #vpullup select not yet implemented 7/16/2013. 3.3V shorted to Vpu on board temporarily #self.ser.write('\x51') #3.3v pullup #resp = self.ser.read(1) #if resp != '\x01': # raise i2cMasterError('Buspirate failed to set pullup voltage to 3.3v: {}'.format(resp)) self.ser.write('\x63') #speed 400kHz resp = self.ser.read(1) if resp != '\x01': raise i2cMasterError('Buspirate failed to set bus speedto 400kHz: {}'.format(resp))
[docs] def close(self): '''close the underlying (serial) interface''' self.ser.close()
[docs] def start(self): '''I2C Start - Falling SDA with SCL high. Returns True or False to indicate successful arbitration''' self.ser.write(self.commands['start']) resp = self.ser.read(1) if resp == '\x01': return True else: return False
[docs] def stop(self): '''I2C Start - Rising SDA with SCL high. Returns True or False to indicate successful arbitration''' self.ser.write(self.commands['stop']) resp = self.ser.read(1) if resp == '\x01': return True else: return False
[docs] def write(self,data8): '''Transmit 8 bits plus 9th acknowledge clock. Returns True or False to indicate slave acknowledge''' self.ser.write(bytearray([self.commands['writebyte'], data8])) #buspirate returns 0x01 to acknowledge command #then returns 0x00 for ACK or 0x01 for NACK resp = self.ser.read(2) if resp == '\x01\x00': return True elif resp == '\x01\x01': return False else: raise i2cMasterError('Buspirate write unxepected communication: {}'.format(resp))
[docs] def read_ack(self): '''Read 8 bits from slave transmitter and assert SDA during 9th acknowledge clock. Returns 8 bit data''' #read returns byte #ack returns 0x01 self.ser.write(bytearray([self.commands['read'], self.commands['ack']])) resp = self.ser.read(2) if len(resp) != 2 or resp[1] != '\x01': raise i2cMasterError('Buspirate unexpected response to read_nack: {}'.format(resp)) return bytearray(resp)[0]
[docs] def read_nack(self): '''Read 8 bits from slave transmitter and release SDA during 9th acknowledge clock to request end of transmission. Returns 8 bit data''' #read returns byte #nack returns 0x01 self.ser.write(bytearray([self.commands['read'], self.commands['nack']])) resp = self.ser.read(2) if len(resp) != 2 or resp[1] != '\x01': raise i2cMasterError('Buspirate unexpected response to read_nack: {}'.format(resp)) return bytearray(resp)[0]
def resync_communication(self): self.__init_i2c()
[docs] def send_byte(self,addr7,data8): '''A simple device may recognize its own slave address and accept up to 256 possible encoded commands in the form of a byte that follows the slave address. All or parts of the Send Byte may contribute to the command. For example, the highest 7 bits of the command code might specify an access to a feature, while the least significant bit would tell the device to turn the feature on or off. Or, a device may set the “volume” of its output based on the value it received from the Send Byte protocol.''' #self.ser.write(bytearray([self.commands['start'], self.write_addr(addr7), data8, self.commands['stop'])) self.ser.write(bytearray([self.commands['writeread'], '\x00', '\x02', '\x00', '\x00', self.write_addr(addr7), data8])) resp = self.ser.read(1) if len(resp) != 1 or resp[0] != '\x01': raise i2cMasterError('Buspirate unexpected response to send_byte: {}'.format(resp))
[docs] def receive_byte(self, addr7): '''The Receive Byte is similar to a Send Byte, the only difference being the direction of data transfer. A simple device may have information that the host needs. It can do so with the Receive Byte protocol. The same device may accept both Send Byte and Receive Byte protocols. A NACK (a ‘1’ in the ACK bit position) signifies the end of a read transfer.''' self.ser.write(bytearray([self.commands['writeread'], '\x00', '\x01', '\x00', '\x01', self.read_addr(addr7)])) resp = self.ser.read(2) if len(resp) != 2 or resp[0] != '\x01': raise i2cMasterError('Buspirate unexpected response to receive_byte: {}'.format(resp)) return bytearray(resp)[1]
[docs] def write_byte(self, addr7, commandCode, data8): '''The first byte of a Write Byte access is the command code. The next byte is the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data byte. The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.ser.write(bytearray([self.commands['writeread'], '\x00', '\x03', '\x00', '\x00', self.write_addr(addr7), commandCode, data8])) resp = self.ser.read(1) if len(resp) != 1 or resp[0] != '\x01': raise i2cMasterError('Buspirate unexpected response to send_byte: {}'.format(resp))
[docs] def write_word(self, addr7, commandCode, data16): '''The first byte of a Write Word access is the command code. The next two bytes are the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data word (low byte first). The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.ser.write(bytearray([self.commands['writeread'], '\x00', '\x04', '\x00', '\x00', self.write_addr(addr7), commandCode, self.low_byte(data16), self.high_byte(data16)])) resp = self.ser.read(1) if len(resp) != 1 or resp[0] != '\x01': raise i2cMasterError('Buspirate unexpected response to write_word: {}'.format(resp))
[docs] def read_byte(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns one byte of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.ser.write(bytearray([self.commands['start'], self.commands['writeword'], self.write_addr(addr7), commandCode, self.commands['start'], self.commands['writebyte'], self.read_addr(addr7), self.commands['read'], self.commands['nack'], self.commands['stop']])) resp = self.ser.read(10) if len(resp) != 10: raise i2cMasterError('Buspirate short response to read_byte: {}'.format(resp)) elif resp[0] != '\x01': #start raise i2cMasterError('Buspirate read_byte start failure: {}'.format(resp)) elif resp[1] != '\x01': #write command response raise i2cMasterError('Buspirate unexpected response to read_byte: {}'.format(resp)) elif resp[2] != '\x00': #write addr ack raise i2cWriteAddressAcknowledgeError('Buspirate read_byte write address acknowledge failure: {}'.format(resp)) elif resp[3] != '\x00': #command code ack raise i2cCommandCodeAcknowledgeError('Buspirate read_byte command code acknowledge failure: {}'.format(resp)) elif resp[4] != '\x01': #start raise i2cMasterError('Buspirate read_byte restart failure: {}'.format(resp)) elif resp[5] != '\x01': #write command response raise i2cMasterError('Buspirate unexpected response to read_byte: {}'.format(resp)) elif resp[6] != '\x00': #read addr ack raise i2cReadAddressAcknowledgeError('Buspirate read_byte read address acknowledge failure: {}'.format(resp)) elif resp[8] != '\x01': #nack command response raise i2cMasterError('Buspirate unexpected response to read_byte: {}'.format(resp)) elif resp[9] != '\x01': #stop raise i2cMasterError('Buspirate unexpected read_byte stop failure: {}'.format(resp)) return bytearray(resp)[7]
[docs] def read_word(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns two bytes of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.ser.write(bytearray([self.commands['start'], self.commands['writeword'], self.write_addr(addr7), commandCode, self.commands['start'], self.commands['writebyte'], self.read_addr(addr7), self.commands['read'], self.commands['ack'], self.commands['read'], self.commands['nack'], self.commands['stop']])) resp = self.ser.read(12) if len(resp) != 12: raise i2cMasterError('Buspirate short response to read_word: {}'.format(resp)) elif resp[0] != '\x01': #start raise i2cMasterError('Buspirate read_word start failure: {}'.format(resp)) elif resp[1] != '\x01': #write command response raise i2cMasterError('Buspirate unexpected response to read_word: {}'.format(resp)) elif resp[2] != '\x00': #write addr ack raise i2cWriteAddressAcknowledgeError('Buspirate read_word write address acknowledge failure: {}'.format(resp)) elif resp[3] != '\x00': #command code ack raise i2cCommandCodeAcknowledgeError('Buspirate read_word command code acknowledge failure: {}'.format(resp)) elif resp[4] != '\x01': #start raise i2cMasterError('Buspirate read_word restart failure: {}'.format(resp)) elif resp[5] != '\x01': #write command response raise i2cMasterError('Buspirate unexpected response to read_word: {}'.format(resp)) elif resp[6] != '\x00': #read addr ack raise i2cReadAddressAcknowledgeError('Buspirate read_word read address acknowledge failure: {}'.format(resp)) #LSByte is resp[7] elif resp[8] != '\x01': #ack command response raise i2cMasterError('Buspirate unexpected response to read_word: {}'.format(resp)) #MSByte is resp[9] elif resp[10] != '\x01': #nack command response raise i2cMasterError('Buspirate unexpected response to read_word: {}'.format(resp)) elif resp[11] != '\x01': #stop raise i2cMasterError('Buspirate read_word stop failure: {}'.format(resp)) return self.word(lowByte=bytearray(resp)[7], highByte=bytearray(resp)[9])
[docs]class i2c_pic(twoWireInterface): '''communication class to simplify talking to dave's external i2c interface firmware on George's development board (pic18F4553 and similar) requires pySerial ''' def __init__(self,interface_raw_serial): self.ser = interface_raw_serial self.__init_i2c() def __del__(self): self.ser.close() def __init_i2c(self): self.ser.read(self.ser.inWaiting()) self.ser.write("\x03") #ctl-c for i in range(0,6): self.ser.write("\r") #carriage return self.ser.write("e") #select the external interface loopcount = 0 char = self.ser.read(1) while(char != "\x02"): # read characters until start of text if loopcount > 10000: raise i2cMasterError('Failed to set PIC to external interface menu: loopcount') if char == '': raise i2cMasterError('Failed to set PIC to external interface menu: timeout') loopcount += 1 char = self.ser.read(1) self.start() self.write(int("E8",16)) self.write(int("AA",16)) self.stop()
[docs] def close(self): '''close the underlying (serial) interface''' self.ser.close()
def resync_communication(self): self.__init_i2c() #implement i2c primitives def start(self): self.ser.write("s") ret_str = self.ser.read(4).decode() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Bad Start") elif len(ret_str) >= 3: if ret_str[2] != "S": raise i2cMasterError("I2C Error: Bad Start") #return (ret_str[2:] == "S") #The Pic doesn't know if bus arbitration succeeded - failure to return start indicates USB failure return True def stop(self): self.ser.write("p") ret_str = self.ser.read(2).decode() if len(ret_str) < 1 or ret_str[0] != "P": raise i2cMasterError("I2C Error: Bad Stop") #return (ret_str[0] == "P") #The Pic doesn't know if bus arbitration succeeded - failure to return stop indicates USB failure return True def write(self,data8): data8 = int(data8) & 0xFF write_str = hex(data8)[2:].rjust(2,"0") self.ser.write(write_str) ret_str = self.ser.read(5).decode() if len(ret_str) < 4 or (ret_str[3] != "K" and ret_str[3] != "N"): raise i2cMasterError("I2C Error: Bad Write") return (ret_str[3] == "K") def read_ack(self): self.ser.write("RK") ret_str = "" num = 0 ret_str = self.ser.read(3).decode() if ret_str[2] != " " or len(ret_str) != 3: raise i2cMasterError("I2C Error: Bad communication during read_ack: {}".format(ret_str)) return int(ret_str[:2],16) def read_nack(self): self.ser.write("RN") ret_str = self.ser.read(3).decode() if ret_str[2] != " " or len(ret_str) != 3: raise i2cMasterError("I2C Error: Bad communication during read_nack: {}".format(ret_str)) return int(ret_str[:2],16) #overload for faster access
[docs] def read_word(self,addr7,commandCode): '''faster way to do an smbus read word''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") cmd = "s" + addr_w + commandCode + "s" + addr_r + "RKRNP" cmd = cmd self.ser.write(cmd) ret_str = self.ser.read(31) if len(ret_str) != 31: raise i2cMasterError("I2C Error: Short response to read_word:{}".format(ret_str)) if ret_str[7] != "K": raise i2cWriteAddressAcknowledgeError("I2C Error: Write address acknowledge fail during read_word") if ret_str[12] != "K": raise i2cCommandCodeAcknowledgeError("I2C Error: Command code acknowledge fail during read_word") if ret_str[21] != "K": raise i2cReadAddressAcknowledgeError("I2C Error: Read address acknowledge fail during read_word") lsb = ret_str[23:25] lsb = int(lsb,16) msb = ret_str[26:28] msb = int(msb,16) data16 = (msb << 8) + lsb return data16
[docs]class i2c_scpi(twoWireInterface): '''communication class to simplify talking to atmega32u4 with Steve/Eric SCPI firmware requires pySerial ''' def __init__(self,visa_interface): self.interface = visa_interface self._cc_list = [] def __del__(self): self.interface.close() def init_i2c(self): self.reset_twi() time.sleep(0.1) #modified this to remove the pyserial inWaiting, now its a plain visa interface timeout = self.interface.timeout self.interface.timeout = 0.02 try: while len(self.interface.readline()): pass except visa_wrappers.visaWrapperException: pass self.interface.timeout = timeout def resync_communication(self): print "***** Attempting RE-SYNC" time.sleep(1.0) self.init_i2c()
[docs] def close(self): '''close the underlying (serial) interface''' self.interface.close()
def bus_scan(self): self.interface.write('W?;') return self.interface.readline().rstrip().lstrip('(@').rstrip(')').split(',') def port_status(self): results = {} self.interface.write(':I2C:PORT:TWSR?;') results['TWSR'] = self.interface.readline().strip() self.interface.write(':I2C:PORT:TWCR?;') results['TWCR'] = self.interface.readline().strip() return results def reset_twi(self): self.interface.write(':I2C:PORT:RST;') def set_frequency(self, frequency): FCLK = 16e6 TWBR = int(round(((FCLK / frequency - 16) / 2))) assert TWBR <= 255 and TWBR >= 0 self.interface.write(':I2C:PORT:TWBR {};'.format(TWBR)) self.reset_twi() print "Setting I2C port to {}Hz".format(FCLK / (16 + 2 * TWBR)) ###I2C Primitives### def start(self): self.interface.write(':S?;') ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to start: {}, check and/or cycle COM port.".format(ret_str)) elif ret_str[0] != "S" : raise i2cStartStopError("I2C Error: Bad Start: {}".format(ret_str)) return (ret_str[0] == "S") def stop(self): self.interface.write(':P?;') ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to stop: {}".format(ret_str)) elif ret_str[0] != "P" : raise i2cStartStopError("I2C Error: Bad Stop: {}".format(ret_str)) return (ret_str[0] == "P") def write(self,data8): data8 = int(data8) & 0xFF write_str_b = hex(data8)[2:].rjust(2,"0") write_str = ':0X?{};'.format(write_str_b) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to write: {}".format(ret_str)) return (ret_str[0] == "K") def read_ack(self): self.interface.write(':RK?;') ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to read_ack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to read_ack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to read_ack: {}".format(ret_str)) return int(ret_str[:2],16) def read_nack(self): self.interface.write(':RN?;') ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to read_nack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to read_nack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to read_nack: {}".format(ret_str)) return int(ret_str[:2],16) ###SMBus Overloads###
[docs] def read_word(self,addr7,commandCode): '''faster way to do an smbus read word''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") write_str = 'SMB:RW?(@{},{});'.format(addr_w,commandCode) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 11: raise i2cMasterError("I2C Error: Short Response to read_word: {}".format(ret_str)) if len(ret_str) > 11: raise i2cMasterError("I2C Error: Long Response to read_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) return int(ret_str[4:8],16)
[docs] def read_word_pec(self,addr7,commandCode): '''faster way to do an smbus read word''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCodeStr = hex(commandCode)[2:].rjust(2,"0") write_str = 'SMB:RW:PEC?(@{},{});'.format(addr_w,commandCodeStr) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 14: raise i2cMasterError("I2C Error: Short Response to read_word: {}".format(ret_str)) if len(ret_str) > 14: raise i2cMasterError("I2C Error: Long Response to read_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) pec = int(ret_str[9:11],16) lsb = int(ret_str[6:8],16) msb = int(ret_str[4:6],16) if self.pec([self.write_addr(addr7),commandCode,self.read_addr(addr7),lsb,msb,pec]): raise i2cPECError("I2C Error: read_word Failed PEC check. Received:{} Expected:{}".format(pec,self.pec([self.write_addr(addr7),commandCode,self.read_addr(addr7),lsb,msb]))) return int(ret_str[4:8],16)
[docs] def read_byte(self,addr7,commandCode): '''faster way to do an smbus read byte''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") write_str = ':SMB:RB?(@{},{});'.format(addr_w,commandCode) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 9: raise i2cMasterError("I2C Error: Short Response to read_byte: {}".format(ret_str)) if len(ret_str) > 9: raise i2cMasterError("I2C Error: Long Response to read_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) return int(ret_str[4:6],16)
[docs] def read_byte_pec(self,addr7,commandCode): '''faster way to do an smbus read byte''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCodeStr = hex(commandCode)[2:].rjust(2,"0") write_str = ':SMB:RB:PEC?(@{},{});'.format(addr_w,commandCodeStr) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 12: raise i2cMasterError("I2C Error: Short Response to read_byte: {}".format(ret_str)) if len(ret_str) > 12: raise i2cMasterError("I2C Error: Long Response to read_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) data8 = int(ret_str[4:6],16) pec = int(ret_str[7:9],16) if self.pec([self.write_addr(addr7),commandCode,self.read_addr(addr7),data8,pec]): raise i2cPECError("I2C Error: read_byte Failed PEC check. Received:{} Expected:{}".format(pec,self.pec([self.write_addr(addr7),commandCode,self.read_addr(addr7),data8]))) return data8
[docs] def write_byte(self,addr7,commandCode,data8): '''faster way to do an smbus write byte''' data8 = int(data8) & 0xFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data8)[2:].rjust(2,"0") write_str = ':SMB:WB?(@{},{},{});'.format(addr_w,commandCode,data_w) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_byte: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error writing command code:{}. Resp={}".format(commandCode,ret_str)) return True
[docs] def write_byte_pec(self,addr7,commandCode,data8): '''faster way to do an smbus write byte''' data8 = int(data8) & 0xFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCodeStr = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data8)[2:].rjust(2,"0") pec = hex(self.pec([self.write_addr(addr7),commandCode,data8]))[2:].rjust(2,"0") write_str = ':SMB:WB:PEC?(@{},{},{},{});'.format(addr_w,commandCodeStr,data_w,pec) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_byte: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Error in write_byte_pec (Possible PEC failure): {} ({},{},{})".format(ret_str,addr7,commandCode,data8)) return True
[docs] def write_word(self,addr7,commandCode,data16): '''faster way to do an smbus write word''' data16 = int(data16) & 0xFFFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data16)[2:].rjust(4,"0") write_str = ':SMB:WW?(@{},{},{});'.format(addr_w,commandCode,data_w) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_word: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error writing command code:{}. Resp={}".format(commandCode,ret_str)) return True
[docs] def write_word_pec(self,addr7,commandCode,data16): '''faster way to do an smbus write word''' data16 = int(data16) & 0xFFFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCodeStr = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data16)[2:].rjust(4,"0") pec = hex(self.pec([self.write_addr(addr7),commandCode,self.low_byte(data16),self.high_byte(data16)]))[2:].rjust(2,"0") write_str = ':SMB:WW:PEC?(@{},{},{},{});'.format(addr_w,commandCodeStr,data_w,pec) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_word: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Error in write_word_pec (Possible PEC failure): {} ({},{},{})".format(ret_str,addr7,commandCode,data16)) return True
[docs] def alert_response(self): '''Another optional signal is an interrupt line for devices that want to trade their ability to master for a pin. SMBALERT# is a wired-AND signal just as the SMBCLK and SMBDAT signals are. SMBALERT# is used in conjunction with the SMBus General Call Address. Messages invoked with the SMBus are 2 bytes long. A slave-only device can signal the host through SMBALERT# that it wants to talk. The host processes the interrupt and simultaneously accesses all SMBALERT# devices through the Alert Response Address (ARA). Only the device(s) which pulled SMBALERT# low will acknowledge the Alert Response Address. The host performs a modified Receive Byte operation. The 7 bit device address provided by the slave transmit device is placed in the 7 most significant bits of the byte. The eighth bit can be a zero or one. Returns 7 bit address of responding device. Returns None if no response to ARA''' self.interface.write(':SMB:ARA?;') ret_str = self.interface.readline() if len(ret_str) < 9: raise i2cMasterError("I2C Error: Short Response to alert_response: {}".format(ret_str)) if len(ret_str) > 9: raise i2cMasterError("I2C Error: Long Response to alert_response: {}".format(ret_str)) if ret_str[2] != "1": return None #no response resp_addr = int(ret_str[4:6],16) return resp_addr >> 1
[docs] def alert_response_pec(self): '''Alert Response Query to SMBALERT# interrupt with Packet Error Check Returns 7 bit address of responding device. Returns None if no response to ARA''' self.interface.write(':SMB:ARA:PEC?;') ret_str = self.interface.readline() if len(ret_str) < 12: raise i2cMasterError("I2C Error: Short Response to alert_response_pec: {}".format(ret_str)) if len(ret_str) > 12: raise i2cMasterError("I2C Error: Long Response to alert_response_pec: {}".format(ret_str)) if ret_str[2] != "1": return None resp_addr = int(ret_str[4:6],16) pec = int(ret_str[7:9],16) if self.pec([self.read_addr(0xC),resp_addr,pec]): raise i2cPECError("I2C Error: ARA Failed PEC check") return resp_addr >> 1
def _read_list(self, cmd_str,fmt_str, pec, addr7, cc_list): pec_str = "B" if pec else "" cc_list = list(set(cc_list)) cc_list.sort() if set(self._cc_list) != set(cc_list): write_str = ":TWIBlock:SET " for cc in cc_list: write_str += "{},".format(cc) write_str = write_str.rstrip(',') self.interface.write(write_str) self._cc_list = cc_list[:] #copy the list values addr_r = hex(self.write_addr(addr7))[2:].rjust(2,"0") #write_str = '{} {}'.format(cmd_str,addr_r) write_str = '{}{}'.format(cmd_str,addr_r) try: data = self.interface.ask_for_values_binary(write_str,format_str="{}{}".format(fmt_str,pec_str), byte_order='>', terminationCharacter='') except visa_wrappers.visaWrapperException as e: raise i2cIOError("VISA error: {}".format(e)) if fmt_str == 'H': #the cfgpro emits two error code bytes at the beginning of its response; both are absorbed into data[0] by unsigned short format #unfortunately, with PEC the error is aligned with the second and third (fake PEC) byte locations. The first byte is always 0. if pec: #error in this case is in the second (msbyte of word) and third (fake PEC) bytes (data[0:1]) if data[0] != 0 or data[1] != 0: raise i2cMasterError("I2C communication error at command code:{}. Error count:{}".format(data[2] | data[3], data[0] | data[1])) else: if data[0] != 0: raise i2cMasterError("I2C communication error at command code:{}. Error count:{}".format(cdata[1], data[0])) data = data[1:] elif fmt_str == 'B': #the cfgpro emits two error code bytes at the beginning of its response; they remain in data[0:1] with unsigned char format if data[0] != 0 or data[1] != 0: raise i2cMasterError("I2C communication error at command code:{}. Error count:{}".format(data[2] << 8 | data[3], data[0] << 8 | data[1])) data = data[2:] else: raise Exception("Implementation incomplete") results = {} if pec: if fmt_str == 'H': #fake PEC at [0] for acknowledge status flag data_list = data[1::2] pec_list = data[2::2] elif fmt_str == 'B': #data already aligned correctly data_list = data[0::2] pec_list = data[1::2] else: raise Exception("Implementation incomplete") for cc,value,pec in zip(cc_list,data_list,pec_list): if fmt_str == 'H': if self.pec([self.write_addr(addr7),cc,self.read_addr(addr7),self.low_byte(value),self.high_byte(value),pec]): raise i2cPECError('PEC failure at address: {}, command code: {}. Read: {}. Expected: {}'.format(addr7, cc, pec, self.pec([self.write_addr(addr7),cc,self.read_addr(addr7),self.low_byte(value),self.high_byte(value)]))) elif fmt_str == 'B': if self.pec([self.write_addr(addr7),cc,self.read_addr(addr7),value,pec]): raise i2cPECError('PEC failure at address: {}, command code: {}. Read: {}. Expected: {}'.format(addr7, cc, pec, self.pec([self.write_addr(addr7),cc,self.read_addr(addr7),value]))) else: raise Exception("Implementation incomplete") results[cc] = value else: for cc,value in zip(cc_list,data): results[cc] = value #print 'read_bin: {}'.format(results) return results def read_word_list(self, addr7, cc_list): #return self._read_list(cmd_str=':TWIBlock:GET:Word:BIN?',fmt_str='H',pec=False,addr7=addr7, cc_list=cc_list) return self._read_list(cmd_str='\xE7',fmt_str='H',pec=False,addr7=addr7, cc_list=cc_list) #binary trigger skips SCPI parser def read_word_list_pec(self, addr7, cc_list): #return self._read_list(cmd_str=':TWIBlock:GET:Word:PEC:BIN?',fmt_str='H',pec=True,addr7=addr7, cc_list=cc_list) return self._read_list(cmd_str='\xE8',fmt_str='H',pec=True,addr7=addr7, cc_list=cc_list) #binary trigger skips SCPI parser def read_byte_list(self, addr7, cc_list): #return self._read_list(cmd_str=':TWIBlock:GET:BYTE:BIN?',fmt_str='B',pec=False,addr7=addr7, cc_list=cc_list) return self._read_list(cmd_str='\xE5',fmt_str='B',pec=False,addr7=addr7, cc_list=cc_list) #binary trigger skips SCPI parser def read_byte_list_pec(self, addr7, cc_list): #return self._read_list(cmd_str=':TWIBlock:GET:BYTE:PEC:BIN?',fmt_str='B',pec=True,addr7=addr7, cc_list=cc_list) return self._read_list(cmd_str='\xE6',fmt_str='B',pec=True,addr7=addr7, cc_list=cc_list) #binary trigger skips SCPI parser
[docs]class i2c_scpi_sp(twoWireInterface): '''communication class to simplify talking to atmega32u4 softport with Steve/Eric SCPI firmware requires pySerial ''' def __init__(self,visa_interface, portnum, sclpin, sdapin, pullup_en): '''create I2C software port out of any GPIO pins visa_interface''' self.interface = visa_interface self.cmd = ":I2CSP{}".format(portnum) self.configure_twi(sclpin = sclpin, sdapin = sdapin, pullup_en = pullup_en) def __del__(self): self.interface.close() def init_i2c(self): time.sleep(0.1) #modified this to remove the pyserial inWaiting, now its a plain visa interface timeout = self.interface.timeout self.interface.timeout = 0.02 while len(self.interface.readline()): pass self.interface.timeout = timeout def resync_communication(self): pass
[docs] def close(self): '''close the underlying (serial) interface''' self.interface.close()
def bus_scan(self): self.interface.write('{}:W?;'.format(self.cmd)) return self.interface.readline().rstrip().lstrip('(@').rstrip(')').split(',') def port_status(self): results = {} return results def configure_twi(self, sclpin, sdapin, pullup_en): self.interface.write('{}:PORT:SDApin {};'.format(self.cmd, sdapin)) self.interface.write('{}:PORT:SCLpin {};'.format(self.cmd, sclpin)) if pullup_en: self.interface.write('{}:PORT:PUEN;'.format(self.cmd)) else: self.interface.write('{}:PORT:EN;'.format(self.cmd)) ###I2C Primitives### def start(self): self.interface.write('{}:S?;'.format(self.cmd)) ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to start: {}, check and/or cycle COM port.".format(ret_str)) elif ret_str[0] != "S" : raise i2cStartStopError("I2C Error: Bad Start: {}".format(ret_str)) return (ret_str[0] == "S") def stop(self): self.interface.write('{}:P?;'.format(self.cmd)) ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to stop: {}".format(ret_str)) elif ret_str[0] != "P" : raise i2cStartStopError("I2C Error: Bad Stop: {}".format(ret_str)) return (ret_str[0] == "P") def write(self,data8): data8 = int(data8) & 0xFF write_str_b = hex(data8)[2:].rjust(2,"0") write_str = ':0X?{};'.format((write_str_b)) self.interface.write('{}{}'.format(self.cmd, write_str)) ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to write: {}".format(ret_str)) return (ret_str[0] == "K") def read_ack(self): self.interface.write('{}:RK?;'.format(self.cmd)) ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to read_ack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to read_ack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to read_ack: {}".format(ret_str)) return int(ret_str[:2],16) def read_nack(self): self.interface.write('{}:RN?;'.format(self.cmd)) ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to read_nack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to read_nack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to read_nack: {}".format(ret_str)) return int(ret_str[:2],16) ###SMBus Overloads###
[docs] def read_word(self,addr7,commandCode): '''faster way to do an smbus read word''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") write_str = ':SMB:RW?(@{},{});'.format(addr_w,commandCode) self.interface.write('{}{}'.format(self.cmd, write_str)) #time.sleep(0.001) ret_str = self.interface.readline() if len(ret_str) < 11: raise i2cMasterError("I2C Error: Short Response to read_word: {}".format(ret_str)) if len(ret_str) > 11: raise i2cMasterError("I2C Error: Long Response to read_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) word = ret_str[4:8] data16 = int(word,16) return data16
[docs] def read_byte(self,addr7,commandCode): '''faster way to do an smbus read byte''' addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") addr_r = hex(self.read_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") write_str = ':SMB:RB?(@{},{});'.format(addr_w,commandCode) self.interface.write('{}{}'.format(self.cmd, write_str)) ret_str = self.interface.readline() if len(ret_str) < 9: raise i2cMasterError("I2C Error: Short Response to read_byte: {}".format(ret_str)) if len(ret_str) > 9: raise i2cMasterError("I2C Error: Long Response to read_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error reading command code:{}. Resp={}".format(commandCode,ret_str)) word = ret_str[4:6] data8 = int(word,16) return data8
[docs] def write_byte(self,addr7,commandCode,data8): '''faster way to do an smbus write byte''' data8 = int(data8) & 0xFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data8)[2:].rjust(2,"0") write_str = ':SMB:WB?(@{},{},{});'.format(addr_w,commandCode,data_w) self.interface.write('{}{}'.format(self.cmd, write_str)) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_byte: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_byte: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error writing command code:{}. Resp={}".format(commandCode,ret_str)) return True
[docs] def write_word(self,addr7,commandCode,data16): '''faster way to do an smbus write word''' data16 = int(data16) & 0xFFFF addr_w = hex(self.write_addr(addr7))[2:].rjust(2,"0") commandCode = hex(commandCode)[2:].rjust(2,"0") data_w = hex(data16)[2:].rjust(4,"0") write_str = ':SMB:WW?(@{},{},{});'.format(addr_w,commandCode,data_w) self.interface.write('{}{}'.format(self.cmd, write_str)) ret_str = self.interface.readline() if len(ret_str) < 6: raise i2cMasterError("I2C Error: Short Response to write_word: {}".format(ret_str)) if len(ret_str) > 6: raise i2cMasterError("I2C Error: Long Response to write_word: {}".format(ret_str)) if ret_str[2] != "1": raise i2cAcknowledgeError("I2C Acknowledge Error writing command code:{}. Resp={}".format(commandCode,ret_str)) return True
def write_softport_speed(self,sclk_freq): def freq_to_counts(sclk_freq): return int(round((1./sclk_freq - 24.374e-6) / 2.625e-6)) def counts_to_freq(counts): return 1./(counts * 2.625e-6 + 24.374e-6) counts = freq_to_counts(sclk_freq) if counts < 0 or counts > 255: raise ValueError("Softport delay must be in [0..255], frequency range is limited to {} to {}".format(counts_to_freq(255),counts_to_freq(0))) self.interface.write('{}:PORT:DELAY {}'.format(self.cmd, counts)) if self.interface.ask('ERR?')[1] != "0": raise SyntaxError("Softport delay must be in [0..255], frequency range is limited to {} to {}".format(counts_to_freq(255),counts_to_freq(0))) # needs to be moved to lab, not twi
class i2c_scpi_testhook(i2c_scpi): def __init__(self,serial_port): i2c_scpi.__init__(self, serial_port) self.channels = {} self.name = "i2c_scpi_testhook at {}".format(serial_port) #what is the str repr of a serial port object? ###lab.py instrument driver methods### def add_channel(self,channel_name,pin_name): '''Adds a channel of name channel_name, associated with physical pin pin_name. When written to, the pin state is changed. Valid pin_names are: HOOK1, HOOK2, HOOK3, HOOK4, PAD_TP_1, PAD_TP_2, PAD_TP_3, PAD_TP_4, PAD_TP_5, PAD_TP_6, PAD_TP_7, PAD_TP_8, PAD_TP_9, GPIO''' pin_name = self.check_name(pin_name) self.channels[channel_name] = pin_name def read_channel(self,channel_name): '''When read, this channel returns the state of the pin (not necessarily what it was set to). Possible responses are: 0,1,Z,P Where Z is high-z and P is weak pullup''' return self.read_pin(self.channels[channel_name]) #perhaps this should return what the pin was set to, not what was read-back (wired-nor)? def write_channel(self,channel_name,value): '''When channel is written to, the pin state is updated. Valid values are: 0,1,Z,P Where Z is high-z and P is weak pullup''' self.set_pin(self.channels[channel_name], value) ###purple-board specific hardware driver methods### def set_dvcc(self, voltage): self.aux_start() self.aux_write(0xE8) data = int(max(min(voltage, 5), 0.8)/5.0*63.0) & 0x3F self.aux_write(data) self.aux_stop() def set_pin(self, pin_name, value): '''Pins are: HOOK1, HOOK2, HOOK3, HOOK4, PAD_TP_1, PAD_TP_2, PAD_TP_3, PAD_TP_4, PAD_TP_5, PAD_TP_6, PAD_TP_7, PAD_TP_8, PAD_TP_9, GPIO Value is 0,1,Z,P Where Z is high-z and P is weak pullup ''' if value == True or value == 1 or value == '1': value = 1 elif value == False or value == 0 or value == '0': value = 0 elif value == 'Z' or value == 'z' or value == 'P' or value == 'p': pass else: raise Exception('Bad value argument passed for pin: {}'.format(value)) pin_name = self.check_name(pin_name) write_str = ':SETPin(@{},{});:SETPin?(@{});'.format(pin_name,value,pin_name) #set the pin and read back its state to make sure there were no usb communication problems self.interface.write(write_str) ret_str = self.interface.read(3) if len(ret_str) < 3: raise i2cMasterError("I2C Error: Short Response to set_pin: {}".format(ret_str)) if len(ret_str) > 3: raise i2cMasterError("I2C Error: Long Response to set_pin: {}".format(ret_str)) if ret_str[1:] != '\r\n': raise i2cMasterError("I2C Error: Bad Response to set_pin: {}".format(ret_str)) if ret_str[0] != str(value).upper(): raise i2cMasterError("I2C Error: Failed to set_pin: {}".format(ret_str)) def read_pin(self, pin_name): pin_name = self.check_name(pin_name) write_str = ':SETPin?(@{});'.format(pin_name) self.interface.write(write_str) ret_str = self.interface.read(3) if len(ret_str) < 3: raise i2cMasterError("I2C Error: Short Response to read_pin: {}".format(ret_str)) if len(ret_str) > 3: raise i2cMasterError("I2C Error: Long Response to read_pin: {}".format(ret_str)) if ret_str[1:] != '\r\n': raise i2cMasterError("I2C Error: Bad Response to read_pin: {}".format(ret_str)) return ret_str[0] def check_name(self, name): pin_names = ['HOOK1', 'HOOK2', 'HOOK3', 'HOOK4', 'PAD_TP_1', 'PAD_TP_2', 'PAD_TP_3', 'PAD_TP_4', 'PAD_TP_5', 'PAD_TP_6', 'PAD_TP_7', 'PAD_TP_8', 'PAD_TP_9', 'GPIO'] uname = name.upper() if uname in pin_names: return uname else: raise Exception('Invalid pin name {}'.format(name)) ###Secondary I2C Port Primitives### def aux_start(self): self.interface.write(':I2CAux:S?;') ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to aux port start: {}".format(ret_str)) elif ret_str[0] != "S" : raise i2cStartStopError("I2C Error: Bad aux port Start: {}".format(ret_str)) return (ret_str[0] == "S") def aux_stop(self): self.interface.write(':I2CAux:P?;') ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to aux port stop: {}".format(ret_str)) elif ret_str[0] != "P" : raise i2cStartStopError("I2C Error: Bad aux port Stop: {}".format(ret_str)) return (ret_str[0] == "P") def aux_write(self,data8): data8 = int(data8) & 0xFF write_str_b = hex(data8)[2:].rjust(2,"0") write_str = ':I2CAux:0X?{};'.format(write_str_b) self.interface.write(write_str) ret_str = self.interface.readline() if len(ret_str) < 2: raise i2cMasterError("I2C Error: Short Response to aux port write: {}".format(ret_str)) return (ret_str[0] == "K") def aux_read_ack(self): self.interface.write(':I2CAux:RK?;') ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to aux port read_ack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to aux port read_ack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to aux port read_ack: {}".format(ret_str)) return int(ret_str[:2],16) def aux_read_nack(self): self.interface.write(':I2CAux:RN?;') ret_str = self.interface.readline() if len(ret_str) < 4: raise i2cMasterError("I2C Error: Short Response to aux port read_nack: {}".format(ret_str)) if len(ret_str) > 4: raise i2cMasterError("I2C Error: Long Response to aux port read_nack: {}".format(ret_str)) if ret_str[3] != "\n": raise i2cMasterError("I2C Error: Bad Response to aux port read_nack: {}".format(ret_str)) return int(ret_str[:2],16)
[docs]class i2c_dc590(twoWireInterface): '''Generic DC590/Linduino Driver. Note that DC590 will not communicate unless it detects pullups on auxillary i2c port. Linduino does not have above limitiation.''' def __init__(self, interface_stream): self.iface = interface_stream self.init_i2c() def __del__(self): self.close() def init_i2c(self): time.sleep(2.5) #Linduino bootloader delay! self.iface.write('\n'*10) time.sleep(2.5) #Linduino bootloader delay! print 'DC590 init response: {}'.format(self.iface.read(None)[0]) #discard any responses self.iface.write('O') #Enable isolated power try: self.i2c_mode() except i2cMasterError as e: print e
[docs] def i2c_mode(self): '''Switch DC590 I2C/SPI mux to SPI''' self.iface.write('MI') #Switch to isolated I2C Mode time.sleep(0.1) buffer = self.iface.read(None)[0] if len(buffer): raise i2cMasterError('Error switching DC590 to I2C Mode. Unexpected data in buffer:{}'.format(buffer))
def _hex_str(self, integer): '''return integer formatted correctly for transmission over DC590 serial link''' return hex(integer)[2:].rjust(2,"0").upper() def start(self): self.iface.write('s') #no response expected return True def stop(self): self.iface.write('p') #no response expected return True def write(self,data8): data8 = int(data8) & 0xFF write_str = 'S' + hex(data8)[2:].rjust(2,"0").upper() self.iface.write(write_str) time.sleep(0.02) #how long do we have to wait to see if the dc590 is going to "N"ack ret_str = self.iface.read(None)[0] if len(ret_str) == 1 and ret_str[0] == 'N': return False #failed acknowledge elif len(ret_str) == 1 and ret_str[0] == 'X': raise i2cMasterError('DC590 EEPROM detection failed, communications disabled: {}'.format(ret_str)) elif len(ret_str) == 0: return True else: raise i2cMasterError('Bad response to DC590 write command: {}'.format(ret_str)) def read_ack(self): self.iface.write("Q") resp = self.iface.read(2) ret_str = resp[0].decode() if len(ret_str) != 2: raise i2cMasterError('Short response to DC590 read_ack command, EEPROM Present?: {}'.format(ret_str)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cMasterError('Long response to DC590 read_ack command: {} then {}'.format(ret_str, ret_extra)) return int(ret_str,16) def read_nack(self): self.iface.write("R") resp = self.iface.read(2) ret_str = resp[0].decode() if len(ret_str) != 2: raise i2cMasterError('Short response to DC590 read_nack command, EEPROM Present?: {}'.format(ret_str)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cMasterError('Long response to DC590 read_nack command: {} then {}'.format(ret_str, ret_extra)) return int(ret_str,16) ###SMBus Overloads def read_byte(self,addr7,commandCode): self.check_size(addr7,7) self.check_size(commandCode,8) byteList = [self.write_addr(addr7), commandCode, self.read_addr(addr7)] write_str = 'sS{}S{}sS{}Rp'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(2) ret_str = resp[0] if len(ret_str) != 2: raise i2cMasterError('Short response to DC590 read_byte command, EEPROM Present?: {}'.format(ret_str)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cMasterError('Long response to DC590 read_byte command: {} then {}'.format(ret_str, ret_extra)) return int(ret_str,16)
[docs] def read_byte_pec(self, addr7, commandCode): '''SMBus Read Byte Protocol with Packet Error Checking. Slave device address specified in 7-bit format. Returns 8-bit data from slave.''' byteList = [self.write_addr(addr7), commandCode, self.read_addr(addr7)] write_str = 'sS{}S{}sS{}QRp'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(4) ret_str = resp[0] if (len(ret_str) != 4): raise i2cMasterError('Short response to DC590 read_byte_pec command, EEPROM Present?: %s' % ret_str) byteList.append(int(ret_str[0:2],16)) byteList.append(int(ret_str[2:4],16)) if self.pec(byteList): raise i2cPECError('DC590 read_byte_pec command failed PEC') if ('N' in ret_str): time.sleep(.1) ret_extra = self.iface.read(None) raise i2cAcknowledgeError('DC590 read_byte_pec failed acknowledge: {} then {}'.format(ret_str, ret_extra)) if ('X' in ret_str): time.sleep(.1) ret_extra = self.iface.read(None) raise i2cMasterError('DC590 EEPROM detection failed, communications disabled: {} then {}'.format(ret_str, ret_extra)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None) raise i2cMasterError('Long response to DC590 read_byte_pec command: {} then {}'.format(ret_str, ret_extra)) return byteList[-2]
def read_word(self,addr7,commandCode): self.check_size(addr7,7) self.check_size(commandCode,8) byteList = [self.write_addr(addr7), commandCode, self.read_addr(addr7)] write_str = 'sS{}S{}sS{}QRp'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(4) ret_str = resp[0] if len(ret_str) != 4: raise i2cMasterError('Short response to DC590 read_word command, EEPROM Present?: {}'.format(ret_str)) if 'N' in ret_str: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cError('DC590 read_word failed acknowledge: {} then {}'.format(ret_str, ret_extra)) if 'X' in ret_str: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cMasterError('DC590 EEPROM detection failed, communications disabled: {} then {}'.format(ret_str, ret_extra)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None)[0] raise i2cMasterError('Long response to DC590 read_word command: {} then {}'.format(ret_str, ret_extra)) return self.word(lowByte=int(ret_str[0:2],16), highByte=int(ret_str[2:4],16))
[docs] def read_word_pec(self, addr7, commandCode): '''SMBus Read Word Protocol with Packet Error Checking. Slave device address specified in 7-bit format. Returns 16-bit data from slave.''' byteList = [self.write_addr(addr7), commandCode, self.read_addr(addr7)] write_str = 'sS{}S{}sS{}QQRp'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(6) ret_str = resp[0] if (len(ret_str) != 6): raise i2cMasterError('Short response to DC590 read_word_pec command, EEPROM Present?: %s' % ret_str) byteList.append(int(ret_str[0:2],16)) byteList.append(int(ret_str[2:4],16)) if int(ret_str[4:6],16) != self.pec(byteList): raise i2cPECError('DC590 read_word_pec command failed PEC') if ('N' in ret_str): time.sleep(.1) ret_extra = self.iface.read(None) raise i2cAcknowledgeError('DC590 read_word_pec failed acknowledge: {} then {}'.format(ret_str, ret_extra)) if ('X' in ret_str): time.sleep(.1) ret_extra = self.iface.read(None) raise i2cMasterError('DC590 EEPROM detection failed, communications disabled: {} then {}'.format(ret_str, ret_extra)) if resp[1]: time.sleep(.1) ret_extra = self.iface.read(None) raise i2cMasterError('Long response to DC590 read_word_pec command: {} then {}'.format(ret_str, ret_extra)) return self.word(lowByte=int(ret_str[0:2],16), highByte=int(ret_str[2:4],16))
def write_byte(self,addr7,commandCode,data8): self.check_size(addr7,7) self.check_size(commandCode,8) self.check_size(data8,8) byteList = [self.write_addr(addr7), commandCode, data8] write_str = 'sS{}S{}S{}p'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(None) ret_str = resp[0] if len(ret_str) != 0: raise i2cError('Response: {} from DC590 write_byte command'.format(ret_str)) def write_byte_pec(self,addr7,commandCode,data8): self.check_size(addr7,7) self.check_size(commandCode,8) self.check_size(data8,8) byteList = [self.write_addr(addr7), commandCode, data8] byteList.append(self.pec(byteList)) write_str = 'sS{}S{}S{}S{}p'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(None) ret_str = resp[0] if len(ret_str) != 0: raise i2cError('Bad response: {} from DC590 write_byte_pec command'.format(ret_str)) def write_word(self,addr7,commandCode,data16): self.check_size(addr7,7) self.check_size(commandCode,8) self.check_size(data16,16) byteList = [self.write_addr(addr7), commandCode, self.low_byte(data16), self.high_byte(data16)] write_str = 'sS{}S{}S{}S{}p'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(None) ret_str = resp[0] if len(ret_str) != 0: raise i2cError('Response: {} from DC590 write_word command'.format(ret_str)) def write_word_pec(self,addr7,commandCode,data16): self.check_size(addr7,7) self.check_size(commandCode,8) self.check_size(data16,16) byteList = [self.write_addr(addr7), commandCode, self.low_byte(data16), self.high_byte(data16)] byteList.append(self.pec(byteList)) write_str = 'sS{}S{}S{}S{}S{}p'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(None) ret_str = resp[0] if len(ret_str) != 0: raise i2cError('Bad response: {} from DC590 write_word_pec command'.format(ret_str)) def send_byte(self,addr7,data8): self.check_size(addr7,7) self.check_size(data8,8) byteList = [self.write_addr(addr7), data8] write_str = 'sS{}S{}p'.format(*map(self._hex_str, byteList)) self.iface.write(write_str) resp = self.iface.read(None) ret_str = resp[0] if len(ret_str) != 0: raise i2cError('Response: {} from DC590 send_byte command'.format(ret_str)) def resync_communication(self): self.iface.read(None)
[docs]class i2c_dc590_list_read(i2c_dc590): '''use with Steve M's linduino sketch extension DC590ListRead implementing ganged register reads todo: add PEC support to .ino and Python.''' def __init__(self, interface_stream): i2c_dc590.__init__(self,interface_stream) self.addr_7 = None self.dc590_cc_list = None def _set_chip_address(self, addr7): if self.addr_7 != addr7: self.addr_7 = addr7 self.iface.write("a{address:02X}".format(address=addr7<<1)) def _set_command_codes(self, cc_list): if self.dc590_cc_list != cc_list: self.dc590_cc_list = cc_list write_str = "A" for cc in range(256): if cc in self.dc590_cc_list: write_str += '1' else: write_str += '0' write_str += ';' self.iface.write(write_str) def _unpack_word_list(self, response, PEC): results = {} for index,cc in enumerate(self.dc590_cc_list): if PEC: highByte = int(response[index*6:index*6+2],16) lowByte = int(response[index*6+2:index*6+4],16) pecByte = int(response[index*6+4:index*6+6],16) if self.pec([self.write_addr(self.addr_7),cc,self.read_addr(self.addr_7),lowByte,highByte,pecByte]): raise i2cPECError('Linduino read_word_list_pec command failed PEC') results[cc] = self.word(lowByte=lowByte, highByte=highByte) else: results[cc] = self.word(lowByte=int(response[index*4+2:index*4+4],16), highByte=int(response[index*4:index*4+2],16)) return results def set_frequency(self, frequency): frequency = float(frequency) if frequency == 1e5: self.iface.write('f') elif frequency == 4e5: self.iface.write('F') else: raise Exception('Set I2C frequency to either 100000Hz or 400000Hz')
[docs] def enable_streaming_word_list(self): '''Turn on streaming I2C data Uses the chip address and commandCode list set up with previous read_{word,byte}_list() command. Not really compatible with PyICe except in I2C-only stand alone data logger mode. ''' self.iface.write('C')
[docs] def disable_streaming(self): '''Turn off streaming I2C data and flush buffers.''' self.iface.write(';') time.sleep(1) #need to wait for buffers? print "Extra stuff:" print self.iface.read(None)
[docs] def read_streaming_word_list(self): '''Return dictionary of single read results. Expects streaming to have been previously enabled using enable_streaming_word_list() This method must be called often enough in the main loop to keep buffers from overflowing. The main loop should spend time here on blocking IO as data fills the serial buffer if the computer is fast enough to keep up. Not really compatible with PyICe except in I2C-only stand alone data logger mode because of this blocking IO speed requirement. ''' response_length = len(self.dc590_cc_list) * 4 + 2 # (cc_lsit) * 4 nibbles + 2 ";\n" return_string = self.iface.read(response_length)[0] if len(return_string) < response_length: if return_string[-3:] == "N;\n": raise i2cAcknowledgeError("I2C list read acknowledge Error: {}".format(return_string)) raise i2cMasterError("Short response to DC590 read_word_list command: {}".format(return_string)) return self._unpack_word_list(return_string, PEC=False)
[docs] def read_streaming_word_list_pec(self): '''Incomplete stream read handler.''' raise Exception('Unimplemented.')
[docs] def read_streaming_byte_list(self): '''Incomplete stream read handler.''' raise Exception('Unimplemented.')
[docs] def read_streaming_byte_list_pec(self): '''Incomplete stream read handler.''' raise Exception('Unimplemented.')
[docs] def read_word_list(self, addr7, cc_list, PEC=False): '''Return dictionary of read_word results. Reads each commandCode of cc_list in turn at chip address addr7 This method overloads twoWireInterface.read_word_list() to improve communication speed when the instrument supports it. ''' self._set_chip_address(addr7) cc_list = sorted(list(set(cc_list))) self._set_command_codes(cc_list) if PEC: self.iface.write("Ec") response_length = len(self.dc590_cc_list) * 6 + 2 # (cc_lsit) * 4 nibbles + 2 ";\n" else: self.iface.write("ec") response_length = len(self.dc590_cc_list) * 4 + 2 # (cc_lsit) * 4 nibbles + 2 ";\n" resp = self.iface.read(response_length) return_string = resp[0] if len(return_string) < response_length: if return_string[-3:] == "N;\n": raise i2cAcknowledgeError("I2C list read acknowledge Error: {}".format(return_string)) raise i2cMasterError("Short response to DC590 read_word_list command: {}".format(return_string)) if resp[1]: raise i2cMasterError("Long response to DC590 read_word_list command: {} then {}".format(return_string, self.iface.read(None))) return self._unpack_word_list(return_string, PEC)
[docs] def read_word_list_pec(self, addr7, cc_list): '''Return dictionary of read_word results. Reads each commandCode of cc_list in turn at chip address addr7 This method overloads twoWireInterface.read_word_list_pec() to improve communication speed when the instrument supports it. ''' return self.read_word_list(addr7, cc_list, PEC=True)
[docs] def read_byte_list(self, addr7, cc_list, PEC=False): '''Return dictionary of read_byte results. Reads each commandCode of cc_list in turn at chip address addr7 This method overloads twoWireInterface.read_byte_list() to improve communication speed when the instrument supports it. ''' self._set_chip_address(addr7) cc_list = sorted(list(set(cc_list))) self._set_command_codes(cc_list) if PEC: self.iface.write("Eb") response_length = len(self.dc590_cc_list) * 4 + 2 # (cc_lsit) * 2 nibbles + 2 ";\n" else: self.iface.write("eb") response_length = len(self.dc590_cc_list) * 2 + 2 # (cc_lsit) * 2 nibbles + 2 ";\n" resp = self.iface.read(response_length) return_string = resp[0] if len(return_string) < response_length: if return_string[-3:] == "N;\n": raise i2cAcknowledgeError("I2C list read acknowledge Error: {}".format(return_string)) raise i2cMasterError("Short response to DC590 read_byte_list command: {}".format(return_string)) results = {} for index,cc in enumerate(self.dc590_cc_list): if PEC: data = int(return_string[index*4:index*4+2],16) pec = int(return_string[index*4+2:index*4+4],16) if self.pec([self.write_addr(self.addr_7),cc,self.read_addr(self.addr_7),data,pec]): raise i2cPECError('Linduino read_byte_list_pec command failed PEC') results[cc] = data else: results[cc] = int(return_string[index*2:index*2+2],16) if resp[1]: raise i2cMasterError("Long response to DC590 read_byte_list command: {} then {}".format(return_string, self.iface.read(None))) return results
[docs] def read_byte_list_pec(self, addr7, cc_list): '''Return dictionary of read_byte results. Reads each commandCode of cc_list in turn at chip address addr7 This method overloads twoWireInterface.read_byte_list_pec() to improve communication speed when the instrument supports it. ''' return self.read_byte_list(addr7, cc_list, PEC=True)
[docs]class i2c_kernel(twoWireInterface): '''communication class using Linux kernel driver requires py-smbus library provided as part of lm-sensors project i2c-tools library you may need to go to PyICe/deps/i2c-tools and build i2c-tools and py-smbus on your machine before this will work (Make/gcc) alternatively, install python-smbus and i2c-tools from your distribution's package manager (apt-get,yum,opkg,port,etc). note that the library names for the various protocols do not match the SMBus protocol specification. ''' #http://wiki.erazor-zone.de/wiki:linux:python:smbus:doc def __init__(self,bus_number): '''check /sys/bus/i2c/devices/i2c* to see which memory location corresponds to which filesystem mapping''' self.bus_number = bus_number if smbusMissing: raise Exception("python-smbus (and maybe i2c-tools) is missing on this computer. Install with your distribution's package manager or build from source. Linux only.") self.bus = smbus.SMBus(self.bus_number) try: self.bus.pec = False except Exception as e: raise i2cIOError(e) #no i2c primitives are available using this library! #protocol-specific access
[docs] def quick_command_wr(self,addr7): '''Here, part of the slave address denotes the command – the R/W# bit. The R/W# bit may be used to simply turn a device function on or off, or enable/disable a low-power standby mode. There is no data sent or received. The quick command implementation is good for very small devices that have limited support for the SMBus specification. It also limits data on the bus for simple devices.''' self.check_size(addr7, 7) self.bus.pec = False try: return self.bus.write_quick(addr7) except Exception as e: raise i2cIOError(e)
[docs] def send_byte(self,addr7,data8): '''A simple device may recognize its own slave address and accept up to 256 possible encoded commands in the form of a byte that follows the slave address. All or parts of the Send Byte may contribute to the command. For example, the highest 7 bits of the command code might specify an access to a feature, while the least significant bit would tell the device to turn the feature on or off. Or, a device may set the “volume” of its output based on the value it received from the Send Byte protocol.''' self.check_size(addr7, 7) self.check_size(data8, 8) self.bus.pec = False try: self.bus.write_byte(addr7,data8) except Exception as e: raise i2cIOError(e)
[docs] def send_byte_pec(self, addr7, data8): '''send_byte with additional PEC byte written to slave.''' self.check_size(addr7, 7) self.check_size(data8, 8) self.bus.pec = True try: self.bus.write_byte(addr7,data8) except Exception as e: raise i2cIOError(e)
[docs] def receive_byte(self, addr7): '''The Receive Byte is similar to a Send Byte, the only difference being the direction of data transfer. A simple device may have information that the host needs. It can do so with the Receive Byte protocol. The same device may accept both Send Byte and Receive Byte protocols. A NACK (a ‘1’ in the ACK bit position) signifies the end of a read transfer.''' self.check_size(addr7, 7) self.bus.pec = False try: return self.bus.read_byte(addr7) except Exception as e: raise i2cIOError(e)
[docs] def receive_byte_pec(self, addr7): '''receive_byte with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.bus.pec = True try: return self.bus.read_byte(addr7) except Exception as e: raise i2cIOError(e)
[docs] def alert_response(self): '''Another optional signal is an interrupt line for devices that want to trade their ability to master for a pin. SMBALERT# is a wired-AND signal just as the SMBCLK and SMBDAT signals are. SMBALERT# is used in conjunction with the SMBus General Call Address. Messages invoked with the SMBus are 2 bytes long. A slave-only device can signal the host through SMBALERT# that it wants to talk. The host processes the interrupt and simultaneously accesses all SMBALERT# devices through the Alert Response Address (ARA). Only the device(s) which pulled SMBALERT# low will acknowledge the Alert Response Address. The host performs a modified Receive Byte operation. The 7 bit device address provided by the slave transmit device is placed in the 7 most significant bits of the byte. The eighth bit can be a zero or one. Returns 7 bit address of responding device. Returns None if no response to ARA''' self.bus.pec = False try: return self.receive_byte(0xC) >> 1 except Exception as e: #probably not right: expect nack if no SMBALERT... raise i2cIOError(e)
[docs] def alert_response_pec(self): '''Alert Response Query to SMBALERT# interrupt with Packet Error Check Returns 7 bit address of responding device. Returns None if no response to ARA''' self.bus.pec = True try: return self.receive_byte(0xC) >> 1 except Exception as e: #probably not right: expect nack if no SMBALERT... raise i2cIOError(e)
[docs] def write_byte(self, addr7, commandCode, data8): '''The first byte of a Write Byte access is the command code. The next byte is the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data byte. The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data8, 8) self.bus.pec = False try: self.bus.write_byte_data(addr7, commandCode, data8) except Exception as e: raise i2cIOError(e)
[docs] def write_byte_pec(self, addr7, commandCode, data8): '''write_byte with additional PEC byte written to slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data8, 8) self.bus.pec = True try: self.bus.write_byte_data(addr7, commandCode, data8) except Exception as e: raise i2cIOError(e)
[docs] def write_word(self, addr7, commandCode, data16): '''The first byte of a Write Word access is the command code. The next two bytes are the data to be written. In this example the master asserts the slave device address followed by the write bit. The device acknowledges and the master delivers the command code. The slave again acknowledges before the master sends the data word (low byte first). The slave acknowledges each byte, and the entire transaction is finished with a STOP condition.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data16, 16) self.bus.pec = False try: self.bus.write_word_data(addr7, commandCode, data16) except Exception as e: raise i2cIOError(e)
[docs] def write_word_pec(self, addr7, commandCode, data16): '''write_word with additional PEC byte written to slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data16, 16) self.bus.pec = True try: self.bus.write_word_data(addr7, commandCode, data16) except Exception as e: raise i2cIOError(e)
[docs] def read_byte(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns one byte of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = False try: return self.bus.read_byte_data(addr7, commandCode) except Exception as e: raise i2cIOError(e)
[docs] def read_byte_pec(self,addr7,commandCode): '''read_byte with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = True try: return self.bus.read_byte_data(addr7, commandCode) except Exception as e: raise i2cIOError(e)
[docs] def read_word(self,addr7,commandCode): '''Reading data is slightly more complicated than writing data. First the host must write a command to the slave device. Then it must follow that command with a repeated START condition to denote a read from that device’s address. The slave then returns two bytes of data. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = False try: return self.bus.read_word_data(addr7,commandCode) except Exception as e: raise i2cIOError(e)
[docs] def read_word_pec(self, addr7, commandCode): '''read_word with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = True try: return self.bus.read_word_data(addr7,commandCode) except Exception as e: raise i2cIOError(e)
[docs] def process_call(self, addr7, commandCode, data16): '''The process call is so named because a command sends data and waits for the slave to return a value dependent on that data. The protocol is simply a Write Word followed by a Read Word without the Read- Word command field and the Write-Word STOP bit. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data16, 16) self.bus.pec = False try: return self.bus.process_call(addr7, commandCode, data16) except Exception as e: raise i2cIOError(e)
[docs] def process_call_pec(self, addr7, commandCode, data16): '''process_call with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.check_size(data16, 16) self.bus.pec = True try: return self.bus.process_call(addr7, commandCode, data16) except Exception as e: raise i2cIOError(e)
[docs] def block_write(self,addr7,commandCode,dataByteList): '''The Block Write begins with a slave address and a write condition. After the command code the host issues a byte count which describes how many more bytes will follow in the message. If a slave has 20 bytes to send, the byte count field will have the value 20 (14h), followed by the 20 bytes of data. The byte count does not include the PEC byte. The byte count may not be 0. A Block Read or Write is allowed to transfer a maximum of 32 data bytes.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) for byte in dataByteList: self.check_size(byte, 8) #self.bus.write_i2c_block_data(addr7, commandCode, dataByteList) self.bus.pec = False try: self.bus.write_block_data(addr7, commandCode, dataByteList) except Exception as e: raise i2cIOError(e)
[docs] def block_write_pec(self,addr7,commandCode,dataByteList): '''block_write with additional PEC byte written to slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) for byte in dataByteList: self.check_size(byte, 8) #self.bus.write_i2c_block_data(addr7, commandCode, dataByteList) self.bus.pec = True try: self.bus.write_block_data(addr7, commandCode, dataByteList) except Exception as e: raise i2cIOError(e)
[docs] def block_read(self,addr7,commandCode): '''A Block Read differs from a block write in that the repeated START condition exists to satisfy the requirement for a change in the transfer direction. A NACK immediately preceding the STOP condition signifies the end of the read transfer.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = False try: return self.bus.read_block_data(addr7,commandCode) except Exception as e: raise i2cIOError(e)
[docs] def block_read_pec(self,addr7,commandCode): '''block_read with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) self.bus.pec = True try: return self.bus.read_block_data(addr7,commandCode) except Exception as e: raise i2cIOError(e)
[docs] def block_process_call(self,addr7,commandCode,dataByteListWrite): '''The block write-block read process call is a two-part message. The call begins with a slave address and a write condition. After the command code the host issues a write byte count (M) that describes how many more bytes will be written in the first part of the message. If a master has 6 bytes to send, the byte count field will have the value 6 (0000 0110b), followed by the 6 bytes of data. The write byte count (M) cannot be zero. The second part of the message is a block of read data beginning with a repeated start condition followed by the slave address and a Read bit. The next byte is the read byte count (N), which may differ from the write byte count (M). The read byte count (N) cannot be zero. The combined data payload must not exceed 32 bytes. The byte length restrictions of this process call are summarized as follows: • M ≥ 1 byte • N ≥ 1 byte • M + N ≤ 32 bytes The read byte count does not include the PEC byte. The PEC is computed on the total message beginning with the first slave address and using the normal PEC computational rules. It is highly recommended that a PEC byte be used with the Block Write-Block Read Process Call. Note that there is no STOP condition before the repeated START condition, and that a NACK signifies the end of the read transfer.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) for byte in dataByteListWrite: self.check_size(byte, 8) self.bus.pec = False try: return self.bus.block_process_call(addr7,commandCode,dataByteListWrite) except Exception as e: raise i2cIOError(e)
[docs] def block_process_call_pec(self,addr7,commandCode,dataByteListWrite): '''block write-block read process call with additional PEC byte read from slave.''' self.check_size(addr7, 7) self.check_size(commandCode, 8) for byte in dataByteListWrite: self.check_size(byte, 8) self.bus.pec = True try: return self.bus.block_process_call(addr7,commandCode,dataByteListWrite) except Exception as e: raise i2cIOError(e)
[docs]class i2cError(Exception): '''I2C Error Superclass - Don't raise this exception. Use more specific subclass.''' def __init__(self, value=None): self.value = value def __str__(self): return repr(self.value)
[docs]class i2cUnimplementedError(i2cError, NotImplementedError): '''Feature currently unimplemented. Write it if you want it!''' pass
[docs]class i2cMasterError(i2cError): '''Unexpected response from I2C interface device, possibly caused by dropped USB packets, serial error, unpowered master, or other 'computer' problems. Excludes communication problems with a device on the two-wire bus.''' pass
[docs]class i2cStartStopError(i2cError): '''Failed to assert Start of Stop signals - not supported by all hardware''' pass
[docs]class i2cAcknowledgeError(i2cError): '''Got expected responses from I2C master device, but slave devices not acknowledging address or subsequent data bytes.''' pass
[docs]class i2cAddressAcknowledgeError(i2cAcknowledgeError): '''Got expected responses from I2C master device, but slave device not acknowledging address.''' pass
[docs]class i2cWriteAddressAcknowledgeError(i2cAddressAcknowledgeError): '''Got expected responses from I2C master device, but slave device not acknowledging write address.''' pass
[docs]class i2cReadAddressAcknowledgeError(i2cAddressAcknowledgeError): '''Got expected responses from I2C master device, but slave device not acknowledging read address.''' pass
[docs]class i2cCommandCodeAcknowledgeError(i2cAcknowledgeError): '''Got expected responses from I2C master device, but slave device not acknowledging command code.''' pass
[docs]class i2cDataAcknowledgeError(i2cAcknowledgeError): '''Got expected responses from I2C master device and slave address, but slave devices not acknowledging subsequent data bytes.''' pass
[docs]class i2cDataLowAcknowledgeError(i2cDataAcknowledgeError): '''Got expected responses from I2C master device and slave address, but slave devices not acknowledging low data byte.''' pass
[docs]class i2cDataHighAcknowledgeError(i2cDataAcknowledgeError): '''Got expected responses from I2C master device and slave address, but slave devices not acknowledging high data byte.''' pass
[docs]class i2cDataPECAcknowledgeError(i2cDataAcknowledgeError): '''Got expected responses from I2C master device and slave address, but slave devices not acknowledging PEC data byte.''' pass
[docs]class i2cPECError(i2cError): '''Got expected responses from I2C master and slave acknowledging all bytes, but failed to reconcile SMBus packet error check.''' pass
[docs]class i2cIOError(i2cError): '''I2C Bus communication failure, not otherwise specified.''' pass
if __name__ == '__main__': '''test code''' pass ## import serial ## myport = serial.Serial('com13', timeout=1) ## foo = i2c_dc590_serial(myport) ## myport = serial.Serial('com15', timeout=5) ## foo = i2c_scpi(myport) ## myport = serial.Serial('com9', timeout=5) ## foo = i2c_pic(myport) ## try: ## print hex(foo.read_word(0x74, 0xaa)) #fan controller ## except i2cError as e: ## print e