Source code for PyICe.lab_core

'''
Channel and Threading Core Framework
====================================

changes to this file should be minimal!

'''

import sys
sys.path.append('..')
from PyICe import logo
logo.display()
from PyICe import lab_interfaces
from PyICe import lab_utils
import sqlite3
import Queue
import re
import thread
import traceback
import multiprocessing.managers
import multiprocessing
import time
import atexit
import collections
import datetime
import numbers


[docs]class delegator(object): '''base class for a read delegator, this is the lowest level class in the library. You will probably never use it directly.''' def __init__(self): self.set_delegator(self) self._threadable = True self._interfaces = [] def set_delegator(self,delegator): self._delegator = delegator def get_delegator(self): return self._delegator def set_allow_threading(self,state=True): self._threadable = True def threadable(self): return self._threadable def resolve_delegator(self): if self._delegator == self: return self._delegator else: return self._delegator.resolve_delegator() def add_interface(self,interface): self._interfaces.append(interface) def get_interfaces(self): if self.get_delegator() == self: return set(self._interfaces) else: return self.resolve_delegator().get_interfaces() def lock_interfaces(self): for interface in self._interfaces: interface.lock() def unlock_interfaces(self): for interface in self._interfaces: interface.unlock() def write_delegated_channel_list(self,channel_value_list): # (NOT IMPLEMENTED YET) #OVERLOAD THIS FUNCTION # takes a list of (channels, value) tuples # writes each channel to its corresponding value try: self.lock_interfaces() for (channel,value) in channel_value_list: channel.write(value) self.unlock_interfaces() except Exception as e: self.unlock_interfaces() raise e def _read_delegated_channel_list(self,channel_list): try: self.lock_interfaces() data = self.read_delegated_channel_list(channel_list) self.unlock_interfaces() return data except Exception as e: self.unlock_interfaces() raise e def read_delegated_channel_list(self,channel_list): #OVERLOAD THIS FUNCTION # takes a list of channels # returns a dictionary of read data by channel name results = collections.OrderedDict() for channel in channel_list: results[channel.get_name()] = channel.read_without_delegator() return results
[docs]class channel(delegator): '''This is the basic channel object, it can be read and/or written. attributes can also be stored in it, for example channel number in a multi channel instrument''' def __init__(self,name,read_function=None,write_function=None): delegator.__init__(self) self.set_name(name) if read_function and write_function: raise Exception('There may only be a single read OR write function') self._read = read_function self._write = write_function self._value = None self._attributes = collections.OrderedDict() self.set_read_access(True) self._category = None self.set_write_delay(0) self._write_min = None self._write_max = None self._write_callbacks = [] self._read_callbacks = [] if write_function: self.set_write_access(True) # write channel elif read_function: self.set_write_access(False) #read channel else: self.set_write_access(True) #dummy channels self.set_description("No Description") self.set_display_format_str() def __str__(self): return "channel Object: {}".format(self.get_name())
[docs] def get_name(self): '''return channel name''' return self.name
[docs] def set_name(self,name): '''rename channel''' name = str(name) if not re.match("[_A-Za-z][_a-zA-Z0-9]*$",name): raise ChannelNameException('Bad Channel Name "{}"'.format(name)) self.name = name
[docs] def set_write_delay(self,delay): '''sets the automatic delay in seconds after channel write''' self._write_delay = delay
[docs] def get_write_delay(self): '''return automatic delay after channel write''' return self._write_delay
[docs] def get_description(self): '''return channel description string.''' return self._description
[docs] def set_description(self,description): '''sets the channel description. argument is string''' self._description = description
def read(self): #setting delegate to false is reserved for the delegator and should never be used otherwise if not self.is_readable(): raise ChannelAccessException('Read a non-readable channel') try: self.lock_interfaces() data = self.resolve_delegator()._read_delegated_channel_list([self])[self.name] self.unlock_interfaces() return data except Exception as e: self.unlock_interfaces() raise e def read_without_delegator(self): # do not use this function unless you are the delegator self.lock_interfaces() if not self._read: result = self._value else: try: result = self._read() except Exception as e: print "Read error in channel {}".format(self.name) print traceback.format_exc() self.unlock_interfaces() raise e for callback in self._read_callbacks: callback(self,result) self.unlock_interfaces() return result def write(self,value): if not self.is_writeable(): raise ChannelAccessException('Wrote a non-writeable channel') self.lock_interfaces() if value is not None: if self._write_min is not None and value < self._write_min: self.unlock_interfaces() raise ChannelValueException('Cannot write {} to {}. Minimum is {}.'.format(self.name,value,self._write_min)) if self._write_max is not None and value > self._write_max: self.unlock_interfaces() raise ChannelValueException('Cannot write {} to {}. Maximum is {}.'.format(self.name,value,self._write_max)) if self._write is not None: try: self._write(value) except Exception as e: print "Write error in channel {}".format(self.name) print traceback.format_exc() self.unlock_interfaces() raise e if self._write_delay: if self._write_delay > 5: lab_utils.egg_timer(self._write_delay) else: time.sleep(self._write_delay) for callback in self._write_callbacks: callback(self,value) self._value = value self.unlock_interfaces() return value
[docs] def write_unformatted(self, value): '''bypass unformatting stub. Only useful for integer and register channels. intended for use by GUI.''' self.write(value)
def _set_value(self,value): '''private method to set channel cached value without actualy _write call or any checking for writability, limits, etc.''' self._value = value return value
[docs] def set_attribute(self,attribute_name,value): '''set attribute_name to value value can br retrived later with get_attribute(attribute_name)''' self._attributes[attribute_name] = value return self
[docs] def get_attribute(self,attribute_name): '''retrieve value previously set with set_attribute(attribute_name, value)''' if attribute_name not in self._attributes.keys(): raise ChannelAttributeError return self._attributes[attribute_name]
[docs] def get_attributes(self): '''return dictionary of all channel attributes previously set with set_attribute(attribute_name, value)''' return dict(self._attributes)
[docs] def set_category(self, category): '''each channel may be a member of a single category for sorting purposes. category argument is usually a string''' if not isinstance(category,str): raise TypeError("Category must be a string") self._category = category return self
[docs] def get_category(self): '''returns category membership. should be a string.''' return self._category
[docs] def is_readable(self): '''return register readability boolean''' return self._readable
[docs] def set_read_access(self, readable=True): '''set or unset register read access''' self._readable = readable self.set_attribute("readable",readable) return self
[docs] def is_writeable(self): '''return register writability boolean''' return self._writeable
[docs] def set_write_access(self, writeable=True): '''set or unset register write access''' self._writeable = writeable self.set_attribute("writeable",writeable) return self
[docs] def set_max_write_limit(self,max): '''set channel's maximum writable value. None disables limit''' if max is None: self._write_max = None else: try: self._write_max = float(max) except: raise Exception('Maximum value for a channels write must be a number') return self
[docs] def set_min_write_limit(self,min): '''set channel's minimum writable value. None disables limit''' if min is None: self._write_min = None else: try: self._write_min = float(min) except: raise Exception('Minimum value for a channels write must be a number') return self
[docs] def get_max_write_limit(self): '''return max writable channel value.''' return self._write_max
[docs] def get_min_write_limit(self, formatted=False): '''return min writable channel value.''' return self._write_min
[docs] def format_display(self,data): '''converts data to string according to string formatting rule set by self.set_display_format_str()''' return self._display_format_str.format(data)
[docs] def set_display_format_str(self,fmt_str='',prefix='',suffix=''): '''format string to alter how data is displayed. example '3.2f', '04X', '#06X', '#18b', '.2%' prefix will be displayed immediately before the channel data, example '0x' suffix will be displayed immediately after the channel data, example ' A' or ' V' ''' self._display_format_str = u'{prefix}{{:{fmt_str}}}{suffix}'.format(prefix=prefix,fmt_str=fmt_str,suffix=suffix) return self
[docs] def set_display_format_function(self,function): '''abandon string formatting and pass data through custom user-supplied function''' #self.format_display = lambda self,data: function(data) self.format_display = function return self
[docs] def add_write_callback(self,write_callback): '''Adds a write callback. This is a function that will be called any time the channel is written. the callback function takes two arguments (channel_object, data)''' self._write_callbacks.append(write_callback) return self
[docs] def add_read_callback(self,read_callback): '''Adds a read callback. This is a function that will be called any time the channel is read. the callback function takes two arguments (channel_object, data)''' self._read_callbacks.append(read_callback) return self
class ChannelException(Exception): pass class ChannelAccessException(ChannelException): pass class ChannelNameException(ChannelException): pass class ChannelAttributeError(ChannelException): pass class ChannelValueException(ChannelException): pass
[docs]class integer_channel(channel): '''channel limited to writing integer values. Adds presets and formats but retains channel class's read/write restrictions.''' def __init__(self, name, size, read_function=None, write_function=None): channel.__init__(self,name,read_function=read_function,write_function=write_function) assert isinstance(size, numbers.Integral) assert size >= 1 self._size = size self.set_min_write_limit(0) self.set_max_write_limit(2**size-1) self.set_attribute("size",size) self.set_attribute("min",0) self.set_attribute("max",2**size-1) self._formats = collections.OrderedDict() self._presets = collections.OrderedDict() self._presets_reverse = collections.OrderedDict() self._format = None self._use_presets_read = False self._use_presets_write = True self._add_default_formats() def __str__(self): return "integer_channel Object: {}".format(self.get_name()) def _add_default_formats(self): def check_sign(data): assert isinstance(data, numbers.Number) if data < 0: raise ValueError('Negative binary/hex values not allowed.') return data if self._size > 1: self.add_format('dec', str, int) self.add_format('hex', lambda data : '0x{{:0{}X}}'.format((self._size-1)/4+1).format(check_sign(data)), lambda data : int(str(data),16)) self.add_format('bin', lambda data : '0b{{:0{}b}}'.format(self._size).format(check_sign(data)), lambda data : int(str(data),2)) self.add_format('signed dec', str, int, signed=True) #self.add_preset('Minimum', self.get_attribute("min")) #this is wrong for two's-comp channels. Not sure how to deal with signed data here since that's handled by formatting... #self.add_preset('Maximum', self.get_attribute("max")) #this is wrong for two's-comp channels. Not sure how to deal with signed data here since that's handled by formatting... else: #single bit default presets self._add_preset('True', True) self._add_preset('False', False)
[docs] def get_max_write_limit(self, formatted=False): '''return max writable channel value. If formatted, return max writeable value in transformed units accorting to set_format(format) active format.''' if formatted: return self.format(self._write_max, self._format) else: return int(self._write_max)
[docs] def get_min_write_limit(self, formatted=False): '''return min writable channel value. If formatted, return min writeable value in transformed units accorting to set_format(format) active format.''' if formatted: return self.format(self._write_min, self._format) else: return int(self._write_min)
[docs] def add_preset(self, preset_name, preset_value): '''Adds a preset named preset_name with value preset_value''' if self._size == 1 and len(self._presets.keys()) == 2 and 'True' in self._presets and 'False' in self._presets: #remove True/False presets if custom presets are added self._presets = collections.OrderedDict() self._presets_reverse = collections.OrderedDict() self._add_preset(preset_name, preset_value) return self
def _add_preset(self, preset_name, preset_value): if preset_name in self._presets: raise Exception('Preset name duplicated: {}.'.format(preset_name)) if preset_value in self._presets_reverse: print 'WARNING: Preset value: {} of register: {} ambiguous name lookup:[{}, {}].'.format(preset_value, self.get_name(), preset_name, self._presets_reverse[preset_value]) self._presets[preset_name] = preset_value self._presets_reverse[preset_value] = preset_name
[docs] def set_format(self,format): '''set active transformation format. reads and writes happen in transformed (real) units instead of native integer. Set to None to disable formatting.''' if format is not None and format not in self.get_formats(): raise Exception('Invalid format "{}" for {}'.format(format,self.get_name())) self._format = format return self
[docs] def get_format(self): '''return active format as set by set_format(format)''' return self._format
[docs] def use_presets_read(self,bool): '''enable replacement of integer value with named enum when reading channel''' self._use_presets_read = bool return self
[docs] def use_presets_write(self,bool): '''enable replacement of named enum with integer value when writing channel''' self._use_presets_write = bool return self
[docs] def using_presets_read(self): '''return boolean denoting last setting of use_presets_read()''' return self._use_presets_read
[docs] def using_presets_write(self): '''return boolean denoting last setting of use_presets_write()''' return self._use_presets_write
[docs] def get_presets(self): '''Returns a list of preset names''' return self._presets.keys()
[docs] def get_presets_dict(self): '''Returns a dictionary of preset names and value''' return collections.OrderedDict(self._presets)
[docs] def add_format(self, format_name, format_function, unformat_function, signed=False, units=''): '''Add a format to this register. Formats convert a raw number into a more meaningful string and vice versa Formats can be generic hex, bin, etc, or can be more complicated. format_name is the name of the format format_function transforms from integer data to real unformat_function transforms from real data to integer format_function and unformat_function should be reversible signed treats integer data as two's complement with self.size bit width units optionally appended to formatted (real) data when displayed by GUI''' self._formats[format_name] = {} if signed: self._formats[format_name]['format_function'] = lambda x: format_function(self.twosComplementToSigned(x)) self._formats[format_name]['unformat_function'] = lambda x: self.signedToTwosComplement(unformat_function(x)) else: self._formats[format_name]['format_function'] = format_function self._formats[format_name]['unformat_function'] = unformat_function self._formats[format_name]['units'] = units return self
[docs] def remove_format(self, format_name): '''remove format_name from dictionary of available formats''' del self._formats[format_name]
[docs] def get_formats(self): '''Return a list of format_names associate with this register. The format_string elements of the returned list may be passed to the format or unformat methods''' return self._formats.keys()
[docs] def format(self, data, format, use_presets): '''take in integer data, pass through specified formatting function, and return string/real representation.''' if data is None: return None if data is True: return True if data is False: return False if use_presets: try: return self._presets_reverse[data] except KeyError: pass if format is not None: if format not in self._formats: raise RegisterFormatException('Register {} has no format {}'.format(self.name,format)) return self._formats[format]['format_function'](data) return data
[docs] def unformat(self, string, format, use_presets): '''take in formatted string / real, pass through specified unformatting function, and return integer representation.''' if string is None: return None if use_presets: try: return self._presets[string] except KeyError: pass if format is not None: if format not in self._formats: raise RegisterFormatException('Register {} has no format {}'.format(self.name,format)) try: return self._formats[format]['unformat_function'](string) except Exception as e: #formats intended for real data will get switched to strings in the GUI. Make an attempt here to fix. if isinstance(string,int) or isinstance(string,float) or string is None: raise e try: formatted_data = int(string,0) #automatically select base except ValueError: try: formatted_data = float(string) except ValueError as e2: print e2 raise e return self._formats[format]['unformat_function'](formatted_data) if string is True or string == 'True': return True if string is False or string == 'False': return False try: return int(string) if float(string) == int(float(string)) else float(string) #return int(float(string))...??? except: print "WARNING: Channel: {} write data: {} failed numeric conversion.".format(self.get_name(), string) return string
[docs] def get_units(self, format): """return real units string for specified format. ex 'A' or 'V'""" return self._formats[format]['units']
[docs] def format_read(self,raw_data): '''transform from integer to real units according to using_presets_read() and active format''' if self._format or self._use_presets_read: return self.format(raw_data,self._format,self._use_presets_read) else: return raw_data
[docs] def format_write(self,value): '''transform from real units to integer according to using_presets_write() and active format''' return self.unformat(value,self._format,self._use_presets_write)
[docs] def twosComplementToSigned(self,binary): '''transform two's complement formatted binary number to signed integer. Requires register's size attribute to be set in __init__''' return lab_utils.twosComplementToSigned(binary,self._size)
[docs] def signedToTwosComplement(self,signed): '''transform signed integer to two's complement formatted binary number. Requires register's size attribute to be set in __init__''' return lab_utils.signedToTwosComplement(signed,self._size)
[docs] def write(self,value): '''write intercept to apply formats/presets before channel write. Coerce to integer and warn about rounding error. Also accepts None''' if self._size == 1: #allow True,False values if value is True: channel.write(self,True) return elif value is False: channel.write(self,False) return if value is not None: raw_data = self.format_write(value) int_data = int(round(raw_data)) if int_data != raw_data: print "WARNING: Channel: {} write: {} unformatted to: {} and rounded to: {}.".format(self.get_name(), value, raw_data, int_data) else: int_data = value channel.write(self,int_data) return int_data
[docs] def write_unformatted(self, value): '''bypass unformatting. intended for use by GUI.''' if self._size == 1: #allow True,False values if value is True: channel.write(self,True) return elif value is False: channel.write(self,False) return if value is not None: int_data = int(round(value)) if int_data != value: print "WARNING: Channel: {} write_unformatted: {} rounded to: {}.".format(self.get_name(), value, int_data) value = int_data channel.write(self,value) return value
[docs] def read_without_delegator(self): '''read intercept to apply formats/presets to channel (raw) read''' if not self._read: result = self.format_read(self._value) else: try: result = self.format_read(self._read()) except Exception as e: print "Read error in channel {}".format(self.name) print traceback.format_exc() raise e for callback in self._read_callbacks: callback(self,result) return result
[docs]class register(integer_channel): '''Register primitive superclass, inherits from integer_channel and channel. Allows read and write access to integer data with presets and formats.''' def __init__(self, name, size, read_function=None,write_function=None): '''if subclass overloads __init__, it should also call this one''' #channel doesn't allow both read and write so just do one, then force in the other integer_channel.__init__(self, name=name, size=size, read_function=read_function) if write_function: self._write = write_function self.set_write_access() def __str__(self): return "Register Object: {}".format(self.get_name())
class RegisterFormatException(ChannelException): pass class channel_group(object): def __init__(self, name='Unnamed Channel Group'): self.set_name(name) self._channel_dict = collections.OrderedDict() # a dictionary of channel objects, keyed by name, contained by this channel_group self._sub_channel_groups = [] # a list of other groups contained by this object self._threaded = False self._partial_delegation_results = collections.OrderedDict() self._self_delegation_channels = [] def __str__(self): return "channel_group Object: {}".format(self.get_name()) def __iter__(self): for channel in self.get_all_channels_list(): yield channel #this is inconsistent with dictionaries, which yield their keys when iterated! def __contains__(self, key): return key in self.get_all_channels_list() def __getitem__(self,channel_name): return self.get_channel(channel_name) def get_name(self): return self._name def set_name(self,name): self._name = str(name) def sort(self, deep=True, **kwargs): if 'key' not in kwargs: kwargs['key'] = lambda kv_tuple: kv_tuple[0] #sort by channel name by default self._channel_dict = collections.OrderedDict(sorted(self._channel_dict.items(), **kwargs)) if deep: #should this go deep and sort sub channel groups too? for scg in self._sub_channel_groups: scg.sort(**kwargs) def add(self,channel_or_group): if isinstance(channel_or_group,channel): self._add_channel(channel_or_group) elif isinstance(channel_or_group,channel_group): self._add_sub_channel_group(channel_or_group) else: raise TypeError('\nAttempted to add something other than a channel or channel_group to a channel_group') def _add_channel(self,channel_object): if not isinstance(channel_object,channel): raise Exception('\nAttempted to add a non-channel to a channel_group') if channel_object.get_name() in self._channel_dict.keys(): print "WARNING: Re-defined channel {}".format(channel_object.get_name()) self._channel_dict[channel_object.get_name()] = channel_object return channel_object def merge_in_channel_group(self,channel_group_object): '''merges in a channel group''' if not isinstance(channel_group_object,channel_group): raise Exception('\nAttempted to merge a non-channel_group to a channel_group') for channel_object in channel_group_object: self._add_channel(channel_object) def _add_sub_channel_group(self,channel_group_object): if not isinstance(channel_group_object,channel_group): raise Exception('\nAttempted to add a "{}" to a channel_group as a sub group'.format(channel_group_object)) channel_name_conflicts = set(self._channel_dict.keys()) & set(channel_group_object.get_all_channel_names()) for channel_name_conflict in channel_name_conflicts: if self._channel_dict[channel_name_conflict] is not channel_group_object: raise Exception('\nChannel name conflict for "{}"'.format(channel_name_conflict)) self._sub_channel_groups.append(channel_group_object) def read(self,channel_name): return self.read_channel(channel_name) def write(self,channel_name,value): return self.write_channel(channel_name,value) def read_channel(self,channel_name): channel = self._resolve_channel(channel_name) if channel is None: raise ChannelAccessException('\nUnable to read channel "{}", did you create it or is it a typo?'.format(channel_name)) return self.read_channel_list([channel])[channel_name] def read_channels(self,item_list): '''item list is a list of channel objects, names or channel_groups''' channel_list = self.resolve_channel_list(item_list) return self.read_channel_list(channel_list) def write_channel(self,channel_name,value): return self.get_channel(channel_name).write(value) def get_channel(self,channel_name): channel = self._resolve_channel(channel_name) if channel is None: raise ChannelAccessException('\nUnable to get channel "{}", did you create it or is it a typo?'.format(channel_name)) return channel def get_flat_channel_group(self,name=None): '''returns a channel_group directly containing all channels this one can resolve''' if name is None: name = '{}_flattened'.format(self.get_name()) new_group = channel_group(name) new_group.merge_in_channel_group(self) return new_group def _resolve_channel(self,channel_name): if channel_name in self._channel_dict: return self._channel_dict[channel_name] for sub_channel_group in self._sub_channel_groups: channel = sub_channel_group._resolve_channel(channel_name) if channel: return channel return None def get_all_channels_dict(self): #returns a dictionary of all channels by name all_channels = collections.OrderedDict(self._channel_dict) for sub_channel_group in self._sub_channel_groups: all_channels.update(sub_channel_group.get_all_channels_dict()) return all_channels def get_all_channel_names(self): return self.get_all_channels_dict().keys() def get_all_channels_list(self): return self.get_all_channels_dict().values() def read_channel_list(self,channel_list): #reads a list of channel objects # create lists of threadable and non-threadable channels threadable_channels = [] non_threadable_channels = [] self._partial_delegation_results = collections.OrderedDict() self._self_delegation_channels = [] for channel in channel_list: if channel.resolve_delegator() is self: # if the user asked the delegator directly to read do not thread # this is also useful for the masters caching mode to do it last self._self_delegation_channels.append(channel) elif not channel.threadable(): non_threadable_channels.append(channel) elif not channel.resolve_delegator().threadable(): non_threadable_channels.append(channel) else: threadable_channels.append(channel) if self._threaded: results = self._read_channels_threaded(threadable_channels) else: results = self._read_channels_non_threaded(threadable_channels) results.update(self._read_channels_non_threaded(non_threadable_channels)) if len(self._self_delegation_channels): self._partial_delegation_results.update(results) results.update( self._read_delegated_channel_list(self._self_delegation_channels) ) self._partial_delegation_results = collections.OrderedDict() self._self_delegation_channels = collections.OrderedDict() return results def _read_channels_non_threaded(self,channel_list): #get a dictionary of delegators for list of channel objects delegator_list = [] for channel in channel_list: delegator_list.append( channel.resolve_delegator() ) # remove all duplicates delegator_list = list(set(delegator_list)) #have each delegator read its channels results = collections.OrderedDict() for delegator in delegator_list: # for each delegator get the list of channels it is responsible for channel_delegation_list = [] for channel in channel_list: if delegator == channel.resolve_delegator(): channel_delegation_list.append(channel) results.update( delegator._read_delegated_channel_list(channel_delegation_list) ) return results def _read_channels_threaded(self,channel_list): # dont read threaded unless i know how to group interfaces for theads (only interface_factory's # know this, ie a master if not hasattr(self,'group_com_nodes_for_threads_filter'): return self._read_channels_non_threaded(channel_list) #get a dictionary of delegator's by channel name delegator_list = [ channel.resolve_delegator() for channel in channel_list] # remove all duplicates delegator_list = list(set(delegator_list)) # build a list of interfaces that will be used in this read interfaces = [] for delegator in delegator_list: for interface in delegator.get_interfaces(): interfaces.append(interface) remaining_delegators = [] interface_thread_groups = self.group_com_nodes_for_threads_filter(interfaces) work_units = 0 if len(interface_thread_groups): for interface_group in interface_thread_groups: #build a group of delegators for each potential thread delegator_group = [] remaining_delegators = [] for delegator in delegator_list: interfaces = delegator.get_interfaces() # a delegator without interfaces cannot be threaded since I dont know how it works if len(interfaces) and interfaces.issubset(interface_group): delegator_group.append(delegator) else: remaining_delegators.append(delegator) #build a list of channels for that group of delegators to read delegator_groups_channel_list = [] for delegator in delegator_group: #this is where the channel reads become unordered... for channel in channel_list: if channel.resolve_delegator() == delegator: delegator_groups_channel_list.append(channel) #start the threaded read here #not threaded yet work_units += 1 #send channels to thread pool self._read_queue.put(delegator_groups_channel_list) delegator_list = remaining_delegators else: remaining_delegators = delegator_list results = self.get_threaded_results(work_units) #group all the thread results here # find the channels for any decelerators that couldn't be threaded delegator_groups_channel_list = [] for delegator in remaining_delegators: for channel in channel_list: if channel.resolve_delegator() == delegator: delegator_groups_channel_list.append(channel) results.update( self._read_channels_non_threaded(delegator_groups_channel_list) ) return results def start_threads(self,number): if self._threaded == False: self._threaded = True self._threads = number self._read_queue = Queue.Queue() self._read_results_queue = Queue.Queue() for i in range(number): thread.start_new_thread(self.threaded_read_function, ()) else: raise Exception('Threads already started, do not start again') def threaded_read_function(self): while self._threaded: try: channel_list = self._read_queue.get(block=True) except Queue.Empty: #shouldn't get here pass else: try: results = self._read_channels_non_threaded(channel_list) except Exception as e: print traceback.format_exc() self._read_results_queue.put(e) else: self._read_results_queue.put(results) def get_threaded_results(self,number): results = collections.OrderedDict() for i in range(number): thread_results = self._read_results_queue.get(block=True) if isinstance(thread_results,Exception): raise thread_results else: results.update(thread_results) return results def read_all_channels(self, categories=None, exclusions=[]): '''read all readable channels in channel group and return orderd dictionary of results. Optionally filter by list of categories.''' channels = [ channel for channel in self.get_all_channels_list() if channel.is_readable() and (categories is None or channel.get_category() in categories)] for channel in self.resolve_channel_list(exclusions): channels.remove(channel) return collections.OrderedDict(sorted(self.read_channel_list(channels).items(), key=lambda t: t[0])) #sort results by channel name def remove_channel(self,channel): #note this delete will only remove from this channel_group, not from children channel_name = channel.get_name() if channel_name not in self._channel_dict.keys(): raise Exception('Channel "{}" is not a member of {}'.format(channel_name,self.get_name())) del self._channel_dict[channel_name] def remove_channel_group(self,channel_group_to_remove): removed_channels = channel_group_to_remove.get_all_channels_list() for removed_channel in removed_channels: self.remove_channel(removed_channel) def remove_channel_by_name(self,channel_name): channel = self.get_channel(channel_name) self.remove_channel(channel) def remove_all_channels_and_sub_groups(self): self._channel_dict = collections.OrderedDict() self._sub_channel_groups = [] def remove_sub_channel_group(self,sub_channel_group): self._sub_channel_groups.remove(sub_channel_group) def remove_category(self, category): #note this delete will only remove from this channel_group, not from children for channel in self.get_all_channels_list(): if channel.get_category() == category: self.remove_channel(channel) def debug_print(self,indent=" "): for ch in self._channel_dict.values(): d = "" if ch.get_delegator() is not ch: d = "(delegated by {})".format(ch.resolve_delegator()) print "{} {} {}".format(indent,ch,d) for sub_channel_group in self._sub_channel_groups: print "{} {}".format(indent,sub_channel_group) sub_channel_group.debug_print("{} ".format(indent) ) #remove the excluded items from the scan list def remove_channel_list(self,item_list): channel_list = self.resolve_channel_list(item_list) for channel in channel_list: self.remove_channel(channel) def resolve_channel_list(self,item_list): '''takes a list of channels, channel_names, or channel_groups and produces a single channel group''' ch_group = channel_group() for item in item_list: if isinstance(item,str): ch_group._add_channel(self.get_channel(item)) elif isinstance(item,channel): ch_group._add_channel(item) elif isinstance(item,channel_group): ch_group.merge_in_channel_group(item) else: raise Exception('Unknown input {}'.format(item_list)) return ch_group def clone(self): #this builds a flattened group and tries to reconnect to the remote_channels channels = self.get_all_channels_list() new_channels = [] remote_channel_group_clients = set() for ch in channels: if isinstance(ch,remote_channel): remote_channel_group_clients.add(ch.get_delegator()) else: new_channels.append(ch) for rcgc in remote_channel_group_clients: clone_rcgc = rcgc.clone() for ch in channels: if ch.get_delegator() == rcgc: new_channels.append(clone_rcgc[ch.get_name()]) new_group = channel_group(self.get_name() + '_cloned') for ch in new_channels: new_group._add_channel(ch) return new_group def write_html(self, file_name=None, verbose=True, sort_categories=False): '''return html document string and optionally write to file_name if verbose, include tables of presets and attributes for each channel if sort_categories, group channel names first by category before alphabetical sort of channel name''' txt = '<!DOCTYPE html/>' txt += '<HTML>\n' txt += '<HEAD>\n' txt += '<META charset="UTF-8"/>\n' txt += '<TITLE>\n' txt += 'Secret Channel Decoder Ring\n' txt += '</TITLE>\n' txt += '<STYLE>\n' txt += ' table, th, td {\n' txt += ' border: 1px solid black;\n' txt += ' border-collapse: collapse;\n' txt += ' }\n' txt += '</STYLE>\n' txt += '</HEAD>\n' txt += '<BODY>\n' txt += '<TABLE>\n' txt += '<TR>\n' txt += '<TH>Channel Name</TH>\n' txt += '<TH>Category</TH>\n' txt += '<TH>Description</TH>\n' txt += '</TR>\n' channels = sorted(self.get_all_channels_list(), key=lambda ch: ch.get_name()) if sort_categories: channels = sorted(channels, key=lambda ch: ch.get_category()) #stable sort preserves name sort above when tied for channel in channels: txt += '<TR>\n' txt += '<TD>{}</TD>\n'.format(channel.get_name()) txt += '<TD>{}</TD>\n'.format(channel.get_category()) txt += '<TD>\n' txt += '<P>\n' txt += u'{}\n'.format(channel.get_description()) txt += '</P>\n' if verbose: #add presets and attributes txt += '<P style="margin-left: 40px">\n' try: if len(channel.get_presets()): ps_dict = channel.get_presets_dict() txt += '<TABLE>\n' txt += '<TR><TH>Presets:</TH></TR>\n' for ps in channel.get_presets(): txt += '<TR><TD>{}: {}</TD></TR>\n'.format(ps, ps_dict[ps]) txt += '</TABLE>' except: #Only integer_channels and registers can have presets pass if len(channel.get_attributes()): txt += '<BR/>\n' txt += '<TABLE>\n' txt += '<TR><TH>Attributes:</TH></TR>\n' for attrib in sorted(channel.get_attributes()): txt += '<TR><TD>{}: {}</TD></TR>\n'.format(attrib, channel.get_attribute(attrib)) txt += '</TABLE>' txt += '</P>\n' txt += '</TD>\n' txt += '</TR>\n' txt += '</TABLE>\n' txt += '</BODY>\n' txt += '</HTML>\n' if file_name is not None: with open(file_name, 'w') as f: f.write(txt.encode('utf-8')) f.close() return txt
[docs]class instrument(channel_group): '''Superclass for all lab instruments To add an instrument to a lab_bench object, it must inherit from instrument or one of its specialized subclasses Rules for adding instruments: 1) extend instrument class 2) call the instrument classes __init__ from its __init__ 3) contain an add_channel (and/or add_channel_XXXXX) methods that: a) create a channel object with a: 1) name 2) EITHER a read_function or write_function b) call the _add_channel method with that channel as an argument 4) has a name attribute which is a meaningful string about the instrument ''' def __init__(self,name): '''Overload method in instrument specific class, the __init__ method should call instrument.__init__(self,name). if an instrument uses an interface, it must call an add_interface_ function from this class of the appropriate type''' channel_group.__init__(self,name) self._interfaces = [] if not hasattr(self,'_base_name'): self._base_name = "unnamed instrument"
[docs] def add_channel(self,channel_name): '''Usage: Add channel name to instrument. For multi-channel instruments, typically also takes a second argument representing the physical channel of the instrument. May also take channel configuration arguments specific to the instrument. Operation: This method should create the channel object then call self._add_channel(channel) to add it to the internal channel group Method must be overloaded by each instrument driver. ''' raise Exception('Add channel method not implemented for instrument {}'.format(self.get_name()))
[docs] def get_error(self): '''Return the first error from the instrument. Overload in scpi_instrument or the actual instrument class''' return 'Error checking not implemented for this instrument'
[docs] def get_errors(self): '''Return a list of all errors from the instrument. Overload in scpi_instrument or the actual instrument class''' return ['Error checking not implemented for this instrument']
def _add_interface(self,interface): self._interfaces.append(interface) def get_interface(self, num=0): return self._interfaces[num] def set_category(self, category_name, update_existing_channels=False): self._base_name = category_name if update_existing_channels: for channel in self: channel.set_category(category_name) def add_interface_visa(self,interface_visa,timeout=None): if not isinstance(interface_visa,lab_interfaces.interface_visa): raise Exception('Interface must be a visa interface,, interface is {}'.format(interface_visa)) if timeout > interface_visa.timeout: # only increase a timeout if it is shared it may be to fast for the others interface_visa.timeout = timeout self._add_interface(interface_visa) def add_interface_raw_serial(self,interface_raw_serial,timeout=None,baudrate=None): if not isinstance(interface_raw_serial,lab_interfaces.interface_raw_serial): raise Exception('Interface must be a raw serial interface, interface is {}'.format(interface_raw_serial)) if timeout > interface_raw_serial.timeout: # only increase a timeout if it is shared it may be to fast for the others interface_raw_serial.timeout = timeout if baudrate: interface_raw_serial.baudrate = baudrate self._add_interface(interface_raw_serial) def add_interface_twi(self,interface_twi,timeout=None): if not isinstance(interface_twi,lab_interfaces.interface_twi): raise Exception('Interface must be a twi interface, interface is {}'.format(interface_twi)) if timeout > interface_twi.timeout: # only increase a timeout if it is shared it may be to fast for the others interface_twi.timeout = timeout self._add_interface(interface_twi) def add_interface_spi(self,interface_spi,timeout=None,baudrate=None): if not isinstance(interface_spi,lab_interfaces.interface_spi): raise Exception('Interface must be an spi interface, interface is {}'.format(interface_spi)) if timeout > interface_spi.timeout: # only increase a timeout if it is shared it may be to fast for the others interface_spi.timeout = timeout self._add_interface(interface_spi) def _add_channel(self,channel): #overload _add_channel to do some automatic repetive tasks before # letting channel_group do the rest for interface in self._interfaces: channel.add_interface(interface) if channel.get_category() is None: channel.set_category(self._base_name) channel_group._add_channel(self,channel) return channel
[docs]class scpi_instrument(instrument): """SCPI Instrument Base Class. Implements methods common to all SCPI instruments Instruments which adhere to the SCPI specification should inherit from the scpi_instrument class rather than the instrument class. """
[docs] def get_error(self): '''Return the first error from the SCPI instrument. +0 is the errorcode for no error''' return self.error()
[docs] def get_errors(self): '''Return a list of all accumulated SCPI instrument errors.''' errors = [] while(True): response = self.get_error() errors.append(response) if (response.split(",")[0] == '+0'): return errors elif (response.split(",")[0] == '0'): return errors
[docs] def beep(self): '''Send a beep command.''' self.get_interface().write("SYST:BEEP")
[docs] def clear_status(self): '''Send the *CLS command.''' self.get_interface().write("*CLS")
[docs] def display_clear(self): """Clear the instrument display""" self.get_interface().write("DISP:TEXT:CLE")
[docs] def display_off(self): '''Turn the instrument display off''' self.get_interface().write('DISP OFF')
[docs] def display_on(self): '''Turn the instrument display on''' self.get_interface().write('DISP ON')
[docs] def display_text(self,text=""): '''Display text on instrument front panel''' command="DISP:TEXT '"+text+"'" self.get_interface().write(command)
[docs] def enable_serial_polling(self): '''Enable the instrument to report operation complete via serial polling''' self.clear_status() #clear the stauts register self.get_interface().write("*ESE 1") #enable the operation complete bit in the event register self.get_interface().write("*SRE 32") #enable the event register to update the status register
[docs] def error(self): '''Get error message.''' return self.get_interface().ask("SYST:ERROR?")
[docs] def operation_complete(self): '''query if current operation is done blocks i/o until operation is complete or timeout this method retries query until a character is returned in cas of premature timeout EDIT - delet retry for now ''' return self.get_interface().ask('*OPC?') response = self.get_interface().ask('*OPC?') if (len(response) > 0): return response else: print '*OPC? Timeout!: %s' % response response = self.operation_complete()
[docs] def fetch(self): '''Send FETCH? query.''' return self.get_interface().ask("FETCH?")
[docs] def init(self): '''Send INIT command.''' self.get_interface().write("INIT")
[docs] def initiate_measurement(self,enable_polling=False): '''Initiate a measurement''' if enable_polling: self.enable_serial_polling() #enable serial polling self.clear_status() #clear the status register self.operation_complete() self.init() self.get_interface().write("*OPC") #enable operation complete update to the status register else: self.operation_complete() self.init()
[docs] def read_measurement(self): '''Send FETCH? query.''' return self.get_interface().ask("FETCH?")
[docs] def reset(self): '''Send the *RST command.''' self.get_interface().write("*RST")
[docs] def trigger(self): '''Send the *TRG command.''' self.get_interface().write('*TRG')
[docs] def identify(self): '''Send the *IDN? command.''' return self.get_interface().ask('*IDN?')
class remote_channel_group_server(object): # this class takes a channel groups and makes it remotely accessible # it currently does not support changing channels after creation def __init__(self,channel_group_object,address='localhost',port=5001,authkey='ltc_lab'): self.channel_group = channel_group_object class channel_group_manager(multiprocessing.managers.BaseManager): pass channel_group_manager.register('channel') channel_group_manager.register('get_channel_server',callable=lambda: self.channel_group, method_to_typeid = {'get_channel':'channel'} ) self.cgm = channel_group_manager(address=(address,port),authkey=authkey) def serve_forever(self): print "Launching remote server listening at address {}:{}".format(self.cgm.address[0],self.cgm.address[1]) server = self.cgm.get_server() server.serve_forever() class remote_channel(channel): # this handles both registers and channels methods_to_proxy = ['__str__', #integer_channel methods: 'add_format', 'add_preset', 'format', 'format_read', 'format_write', 'get_format', 'get_formats', 'get_max_write_limit', 'get_min_write_limit', 'get_presets', 'get_presets_dict', 'get_units', #'read_without_delegator', 'remove_format', 'set_format', 'signedToTwosComplement', 'twosComplementToSigned', 'unformat', 'use_presets_read', 'use_presets_write', 'using_presets_read', 'using_presets_write', 'write', 'write_unformatted', #channel methods: 'add_read_callback', 'add_write_callback', 'format_display', 'get_attribute', 'get_attributes', 'get_category', 'get_description', 'get_name', 'get_write_delay', 'is_readable', 'is_writeable', 'read', 'set_attribute', 'set_category', 'set_description', 'set_display_format_function', 'set_display_format_str', 'set_max_write_limit', 'set_min_write_limit', 'set_name', 'set_read_access', 'set_write_access', 'set_write_delay', #delegator methods omitted ] def __init__(self,proxy_channel,parent_delegator): #this intentially does not call the init of channel,just delegator delegator.__init__(self) self.set_delegator(parent_delegator) self._proxy_delegator = delegator for method in self.methods_to_proxy: if hasattr( proxy_channel, method): setattr(self, method, getattr(proxy_channel, method)) class remote_channel_group_client(channel_group,delegator): def __init__(self, address='localhost',port=5001,authkey='ltc_lab'): self._address = address self._port = port self._authkey=authkey channel_group.__init__(self,'remote_channel @ {}:{}'.format(address,port)) delegator.__init__(self) class channel_group_manager(multiprocessing.managers.BaseManager): pass channel_group_manager.register('channel') channel_group_manager.register('get_channel_server') #check if the port is open, there doesn't seem to be a way to time out so check first import socket; sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex((address,port)) if result: #the port is not open raise RemoteChannelGroupException('Unable to connect: {}'.format(result)) self.cgm = channel_group_manager(address=(address,port),authkey=authkey) self.cgm.connect() self.server = self.cgm.get_channel_server() names = self.server.get_all_channel_names() for i in names: ch = self.server.get_channel(i) self._add_channel(remote_channel(ch,self)) def read_delegated_channel_list(self,channel_list): channel_names = [ch.get_name() for ch in channel_list] return self.server.read_channels(channel_names) def clone(self): return remote_channel_group_client(self._address,self._port,self._authkey) class RemoteChannelGroupException(ChannelException): pass
[docs]class channel_master(channel_group,delegator): '''Master channel collection. There is typically only one. It replaces the old lab_bench as the main point of interaction with channels. Channels and channel_groups (instruments) may be added to it. It also creates dummy and virtual channels and adds them to its collection. It also supports virtual_caching channels; these can use cached data if available during logging or other multiple channel read.''' def __init__(self,name=None): if name is None: name = object.__str__(self)[1:-1] #remove Python <> because Qt interpret's them as HTML tags channel_group.__init__(self, name) delegator.__init__(self) self._caching_mode = 0 self._verbose = False self._read_callbacks = [] self._write_callbacks = [] self.start_threads(24) def set_verbose(self,verbose=True): self._verbose = verbose def add(self,channel_or_group): channel_group.add(self,channel_or_group) if self._verbose: print 'Added: {}'.format(channel_or_group.get_name())
[docs] def add_channel_virtual(self,channel_name,read_function=None,write_function=None,integer_size=None): '''Adds a channel named channel_name. Channel may have a read_function or a write_function but not both. If write_function is supplied, the write function is called with the value when written, and the last written value is returned when read. If read_function is supplied, this channel returns the return of read_function when read. If integer_size is not None, creates in integer_channel instead of a channel. integer_size should specify the number of data bits. Integer channels can add presets, formats.''' if integer_size is not None: new_channel = integer_channel(channel_name,size=integer_size,read_function=read_function,write_function=write_function) else: new_channel = channel(channel_name,read_function=read_function,write_function=write_function) new_channel.set_description(self.get_name() + ': ' + self.add_channel_virtual.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
[docs] def add_channel_virtual_caching(self,channel_name,read_function=None,write_function=None): '''Adds a channel named channel_name. Channel may have a read_function or a write_function but not both. If write_function is supplied, the write function is called with the value when written, and the last written value is returned when read. If read_function is supplied, this channel returns the return of read_function when read. If the read_function calls the creating channel_master's read_channel on another channel, a cached value may be used if part of a multi-channel channel read. This can improve logging speed in some cases.''' new_channel = channel(channel_name,read_function=read_function,write_function=write_function) new_channel.set_delegator(self) new_channel.set_description(self.get_name() + ': ' + self.add_channel_virtual_caching.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
[docs] def add_channel_dummy(self,channel_name,integer_size=None): '''Add a named dummy channel. This can be used if a single physical instrument channel is externally multiplexed to multiple measurement nodes. The user can store the multiple measurement results from a single instrument into multiple dummy channels. Also it is useful for logging test conditions.''' if integer_size is not None: new_channel = integer_channel(channel_name,size=integer_size) else: new_channel = channel(channel_name) new_channel.set_description(self.get_name() + ': ' + self.add_channel_dummy.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
[docs] def add_channel_delta_timer(self,channel_name,reciprocal=False): '''Add a named timer channel. Returns the time elapsed since the prior channel read. Optionally, compute 1/time to return frequency instead.''' class timer(object): def __init__(self, reciprocal): self.reciprocal = reciprocal self.last_time = datetime.datetime.utcnow() def __call__(self): self.this_time = datetime.datetime.utcnow() elapsed = self.this_time - self.last_time self.last_time = self.this_time if not reciprocal: return elapsed.total_seconds() #return native dimedelta instead? else: try: return 1/elapsed.total_seconds() except ZeroDivisionError: #too fast? return None new_channel = channel(channel_name, read_function=timer(reciprocal)) new_channel.set_description(self.get_name() + ': ' + self.add_channel_delta_timer.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
[docs] def add_channel_total_timer(self,channel_name): '''Add a named timer channel. Returns the time elapsed since first channel read.''' class timer(object): def __init__(self): self.beginning = None def __call__(self): if self.beginning is None: self.beginning = datetime.datetime.utcnow() return (datetime.datetime.utcnow() - self.beginning).total_seconds() #return native dimedelta instead? new_channel = channel(channel_name, read_function=timer()) new_channel.set_description(self.get_name() + ': ' + self.add_channel_total_timer.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
[docs] def add_channel_counter(self,channel_name, **kwargs): '''Add a named timer channel. Returns zero the first time channel is read and increments by one each time thereafter.''' class counter(object): def __init__(self, init=0, inc=1): self.inc = inc self.count = init - self.inc def __call__(self): self.count += self.inc return self.count new_channel = channel(channel_name, read_function=counter(**kwargs)) new_channel.set_description(self.get_name() + ': ' + self.add_channel_counter.__doc__) new_channel.set_category('Virtual') return self._add_channel(new_channel)
def read_channel(self,channel_name): if self._verbose: print "Reading Channel (via channel_master.read_channel): {}".format(channel_name) channel = self.get_channel(channel_name) if self._caching_mode: if channel_name in self._partial_delegation_results: result = self._partial_delegation_results[channel_name] if self._verbose: print "Reading Channel: {} from previous results: {}".format(channel_name,result) else: result = self.get_channel(channel_name).read() if self._verbose: print "cache miss {} read: {}".format(channel_name,result) if channel in self._self_delegation_channels: self._partial_delegation_results[channel_name] = result return result else: result = self.get_channel(channel_name).read() if self._verbose: print " {} read: {}".format(channel_name,result) #calling the observer is done here so its in the secondary thread, if threading for function in self._read_callbacks: function({channel_name: result}) return result def read_channel_list(self,channel_list): if self._verbose: print("Reading channel list: ") for c in channel_list: print "\t{}".format(c) results = channel_group.read_channel_list(self,channel_list) for function in self._read_callbacks: function(results) return results
[docs] def write_channel(self,channel_name,value): '''Delegates channel write to the appropriate registered instrument.''' if self._verbose: print("Writing Channel {}, to {}".format(channel_name,value)) for function in self._write_callbacks: function({channel_name: value}) data = channel_group.write_channel(self,channel_name,value) return data
def read_delegated_channel_list(self,channel_list): results = collections.OrderedDict() if self._caching_mode: for channel in channel_list: if channel.get_name() not in self._partial_delegation_results: results[channel.get_name()] = channel.read_without_delegator() self._partial_delegation_results[channel.get_name()] = results[channel.get_name()] else: results[channel.get_name()] = self._partial_delegation_results[channel.get_name()] else: self._caching_mode += 1 for channel in channel_list: if channel.get_name() not in self._partial_delegation_results: results[channel.get_name()] = channel.read() self._partial_delegation_results[channel.get_name()] = results[channel.get_name()] else: results[channel.get_name()] = self._partial_delegation_results[channel.get_name()] self._caching_mode -= 1 if self._caching_mode == 0: self._partial_delegation_results = collections.OrderedDict() return results def serve(self,address='localhost',port=5001,authkey='ltc_lab'): rcgs = remote_channel_group_server(self,address,port,authkey) rcgs.serve_forever() def attach(self, address='localhost',port=5001,authkey='ltc_lab'): try: rcgc = remote_channel_group_client(address,port,authkey) except RemoteChannelGroupException as e: print e return False self.add(rcgc) return True def background_gui(self,cfg_file='default.guicfg'): thread.start_new_thread( self._gui_launcher_passive, (cfg_file,) ) def gui(self,cfg_file='default.guicfg'): self._gui_launcher(cfg_file)
[docs] def add_read_callback(self,read_callback): '''Adds a read callback. This is a function that will be called any time a channel(s) is read. the callback function should accept one argument: the dictionary of results. If it is not important to group results by each batch read, consider adding a callback to an individual channel instead.''' self._read_callbacks.append(read_callback)
def remove_read_callback(self, read_callback): self._read_callbacks.remove(read_callback)
[docs] def add_write_callback(self,write_callback): '''Adds a write callback. This is a function that will be called any time a channel is written. the callback function should accept one argument: the dictionary of results. In this case, the dictionary will only contain a key,value pair for the single channel that was written. For more flexibility, considering adding a callback to an individual channel instead.''' self._write_callbacks.append(write_callback)
def remove_write_callback(self, write_callback): self._write_callbacks.remove(write_callback) def _gui_launcher_passive(self,cfg_file): import lab_gui #this cannot be imported in the main thread gui = lab_gui.ltc_lab_gui_app(self,passive=True,cfg_file=cfg_file) self.add_read_callback(gui.passive_data) self.add_write_callback(gui.passive_data) gui.exec_() def _gui_launcher(self,cfg_file): import lab_gui #this cannot be imported in the main thread gui = lab_gui.ltc_lab_gui_app(self,passive=False,cfg_file=cfg_file) gui.exec_() def get_dummy_clone(self): clone = channel_master() for channel in self: clone.add_channel_dummy(channel.get_name()) clone[channel.get_name()].set_category(channel.get_category()) if channel._read: clone[channel.get_name()].write(0) return clone
class master(channel_master,lab_interfaces.interface_factory): def __init__(self,name=None): channel_master.__init__(self,name) lab_interfaces.interface_factory.__init__(self) class logger(master): def __init__(self, channel_master_or_group=None, database="data_log.sqlite", use_threads=True): '''channel_group is a lab_bench object containing all instruments of interest for logging. database is the filemane the sqlite database will be stored in''' master.__init__(self,name='logger') if channel_master_or_group is not None: self.merge_in_channel_group(channel_master_or_group) for channel in self: if not channel.is_readable(): self.remove_channel(channel) # if the object given if isinstance(channel_master_or_group,channel_master): self.master = channel_master_or_group else: self.master = self self._backend = logger_backend(database=database, use_threads=use_threads) self._database = database atexit.register(self.stop) self._table_name = None self._log_callbacks = [] self._previously_logged_data = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.stop() return None def stop(self): '''close sqlite database connection.''' self._backend.stop() def add_channel(self,channel_object): self._add_channel(channel_object) def append_table(self,table_name): self._table_name = table_name columns = self.get_all_channel_names() return self._backend.append_table(table_name,columns) def new_table(self,table_name,replace_table=False,warn=False): '''create a new table with columns matching channels known to logger instance if replace_table == 'copy', previously collected data to a new table with the date and time appended to the table name before overwriting if replace_table, allow previously collected data to be overwritten if not replace_table, raise an exception (or print to screen instead if warn) rather than overwrite existing data''' self._table_name = table_name columns = self.get_all_channel_names() return self._backend.new_table(table_name,columns,replace_table,warn) def switch_table(self,table_name): self._table_name = table_name return self._backend.switch_table(table_name) def copy_table(self,old_table, new_table): return self._backend.copy_table(old_table,new_table) def get_database(self): return self._database def get_table_name(self): return self._table_name def _fetch_channel_data(self,exclusions): scan_list = self.get_flat_channel_group('scan_list') #only log channels that are readable for channel in scan_list.get_all_channels_list(): if not channel.is_readable(): scan_list.remove_channel(channel) #remove the excluded items from the scan list scan_list.remove_channel_list(exclusions) channel_data = self.master.read_channel_list(scan_list) #add additional database columns channel_data['rowid'] = None channel_data['datetime'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') return channel_data def log(self,exclusions=[]): '''measure all non-excluded channels. Channels may be excluded by name, channel_group(instrument), or directly. Returns a dictionary of what it logged.''' self._backend.check_exception() data = self._fetch_channel_data(exclusions) self._backend.store(data) self._previously_logged_data = data for callback in self._log_callbacks: callback(data) return data def log_if_changed(self, log_exclusions=[], compare_exclusions=[]): '''like log(), but only stores data to database if data in at least on channel/column has changed. log_exclusions is a list of logger channels which will not be read nor stored in the database. compare_exclusions is a list of logger channels which will not be used to decide if data has changed but which will be read and stored in the databased if something else changed. rowid and datetime are automatically excluded from change comparison. returns channel data if logged, else None''' self._backend.check_exception() data = self._fetch_channel_data(log_exclusions) if self._previously_logged_data is not None: old_data = dict(self._previously_logged_data) new_data = dict(data) del old_data['rowid'] del new_data['rowid'] del old_data['datetime'] del new_data['datetime'] for item in compare_exclusions: if isinstance(item,str): del old_data[item] del new_data[item] elif isinstance(item,channel): del old_data[item.get_name()] del new_data[item.get_name()] else: raise Exception('Unknown compare exclusion type: {} {}'.format(type(item),item)) if self._previously_logged_data is None or old_data != new_data: self._backend.store(data) self._previously_logged_data = data #skip callbacks if data unchanged? for callback in self._log_callbacks: callback(data) return data else: return None def log_data(self, data_dictionary): '''log previously collected data. data_dictionary should have channel name keys. set up logger and table using logger.add_data_channels() alternately, data_dictionary can be an iterable containing dictionaries, each representing a single row.''' self._backend.check_exception() try: data_dictionary['rowid'] = None if data_dictionary.get('datetime', None) is None: data_dictionary['datetime'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') self._backend.store(data_dictionary) self._previously_logged_data = data_dictionary except TypeError: #not a mapping type, assume iterable of mapping for row in data_dictionary: assert len(row.keys()) #try to prevent infinite recursion with malformed data self.log_data(row) def add_data_channels(self, data_dictionary): '''prepare logger channel group with fake channels matching data_dictionary keys. call before logger.new_table(). use to log previously collected data in conjunction with logger.log_data() ''' assert len(self.get_all_channel_names()) == 0 def read_disable(): raise Exception('Attempted to read fake channel designed to be used with logger.log_data()') for key in data_dictionary: fake_channel = channel(key, read_function=read_disable) self.add_channel(fake_channel) def add_log_callback(self,log_callback): '''Adds a read callback. This is a function that will be called any time a channel(s) is read. the callback function should accept one argument: the dictionary of results. If it is not important to group results by each batch read, consider adding a callback to an individual channel instead.''' self._log_callbacks.append(log_callback) def remove_log_callback(self, log_callback): self._log_callbacks.remove(log_callback) def get_master(self): return self.master def flush(self): '''commit pending transactions and block until database thread queue is empty.''' self._backend.sync_threads() def set_journal_mode(self, journal_mode='WAL', synchronous='NORMAL', timeout_ms=10000): '''configure database connection for more reliable concurrent read/write operations with high data throughput or large data sets. Three options are individually configurable: journal_mode changes the default rollback journal to other methods which read-lock the database less often and for less time. WAL (write-ahead log) is usually the best choice. Reading and writing are independet and will never block each other. https://www.sqlite.org/pragma.html#pragma_journal_mode https://www.sqlite.org/wal.html synchronouns changes how SQLite waits to confirm that data has been safely written to the disk platter surface. Relaxing this from FULL to NORMAL to OFF will increase commit speed with an increasing risk of data corruption if power is lost or the compute crashes at an inopportune time. Use with caution. Usually WAL alone will correct most access problems. https://www.sqlite.org/pragma.html#pragma_synchronous timeout_ms changes the amount of time SQLite will wait for access to a locked database before giving up and failing. Timeouts are much less likely in WAL mode compated to normal rollback journal mode. https://www.sqlite.org/pragma.html#pragma_busy_timeout ''' self._backend.execute("PRAGMA locking_mode = NORMAL;") self._backend.execute("PRAGMA busy_timeout = {};".format(timeout_ms)) journal_mode = journal_mode.upper() if journal_mode not in ["DELETE","TRUNCATE","PERSIST","MEMORY","WAL","OFF"]: raise Exception('Valid arguments to journal_mode are "DELETE", "TRUNCATE", "PERSIST", "MEMORY", "WAL", and "OFF". See https://www.sqlite.org/pragma.html#pragma_journal_mode') self._backend.execute("PRAGMA journal_mode = {};".format(journal_mode)) if journal_mode == "WAL": self._backend.execute("PRAGMA wal_autocheckpoint=100;") synchronous = synchronous.upper() if synchronous not in ["OFF","NORMAL","FULL","EXTRA"]: raise Exception('Valid arguments to synchronous are "OFF", "NORMAL", "FULL", and "EXTRA". See https://www.sqlite.org/pragma.html#pragma_synchronous') self._backend.execute("PRAGMA synchronous = {};".format(synchronous)) def optimize(self): '''Defragment database file, reducing file size and speeding future queries. Also re-runs query plan optimizer to spped future queries. WARNING: May take a lot time to complete when operating on a large database. WARNING: May re-order rowid's ''' self._backend.execute("VACUUM;") self._backend.execute("ANALYZE;") self.flush() class logger_backend(object): def __init__(self,database="data_log.sqlite", use_threads=True): self.table_name = None self._use_thread = use_threads self._max_lock_time = datetime.timedelta(seconds=1) self._thread_exception = None self._run = True self._stopped = False if self._use_thread: self.storage_queue = Queue.Queue() self._thread = thread.start_new_thread(self._db_thread, ()) self.storage_queue.put(lambda: self._connect_db(database)) else: self._connect_db(database) atexit.register(self.stop) def _close(self): #print "Closing database connection." try: self.conn.execute("PRAGMA wal_checkpoint(RESTART);") #no effect if DB not in write-ahead log journal mode self.conn.execute("PRAGMA journal_mode=DELETE;") self.conn.close() except sqlite3.ProgrammingError as e: #can't execute if connection previously closed print e except Exception as e: print "Unhandled exception in _close!" print e def sync_threads(self): if self._use_thread: self.storage_queue.put(self._commit) self.storage_queue.join() self.check_exception() else: self._commit() def check_exception(self): self._check_exception() def _check_exception(self): if self._thread_exception: raise self._thread_exception def execute(self, sql_query, *params): '''not currently capable of returning the query result through the thread queue useful for setting up the database with PRAGMA commands.''' if self._use_thread: self.storage_queue.put(lambda: self._execute(sql_query, *params)) else: self._execute(sql_query, *params) def _execute(self, sql_query, *params): return self.conn.execute(sql_query, params) def _connect_db(self,database): self.db = database self.conn = sqlite3.connect(self.db) self.cur = self.conn.cursor() def _db_thread(self): self.lock_time = None while self._run: if self.lock_time is not None and (datetime.datetime.utcnow()-self.lock_time) > self._max_lock_time: self._commit() self.lock_time = None #print 'max lock timed out' try: function = self.storage_queue.get(block=False) except Queue.Empty: if self.lock_time is not None: try: self.conn.commit() #not self._commit to avoid infinite retry self.conn.execute("PRAGMA wal_checkpoint(PASSIVE);") #no effect if DB not in write-ahead log journal mode except sqlite3.OperationalError as e: print "Opportunistic commit failed. Not retrying." else: self.lock_time = None function = self.storage_queue.get(block=True) finally: try: if self.lock_time is None: self.lock_time = datetime.datetime.utcnow() function() except Exception as e: print traceback.format_exc() raise e self._thread_exception = e finally: self.storage_queue.task_done() self._stopped = True def store(self,data): if self._use_thread: self.storage_queue.put(lambda: self._store(data)) else: self._store(data) def _new_table(self,table_name,columns,replace_table=False,warn=False): '''create new table in the sqlite database with a column for each channel. replace any existing table with the same name (delete data!).''' table_name = str(table_name).replace(" ","_") try: self._create_table(table_name,columns) self._commit() except sqlite3.OperationalError as e: if replace_table == 'copy': try: self._copy_table(table_name, '{}_{}'.format(table_name, datetime.datetime.utcnow().strftime('%Y_%m_%dT%H_%M_%SZ'))) self.cur.execute("DROP TABLE {};".format(table_name)) self._commit() except sqlite3.OperationalError as e: print e print "Table not copied (may not exist): {}".format(table_name) self._create_table(table_name,columns) self._commit() elif replace_table: try: self.cur.execute("DROP TABLE {};".format(table_name)) self._commit() except sqlite3.OperationalError as e: print e print "Table not dropped (may not exist): {}".format(table_name) self._create_table(table_name,columns) self._commit() else: if warn: print 'Table name {} creation failed. Table probably exists. Rename table, change table name argument, or call with replace_table argument=True'.format(self.table_name) else: raise e def _create_table(self,table_name,columns): '''create the actual sql table and commit to database. Called by new_sweep_replace() (and new_sweep()?)''' self.columns = list(columns) self.columns.sort() txt = "CREATE TABLE " + table_name + " ( rowid INTEGER PRIMARY KEY, datetime DATETIME, " for column in self.columns: txt += "{} NUMERIC, ".format(column) txt = txt[:-2] txt += " );" self.cur.execute(txt) self._commit() self.table_name = table_name def _store(self,data,num=0): if self.table_name is None: raise Exception("Need to create a table before logging") '''match data dictionary keys to table column names and commit new row to table.''' q = ("?," * len(data))[:-1] def db_clean(column_data): '''help database to store lists, dictionaries and any other future datatype that doesn't fit natively''' if isinstance(column_data,list): return str(column_data) elif isinstance(column_data,dict): return str(column_data) return column_data values = tuple([db_clean(column) for column in data.values()]) sql = "INSERT INTO {} {} VALUES ({});".format(self.table_name, str(tuple(data.keys())), q) try: self.cur.execute(sql,values) except sqlite3.OperationalError as e: if num > 2: print e print "Try {} failed. Trying again...".format(num) time.sleep(0.01) self._store(data,num=num+1) #keep trying forever def copy_table(self,old_table,new_table): self._check_name(new_table) self._check_exception() if self._use_thread: self.storage_queue.put(lambda: self._copy_table(old_table,new_table)) else: self._copy_table(old_table,new_table) self.sync_threads() def _copy_table(self, old_table, new_table): self.conn.execute('CREATE TABLE {} AS SELECT * FROM {};'.format(new_table, old_table)) def _commit(self): try: self.conn.commit() except sqlite3.OperationalError as e: print e print "Trying commit again..." self._commit() def switch_table(self,table_name): self._check_name(table_name) self._check_exception() if self._use_thread: self.storage_queue.put(lambda: self._switch_table(table_name)) else: self._switch_table(table_name) self.sync_threads() def _switch_table(self,table_name): self.table_name = table_name def append_table(self,table_name,columns): self._check_name(table_name) self._check_exception() if self._use_thread: self.storage_queue.put(lambda: self._append_table(table_name,columns)) else: self._append_table(table_name,columns) self.sync_threads() def _append_table(self,table_name,columns): try: self._new_table(table_name,columns,replace_table=False,warn=False) self._commit() except sqlite3.OperationalError: pass self._switch_table(table_name) for column in columns: try: self.cur.execute("ALTER TABLE {} ADD {} NUMERIC;".format(self.table_name, column)) self._commit() except sqlite3.OperationalError as e: pass else: print "Added column: {} to table: {}".format(column, self.table_name) def _check_name(self,name): if not re.match("[_A-Za-z][_a-zA-Z0-9]*$",name): raise Exception('Bad Table Name "{}"'.format(name)) def new_table(self,table_name,columns,replace_table=False,warn=False): self._check_name(table_name) self._check_exception() if self._use_thread: self.storage_queue.put(lambda: self._new_table(table_name,columns,replace_table,warn)) else: self._new_table(table_name,columns,replace_table,warn) self.sync_threads() def stop(self): if self._use_thread: self.storage_queue.put(self._stop) else: self._stop() while not self._stopped: #change to self.storage_queue.join()? pass def _stop(self): try: self._commit() except sqlite3.ProgrammingError as e: #can't commit if connection previously closed print e except Exception as e: print "Unhandled exception in _stop!" print e finally: self._close() self._run = False if __name__ == "__main__": def print_it(x): print x #test of threaded delegation lb = master() if not lb.attach(address='localhost'): print "creating fake channels" # make 4 communication nodes, a-d, a and b are root level interfaces, b is not thread safe, c and d are downstream of b ia = lb.get_dummy_interface(name='a') ia.set_com_node_thread_safe(True) ib = lb.get_dummy_interface(name='b') ib.set_com_node_thread_safe(True) ic = lb.get_dummy_interface(parent=ib,name='c') id = lb.get_dummy_interface(parent=ib,name='d') #create some dummy channels using these interfaces ch1_ia = channel('ch1_ia', read_function = lambda: time.sleep(3) ) ch1_ia.add_interface(ia) ch2_ic = channel('ch2_ic', read_function = lambda: time.sleep(1) ) ch2_ic.add_interface(ic) ch3_id = channel('ch3_id', read_function = lambda: time.sleep(1) ) ch3_id.add_interface(id) ch4_id = channel('ch4_id', read_function = lambda: time.sleep(1) ) #ch4_id.set_delegator(ch3_id) ch4_id.add_interface(id) lb._add_channel(ch1_ia) lb._add_channel(ch2_ic) lb._add_channel(ch3_id) lb._add_channel(ch4_id) lb.add_channel_dummy('dummy') lb.write('dummy',"dummydata") lb.add_channel_virtual('virtual_print',write_function= lambda x: print_it(x)) print "new_logger" lb.gui() logger = logger(lb) logger.new_table("test_table",replace_table=True) logger.set_journal_mode() logger.log() logger.log() #lb.background_gui() #lb.serve(address='localhost') print "done" else: print "did not create any channels" tstart = time.time() data = lb.read_all_channels() print "read took {}".format(time.time()-tstart) print data lgr = logger(lb) lgr.new_table('test',replace_table=True) lgr.log() lgr.log() #lgr.gui() #lb.gui()