Source code for PyICe.lab_utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Miscellaneous Utilities
=======================

Includes:

- database access toools
- servo delay loop
- string sanitization
- filters and other numeric transforms
- floating point number utilites
- email/SMS notification
 
 '''

import smtplib
from email.mime.text import MIMEText #send email/sms message from script
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import time, datetime
import sqlite3
import re
import urllib
import operator
import math
import os
import atexit
import collections
import sys
import struct
import threading
import numbers
import csv
import Queue

try:
    import numpy, scipy, scipy.signal
except ImportError as e:
    print "Warning: NumPy,SciPy import failed."
    print e

[docs]class delay_loop(object): '''make constant loop delay independent of loop processing time by measuring time at beginning and end of loop and adding extra delay as necessary''' def __init__(self, strict=False, begin=True, no_drift=True): '''Set strict to True to raise an Exception if loop time is longer than requested delay. Timer will automatically begin when the object is instantiated if begin=True. To start timer only when ready, set begin=False and call begin() method to start timer. If no_drift=True, delay loop will manage loop time over-runs by debiting extra time from next cycle. This insures long-term time stability at the expense of increased jitter. Windows task switching can add multi-mS uncertainty to each delay() call, which can accumulate if not accounted for. Set no_drift=False to ignore time over-runs when computing next delay time. ''' self.strict = strict self.no_drift = no_drift self.count = 0 self.begin_time = None self.delay_time = None #last loop time for margin diagnostics. self.loop_time = None if begin: self.begin() def __call__(self,seconds): return self.delay(seconds)
[docs] def get_count(self): '''returns total number of times delay() method called''' return self.count
[docs] def get_total_time(self): '''returns total number of seconds since first delay''' return (datetime.datetime.utcnow()-self.start_time).total_seconds()
[docs] def begin(self, offset = 0): '''make note of begin time for loop measurement. Use offset to adjust the begin time in case of overrun on last cycle.''' if self.begin_time is None: self.start_time = datetime.datetime.utcnow() self.begin_time = datetime.datetime.utcnow() + datetime.timedelta(seconds = offset)
[docs] def delay(self, seconds): '''delay extra time to make loop time constant returns actual delay time achieved''' if self.begin_time is None: raise Exception('Call begin() method before delay().') self.count += 1 elapsed_time = datetime.datetime.utcnow() - self.begin_time self.delay_time = (datetime.timedelta(seconds = seconds) - elapsed_time).total_seconds() if (self.delay_time < 0): if (self.strict == True): raise Exception('Loop processing longer than requested delay by {:3.3f} seconds {} at {}.'.format(-self.delay_time, datetime.datetime.now())) else: print 'Warning! Loop processing longer than requested delay by {:3.3f} seconds at {}.'.format(-self.delay_time, datetime.datetime.now()) else: time.sleep(self.delay_time) self.loop_time = (datetime.datetime.utcnow() - self.begin_time).total_seconds() if self.no_drift: self.begin(offset = seconds - self.loop_time) # restart timer for next loop cycle, use offset to correct for over run. else: self.begin(offset = 0) # restart timer for next loop cycle, ignore over run. return self.delay_time
[docs] def time_remaining(self, loop_time): '''Use this in a while loop to perform another function for duration loop_time. Test the result for 0 or less.''' if self.begin_time is None: raise Exception('Call begin() method before time_remaining().') remaining_time = loop_time - (datetime.datetime.utcnow() - self.begin_time).total_seconds() if remaining_time <= 0: self.begin(offset = remaining_time) # restart timer for next loop cycle, use offset to correct for over run. return remaining_time
[docs] def delay_margin(self): '''Return extra time remaining (ie sleep time) before last call to delay().''' return self.delay_time
[docs] def achieved_loop_time(self): '''Return previous actual achieved loop time (including any overrun).''' return self.loop_time
[docs]class threaded_writer(object): '''helper to perform some task in parallel with test script at fixed rate'''
[docs] class stop_thread(threading.Thread): '''Thread extended to have stop() method. Threads cannot be restarted after stopping. Make a new one to restart.''' def __init__(self, stop_event, stopped_event, queue, group=None, target=None, name=None, args=(), kwargs={}): self.stop_event = stop_event #command to stop thread self.stopped_event = stopped_event #notification that thread has stopped itself. self.queue = queue threading.Thread.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
[docs] def stop(self): '''stop thread. thread cannot be restarted.''' self.stop_event.set()
def set_time_interval(self, time_interval): self.queue.put(("time_interval", time_interval))
def __init__(self, verbose=False): self.verbose = verbose self._threads = [] #check stopped_event whenever inspecting elements of this list to find out which threads have already stopped. def _check_threads(self): '''remove terminated threads from internal list''' for thread in self._threads[:]: if thread.stopped_event.is_set(): self._threads.remove(thread)
[docs] def stop_all(self): '''stop all threads. threads cannot be restarted.''' self._check_threads() for thread in self._threads[:]: thread.stop() self._threads.remove(thread)
[docs] def connect_channel(self, channel_name, time_interval, sequence=None, start=True, address='localhost', port=5001, authkey='ltc_lab'): ''' Write each element of sequence in turn to channel_name, waiting time_interval between writes. If sequence is None, Periodically read and re-write channel as keepalive. Thread safety provided by remote channel server infrastructure. First thread must call master.serve() and test script should call master.attach(). ''' from PyICe import lab_core m = lab_core.master() m.attach(address, port, authkey) if sequence is None: return self.add_function(lambda channel_name=channel_name: m.write(channel_name, m.read(channel_name)), time_interval, start) else: class sequencer(object): def __init__(self): self.sequence = self.generator(sequence) def generator(self, sequence): for i in sequence: yield i def __call__(self): m.write(channel_name, self.sequence.next()) return self.add_function(sequencer(), time_interval, start)
[docs] def add_function(self, function, time_interval, start=True): ''' Periodically execute function. No thread safety. Use caution with shared interfaces or use separate remote channel clients with each function. See example above. ''' stop_event = threading.Event() stopped_event = threading.Event() queue = Queue.Queue() thread = self.stop_thread(stop_event, stopped_event, queue, target=lambda: self._task(function, time_interval, stop_event, stopped_event, queue), name=None) if start: thread.start() self._threads.append(thread) return thread
def _task(self, function, time_interval, stop_event, stopped_event, queue): '''thread handling loop. processes input Event to request thread termination and sends event back when thread terminates.''' dly = delay_loop() params = {} params['time_interval'] = time_interval while not stop_event.is_set(): #add ability to pass external message to terminate thread??? try: attr = queue.get_nowait() if self.verbose: print "Writing {} to {}".format(attr[0],attr[1]) params[attr[0]] = attr[1] except Queue.Empty: pass if self.verbose: print "Executing {} at time {}".format(function, datetime.datetime.utcnow()) try: function() except StopIteration as e: if self.verbose: print "Thread {} terminating - reached end of sequence at time {}".format(function, datetime.datetime.utcnow()) stopped_event.set() return dly.delay(params['time_interval']) if self.verbose: print "Thread {} terminating - received stop event at time {}".format(function, datetime.datetime.utcnow()) stopped_event.set()
[docs]def floatRange(start,stop=None,step=None): '''Returns a list of numbers similar to python range() builtin but supports floats. start is inclusive, stop is exclusive When called with a single argument, start=0 and the argument becomes stop.''' return numpy.arange(start, stop, step).tolist()
[docs]def floatRangeInc(start,stop=None,step=None): '''Same as float range, however it is inclusive of the last value''' return floatRange(start, stop, step) + [stop]
[docs]def logRange(start,stop,stepsPerDecade=None, stepsPerOctave=None): '''log step range function similar to python built-in range()''' if (stepsPerDecade is not None and stepsPerOctave is None): stepsize = 10**(1.0/stepsPerDecade) #possible divide by zero! elif (stepsPerDecade is None and stepsPerOctave is not None): stepsize = 2**(1.0/stepsPerOctave) #possible divide by zero! else: raise Exception('Must call logRange function with exactly one of the (stepsPerDecade, stepsPerOctave) arguments') point = float(start) r = [] while (point < stop): r.append(point) point *= stepsize return r
[docs]def decadeListRange(decadePoints,decades): '''log step range function similar to python built-in range() accepts list input of points in a single decade and repeats these points over the specified number of decades ''' r = [] exp = 0 while (decades > 0): r.extend(map(lambda x: x*10**exp, decadePoints)) decades -= 1 exp += 1 return r
[docs]class email(object): '''sends email to specified destination from dedicated gmail account, or account of your choosing''' def __init__(self, destination, login='ltlabboston', pw='PythonLab', domain='gmail.com'): '''destination is the recipient's email address''' self.destination = destination self.login = login self.pw = pw self.sender = login + '@' + domain
[docs] def send(self, body, subject = None, attachments = []): '''compose MIME message with proper headers and send''' if len(attachments) == 0: message = MIMEText(body, _charset="utf-8") else: message = MIMEMultipart('mixed') message.attach(MIMEText(body, _charset="utf-8")) for attachment in attachments: filebytes = open(attachment, "rb") Attachment = MIMEApplication(filebytes.read()) Attachment.add_header('content-disposition', 'attachment', filename = os.path.basename(attachment)) message.attach(Attachment) if (subject is not None): message['Subject'] = subject message['To'] = self.destination message['From'] = self.sender server = smtplib.SMTP('smtp.gmail.com:587') server.ehlo() server.starttls() server.ehlo() server.login(self.login,self.pw) server.sendmail(self.sender, self.destination, message.as_string()) server.quit()
[docs]class sms(email): '''Extends email class to send sms messages through several carriers' email to sms gateways''' def __init__(self, mobile_number, carrier, login='ltlabboston', pw='PythonLab', domain='gmail.com'): '''carrier is 'verizon', 'tmobile', 'att', 'sprint', or 'nextel' ''' sms_email = '' for digit in str(mobile_number): if digit.isdigit(): #remove dashes, dots, spaces, and whatever other non-digits came in sms_email += digit sms_email = sms_email.lstrip('1') #remove country code if (len(sms_email) != 10): raise Exception('mobile_number argument must be a 10-digit phone number with area code') carrier = carrier.lower() if (carrier == 'verizon'): sms_email += '@vtext.com' elif (carrier == 't-mobile' or carrier == 'tmobile'): sms_email += '@tmomail.net' elif (carrier == 'att' or carrier == 'at&t'): sms_email += '@txt.att.net ' elif (carrier == 'sprint'): sms_email += '@messaging.sprintpcs.com' elif (carrier == 'nextel'): sms_email += '@page.nextel.com' else: #look up additional sms email gateways here: http://en.wikipedia.org/wiki/List_of_SMS_gateways raise Exception('carrier argument must be "verizon", "t-mobile", "att", "sprint", or "nextel" unless you add your carrier to the list') email.__init__(self, sms_email, login, pw, domain)
def str2num(str_in): if isinstance(str_in,int) or isinstance(str_in,float) or str_in is None: return str_in try: return int(str_in,0) #automatically select base except ValueError: try: return float(str_in) except ValueError as e: print "string failed to convert both to integer (automatic base selection) and float: {}".format(str) raise e def clean_c(str): str = str.replace("\t","_") #0x09 str = str.replace(" ","_") #0x20 str = str.replace("!","_BANG_") #0x21 str = str.replace('"',"_DQT_") #0x22 str = str.replace("#","_PND_") #0x23 str = str.replace("$","_DOL_") #0x24 str = str.replace("%","_PER_") #0x25 str = str.replace("&","_AND_") #0x26 str = str.replace("'","_SQT_") #0x27 str = str.replace("(","_OPNP_") #0x28 str = str.replace(")","_CLSP_") #0x29 str = str.replace("*","_MUL_") #0x2A str = str.replace("+","_PLS_") #0x2B str = str.replace(",","_COMA_") #0x2C str = str.replace("-","_MNS_") #0x2D str = str.replace(".","p") #0x2E str = str.replace("/","_DIV_") #0x2F str = str.replace(":","_CLN_") #0x3A str = str.replace(";","_SCLN_") #0x3B str = str.replace("<","_LSS_THN_") #0x3C str = str.replace("=","_EQLS_") #0x3D str = str.replace(">","_GRTR_THN_") #0x3E str = str.replace("?","_QUES_") #0x3F str = str.replace("@","_AT_") #0x40 str = str.replace("[","_OPNS_") #0x5B str = str.replace("\\","_SLSH_") #0x5C str = str.replace("]","_CLSS_") #0x5D str = str.replace("^","_CAR_") #0x5E #0x5F is '_' str = str.replace("`","_GRAVE_") #0x60 str = str.replace("{","_OPNC_") #0x7B str = str.replace("|","_OR_") #0x7C str = str.replace("}","_CLSC_") #0x7D str = str.replace("~","_TIL_") #0x7E if str[0].isdigit(): str = "_" + str for c in str: #all characters 0x3A-0x40 and 0x5B-0x60 already replaced above. if ord(c) > 127: raise Exception('TODO?: escape non-ascii character found in: {}'.format(str)) if ord(c) < 0x30: raise Exception('Ascii control character code point {} found in: {}'.format(ord(c),str)) if ord(c) > 0x7A: raise Exception('Ascii non-alphanumeric character code point {} found in: {}'.format(ord(c),str)) return str def remove_non_ascii(text): out = '' for c in text: #all characters 0x3A-0x40 and 0x5B-0x60 already replaced above. if ord(c) > 127: c = "(REMOVED_NON_ASCII)" out += c return out def remove_html(text): if text is not None: re.sub('<[^<]+?>', '', text) return text
[docs]def swap_endian(word, elementCount, elementSize=8): '''reverse endianness of multi-byte word elementCount is number of bytes, or other atomic memory block if not of elementSize 8 bits to reverse bit order, set elementCount to the number of bits and set elementSize to 1.''' assert word < 2**(elementSize*elementCount) assert word >= 0 reversed = 0x00 mask = 2**elementSize-1 while elementCount > 0: reversed ^= (word & mask) << (elementSize*(elementCount-1)) word = word >> elementSize elementCount -= 1 return reversed
[docs]def signedToTwosComplement(signed, bitCount): '''take python int and convert to two's complement representation using specified number of bits''' assert signed < 2**(bitCount-1) assert signed >= -1 * 2**(bitCount-1) if signed < 0: signed += 2**bitCount signed &= 2**bitCount-1 return signed
[docs]def twosComplementToSigned(binary, bitCount): '''take two's complement number with specified number of bits and convert to python int representation''' assert binary < 2**bitCount assert binary >= 0 if binary >= 2**(bitCount-1): binary -= 2**bitCount return binary
[docs]def isclose(a, b, rel_tol=1e-9, abs_tol=0.0): #backported from 3.5 #https://github.com/PythonCHB/close_pep/blob/master/isclose.py #https://www.python.org/dev/peps/pep-0485/ #https://docs.python.org/3/library/math.html#math.isclose #alternative tests here: https://github.com/PythonCHB/close_pep/blob/master/is_close.py """ returns True if a is close in value to b. False otherwise :param a: one of the values to be tested :param b: the other value to be tested :param rel_tol=1e-9: The relative tolerance -- the amount of error allowed, relative to the absolute value of the larger input values. :param abs_tol=0.0: The minimum absolute tolerance level -- useful for comparisons to zero. NOTES: -inf, inf and NaN behave similarly to the IEEE 754 Standard. That is, NaN is not close to anything, even itself. inf and -inf are only close to themselves. The function can be used with any type that supports comparison, substratcion and multiplication, including Decimal, Fraction, and Complex Complex values are compared based on their absolute value. See PEP-0485 for a detailed description """ if a == b: # short-circuit exact equality return True if rel_tol < 0.0 or abs_tol < 0.0: raise ValueError('error tolerances must be non-negative') # use cmath so it will work with complex ot float if math.isinf(abs(a)) or math.isinf(abs(b)): # This includes the case of two infinities of opposite sign, or # one infinity and one finite number. Two infinities of opposite sign # would otherwise have an infinite relative tolerance. return False diff = abs(b - a) return (((diff <= abs(rel_tol * b)) and #DJS change from weak to strong symmetry so that argument order doesn't matter (diff <= abs(rel_tol * a))) or (diff <= abs_tol))
[docs]def unit_least_precision(val, increasing=True): '''return positive increment/decrement to next representable floating point number above/below val''' if increasing: return float_next(val) - val else: return val - float_prior(val)
[docs]def float_next(val): '''return next Python double precision floating point nuber larger than x.''' #algorithm copied from Boost: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp assert not math.isinf(val) assert not math.isnan(val) assert not val >= sys.float_info.max if val == 0: return sys.float_info.epsilon*sys.float_info.min #denorm min frac, expon = math.frexp(val) if frac == -0.5: expon -= 1 #reduce exponent when val is a power of two, and negative. diff = math.ldexp(1, expon - sys.float_info.mant_dig) if diff == 0: diff = sys.float_info.epsilon*sys.float_info.min #denorm min return val + diff;
[docs]def float_prior(val): '''return next Python double precision floating point nuber smaller than x.''' #algorithm copied from Boost: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp assert not math.isinf(val) assert not math.isnan(val) assert not val <= -sys.float_info.max if val == 0: return -sys.float_info.epsilon*sys.float_info.min #denorm min frac, expon = math.frexp(val) if frac == 0.5: expon -= 1 #when val is a power of two we must reduce the exponent diff = math.ldexp(1, expon - sys.float_info.mant_dig) if diff == 0: diff = sys.float_info.epsilon*sys.float_info.min #denorm min return val - diff
[docs]def float_distance(x,y): '''return signed difference between x and y expressed as distance between representable floating point numbers.''' #Boost library algorithm port: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp if x > y: return -float_distance(y, x) elif x == y: return 0 elif x == 0: return 1 + abs(float_distance(math.copysign(1,y) * sys.float_info.epsilon*sys.float_info.min, y)) #denorm min elif y == 0: return 1 + abs(float_distance(math.copysign(1,x) * sys.float_info.epsilon*sys.float_info.min, x)) #denorm min elif math.copysign(1,x) != math.copysign(1,y): return 2 + abs(float_distance(math.copysign(1,x) * sys.float_info.epsilon*sys.float_info.min, x)) + abs(float_distance(math.copysign(1,y) * sys.float_info.epsilon*sys.float_info.min, y)) #denorm min #should have same sign now elif x < 0: return float_distance(-y, -x) assert x >= 0 assert y >= x # Note that if a is a denorm then the usual formula fails because we actually have fewer than tools::digits<T>() significant bits in the representation: expon = math.frexp(x)[1] if expon < sys.float_info.min_exp: expon = sys.float_info.min_exp upper = math.ldexp(1, expon) result = 0 expon = sys.float_info.mant_dig - expon #For floating-point types, this is the number of digits in the mantissa. #If b is greater than upper, then we *must* split the calculation as the size of the ULP changes with each order of magnitude change: if(y > upper): result = float_distance(upper, y) y = upper #Use compensated double-double addition to avoid rounding errors in the subtraction: X = x - y Z = X - x Y = (x - (X - Z)) - (y + Z) if X < 0: X = -X Y = -Y result += math.ldexp(X, expon) + math.ldexp(Y, expon) #Result must be an integer: assert(result == math.floor(result)) return int(result)
def bounded(value, min_value=None, max_value=None, key=None): kwargs = {} if key is not None: kwargs['key'] = key if min_value is not None and max_value is not None: return max(min(value, max_value, **kwargs), min_value, **kwargs) elif min_value is None and max_value is not None: return min(value, max_value, **kwargs) elif min_value is not None and max_value is None: return max(value, min_value, **kwargs) return value
[docs]class csv_writer(object): '''shared functions for higher level interfaces''' def __init__(self): self.column_data_t = collections.namedtuple('column_setup',['query_name','display_name','transform','format','query_function']) self.no_transform = lambda x: x self.columns = [] self.comments = [] def _format_header(self): header_txt = u'' for comment in self.comments: header_txt += comment + '\n' for column in self.columns: header_txt += u"{},".format(column.display_name) return header_txt[:-1] + '\n' def _format_output(self, data, column_setup_tuple): '''give just one element of a data row''' if column_setup_tuple.query_function is not None: data = column_setup_tuple.query_function() data = column_setup_tuple.transform(data) return '{},'.format(column_setup_tuple.format).format(data)
[docs] def add_comment(self, comment_str, comment_character='#'): '''add comment line(s) to the top of the output file. Live Graph treats '@' as a 'description line' and '#' as a 'comment line'. Neither has any effect on the data interpretation.''' self.comments.append(u'{}{}'.format(comment_character,comment_str))
[docs] def add_elapsed_seconds(self,display_name='elapsed_seconds', format=''): '''computes elapsed seconds since first row of table''' self._add_elapsed_time(display_name=display_name,format=format,transform=self.no_transform)
[docs] def add_elapsed_minutes(self,display_name='elapsed_minutes', format=''): '''computes elapsed minutes since first row of table''' self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/60.0)
[docs] def add_elapsed_hours(self,display_name='elapsed_hours', format=''): '''computes elapsed hours since first row of table''' self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/3600.0)
[docs] def add_elapsed_days(self,display_name='elapsed_days', format=''): '''computes elapsed days since first row of table''' self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/86400.0)
def _add_elapsed_time(self, *args, **kwargs): raise NotImplementedError('Elapsed time not implemented')
[docs] def add_column(self, query_name, display_name=None, format='',transform=None,query_function=None): '''add single column to output file. provides more customization options than addign a list of columns transform is a python function applied to the query results before formatting format is a format string to alter the column data. Ex: 3.2f. query function is a function that returns data directly from Python rather than from external data source. Ex: time ''' if display_name is None: display_name = query_name format = format = '{{:{}}}'.format(format) if transform is None: transform = self.no_transform self.columns.append(self.column_data_t(display_name=display_name,query_name=query_name,transform=transform,format=format,query_function=query_function))
[docs] def add_columns(self, column_list, format=''): '''Shortcut method to add multiple data columns at once. column_list selects additional data columns to output. format is a format string to alter the column data. Ex: 3.2f. For more flexibility, add columns individually using add_column() method.''' for column in column_list: self.add_column(column, format=format)
[docs]class csv_logger(csv_writer): '''set up columns, then pass results dictionary from logger or channel group to write() method. Can be used to provide automated test script 'marching waves' with a program such as Live Graph (http://www.live-graph.org/). ''' def __init__(self, output_file, encoding='utf-8'): csv_writer.__init__(self) self.output_file = output_file self.encoding = encoding self.header_written = False self.f = open(self.output_file, 'w', 1) #line buffered atexit.register(self.__del__) self._row_id = -1 def __enter__(self): return self def __exit__(self,exc_type, exc_val, exc_tb): print "__exit__ closing CSV filehandle: {}".format(self.output_file) self.f.close() return None def __del__(self): print "__del__ closing CSV filehandle: {}".format(self.output_file) self.f.close() def _row_count(self): '''private method used by rowid column.''' self._row_id += 1 return self._row_id
[docs] def add_timestamps(self): '''add rowid and datetime fields to output''' csv_writer.add_column(self,query_name=None,display_name='rowid', query_function=self._row_count) csv_writer.add_column(self,query_name=None,display_name='datetime',query_function=datetime.datetime.now,transform=lambda t: datetime.datetime.strftime(t,"%Y-%m-%dT%H:%M:%S.%fZ")) #2015-10-20 21:54:17
def _add_elapsed_time(self,display_name,format,transform): csv_writer.add_column(self,query_name=None,display_name=display_name,format=format,query_function=datetime.datetime.now,transform=lambda t:transform((t-self._time_zero).total_seconds()))
[docs] def add_column(self, channel): '''set up output to include channel object as column data source''' csv_writer.add_column(self, query_name=channel.get_name(), transform=channel.format_display)
[docs] def add_columns(self, channel_list): '''set up output to include channel objects in channel_list as column data sources''' for channel in channel_list: self.add_column(channel)
[docs] def write(self, channel_data): '''write selected columns with data supplied in channel_data dictionary''' if not self.header_written: self.f.write(self._format_header().encode(self.encoding)) self._time_zero = datetime.datetime.now() #for elapsed time computation self.header_written = True row_txt = '' for column in self.columns: if column.query_function is not None: row_txt += self._format_output(None, column) elif column.query_name in channel_data: row_txt += self._format_output(channel_data[column.query_name], column) else: raise Exception('Data for column: {} not provided to write() method.'.format(column.display_name)) self.f.write((row_txt[:-1] + '\n').encode(self.encoding)) self.f.flush() #just in case line buffering doesn't work return channel_data
[docs] def register_logger_callback(self, logger): '''register this csv_logger instance with a lab_core.logger instance for automatic data plotting ''' logger.add_log_callback(self.write)
[docs] def unregister_logger_callback(self, logger, close_file=True): '''clean up in case lab_core.logger will be re-used for a new test.''' logger.remove_log_callback(self.write) if close_file: self.f.close()
[docs]class sqlite_to_csv(csv_writer): '''Formats data stored in an SQLite database so that it can be browsed interactively. Use a program like Live Graph (www.www.live-graph.org) or KST (kst-plot.kde.org) to visualize data.''' def __init__(self, table_name, database_file='data_log.sqlite'): '''name is the chart title. table_name is the database table containing selected data columns. database_file is the sqlite file containing table_name.''' csv_writer.__init__(self) self.table_name = table_name self.conn = sqlite3.connect(database_file) self.cursor = self.conn.cursor() def __enter__(self): return self def __exit__(self,exc_type, exc_val, exc_tb): self.conn.close() return None
[docs] def add_timestamps(self): '''Add rowid and datetime columns to csv output.''' self.add_column('rowid') self.add_column('datetime')
def _add_elapsed_time(self,display_name,format,transform): self.cursor.execute('SELECT strftime("%s",datetime) FROM {} LIMIT 1'.format(self.table_name)) self.add_column(query_name='strftime("%s",datetime) - {}'.format(self.cursor.fetchone()[0]),display_name=display_name,format=format,transform=transform)
[docs] def write(self, output_file, append=False, encoding='utf-8'): '''write previously selected column data to output_file.''' query_txt = '' for column in self.columns: query_txt += u"{},".format(column.query_name) query_txt = query_txt[:-1] with open(output_file, 'a' if append else 'w') as f: f.write(self._format_header().encode(encoding)) for row in self.cursor.execute('SELECT {} FROM {}'.format(query_txt,self.table_name)): row_txt = '' for cidx,column in enumerate(row): row_txt += self._format_output(column, self.columns[cidx]) f.write((row_txt[:-1] + '\n').encode(encoding)) f.close() print 'Output written to {}'.format(output_file)
[docs]class sqlite_to_easylog(sqlite_to_csv): '''Wrapper to make specific format required by Easy Log Graph software. Formats data stored in an SQLite database so that it can be browsed inteactively. Use EasyLogGraph (http://www.lascarelectronics.com/data-logger/easylogger-software.php) to visualize data.''' def __init__(self, chart_name, table_name, y1_axis_units='V', y2_axis_units='A', database_file='data_log.sqlite'): '''chart_name will appear at top of graph table_name is the sqlite database table name y1_axis_units controls the left-side y-axis label y2_axis_units controls the right-side y-axis label database_file is the filename of the sqlite database''' self.y1_axis_units = y1_axis_units self.y2_axis_units = y2_axis_units sqlite_to_csv.__init__(self, table_name, database_file) sqlite_to_csv.add_column(self,query_name='rowid',display_name=chart_name) sqlite_to_csv.add_column(self,query_name='datetime',display_name='Time') #the position of these fields is important def add_comment(self, *args, **kwargs): raise Exception("Comment lines don't seem to be allowed in EasyLog files.")
[docs] def add_column(self, query_name, second_y_axis=False, display_name=None, format='', transform=None): '''query name is the name of the sqlite column second_y_axis is a boolean. Setting to True places data on the right-side y-axis scale display_name, if not None, sets csv column header title differently from database column name format controls appearance of queried data. Ex: "3.2f"''' if display_name is None: display_name = query_name display_name = "{} ({})".format(display_name, self.y2_axis_units if second_y_axis else self.y1_axis_units) #Data goes to first or second y-axis based on parenthesized units in column heading sqlite_to_csv.add_column(self,query_name=query_name,display_name=display_name,format=format, transform=transform)
[docs] def add_columns(self, column_list, second_y_axis=False, format=''): '''adds a list of sqlite column names at once all columns will be placed on left-side y-axis scale unless second_y_axis is True format controls appearance of queried data. Ex: "3.2f" ''' for column in column_list: self.add_column(query_name=column, second_y_axis=second_y_axis, format=format) # def _add_elapsed_time(self, *args, **kwargs): # raise Exception("Elapsed time not supported yet. Is it really needed?")
[docs] def write(self, output_file, append=False): '''write queried data to output_file after column setup is complete''' sqlite_to_csv.write(self, output_file=output_file, append=append, encoding='mbcs') #windows ANSI codepage
[docs]class sqlite_data(collections.Sequence): #collections.Iterable to disable slicing? '''Produce iteratable object returning row sequence, where each column within each row is accessible by either column name or position. table_name can be an expression returning a synthetic non-table relation. ''' def __init__(self, table_name=None, database_file='data_log.sqlite', timezone=None): if timezone is None: self.timezone = UTC() else: self.timezone = timezone sqlite3.register_converter("DATETIME", self.convert_timestring) self.conn = sqlite3.connect(database_file, detect_types=sqlite3.PARSE_DECLTYPES) #automatically convert datetime column to Python datetime object self.conn.row_factory = sqlite3.Row #index row data tuple by column name self.table_name = table_name self.sql_query = None self.params = [] if table_name is not None: self.sql_query = "SELECT * from {}".format(table_name) #remove rowid because it's now an explicitly stored column from the logger def convert_timestring(self, time_string): # return datetime.datetime.strptime(time_string,'%Y-%m-%d %H:%M:%S').replace(tzinfo=UTC()).astimezone(self.timezone) #old format return datetime.datetime.strptime(time_string,'%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=UTC()).astimezone(self.timezone) def __getitem__(self,key): '''implement sequence behavior.''' subs = {} if isinstance(key, slice): if key.start is None: subs['start'] = 0 else: subs['start'] = key.start if key.stop is None: subs['limit'] = -1 #no limit else: if subs['start'] >= key.stop: raise Exception('Reverse iteration not supported.') subs['limit'] = key.stop - subs['start'] if key.step is not None: raise Exception('Slice step not supported.') fetch = sqlite3.Cursor.fetchall else: subs['start'] = key subs['limit'] = 1 fetch = sqlite3.Cursor.fetchone return fetch(self.conn.execute(self.sql_query + " LIMIT {limit} OFFSET {start};".format(**subs), self.params)) def __iter__(self): '''implement iterable behavior.''' return self.conn.execute(self.sql_query, self.params) def __len__(self): '''return number of rows returned by sql query. WARNING: Inefficient. ''' #this is hard because the iterable doesn't actually know its length #self.cursor.rowcount doesn't work; returns -1 when database isn't modified. #not very efficient for big dataset! return len(self.conn.execute(self.sql_query, self.params).fetchall()) def __enter__(self): return self def __exit__(self,exc_type, exc_val, exc_tb): self.conn.close()
[docs] def get_column_names(self): '''return tuple of column names. Column names can be used for future queries or used to select column from query row results. ''' if self.sql_query is None: raise Exception('table_name not specified') return self.conn.execute(self.sql_query, self.params).fetchone().keys()
[docs] def get_column_types(self): '''Return dictionary of data types stored in each column. Note that SQLite does not enforce types within a column, nor does the PyICe logger. The types of data stored in the first row will be returned, which may not match data stored elsewhere in the relation. Used by numpy array conversion to define data stride. ''' cursor = self.conn.execute(self.sql_query, self.params).fetchone() return collections.OrderedDict([(k,type(cursor[k])) for k in cursor.keys()])
[docs] def get_distinct(self, column_name, table_name=None): '''return one copy of each value (set) in specified column table_name can be an expression returning a synthetic non-table relation. ''' if table_name is None: table_name = self.table_name if table_name is None: raise Exception('table_name not specified') data = self.conn.execute("SELECT DISTINCT {} from {};".format(column_name, table_name)).fetchall() return tuple(sorted(zip(*data)[0]))
[docs] def query(self, sql_query, *params): '''return iterable with query results. columns within each row can be accessed by column name or by position ''' self.sql_query = sql_query self.params = params return self.conn.execute(self.sql_query, self.params)
[docs] def zip(self): '''return query data transposed into column_list of row_lists.''' return zip(*self)
[docs] def csv(self, output_file, elapsed_time_columns=False, append=False, encoding='utf-8'): '''write data to csv output_file. set output_file to None to just return csv string.''' output_txt = "" datetime_col = None for pos,column in enumerate(self.get_column_names()): output_txt += '{},'.format(column) if elapsed_time_columns and column == 'datetime': output_txt += 'elapsed_time,' output_txt += 'elapsed_seconds,' datetime_col = pos output_txt = '{}\n'.format(output_txt[:-1]) start_time = None for row in self: for pos,column in enumerate(row): output_txt += '{},'.format(column) if start_time is None and pos == datetime_col: start_time = column if elapsed_time_columns and pos == datetime_col: output_txt += '{},'.format(column - start_time) #elapsed_time output_txt += '{},'.format((column-start_time).total_seconds()) #elapsed_seconds output_txt = '{}\n'.format(output_txt[:-1]) if output_file is not None: with open(output_file, 'a' if append else 'w') as f: f.write(output_txt.encode(encoding)) f.close() print 'Output written to {}'.format(output_file) return output_txt
[docs] def xlsx(self, output_file, elapsed_time_columns=False): '''write data to excel output_file.''' import xlsxwriter workbook = xlsxwriter.Workbook(output_file, {'constant_memory': True, 'strings_to_formulas': False, 'strings_to_urls': False, 'strings_to_numbers': False, }) formats = { 'default' : workbook.add_format({'font_name' : 'Courier New', 'font_size' : 9}), 'date_time' : workbook.add_format({'num_format': 'yyyy/mm/dd hh:mm:ss.000', 'font_name' : 'Courier New', 'font_size' : 9}), 'delta_time': workbook.add_format({'num_format': 'hh:mm:ss.000', 'font_name' : 'Courier New', 'font_size' : 9}), 'row_shade': workbook.add_format({'bg_color': '#D0FFD0'}), } worksheet = workbook.add_worksheet(self.table_name) column_width_pad = 1.0 filter_button_width = 3 row_idx = 0 col_idx = 0 datetime_col = None for column in self.get_column_names(): worksheet.write(row_idx, col_idx, column) worksheet.set_column(col_idx, col_idx, len(column) * column_width_pad + filter_button_width, formats['default']) if column == 'datetime': datetime_col = col_idx if elapsed_time_columns: worksheet.write(row_idx, col_idx + 1, 'elapsed_time') worksheet.set_column(col_idx + 1, col_idx + 1, len('elapsed_time') * column_width_pad + filter_button_width, formats['default']) worksheet.write(row_idx, col_idx + 2, 'elapsed_seconds') worksheet.set_column(col_idx + 2, col_idx + 2, len('elapsed_seconds') * column_width_pad + filter_button_width, formats['default']) col_idx += 2 col_idx += 1 column_count = col_idx - 1 worksheet.freeze_panes(1,0) #Keep header line at top row_idx += 1 col_idx = 0 start_time = None for row in self: for column in row: if col_idx == datetime_col: worksheet.write_datetime(row_idx, col_idx, column.replace(tzinfo=None), formats['date_time']) #Excel can't deal with timezone-aware datetimes. Output Zulu time. worksheet.set_column(col_idx, col_idx, max(len(formats['date_time'].num_format) * column_width_pad, worksheet.col_sizes[col_idx])) else: worksheet.write(row_idx, col_idx, column) try: worksheet.set_column(col_idx, col_idx, max(len(column) * column_width_pad, worksheet.col_sizes[col_idx])) except TypeError: pass #numbers doesn't have length if start_time is None and col_idx == datetime_col: start_time = column if elapsed_time_columns and col_idx == datetime_col: worksheet.write_datetime(row_idx, col_idx + 1, column - start_time, formats['delta_time']) #elapsed_time worksheet.set_column(col_idx + 1, col_idx + 1, max(len(formats['delta_time'].num_format) * column_width_pad, worksheet.col_sizes[col_idx + 1])) worksheet.write(row_idx, col_idx + 2, (column-start_time).total_seconds()) #elapsed_seconds col_idx += 2 col_idx += 1 row_idx += 1 col_idx = 0 worksheet.autofilter(0, 0, row_idx - 1, column_count) worksheet.conditional_format(1, 0, row_idx - 1, column_count, {'type': 'formula', 'criteria': '=MOD(SUBTOTAL(3,$A$1:$A2),2)=0', 'format': formats['row_shade']} ) icon_path = os.path.join(os.path.dirname(__file__), "tssop.png") worksheet.insert_image(row_idx + 2, 1, icon_path, {'y_offset': 70}) try: import io, urllib2 url = 'https://www.python.org/static/community_logos/python-powered-h-140x182.png' image_data = io.BytesIO(urllib2.urlopen(url).read()) worksheet.insert_image(row_idx + 2, 5, url, {'image_data': image_data, 'x_scale': 1.5, 'y_scale': 1.5, }) except Exception as e: print "INFO: Python logo insertion failed." #print e worksheet.protect('', {'sort': True, 'autofilter': True, 'format_cells': True, 'format_columns': True, 'format_rows': True, }) workbook.close() print 'Output written to {}'.format(output_file)
[docs] def to_list(self): '''return copy of data in list object''' return [row for row in self]
[docs] def numpy_recarray(self, force_float_dtype=False, data_types=None): '''return NumPy record array containing data. Rows can be accessed by index, ex arr[2]. Columns can be accessed by column name attribute, ex arr.vbat. Use with data filtering, smoothing, compressing, etc matrix operations provided by SciPy and lab_utils.transform, lab_utils.decimate. Use automaticic column names, but force data type to float with force_float_dtype boolean argument. Override automatic coulumn names and data types (first row) by specifying data_type iterable of (column_name,example_contents) for each column matching query order. http://docs.scipy.org/doc/numpy-1.10.1/reference/generated/numpy.recarray.html ''' if force_float_dtype and data_types is None: dtype = numpy.dtype([(key,type(float())) for key in self.get_column_types()]) elif force_float_dtype and data_types is not None: raise Exception('Specify only one of force_float_dtype, data_types arguments.') elif data_types is None: dtype = numpy.dtype([(k,v) for k,v in self.get_column_types().iteritems()]) else: dtype = numpy.dtype([(column_name,type(example_contents)) for column_name,example_contents in data_types]) arr = numpy.array([tuple(row) for row in self], dtype) return arr.view(numpy.recarray)
[docs] def column_query(self, column_list): '''return partial query string separating column names with comma characters.''' str = '' for column in column_list: str += '{},'.format(column) return str[:-1]
[docs] def time_delta_query(self, time_div=1, column_name=None): '''return partial query string which will compute fractional delta seconds from first entry in the table as a column. Feed back into query to get elapsed time column. Ex "SELECT rowid, {}, * FROM ...".format(sqlite_data_obj.time_delta_query()) Use time_div to convert from second to your choice of time scales, example: time_div=3600 would be hours. ''' if column_name is None: if time_div == 0.001: column_name = "elapsed_milliseconds" elif time_div == 1: column_name = "elapsed_seconds" elif time_div == 60: column_name = "elapsed_minutes" elif time_div == 3600: column_name = "elapsed_hours" elif time_div == 86400: column_name = "elapsed_days" elif time_div == 31536000: column_name = "elapsed_years" else: column_name = "elapsed_time" frac_s_str = "strftime('%s',datetime)+strftime('%f',datetime)-strftime('%S',datetime)" if self.table_name is None: raise Exception('table_name not specified') first_time = self.conn.execute("SELECT {} FROM {} ORDER BY rowid ASC;".format(frac_s_str,self.table_name)).fetchone()[0] return "({}-{})/{} AS {}".format(frac_s_str,first_time,time_div,column_name)
[docs] def filter_change(self, column_name_list, table_name=None, first_row=False, preceding_row=False): '''return tuple of rowid values where any column in column_name_list changed value. result tuple can be fed into a new query("SELECT ... WHERE rowid in {}".format(sqlite_data_obj.filter_change())). it table_name is omitted, instance default will be used. setting preceding_row to True will also return the rowid before the change occurred. ''' if table_name is None: table_name = self.table_name if table_name is None: raise Exception('table_name not specified') if first_row: first_row = (1,) else: first_row = tuple() sql_query = 'SELECT delay.rowid from {table_name} as orig JOIN {table_name} as delay ON orig.rowid = delay.rowid-1 WHERE '.format(table_name=table_name) for column_name in column_name_list: sql_query += 'orig.{column_name} IS NOT delay.{column_name} OR '.format(column_name=column_name) sql_query = sql_query[:-4] try: row_ids = zip(*self.conn.execute(sql_query))[0] except IndexError as e: #no changes return first_row if preceding_row: preceding_row_ids = tuple(row -1 for row in row_ids) return tuple(sorted(first_row + row_ids + preceding_row_ids)) return tuple(sorted(first_row + row_ids))
[docs] def optimize(self): '''Defragment database file, reducing file size and speeding future queries. Also re-runs query plan optimizer to spped future queries. WARNING: May take a lot time to complete when operating on a large database. WARNING: May re-order rowid's ''' self.conn.execute("VACUUM;") self.conn.execute("ANALYZE;")
[docs] def expand_vector_data(self, csv_filename=None, csv_append=False, csv_encoding='utf-8'): '''Expand vector list data (from oscilloscope, network analyzer, etc) to full row-rank. Scalar data will be expanded to vector length. Returns numpy record array. Optionally write output to comma separated file if csv_filname argument is specified. ''' columns = [] dtypes = [] data_length = None column_names = self.get_column_names() for column in column_names: columns.append([]) for i,row in enumerate(self): try: if row[column].startswith('[') and row[column].endswith(']'): column_data = [float(x) for x in row[column].strip("[]").split(",")] if data_length is None and len(column_data) > 1: data_length = len(column_data) elif len(column_data) != 1 and len(column_data) != data_length: raise Exception('Inconsistent data length in vector expansion: {} and {}'.format(len(columns[-1]),data_length)) else: column_data = [row[column]] except AttributeError: column_data = [row[column]] columns[-1].append(column_data) if data_length is None: print "WARNING: No vector data found." data_length = 1 for i,column in enumerate(columns): if len(column[0]) == 1: dtypes.append((column_names[i], type(column[0][0]))) else: dtypes.append((column_names[i], float)) for rowcount,rowcoldata in enumerate(column): if len(rowcoldata) == 1: #expand scalar data to vector length column[rowcount] = rowcoldata * data_length #flatten row data data = [] for rowid in range(rowcount+1): rowdata = [] for columnid in range(i+1): rowdata.append(columns[columnid][rowid]) data.extend(zip(*rowdata)) #csv output if csv_filename is not None: csv_txt = "" for column in column_names: csv_txt += '{},'.format(column) csv_txt = '{}\n'.format(csv_txt[:-1]) for row in data: for column in row: csv_txt += '{},'.format(column) csv_txt = '{}\n'.format(csv_txt[:-1]) with open(csv_filename, 'a' if csv_append else 'w') as f: f.write(csv_txt.encode(csv_encoding)) f.close() print 'Output written to {}'.format(csv_filename) array = numpy.array(data, dtype=dtypes) return array.view(numpy.recarray)
[docs]def vector_transform(rec_array, column_vector_functions, column_names=None): '''Generic filter function. column_vector_functions is a list of functions for each column and should have a length equal to the number of columns. To leave a column unchanged, set column vector function to None. Each column vector function will be applied to the whole column vector. Thus it is appropriate for 1-d filtering and decimation where access to values in adjacent columns is not required. column_names is a list of names for each column in the returned record array. To leave a column name unchanged from input record array, set column name to None. To leave all column names unchanged from input record array, set column_names to None. To smooth data, use something like scipy.signal.filtfilt and scipy.signal.butter for the column_vector_functions http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.filtfilt.html http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.signal.butter.html ''' assert len(rec_array.dtype.names) == len(column_vector_functions) if column_names is None: column_names = [None] * len(column_vector_functions) assert len(column_vector_functions) == len(column_names) filt_cols = [] filt_names = [] for i, column_name in enumerate(rec_array.dtype.names): if column_vector_functions[i] is not None: filt_cols.append(column_vector_functions[i](rec_array[column_name])) else: filt_cols.append(rec_array[column_name]) if column_names[i] is not None: filt_names.append(column_names[i]) else: filt_names.append(column_name) #use old name return numpy.core.records.fromarrays(filt_cols, names=filt_names)
[docs]def decimate(rec_array, downsample_factor, **kwargs): '''Reduce row count by factor of downsample_factor. By default an order 8 Chebyshev type I filter is used independently on each column. Set kwarg ftype='fir' to instead use a 30 point FIR filter with hamming window (recommended). http://docs.scipy.org/doc/scipy-0.16.1/reference/generated/scipy.signal.decimate.html ''' return vector_transform(rec_array, [lambda col: scipy.signal.decimate(x=col,q=downsample_factor, **kwargs)] * len(rec_array.dtype.names))
[docs]def ramer_douglas_peucker(rec_array, epsilon, verbose=True): '''reduce number of points in line-sigment curve such that reduced line segment count approximates original curve within epsilon tolerance. https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm''' try: from rdp import rdp except ImportError: print "Install Ramer-Douglas-Peucker package.\nhttps://pypi.python.org/pypi/rdp" raise start_time = time.time() column_type = 'float' #column_type = '<f8' old_dtype = rec_array.dtype.descr new_dtype = numpy.dtype([(column[0],column_type) for column in old_dtype]) np_array = rec_array.astype(new_dtype).view(column_type).reshape(-1,2) reduced_array = rdp(np_array, epsilon) reduced_rec_array = numpy.fromiter(reduced_array, dtype=new_dtype).view(numpy.recarray) if verbose: stop_time = time.time() print "RDP reduced {} data set from {} to {} points ({:3.1f}%) in {} seconds with epsilon={}.".format(old_dtype[1][0],len(rec_array),len(reduced_rec_array),100.*len(reduced_rec_array)/len(rec_array),int(round((stop_time-start_time))),epsilon) return reduced_rec_array
[docs]def smooth_spline(rec_array, rms_error, verbose=True, **kwargs): '''uses http://scipy.github.io/devdocs/generated/scipy.interpolate.UnivariateSpline.html with movable knots set rms_error to change number of knots to bound smoothed data rms deviation from original data points. rec_array is modified in place returns number of knots used to contstruct spline''' point_count = len(rec_array) rss = rms_error * point_count**0.5 spl = scipy.interpolate.UnivariateSpline(x=rec_array[rec_array.dtype.names[0]],y=rec_array[rec_array.dtype.names[1]],s=rss, **kwargs) knot_count = len(spl.get_knots()) first_x = rec_array[0][0] last_x = rec_array[-1][0] new_x = numpy.linspace(start=first_x, stop=last_x, num=point_count, endpoint=True, dtype=type(float())) for i in range(point_count): #replace old array in-place rec_array[i] = (new_x[i], spl(new_x[i])) if verbose: print "rms_error {} constructed {} knots from {} original data points.".format(rms_error, knot_count, point_count) return knot_count
[docs]def smooth_filtfile(rec_array): '''somebody finish this. https://docs.scipy.org/doc/scipy-0.18.1/reference/generated/scipy.signal.filtfilt.html'''
[docs]def polyfit(rec_array, degree=1): '''returns polynomial fit coefficients list, highest order first https://docs.scipy.org/doc/numpy/reference/generated/numpy.polyfit.html ''' return numpy.polyfit(x=rec_array[rec_array.dtype.names[0]], y=rec_array[rec_array.dtype.names[1]], deg=degree)
def _detrend(rec_array, **kwargs): '''http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html''' return vector_transform(rec_array, [None, lambda col: scipy.signal.detrend(data=col, **kwargs)] * (len(rec_array.dtype.names)-1))
[docs]def detrend_constant(rec_array, **kwargs): '''Remove data mean from all columns except first one (assumed x-axis) http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html ''' return _detrend(rec_array, type='constant')
[docs]def detrend_linear(rec_array, **kwargs): '''Remove data mean from all columns except first one (assumed x-axis) http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html ''' return _detrend(rec_array, type='linear')
[docs]def integral_nonlinearity(rec_array, lsb_size=1): '''transform (code, voltage) data into INL optional lsb_size argument scales y-axis data from real units to lsb count.''' return scalar_transform(detrend_linear(rec_array), [None, lambda x: x / float(lsb_size)])
[docs]def differential_nonlinearity(rec_array, lsb_size=1): '''transform (code, voltage) data into DNL. optional lsb_size argument scales y-axis data from real units to lsb count.''' return scalar_transform(vector_transform(rec_array, [lambda col: col[:-1], lambda col: numpy.diff(col)]), [None, lambda x: x / float(lsb_size)])
[docs]def scalar_transform(rec_array, column_scalar_functions, column_names=None): '''Transform column data by processing through user-supplied function column_scalar_functions is a list of functions for each column and should have a length equal to the number of columns. To leave a column unchanged, set column scalar function to None. The column scalar function will be applied to each point in the column individually. Thus it is appropriate for scaling, offseting changing data type, etc. This function cannot be used for filtering or convolution operations that need access to adjacent data points. Instead, use vector_transform(). column_names is a list of names for each column in the returned record array. To leave a column name unchanged from input record array, set column name to None. To leave all column names unchanged from input record array, set column_names to None. ''' column_vector_functions = [] for csf in column_scalar_functions: if csf is None: column_vector_functions.append(None) else: column_vector_functions.append(lambda column, func=csf: [func(x) for x in column]) return vector_transform(rec_array, column_vector_functions, column_names)
[docs]class US_Time_Zone(datetime.tzinfo): '''Generic Timezone parent class. Implements methods required by datetime.tzinfo with US DST rules (second Sunday of March and first Sunday of November). Requires subclass to define local timezone parameters: self.tz_name self.tz_name_dst self.gmt_offset self.dst_offset '''
[docs] def first_sunday_on_or_after(self,dt): '''return date of first Sunday on or after dt.''' days_to_go = 6 - dt.weekday() #if days_to_go: dt += datetime.timedelta(days_to_go) return dt
[docs] def utcoffset(self, dt): '''return DST aware offset from GMT/UTC based on calendar date.''' return datetime.timedelta(hours=self.gmt_offset) + self.dst(dt)
[docs] def dst(self, dt): '''return DST offset from standard local time based on calendar date.''' # DST starts second Sunday in March # ends first Sunday in November self.dston = self.first_sunday_on_or_after(datetime.datetime(dt.year, 4, 8)) self.dstoff = self.first_sunday_on_or_after(datetime.datetime(dt.year, 11, 1)) if self.dston <= dt.replace(tzinfo=None) < self.dstoff: return datetime.timedelta(hours=self.dst_offset) else: return datetime.timedelta(0)
[docs] def tzname(self,dt): '''return DST aware local time zone name based on calendar date.''' if self.dst(dt): return self.tz_name_dst return self.tz_name
[docs]class UTC(US_Time_Zone): '''UTC / GMT / Zulu time zone.''' def __init__(self): self.tz_name = "UTC" self.tz_name_dst = "UTC" self.gmt_offset = +0 self.dst_offset = +0
[docs]class US_Eastern_Time(US_Time_Zone): '''US Eastern time zone. (NYC/BOS)''' def __init__(self): self.tz_name = "EST" self.tz_name_dst = "EDT" self.gmt_offset = -5 self.dst_offset = +1
[docs]class US_Pacific_Time(US_Time_Zone): '''US Pacific time zone. (LAX/SMF)''' def __init__(self): self.tz_name = "PST" self.tz_name_dst = "PDT" self.gmt_offset = -8 self.dst_offset = +1
[docs]def logger_time_str(datetime): '''return time string in same format as used by lab_core.logger. Requires timezone-aware datetime object argument to correctly convert to UTC times used by logger. if datetime object is naieve of timezone, add it with datetime.replace(tzinfo=lab_utils.US_Eastern_Time()) or datetime.replace(tzinfo=lab_utils.UTC()) ''' return datetime.astimezone(UTC()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
class dlog(object): def __init__(self, filename="output.txt"): '''Open filehandle to filename. If omitted, filename defaults to "output.txt''' self.errcnt = 0 self.f = open(filename,'w') # note the time.clock function won't work well for linux... # this is written for windows self.timezero = time.clock() # time/date stamp header self.log_notime(time.strftime("%a, %d %b %Y %H:%M:%S")) def __enter__(self): return self def __exit__(self,exc_type, exc_val, exc_tb): self.f.close() return None def log_notime(self,data): '''Write data to file with trailing newline''' self.f.write(str(data) + "\n") print(data) def log(self,data): '''Write data to file with timestamp and trailing newline''' self.log_notime(str(time.clock()-self.timezero) + str(data)) def create_error(self): '''This function doesn't appear to actually do much. self.errcnt is never written to the dlog.''' self.errcnt += 1 def finish(self): '''Write final timestamp and close filehandle''' self.log_notime("Data log closed at {}. Elapsed time: {}".format(time.strftime("%a, %d %b %Y %H:%M:%S"), time.clock()-self.timezero)) self.f.close()
[docs]class debug_log(object): '''Log messages into a file and optionally print to screen.''' # This class used in most of Frank's tests. def __init__(self, log_file_name=__name__+".log", debug=False): self.debug = debug self.f = open(log_file_name, "w") self.fileno = self.f.fileno # atexit.register(self.__del__) # Tries to close the debug_log file if the program exits for any reason. def __enter__(self): return self def __exit__(self,exc_type, exc_val, exc_tb): self.f.close() return None
[docs] def write(self, msg): '''Add a message to the log file. Also print the message if debug is True''' t_str = time.asctime() m_str = "{} :: {}\n".format(t_str, msg) if self.debug: print m_str, self.f.write(m_str) self.f.flush() # def __del__(self): # self.f.flush() # os.fsync(self.fileno) # self.f.close()
class interpolator(object): def __init__(self): self._points = [] self._points_ysort = [] self.y_slope = 0 def check_monotonicity(self): if len(self._points) > 1: x_pts = [x[0] for x in self._points] y_pts = [y[1] for y in self._points] self.y_slope = cmp(y_pts[1],y_pts[0]) for i in range(1,len(self._points)): if len(x_pts) != len(set(x_pts)): raise Exception('duplicated x column value') if self.y_slope*y_pts[i] <= self.y_slope*y_pts[i-1]: raise Exception('y column values are not monotonically increasing or decreasing relative to x-column values at point ({},{})'.format(self._points[i][0],self._points[i][1])) def sort(self): self._points.sort(key=operator.itemgetter(0)) #increasing values in x self._points_ysort = sorted(self._points, key=operator.itemgetter(1)) #increasing values in y def add_point(self, x_val, y_val): self._points.append([x_val,y_val]) self.sort() self.check_monotonicity() def add_points(self, point_list): '''expects 2d list of the form [[x0,y0],[x1,y1],...[xn,yn]] the points must be strictly monotonic, but not necessarily sorted''' for point in point_list: self.add_point(point[0], point[1]) def find(self, key, sorted_key_list, value_list): '''function operates independent of object internal data expects sorted_key_list to increase strictly monotonically will return linear combination of two values from value list weighted by distance from two enclosing points of key in sorted_key_list ''' points = zip(sorted_key_list, value_list) if len(points) < 2: raise Exception('At least two points are required to define a line.') low_pts = [pt for pt in points if pt[0] <= key] high_pts = [pt for pt in points if pt[0] >= key] if len(low_pts) == 0: #no points below value; extrapolate from first two points #print 'Bottom extrapolation' low_pt = points[0] high_pt = points[1] elif len(high_pts) == 0: #no points above value; extrapolate from last two points #print 'Top extrapolation' low_pt = points[-2] high_pt = points[-1] else: #interpolate between points lower and higher than key argument #print 'Interpolation' low_pt = low_pts[-1] high_pt = high_pts[0] if low_pt == high_pt: #avoid divide by 0 return low_pt[1] else: return low_pt[1] + (key-low_pt[0])*(float(high_pt[1]-low_pt[1])/(high_pt[0]-low_pt[0])) #prevent integer divide def get_x_val(self, y_val): [x_pts,y_pts] = zip(*self._points_ysort) return self.find(y_val, y_pts, x_pts) def get_y_val(self, x_val): [x_pts,y_pts] = zip(*self._points) return self.find(x_val, x_pts, y_pts)
[docs]def eng_string( x, fmt = ':.3g', si = True): ''' Returns float/int value <x> formatted in a simplified engineering format - using an exponent that is a multiple of 3. format: printf-style string used to format the value before the exponent. si: if true, use SI suffix for exponent, e.g. k instead of e3, n instead of e-9 etc. E.g. with format='%.2f': 1.23e-08 => 12.30e-9 123 => 123.00 1230.0 => 1.23e3 -1230000.0 => -1.23e6 and with si=True: 1230.0 => 1.23k -1230000.0 => -1.23M ''' assert isinstance(x,numbers.Number) #why is this here? in case x is a string? if so, int('5e3') won't work #if int(x) != float(x): # x = float(x) if x == 0: return '{{{}}}'.format(fmt).format(x) sign = '' if x < 0: x = -x sign = '-' exp = int( math.floor( math.log10( x))) exp3 = exp - ( exp % 3) x3 = x / ( 10 ** exp3) if si and exp3 >= -24 and exp3 <= 24 and exp3 != 0: exp3_text = u'yzafpnµm kMGTPEZY'[ ( exp3 - (-24)) / 3] elif exp3 == 0: exp3_text = '' else: exp3_text = 'e%s' % exp3 s1 = "{}{{{}}}".format(sign,fmt) s2 = s1.format( x3 ) return u"{}{}".format(s2,exp3_text)
class ordered_pair(list): def transform(self, x_transform = None, y_transform = None): '''executes x_transform function on first (x) element of each ordered pair data point executes y_transform function on second (y) element of each ordered pair data point returns None, data changed in place not appropriate for filtering functions that require access to adjacent (in time or space) data point values''' if x_transform is None: x_transform = lambda x: x if y_transform is None: y_transform = lambda y: y for i in range(len(self)): self[i] = [x_transform(self[i][0]), y_transform(self[i][1])] def x_sql_time(self): '''convert sqlite database datetime string in x-axis data to python datetime object''' self.transform(x_transform=lambda t: datetime.datetime.strptime(t, '%Y-%m-%dT%H:%M:%S.%fZ')) def x_sql_elapsed_time(self, seconds=False, minutes=False, hours=False, days=False): '''convert sqlite database datetime string in x-axis data to python timedelta object access properties of days, seconds (0 to 86399 inclusive) and microseconds (0 to 999999 inclusive) or method total_seconds() optionally, instead return numeric total seconds, minutes, hours or days by setting respective argument to True''' self.x_sql_time() start_time = self[0][0] self.transform(x_transform=lambda t: t - start_time) if not (seconds or minutes or hours or days): pass elif seconds and not (minutes or hours or days): self.transform(x_transform=lambda t: t.total_seconds()) elif minutes and not (seconds or hours or days): self.transform(x_transform=lambda t: t.total_seconds() / 60.0) elif hours and not (seconds or minutes or days): self.transform(x_transform=lambda t: t.total_seconds() / 3600.0) elif days and not (seconds or minutes or hours): self.transform(x_transform=lambda t: t.total_seconds() / 86400.0) else: raise Exception('Specify at most one of (seconds, minutes, hours, days)') def xscale(self, x_scale): '''changes list in place with x points multiplied by x_scale and y points unaltered''' self.transform(x_transform = lambda x: x * x_scale) def yscale(self, y_scale): '''changes list in place with x points unaltered and y points multiplied by y_scale''' self.transform(y_transform = lambda y: y * y_scale) def xoffset(self, x_offset): self.transform(x_transform = lambda x: x + x_offset) def yoffset(self, y_offset): self.transform(y_transform = lambda y: y + y_offset) def xyscale(self, x_scale, y_scale): '''changes list in place with x points multiplied by x_scale and y points multiplied by y_scale''' self.transform(x_transform = lambda x: x*x_scale, y_transform = lambda y: y*y_scale) def truncate(self, length=None, offset=0): orig_len = len(self) del self[0:offset] #offset or offset+1? if length is None: pass elif length > 0 and length < 1: new_len = int(round(length*orig_len)) if new_len > len(self): raise Exception('Record too short after offset to return {}% of original length.'.format(length*100)) del self[int(round(length*orig_len)):len(self)] #percentage of orignial data? #xydata = xydata[:int(round(length*len(xydata)))] #percentage of offset data? print 'Truncating record from {} to {} points starting at {}.'.format(orig_len,int(round(length*orig_len)),offset) elif length <= len(self) and int(length) == length: del self[length:len(self)] print 'Truncating record from {} to {} points starting at {}.'.format(orig_len,length,offset) else: raise Exception('length argument should be 0-1 percentage of original record length or integer desired record length.') def decimate(self, scale): assert scale > 0 assert scale <= 1 old_len = len(self) new_len = int(round(scale*old_len)) print 'Decimating record from {} to {} points.'.format(len(self),new_len) accumulator = 0 decimated_data = [] incr = 1 - (1.0*new_len/old_len) del_list = [] for i in range(len(self)): accumulator += incr if accumulator >= 1: # print 'dropping point {}:{}'.format(i,self[i]) accumulator -= 1 del_list.append(i) #don't change list length while iterating it while len(del_list): del self[del_list.pop()] def numpy_recarray(self, force_float_dtype=False, data_types=None): '''return NumPy record array containing data. Rows can be accessed by index, ex arr[2]. Columns can be accessed by column name attribute, ex arr.vbat. Use with data filtering, smoothing, compressing, etc matrix operations provided by SciPy and lab_utils.transform, lab_utils.decimate. Use automaticic column names, but force data type to float with force_float_dtype boolean argument. Override automatic coulumn names and data types (first row) by specifying data_type iterable of (column_name,example_contents) for each column matching query order. http://docs.scipy.org/doc/numpy-1.10.1/reference/generated/numpy.recarray.html ''' if force_float_dtype and data_types is None: dtype = numpy.dtype([('x',type(float())), ('y',type(float()))]) elif force_float_dtype and data_types is not None: raise Exception('Specify only one of force_float_dtype, data_types arguments.') elif data_types is None: dtype = numpy.dtype([('x',type(self[0][0])), ('x',type(self[0][1]))]) else: dtype = numpy.dtype([(column_name,type(example_contents)) for column_name,example_contents in data_types]) arr = numpy.array([tuple(row) for row in self], dtype) return arr.view(numpy.recarray) def ramer_douglas_peucker(self, epsilon, verbose=True, force_float_dtype=False, data_types=None): '''reduce number of points in line-sigment curve such that reduced line segment count approximates original curve within epsilon tolerance. https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm''' return ramer_douglas_peucker(self.numpy_recarray(force_float_dtype, data_types), epsilon, verbose) def _smooth(self, axis, window, extrapolation_window): if window is None or window == 1: return window = int(round(window)) if window%2 == 0: print '*** WARNING ***' print "Even window sizes like {} have a missing centroid and will slide the data downward using this smoothing function.".format(int(window)) print "I'm incrementing the window by one to {} to correct for this.".format(int(window) + 1) window += 1 if axis == 'y': x = zip(*self)[0] y = zip(*self)[1] else: x = zip(*self)[1] y = zip(*self)[0] spacing = (x[-1] - x[0]) / float(len(x) - 1) # fit_lower and fit_upper are linear extrapolations off the left and right sides using the extrapolation_window fit_lower = numpy.poly1d(numpy.polyfit(x = x[:extrapolation_window], y = y[:extrapolation_window], deg = 1)) fit_upper = numpy.poly1d(numpy.polyfit(x = x[-extrapolation_window:], y = y[-extrapolation_window:], deg = 1)) xpoints_lo = sorted([x[0] - spacing * (points + 1) for points in range(window)]) xpoints_hi = sorted([x[-1] + spacing * (points + 1) for points in range(window)]) values_lo = [fit_lower(point) for point in xpoints_lo] values_hi = [fit_upper(point) for point in xpoints_hi] data = values_lo + list(y) + values_hi # Extend data end points left/right data = numpy.convolve(data, numpy.ones(window)/float(window), 'same') # to assist running average algorithm data = data.ravel().tolist() # Convert array back to list data[0:window] = [] # Strip off left padding data[len(data)-window:] = [] # Strip off right padding for i in range(len(self)): if axis == 'y': self[i] = [x[i], data[i]] else: self[i] = [data[i], x[i]] def smooth_y(self, window = 5, extrapolation_window = None, iterations = 1): '''Smooths a data set's y axis data for publication. 'window' is the size of the main filtering window. The data is convolved with a block of '1s'. The length of the block determines the aggressiveness of the filtering. A large window size is like having a low frequency pole. The larger it is the more distortion there will be. 'extrapolation_window' is the size of the window used to determine the line for linear extrapolation off the ends of the data set. The data needs to be extended so the convolution doesn't run out of data on the ends. The default extrapolation window is set to the main window but can be reduced to reduce distortion or more properly model the end point derivatives. 'iterations' is the number of iterations the smoothing function runs. Increasing the iterations is like having more poles at the same frequency thereby producing essentially a 'brick wall' filter. ***************** **** WARNING **** ***************** Iterations should be used judiciously as it can lead to large phase shifting which moves the data a great distance on the independent axis. It's a good idea to always plot the original data and the massaged data together to ensure that the massaged data has not been seriously distorted.''' if extrapolation_window is None: extrapolation_window = window for i in range(iterations): self._smooth(axis = 'y', window = window, extrapolation_window = extrapolation_window) def smooth_x(self, window = 5, extrapolation_window = None, iterations = 1): '''Smooths a data set's x axis data for publication. 'window' is the size of the main filtering window. The data is convolved with a block of '1s'. The length of the block determines the aggressiveness of the filtering. A large window size is like having a low frequency pole. The larger it is the more distortion there will be. 'extrapolation_window' is the size of the window used to determine the line for linear extrapolation off the ends of the data set. The data needs to be extended so the convolution doesn't run out of data on the ends. The default extrapolation window is set to the main window but can be reduced to reduce distortion or more properly model the end point derivatives. 'iterations' is the number of iterations the smoothing function runs. Increasing the iterations is like having more poles at the same frequency thereby producing essentially a 'brick wall' filter. ***************** **** WARNING **** ***************** Iterations should be used judiciously as it can lead to large phase shifting which moves the data a great distance on the independent axis. It's a good idea to always plot the original data and the massaged data together to ensure that the massaged data has not been seriously distorted.''' if extrapolation_window is None: extrapolation_window = window for i in range(iterations): self._smooth(axis = 'x', window = window, extrapolation_window = extrapolation_window) class oscilloscope_channel(ordered_pair): def __init__(self, time_points, channel_data): list.__init__(self) '''takes string data, likely from a two-column sql database query of an oscilloscope trace and returns a list of (x,y) ordered pairs of floats appropriate for plotting or further manipulation expects time and channel series data to be surrounded with square braces and comma separated time_points and channel_data should be of equal length''' xvalues = [float(x) for x in time_points.strip("[]").split(",")] yvalues = [float(x) for x in channel_data.strip("[]").split(",")] self.extend(zip(xvalues, yvalues)) self.array = numpy.array(zip(xvalues, yvalues), dtype=[('x', float), ('y', float)]) def to_recarray(self): return self.array.view(numpy.recarray)
[docs]def egg_timer(timeout, message=None, length=30, display_callback=None): '''Provides a blocking delay with a graphic to indicate progress so far so the computer doesn't look idle optionally, display a message on the line above the timer graphic optionally, specify a display_callback function to insert extra progress information after the timer display. display_callback function should accept a single dictionary argument and return a string.''' light_shade = u"▒" #\u2592 dark_shade = u"█" #\u2588 digits = len(str(int(timeout))) longest_line_len = 0 status = {} status['start_time'] = time.time() status['total_time'] = timeout status['callback_disp_str'] = '' status['elapsed_time'] = 0 status['message'] = message if message is not None: print message while status['elapsed_time'] != status['total_time']: status['present_time'] = time.time() status['elapsed_time'] = min(status['present_time'] - status['start_time'], status['total_time']) status['remaining_time'] = status['total_time'] - status['elapsed_time'] status['percent_complete'] = status['elapsed_time'] / status['total_time'] complete_length = int(status['percent_complete'] * length) status['dark'] = u"█" * complete_length status['light'] = u"▒" * (length-complete_length) if display_callback is not None: status['callback_disp_str'] = display_callback(status) print_str = u"\r║{{dark}}{{light}}║ {{remaining_time:{digits}.0f}}/{{total_time:{digits}.0f}}s remaining. ({{percent_complete:3.1%}}). {{callback_disp_str}}".format(digits=digits).format(**status) #║╠╣ if len(print_str) < longest_line_len: pad = " " * (longest_line_len - len(print_str)) else: pad = "" longest_line_len = len(print_str) print print_str + pad, loop_time = time.time() - status['present_time'] if loop_time < 0.1: time.sleep(0.1 - loop_time) print
[docs]def expand_tabs(string, *column_widths, **default_column_width): '''like string.expandtabs, but works only on a single line and allows for varying column widths. accepts variable number of positional arguments for each column width. accepts keyword argument "default_column_width" if not all column widths are specified. accepts keyword argument "verbose" to warn if column width is too narrow for contents.''' for key in default_column_width: if key != "default_column_width" and key != "verbose": raise Exception('"default_column_width" and "verbose" are the only allowed keyword arguments.') columns = string.split('\t') out_str = '' for idx, column in enumerate(columns): try: column_width = column_widths[idx] except IndexError as e: if "default_column_width" in default_column_width: column_width = default_column_width["default_column_width"] else: raise Exception('Specify width of each column or specify keyword argument "default_column_width"') pad = column_width - len(column) if pad < 1: if default_column_width.get("verbose", None): print "Column {} undersize by {}.".format(idx, 1-pad) pad = 1 space = ' ' * pad out_str += column + space return out_str
[docs]def column_formatter(rows_of_columns, padding=3, justification="left", fist_line_justification="center"): '''takes data of form: [['ID:', 'REL_NOW_TIME', 'DURATION', 'FMT', 'RAW ', 'COUNT'], [u'00:', '-00:00:35.290 ', ' 00:00:00.431 ', u'41.974488V', 28438, 1], [u'01:', '-00:00:34.859 ', ' 00:00:07.216 ', u'-20.675808V', 51528, 1], [u'02:', '-00:00:27.643 ', ' 00:00:00.586 ', u'-30.318516V', 44995, 1], [u'03:', '-00:00:27.057 ', ' 00:00:17.777 ', u'38.97378V', 26405, 1], [u'04:', '-00:00:09.280 ', ' 00:00:00.897 ', u'21.428568V', 14518, 1], [u'05:', '-00:00:08.383 ', ' 00:00:08.383+', u'-23.781312V', 49424, 1] ] produces formatted output of form: ID: REL_NOW_TIME DURATION FMT RAW COUNT 00: -00:00:35.290 00:00:00.431 41.974488V 28438 1 01: -00:00:34.859 00:00:07.216 -20.675808V 51528 1 02: -00:00:27.643 00:00:00.586 -30.318516V 44995 1 03: -00:00:27.057 00:00:17.777 38.97378V 26405 1 04: -00:00:09.280 00:00:00.897 21.428568V 14518 1 05: -00:00:08.383 00:00:08.383+ -23.781312V 49424 1 padding sets spacing between columns first_line_justification and justification control aligment for first and subsequent lines respectively. valid arguments are 'left','right' and 'center' ''' if justification == "left": sljmethod = unicode.ljust elif justification == "right": sljmethod = unicode.rjust elif justification == "center": sljmethod = unicode.center else: raise Exception("Valid justification arguments are 'left','right' and 'center'") if fist_line_justification == "left": fljmethod = unicode.ljust elif fist_line_justification == "right": fljmethod = unicode.rjust elif fist_line_justification == "center": fljmethod = unicode.center else: raise Exception("Valid fist_line_justification arguments are 'left','right' and 'center'") col_widths = [0] * len(rows_of_columns[0]) for row in rows_of_columns: for idx, col in enumerate(row): col_widths[idx] = max(col_widths[idx], len(unicode(col))) ret_str = u'' for idx,row in enumerate(rows_of_columns): jmethod = fljmethod if idx == 0 else sljmethod for idx,col in enumerate(row): ret_str += u"".join(jmethod(unicode(col), col_widths[idx])) ret_str += " " * padding ret_str += "\n" return ret_str
class ticker(object): def __init__(self,stock_list=None): if stock_list is None: self.stock_list = ['LLTC', 'ADI', 'TXN', 'AAPL', 'GOOG'] else: self.stock_list = stock_list def get_quote(self,symbol): url = 'http://finance.yahoo.com/d/quotes.csv?s=+{}&f=snl1'.format(symbol) try: ticker, desc, price = csv.reader([urllib.urlopen(url).read()]).next() return {'ticker':ticker.strip().replace('"',''), 'desc':desc.strip().replace('"',''), 'price':price.strip().replace('"','')} except Exception as e: print e return {'ticker':None, 'desc':None, 'price':None} def build_tape(self): self.str = '' for stock in self.stock_list: data = self.get_quote(stock) self.str += '{}: {} '.format(data['ticker'],data['price']) return self.str def rotate(self): self.str = self.str[1:] + self.str[0] return self.str def tick(self, display_function=None, character_time=0.15, refresh_time=45): if display_function is None: display_function = lambda msg: self.disp(msg) refresh_cycles = max(int(refresh_time / character_time), 1) while True: self.build_tape() for i in range(refresh_cycles): display_function(self.rotate()) time.sleep(character_time) def disp(self, msg): print '{}\r'.format(msg),
[docs]def modulate(data1, data2): '''data1 and data2 are tuples of x and y data that may not have the same number of 'x' values. The result is interpolated up to the higher of the two.''' independent = [] product = [] if len(data1) > len(data2): for value in data1: xvalue = value[0] data2_value = numpy.interp(xvalue, zip(*data2)[0], zip(*data2)[1]) independent.append(xvalue) product.append(value[1] * data2_value) else: for value in data2: xvalue = value[0] data1_value = numpy.interp(xvalue, zip(*data1)[0], zip(*data1)[1]) independent.append(xvalue) product.append(value[1] * data1_value) return zip(independent, product)
[docs]def delete_file(filename, max_tries=20, retry_delay=5): """Tries to delete a file, retrying if the file is locked (e.g. because it is open in Notepad++, SQLite Manager, or another program), failing gracefully if the file doesn't (yet) exist. Gives up after a number of retries and raises RuntimeError. Good for removing stale sqlite DBs and log files from old runs.""" try: f_stat = os.stat(filename) # See if file already exists. # If not, an exception is thrown and we GOTO the "except OSError:" below. # All code from here to the "except OSError:" # is only executed if the file actually exists. tries = max_tries while tries > 0: try: os.remove(filename) print "Removed prior run file {}".format(filename) break except OSError: print "Unable to remove stale file {} --- RETRYING in {} secs".format(filename, retry_delay) tries = tries - 1 time.sleep(retry_delay) else: print "Giving up!" raise RuntimeError except OSError: print "No prior", filename, "to remove."
if __name__ == "__main__": pass