#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Miscellaneous Utilities
=======================
Includes:
- database access toools
- servo delay loop
- string sanitization
- filters and other numeric transforms
- floating point number utilites
- email/SMS notification
'''
import smtplib
from email.mime.text import MIMEText #send email/sms message from script
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import time, datetime
import sqlite3
import re
import urllib
import operator
import math
import os
import atexit
import collections
import sys
import struct
import threading
import numbers
import csv
import Queue
try:
import numpy, scipy, scipy.signal
except ImportError as e:
print "Warning: NumPy,SciPy import failed."
print e
[docs]class delay_loop(object):
'''make constant loop delay independent of loop processing time by measuring time at beginning
and end of loop and adding extra delay as necessary'''
def __init__(self, strict=False, begin=True, no_drift=True):
'''Set strict to True to raise an Exception if loop time is longer than requested delay.
Timer will automatically begin when the object is instantiated if begin=True.
To start timer only when ready, set begin=False and call begin() method to start timer.
If no_drift=True, delay loop will manage loop time over-runs by debiting extra time from next cycle.
This insures long-term time stability at the expense of increased jitter.
Windows task switching can add multi-mS uncertainty to each delay() call, which can accumulate if not accounted for.
Set no_drift=False to ignore time over-runs when computing next delay time.
'''
self.strict = strict
self.no_drift = no_drift
self.count = 0
self.begin_time = None
self.delay_time = None #last loop time for margin diagnostics.
self.loop_time = None
if begin:
self.begin()
def __call__(self,seconds):
return self.delay(seconds)
[docs] def get_count(self):
'''returns total number of times delay() method called'''
return self.count
[docs] def get_total_time(self):
'''returns total number of seconds since first delay'''
return (datetime.datetime.utcnow()-self.start_time).total_seconds()
[docs] def begin(self, offset = 0):
'''make note of begin time for loop measurement. Use offset to adjust the begin time in case of overrun on last cycle.'''
if self.begin_time is None:
self.start_time = datetime.datetime.utcnow()
self.begin_time = datetime.datetime.utcnow() + datetime.timedelta(seconds = offset)
[docs] def delay(self, seconds):
'''delay extra time to make loop time constant
returns actual delay time achieved'''
if self.begin_time is None:
raise Exception('Call begin() method before delay().')
self.count += 1
elapsed_time = datetime.datetime.utcnow() - self.begin_time
self.delay_time = (datetime.timedelta(seconds = seconds) - elapsed_time).total_seconds()
if (self.delay_time < 0):
if (self.strict == True):
raise Exception('Loop processing longer than requested delay by {:3.3f} seconds {} at {}.'.format(-self.delay_time, datetime.datetime.now()))
else:
print 'Warning! Loop processing longer than requested delay by {:3.3f} seconds at {}.'.format(-self.delay_time, datetime.datetime.now())
else:
time.sleep(self.delay_time)
self.loop_time = (datetime.datetime.utcnow() - self.begin_time).total_seconds()
if self.no_drift:
self.begin(offset = seconds - self.loop_time) # restart timer for next loop cycle, use offset to correct for over run.
else:
self.begin(offset = 0) # restart timer for next loop cycle, ignore over run.
return self.delay_time
[docs] def time_remaining(self, loop_time):
'''Use this in a while loop to perform another function for duration loop_time. Test the result for 0 or less.'''
if self.begin_time is None:
raise Exception('Call begin() method before time_remaining().')
remaining_time = loop_time - (datetime.datetime.utcnow() - self.begin_time).total_seconds()
if remaining_time <= 0:
self.begin(offset = remaining_time) # restart timer for next loop cycle, use offset to correct for over run.
return remaining_time
[docs] def delay_margin(self):
'''Return extra time remaining (ie sleep time) before last call to delay().'''
return self.delay_time
[docs] def achieved_loop_time(self):
'''Return previous actual achieved loop time (including any overrun).'''
return self.loop_time
[docs]class threaded_writer(object):
'''helper to perform some task in parallel with test script at fixed rate'''
[docs] class stop_thread(threading.Thread):
'''Thread extended to have stop() method. Threads cannot be restarted after stopping. Make a new one to restart.'''
def __init__(self, stop_event, stopped_event, queue, group=None, target=None, name=None, args=(), kwargs={}):
self.stop_event = stop_event #command to stop thread
self.stopped_event = stopped_event #notification that thread has stopped itself.
self.queue = queue
threading.Thread.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
[docs] def stop(self):
'''stop thread. thread cannot be restarted.'''
self.stop_event.set()
def set_time_interval(self, time_interval):
self.queue.put(("time_interval", time_interval))
def __init__(self, verbose=False):
self.verbose = verbose
self._threads = [] #check stopped_event whenever inspecting elements of this list to find out which threads have already stopped.
def _check_threads(self):
'''remove terminated threads from internal list'''
for thread in self._threads[:]:
if thread.stopped_event.is_set():
self._threads.remove(thread)
[docs] def stop_all(self):
'''stop all threads. threads cannot be restarted.'''
self._check_threads()
for thread in self._threads[:]:
thread.stop()
self._threads.remove(thread)
[docs] def connect_channel(self, channel_name, time_interval, sequence=None, start=True, address='localhost', port=5001, authkey='ltc_lab'):
'''
Write each element of sequence in turn to channel_name, waiting time_interval between writes.
If sequence is None, Periodically read and re-write channel as keepalive.
Thread safety provided by remote channel server infrastructure.
First thread must call master.serve() and test script should call master.attach().
'''
from PyICe import lab_core
m = lab_core.master()
m.attach(address, port, authkey)
if sequence is None:
return self.add_function(lambda channel_name=channel_name: m.write(channel_name, m.read(channel_name)), time_interval, start)
else:
class sequencer(object):
def __init__(self):
self.sequence = self.generator(sequence)
def generator(self, sequence):
for i in sequence:
yield i
def __call__(self):
m.write(channel_name, self.sequence.next())
return self.add_function(sequencer(), time_interval, start)
[docs] def add_function(self, function, time_interval, start=True):
'''
Periodically execute function.
No thread safety. Use caution with shared interfaces or use separate remote channel clients with each function. See example above.
'''
stop_event = threading.Event()
stopped_event = threading.Event()
queue = Queue.Queue()
thread = self.stop_thread(stop_event, stopped_event, queue, target=lambda: self._task(function, time_interval, stop_event, stopped_event, queue), name=None)
if start:
thread.start()
self._threads.append(thread)
return thread
def _task(self, function, time_interval, stop_event, stopped_event, queue):
'''thread handling loop. processes input Event to request thread termination and sends event back when thread terminates.'''
dly = delay_loop()
params = {}
params['time_interval'] = time_interval
while not stop_event.is_set(): #add ability to pass external message to terminate thread???
try:
attr = queue.get_nowait()
if self.verbose:
print "Writing {} to {}".format(attr[0],attr[1])
params[attr[0]] = attr[1]
except Queue.Empty:
pass
if self.verbose:
print "Executing {} at time {}".format(function, datetime.datetime.utcnow())
try:
function()
except StopIteration as e:
if self.verbose:
print "Thread {} terminating - reached end of sequence at time {}".format(function, datetime.datetime.utcnow())
stopped_event.set()
return
dly.delay(params['time_interval'])
if self.verbose:
print "Thread {} terminating - received stop event at time {}".format(function, datetime.datetime.utcnow())
stopped_event.set()
[docs]def floatRange(start,stop=None,step=None):
'''Returns a list of numbers similar to python range() builtin but supports floats.
start is inclusive, stop is exclusive
When called with a single argument, start=0 and the argument becomes stop.'''
return numpy.arange(start, stop, step).tolist()
[docs]def floatRangeInc(start,stop=None,step=None):
'''Same as float range, however it is inclusive of the last value'''
return floatRange(start, stop, step) + [stop]
[docs]def logRange(start,stop,stepsPerDecade=None, stepsPerOctave=None):
'''log step range function similar to python built-in range()'''
if (stepsPerDecade is not None and stepsPerOctave is None):
stepsize = 10**(1.0/stepsPerDecade) #possible divide by zero!
elif (stepsPerDecade is None and stepsPerOctave is not None):
stepsize = 2**(1.0/stepsPerOctave) #possible divide by zero!
else:
raise Exception('Must call logRange function with exactly one of the (stepsPerDecade, stepsPerOctave) arguments')
point = float(start)
r = []
while (point < stop):
r.append(point)
point *= stepsize
return r
[docs]def decadeListRange(decadePoints,decades):
'''log step range function similar to python built-in range()
accepts list input of points in a single decade and repeats
these points over the specified number of decades
'''
r = []
exp = 0
while (decades > 0):
r.extend(map(lambda x: x*10**exp, decadePoints))
decades -= 1
exp += 1
return r
[docs]class email(object):
'''sends email to specified destination from dedicated gmail account, or account of your choosing'''
def __init__(self, destination, login='ltlabboston', pw='PythonLab', domain='gmail.com'):
'''destination is the recipient's email address'''
self.destination = destination
self.login = login
self.pw = pw
self.sender = login + '@' + domain
[docs] def send(self, body, subject = None, attachments = []):
'''compose MIME message with proper headers and send'''
if len(attachments) == 0:
message = MIMEText(body, _charset="utf-8")
else:
message = MIMEMultipart('mixed')
message.attach(MIMEText(body, _charset="utf-8"))
for attachment in attachments:
filebytes = open(attachment, "rb")
Attachment = MIMEApplication(filebytes.read())
Attachment.add_header('content-disposition', 'attachment', filename = os.path.basename(attachment))
message.attach(Attachment)
if (subject is not None):
message['Subject'] = subject
message['To'] = self.destination
message['From'] = self.sender
server = smtplib.SMTP('smtp.gmail.com:587')
server.ehlo()
server.starttls()
server.ehlo()
server.login(self.login,self.pw)
server.sendmail(self.sender, self.destination, message.as_string())
server.quit()
[docs]class sms(email):
'''Extends email class to send sms messages through several carriers' email to sms gateways'''
def __init__(self, mobile_number, carrier, login='ltlabboston', pw='PythonLab', domain='gmail.com'):
'''carrier is 'verizon', 'tmobile', 'att', 'sprint', or 'nextel' '''
sms_email = ''
for digit in str(mobile_number):
if digit.isdigit(): #remove dashes, dots, spaces, and whatever other non-digits came in
sms_email += digit
sms_email = sms_email.lstrip('1') #remove country code
if (len(sms_email) != 10):
raise Exception('mobile_number argument must be a 10-digit phone number with area code')
carrier = carrier.lower()
if (carrier == 'verizon'):
sms_email += '@vtext.com'
elif (carrier == 't-mobile' or carrier == 'tmobile'):
sms_email += '@tmomail.net'
elif (carrier == 'att' or carrier == 'at&t'):
sms_email += '@txt.att.net '
elif (carrier == 'sprint'):
sms_email += '@messaging.sprintpcs.com'
elif (carrier == 'nextel'):
sms_email += '@page.nextel.com'
else:
#look up additional sms email gateways here: http://en.wikipedia.org/wiki/List_of_SMS_gateways
raise Exception('carrier argument must be "verizon", "t-mobile", "att", "sprint", or "nextel" unless you add your carrier to the list')
email.__init__(self, sms_email, login, pw, domain)
def str2num(str_in):
if isinstance(str_in,int) or isinstance(str_in,float) or str_in is None:
return str_in
try:
return int(str_in,0) #automatically select base
except ValueError:
try:
return float(str_in)
except ValueError as e:
print "string failed to convert both to integer (automatic base selection) and float: {}".format(str)
raise e
def clean_c(str):
str = str.replace("\t","_") #0x09
str = str.replace(" ","_") #0x20
str = str.replace("!","_BANG_") #0x21
str = str.replace('"',"_DQT_") #0x22
str = str.replace("#","_PND_") #0x23
str = str.replace("$","_DOL_") #0x24
str = str.replace("%","_PER_") #0x25
str = str.replace("&","_AND_") #0x26
str = str.replace("'","_SQT_") #0x27
str = str.replace("(","_OPNP_") #0x28
str = str.replace(")","_CLSP_") #0x29
str = str.replace("*","_MUL_") #0x2A
str = str.replace("+","_PLS_") #0x2B
str = str.replace(",","_COMA_") #0x2C
str = str.replace("-","_MNS_") #0x2D
str = str.replace(".","p") #0x2E
str = str.replace("/","_DIV_") #0x2F
str = str.replace(":","_CLN_") #0x3A
str = str.replace(";","_SCLN_") #0x3B
str = str.replace("<","_LSS_THN_") #0x3C
str = str.replace("=","_EQLS_") #0x3D
str = str.replace(">","_GRTR_THN_") #0x3E
str = str.replace("?","_QUES_") #0x3F
str = str.replace("@","_AT_") #0x40
str = str.replace("[","_OPNS_") #0x5B
str = str.replace("\\","_SLSH_") #0x5C
str = str.replace("]","_CLSS_") #0x5D
str = str.replace("^","_CAR_") #0x5E
#0x5F is '_'
str = str.replace("`","_GRAVE_") #0x60
str = str.replace("{","_OPNC_") #0x7B
str = str.replace("|","_OR_") #0x7C
str = str.replace("}","_CLSC_") #0x7D
str = str.replace("~","_TIL_") #0x7E
if str[0].isdigit():
str = "_" + str
for c in str:
#all characters 0x3A-0x40 and 0x5B-0x60 already replaced above.
if ord(c) > 127:
raise Exception('TODO?: escape non-ascii character found in: {}'.format(str))
if ord(c) < 0x30:
raise Exception('Ascii control character code point {} found in: {}'.format(ord(c),str))
if ord(c) > 0x7A:
raise Exception('Ascii non-alphanumeric character code point {} found in: {}'.format(ord(c),str))
return str
def remove_non_ascii(text):
out = ''
for c in text:
#all characters 0x3A-0x40 and 0x5B-0x60 already replaced above.
if ord(c) > 127:
c = "(REMOVED_NON_ASCII)"
out += c
return out
def remove_html(text):
if text is not None:
re.sub('<[^<]+?>', '', text)
return text
[docs]def swap_endian(word, elementCount, elementSize=8):
'''reverse endianness of multi-byte word
elementCount is number of bytes, or other atomic memory block if not of elementSize 8 bits
to reverse bit order, set elementCount to the number of bits and set elementSize to 1.'''
assert word < 2**(elementSize*elementCount)
assert word >= 0
reversed = 0x00
mask = 2**elementSize-1
while elementCount > 0:
reversed ^= (word & mask) << (elementSize*(elementCount-1))
word = word >> elementSize
elementCount -= 1
return reversed
[docs]def signedToTwosComplement(signed, bitCount):
'''take python int and convert to two's complement representation using specified number of bits'''
assert signed < 2**(bitCount-1)
assert signed >= -1 * 2**(bitCount-1)
if signed < 0:
signed += 2**bitCount
signed &= 2**bitCount-1
return signed
[docs]def twosComplementToSigned(binary, bitCount):
'''take two's complement number with specified number of bits and convert to python int representation'''
assert binary < 2**bitCount
assert binary >= 0
if binary >= 2**(bitCount-1):
binary -= 2**bitCount
return binary
[docs]def isclose(a, b, rel_tol=1e-9, abs_tol=0.0):
#backported from 3.5
#https://github.com/PythonCHB/close_pep/blob/master/isclose.py
#https://www.python.org/dev/peps/pep-0485/
#https://docs.python.org/3/library/math.html#math.isclose
#alternative tests here: https://github.com/PythonCHB/close_pep/blob/master/is_close.py
"""
returns True if a is close in value to b. False otherwise
:param a: one of the values to be tested
:param b: the other value to be tested
:param rel_tol=1e-9: The relative tolerance -- the amount of error
allowed, relative to the absolute value of the
larger input values.
:param abs_tol=0.0: The minimum absolute tolerance level -- useful
for comparisons to zero.
NOTES:
-inf, inf and NaN behave similarly to the IEEE 754 Standard. That
is, NaN is not close to anything, even itself. inf and -inf are
only close to themselves.
The function can be used with any type that supports comparison,
substratcion and multiplication, including Decimal, Fraction, and
Complex
Complex values are compared based on their absolute value.
See PEP-0485 for a detailed description
"""
if a == b: # short-circuit exact equality
return True
if rel_tol < 0.0 or abs_tol < 0.0:
raise ValueError('error tolerances must be non-negative')
# use cmath so it will work with complex ot float
if math.isinf(abs(a)) or math.isinf(abs(b)):
# This includes the case of two infinities of opposite sign, or
# one infinity and one finite number. Two infinities of opposite sign
# would otherwise have an infinite relative tolerance.
return False
diff = abs(b - a)
return (((diff <= abs(rel_tol * b)) and #DJS change from weak to strong symmetry so that argument order doesn't matter
(diff <= abs(rel_tol * a))) or
(diff <= abs_tol))
[docs]def unit_least_precision(val, increasing=True):
'''return positive increment/decrement to next representable floating point number above/below val'''
if increasing:
return float_next(val) - val
else:
return val - float_prior(val)
[docs]def float_next(val):
'''return next Python double precision floating point nuber larger than x.'''
#algorithm copied from Boost: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp
assert not math.isinf(val)
assert not math.isnan(val)
assert not val >= sys.float_info.max
if val == 0:
return sys.float_info.epsilon*sys.float_info.min #denorm min
frac, expon = math.frexp(val)
if frac == -0.5:
expon -= 1 #reduce exponent when val is a power of two, and negative.
diff = math.ldexp(1, expon - sys.float_info.mant_dig)
if diff == 0:
diff = sys.float_info.epsilon*sys.float_info.min #denorm min
return val + diff;
[docs]def float_prior(val):
'''return next Python double precision floating point nuber smaller than x.'''
#algorithm copied from Boost: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp
assert not math.isinf(val)
assert not math.isnan(val)
assert not val <= -sys.float_info.max
if val == 0:
return -sys.float_info.epsilon*sys.float_info.min #denorm min
frac, expon = math.frexp(val)
if frac == 0.5:
expon -= 1 #when val is a power of two we must reduce the exponent
diff = math.ldexp(1, expon - sys.float_info.mant_dig)
if diff == 0:
diff = sys.float_info.epsilon*sys.float_info.min #denorm min
return val - diff
[docs]def float_distance(x,y):
'''return signed difference between x and y expressed as distance between representable floating point numbers.'''
#Boost library algorithm port: http://www.boost.org/doc/libs/1_45_0/boost/math/special_functions/next.hpp
if x > y:
return -float_distance(y, x)
elif x == y:
return 0
elif x == 0:
return 1 + abs(float_distance(math.copysign(1,y) * sys.float_info.epsilon*sys.float_info.min, y)) #denorm min
elif y == 0:
return 1 + abs(float_distance(math.copysign(1,x) * sys.float_info.epsilon*sys.float_info.min, x)) #denorm min
elif math.copysign(1,x) != math.copysign(1,y):
return 2 + abs(float_distance(math.copysign(1,x) * sys.float_info.epsilon*sys.float_info.min, x)) + abs(float_distance(math.copysign(1,y) * sys.float_info.epsilon*sys.float_info.min, y)) #denorm min
#should have same sign now
elif x < 0:
return float_distance(-y, -x)
assert x >= 0
assert y >= x
# Note that if a is a denorm then the usual formula fails because we actually have fewer than tools::digits<T>() significant bits in the representation:
expon = math.frexp(x)[1]
if expon < sys.float_info.min_exp:
expon = sys.float_info.min_exp
upper = math.ldexp(1, expon)
result = 0
expon = sys.float_info.mant_dig - expon #For floating-point types, this is the number of digits in the mantissa.
#If b is greater than upper, then we *must* split the calculation as the size of the ULP changes with each order of magnitude change:
if(y > upper):
result = float_distance(upper, y)
y = upper
#Use compensated double-double addition to avoid rounding errors in the subtraction:
X = x - y
Z = X - x
Y = (x - (X - Z)) - (y + Z)
if X < 0:
X = -X
Y = -Y
result += math.ldexp(X, expon) + math.ldexp(Y, expon)
#Result must be an integer:
assert(result == math.floor(result))
return int(result)
def bounded(value, min_value=None, max_value=None, key=None):
kwargs = {}
if key is not None:
kwargs['key'] = key
if min_value is not None and max_value is not None:
return max(min(value, max_value, **kwargs), min_value, **kwargs)
elif min_value is None and max_value is not None:
return min(value, max_value, **kwargs)
elif min_value is not None and max_value is None:
return max(value, min_value, **kwargs)
return value
[docs]class csv_writer(object):
'''shared functions for higher level interfaces'''
def __init__(self):
self.column_data_t = collections.namedtuple('column_setup',['query_name','display_name','transform','format','query_function'])
self.no_transform = lambda x: x
self.columns = []
self.comments = []
def _format_header(self):
header_txt = u''
for comment in self.comments:
header_txt += comment + '\n'
for column in self.columns:
header_txt += u"{},".format(column.display_name)
return header_txt[:-1] + '\n'
def _format_output(self, data, column_setup_tuple):
'''give just one element of a data row'''
if column_setup_tuple.query_function is not None:
data = column_setup_tuple.query_function()
data = column_setup_tuple.transform(data)
return '{},'.format(column_setup_tuple.format).format(data)
[docs] def add_elapsed_seconds(self,display_name='elapsed_seconds', format=''):
'''computes elapsed seconds since first row of table'''
self._add_elapsed_time(display_name=display_name,format=format,transform=self.no_transform)
[docs] def add_elapsed_minutes(self,display_name='elapsed_minutes', format=''):
'''computes elapsed minutes since first row of table'''
self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/60.0)
[docs] def add_elapsed_hours(self,display_name='elapsed_hours', format=''):
'''computes elapsed hours since first row of table'''
self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/3600.0)
[docs] def add_elapsed_days(self,display_name='elapsed_days', format=''):
'''computes elapsed days since first row of table'''
self._add_elapsed_time(display_name=display_name,format=format,transform=lambda x: x/86400.0)
def _add_elapsed_time(self, *args, **kwargs):
raise NotImplementedError('Elapsed time not implemented')
[docs] def add_column(self, query_name, display_name=None, format='',transform=None,query_function=None):
'''add single column to output file.
provides more customization options than addign a list of columns
transform is a python function applied to the query results before formatting
format is a format string to alter the column data. Ex: 3.2f.
query function is a function that returns data directly from Python rather than from external data source. Ex: time
'''
if display_name is None:
display_name = query_name
format = format = '{{:{}}}'.format(format)
if transform is None:
transform = self.no_transform
self.columns.append(self.column_data_t(display_name=display_name,query_name=query_name,transform=transform,format=format,query_function=query_function))
[docs] def add_columns(self, column_list, format=''):
'''Shortcut method to add multiple data columns at once.
column_list selects additional data columns to output.
format is a format string to alter the column data. Ex: 3.2f.
For more flexibility, add columns individually using add_column() method.'''
for column in column_list:
self.add_column(column, format=format)
[docs]class csv_logger(csv_writer):
'''set up columns, then pass results dictionary from logger or channel group to write() method.
Can be used to provide automated test script 'marching waves' with a program such as Live Graph (http://www.live-graph.org/).
'''
def __init__(self, output_file, encoding='utf-8'):
csv_writer.__init__(self)
self.output_file = output_file
self.encoding = encoding
self.header_written = False
self.f = open(self.output_file, 'w', 1) #line buffered
atexit.register(self.__del__)
self._row_id = -1
def __enter__(self):
return self
def __exit__(self,exc_type, exc_val, exc_tb):
print "__exit__ closing CSV filehandle: {}".format(self.output_file)
self.f.close()
return None
def __del__(self):
print "__del__ closing CSV filehandle: {}".format(self.output_file)
self.f.close()
def _row_count(self):
'''private method used by rowid column.'''
self._row_id += 1
return self._row_id
[docs] def add_timestamps(self):
'''add rowid and datetime fields to output'''
csv_writer.add_column(self,query_name=None,display_name='rowid', query_function=self._row_count)
csv_writer.add_column(self,query_name=None,display_name='datetime',query_function=datetime.datetime.now,transform=lambda t: datetime.datetime.strftime(t,"%Y-%m-%dT%H:%M:%S.%fZ")) #2015-10-20 21:54:17
def _add_elapsed_time(self,display_name,format,transform):
csv_writer.add_column(self,query_name=None,display_name=display_name,format=format,query_function=datetime.datetime.now,transform=lambda t:transform((t-self._time_zero).total_seconds()))
[docs] def add_column(self, channel):
'''set up output to include channel object as column data source'''
csv_writer.add_column(self, query_name=channel.get_name(), transform=channel.format_display)
[docs] def add_columns(self, channel_list):
'''set up output to include channel objects in channel_list as column data sources'''
for channel in channel_list:
self.add_column(channel)
[docs] def write(self, channel_data):
'''write selected columns with data supplied in channel_data dictionary'''
if not self.header_written:
self.f.write(self._format_header().encode(self.encoding))
self._time_zero = datetime.datetime.now() #for elapsed time computation
self.header_written = True
row_txt = ''
for column in self.columns:
if column.query_function is not None:
row_txt += self._format_output(None, column)
elif column.query_name in channel_data:
row_txt += self._format_output(channel_data[column.query_name], column)
else:
raise Exception('Data for column: {} not provided to write() method.'.format(column.display_name))
self.f.write((row_txt[:-1] + '\n').encode(self.encoding))
self.f.flush() #just in case line buffering doesn't work
return channel_data
[docs] def register_logger_callback(self, logger):
'''register this csv_logger instance with a lab_core.logger instance for automatic data plotting
'''
logger.add_log_callback(self.write)
[docs] def unregister_logger_callback(self, logger, close_file=True):
'''clean up in case lab_core.logger will be re-used for a new test.'''
logger.remove_log_callback(self.write)
if close_file:
self.f.close()
[docs]class sqlite_to_csv(csv_writer):
'''Formats data stored in an SQLite database so that it can be browsed interactively.
Use a program like Live Graph (www.www.live-graph.org) or KST (kst-plot.kde.org) to visualize data.'''
def __init__(self, table_name, database_file='data_log.sqlite'):
'''name is the chart title.
table_name is the database table containing selected data columns.
database_file is the sqlite file containing table_name.'''
csv_writer.__init__(self)
self.table_name = table_name
self.conn = sqlite3.connect(database_file)
self.cursor = self.conn.cursor()
def __enter__(self):
return self
def __exit__(self,exc_type, exc_val, exc_tb):
self.conn.close()
return None
[docs] def add_timestamps(self):
'''Add rowid and datetime columns to csv output.'''
self.add_column('rowid')
self.add_column('datetime')
def _add_elapsed_time(self,display_name,format,transform):
self.cursor.execute('SELECT strftime("%s",datetime) FROM {} LIMIT 1'.format(self.table_name))
self.add_column(query_name='strftime("%s",datetime) - {}'.format(self.cursor.fetchone()[0]),display_name=display_name,format=format,transform=transform)
[docs] def write(self, output_file, append=False, encoding='utf-8'):
'''write previously selected column data to output_file.'''
query_txt = ''
for column in self.columns:
query_txt += u"{},".format(column.query_name)
query_txt = query_txt[:-1]
with open(output_file, 'a' if append else 'w') as f:
f.write(self._format_header().encode(encoding))
for row in self.cursor.execute('SELECT {} FROM {}'.format(query_txt,self.table_name)):
row_txt = ''
for cidx,column in enumerate(row):
row_txt += self._format_output(column, self.columns[cidx])
f.write((row_txt[:-1] + '\n').encode(encoding))
f.close()
print 'Output written to {}'.format(output_file)
[docs]class sqlite_to_easylog(sqlite_to_csv):
'''Wrapper to make specific format required by Easy Log Graph software.
Formats data stored in an SQLite database so that it can be browsed inteactively.
Use EasyLogGraph (http://www.lascarelectronics.com/data-logger/easylogger-software.php) to visualize data.'''
def __init__(self, chart_name, table_name, y1_axis_units='V', y2_axis_units='A', database_file='data_log.sqlite'):
'''chart_name will appear at top of graph
table_name is the sqlite database table name
y1_axis_units controls the left-side y-axis label
y2_axis_units controls the right-side y-axis label
database_file is the filename of the sqlite database'''
self.y1_axis_units = y1_axis_units
self.y2_axis_units = y2_axis_units
sqlite_to_csv.__init__(self, table_name, database_file)
sqlite_to_csv.add_column(self,query_name='rowid',display_name=chart_name)
sqlite_to_csv.add_column(self,query_name='datetime',display_name='Time') #the position of these fields is important
def add_comment(self, *args, **kwargs):
raise Exception("Comment lines don't seem to be allowed in EasyLog files.")
[docs] def add_column(self, query_name, second_y_axis=False, display_name=None, format='', transform=None):
'''query name is the name of the sqlite column
second_y_axis is a boolean. Setting to True places data on the right-side y-axis scale
display_name, if not None, sets csv column header title differently from database column name
format controls appearance of queried data. Ex: "3.2f"'''
if display_name is None:
display_name = query_name
display_name = "{} ({})".format(display_name, self.y2_axis_units if second_y_axis else self.y1_axis_units) #Data goes to first or second y-axis based on parenthesized units in column heading
sqlite_to_csv.add_column(self,query_name=query_name,display_name=display_name,format=format, transform=transform)
[docs] def add_columns(self, column_list, second_y_axis=False, format=''):
'''adds a list of sqlite column names at once
all columns will be placed on left-side y-axis scale unless second_y_axis is True
format controls appearance of queried data. Ex: "3.2f"
'''
for column in column_list:
self.add_column(query_name=column, second_y_axis=second_y_axis, format=format)
# def _add_elapsed_time(self, *args, **kwargs):
# raise Exception("Elapsed time not supported yet. Is it really needed?")
[docs] def write(self, output_file, append=False):
'''write queried data to output_file after column setup is complete'''
sqlite_to_csv.write(self, output_file=output_file, append=append, encoding='mbcs') #windows ANSI codepage
[docs]class sqlite_data(collections.Sequence): #collections.Iterable to disable slicing?
'''Produce iteratable object returning row sequence, where each column within each row is accessible by either column name or position.
table_name can be an expression returning a synthetic non-table relation.
'''
def __init__(self, table_name=None, database_file='data_log.sqlite', timezone=None):
if timezone is None:
self.timezone = UTC()
else:
self.timezone = timezone
sqlite3.register_converter("DATETIME", self.convert_timestring)
self.conn = sqlite3.connect(database_file, detect_types=sqlite3.PARSE_DECLTYPES) #automatically convert datetime column to Python datetime object
self.conn.row_factory = sqlite3.Row #index row data tuple by column name
self.table_name = table_name
self.sql_query = None
self.params = []
if table_name is not None:
self.sql_query = "SELECT * from {}".format(table_name) #remove rowid because it's now an explicitly stored column from the logger
def convert_timestring(self, time_string):
# return datetime.datetime.strptime(time_string,'%Y-%m-%d %H:%M:%S').replace(tzinfo=UTC()).astimezone(self.timezone) #old format
return datetime.datetime.strptime(time_string,'%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=UTC()).astimezone(self.timezone)
def __getitem__(self,key):
'''implement sequence behavior.'''
subs = {}
if isinstance(key, slice):
if key.start is None:
subs['start'] = 0
else:
subs['start'] = key.start
if key.stop is None:
subs['limit'] = -1 #no limit
else:
if subs['start'] >= key.stop:
raise Exception('Reverse iteration not supported.')
subs['limit'] = key.stop - subs['start']
if key.step is not None:
raise Exception('Slice step not supported.')
fetch = sqlite3.Cursor.fetchall
else:
subs['start'] = key
subs['limit'] = 1
fetch = sqlite3.Cursor.fetchone
return fetch(self.conn.execute(self.sql_query + " LIMIT {limit} OFFSET {start};".format(**subs), self.params))
def __iter__(self):
'''implement iterable behavior.'''
return self.conn.execute(self.sql_query, self.params)
def __len__(self):
'''return number of rows returned by sql query.
WARNING: Inefficient.
'''
#this is hard because the iterable doesn't actually know its length
#self.cursor.rowcount doesn't work; returns -1 when database isn't modified.
#not very efficient for big dataset!
return len(self.conn.execute(self.sql_query, self.params).fetchall())
def __enter__(self):
return self
def __exit__(self,exc_type, exc_val, exc_tb):
self.conn.close()
[docs] def get_column_names(self):
'''return tuple of column names.
Column names can be used for future queries or used to select column from query row results.
'''
if self.sql_query is None:
raise Exception('table_name not specified')
return self.conn.execute(self.sql_query, self.params).fetchone().keys()
[docs] def get_column_types(self):
'''Return dictionary of data types stored in each column.
Note that SQLite does not enforce types within a column, nor does the PyICe logger.
The types of data stored in the first row will be returned, which may not match data stored elsewhere in the relation.
Used by numpy array conversion to define data stride.
'''
cursor = self.conn.execute(self.sql_query, self.params).fetchone()
return collections.OrderedDict([(k,type(cursor[k])) for k in cursor.keys()])
[docs] def get_distinct(self, column_name, table_name=None):
'''return one copy of each value (set) in specified column
table_name can be an expression returning a synthetic non-table relation.
'''
if table_name is None:
table_name = self.table_name
if table_name is None:
raise Exception('table_name not specified')
data = self.conn.execute("SELECT DISTINCT {} from {};".format(column_name, table_name)).fetchall()
return tuple(sorted(zip(*data)[0]))
[docs] def query(self, sql_query, *params):
'''return iterable with query results.
columns within each row can be accessed by column name or by position
'''
self.sql_query = sql_query
self.params = params
return self.conn.execute(self.sql_query, self.params)
[docs] def zip(self):
'''return query data transposed into column_list of row_lists.'''
return zip(*self)
[docs] def csv(self, output_file, elapsed_time_columns=False, append=False, encoding='utf-8'):
'''write data to csv output_file.
set output_file to None to just return csv string.'''
output_txt = ""
datetime_col = None
for pos,column in enumerate(self.get_column_names()):
output_txt += '{},'.format(column)
if elapsed_time_columns and column == 'datetime':
output_txt += 'elapsed_time,'
output_txt += 'elapsed_seconds,'
datetime_col = pos
output_txt = '{}\n'.format(output_txt[:-1])
start_time = None
for row in self:
for pos,column in enumerate(row):
output_txt += '{},'.format(column)
if start_time is None and pos == datetime_col:
start_time = column
if elapsed_time_columns and pos == datetime_col:
output_txt += '{},'.format(column - start_time) #elapsed_time
output_txt += '{},'.format((column-start_time).total_seconds()) #elapsed_seconds
output_txt = '{}\n'.format(output_txt[:-1])
if output_file is not None:
with open(output_file, 'a' if append else 'w') as f:
f.write(output_txt.encode(encoding))
f.close()
print 'Output written to {}'.format(output_file)
return output_txt
[docs] def xlsx(self, output_file, elapsed_time_columns=False):
'''write data to excel output_file.'''
import xlsxwriter
workbook = xlsxwriter.Workbook(output_file, {'constant_memory': True,
'strings_to_formulas': False,
'strings_to_urls': False,
'strings_to_numbers': False,
})
formats = { 'default' : workbook.add_format({'font_name' : 'Courier New', 'font_size' : 9}),
'date_time' : workbook.add_format({'num_format': 'yyyy/mm/dd hh:mm:ss.000', 'font_name' : 'Courier New', 'font_size' : 9}),
'delta_time': workbook.add_format({'num_format': 'hh:mm:ss.000', 'font_name' : 'Courier New', 'font_size' : 9}),
'row_shade': workbook.add_format({'bg_color': '#D0FFD0'}),
}
worksheet = workbook.add_worksheet(self.table_name)
column_width_pad = 1.0
filter_button_width = 3
row_idx = 0
col_idx = 0
datetime_col = None
for column in self.get_column_names():
worksheet.write(row_idx, col_idx, column)
worksheet.set_column(col_idx, col_idx, len(column) * column_width_pad + filter_button_width, formats['default'])
if column == 'datetime':
datetime_col = col_idx
if elapsed_time_columns:
worksheet.write(row_idx, col_idx + 1, 'elapsed_time')
worksheet.set_column(col_idx + 1, col_idx + 1, len('elapsed_time') * column_width_pad + filter_button_width, formats['default'])
worksheet.write(row_idx, col_idx + 2, 'elapsed_seconds')
worksheet.set_column(col_idx + 2, col_idx + 2, len('elapsed_seconds') * column_width_pad + filter_button_width, formats['default'])
col_idx += 2
col_idx += 1
column_count = col_idx - 1
worksheet.freeze_panes(1,0) #Keep header line at top
row_idx += 1
col_idx = 0
start_time = None
for row in self:
for column in row:
if col_idx == datetime_col:
worksheet.write_datetime(row_idx, col_idx, column.replace(tzinfo=None), formats['date_time']) #Excel can't deal with timezone-aware datetimes. Output Zulu time.
worksheet.set_column(col_idx, col_idx, max(len(formats['date_time'].num_format) * column_width_pad, worksheet.col_sizes[col_idx]))
else:
worksheet.write(row_idx, col_idx, column)
try:
worksheet.set_column(col_idx, col_idx, max(len(column) * column_width_pad, worksheet.col_sizes[col_idx]))
except TypeError:
pass #numbers doesn't have length
if start_time is None and col_idx == datetime_col:
start_time = column
if elapsed_time_columns and col_idx == datetime_col:
worksheet.write_datetime(row_idx, col_idx + 1, column - start_time, formats['delta_time']) #elapsed_time
worksheet.set_column(col_idx + 1, col_idx + 1, max(len(formats['delta_time'].num_format) * column_width_pad, worksheet.col_sizes[col_idx + 1]))
worksheet.write(row_idx, col_idx + 2, (column-start_time).total_seconds()) #elapsed_seconds
col_idx += 2
col_idx += 1
row_idx += 1
col_idx = 0
worksheet.autofilter(0, 0, row_idx - 1, column_count)
worksheet.conditional_format(1, 0, row_idx - 1, column_count, {'type': 'formula',
'criteria': '=MOD(SUBTOTAL(3,$A$1:$A2),2)=0',
'format': formats['row_shade']}
)
icon_path = os.path.join(os.path.dirname(__file__), "tssop.png")
worksheet.insert_image(row_idx + 2, 1, icon_path, {'y_offset': 70})
try:
import io, urllib2
url = 'https://www.python.org/static/community_logos/python-powered-h-140x182.png'
image_data = io.BytesIO(urllib2.urlopen(url).read())
worksheet.insert_image(row_idx + 2, 5, url, {'image_data': image_data,
'x_scale': 1.5,
'y_scale': 1.5,
})
except Exception as e:
print "INFO: Python logo insertion failed."
#print e
worksheet.protect('', {'sort': True,
'autofilter': True,
'format_cells': True,
'format_columns': True,
'format_rows': True,
})
workbook.close()
print 'Output written to {}'.format(output_file)
[docs] def to_list(self):
'''return copy of data in list object'''
return [row for row in self]
[docs] def numpy_recarray(self, force_float_dtype=False, data_types=None):
'''return NumPy record array containing data.
Rows can be accessed by index, ex arr[2].
Columns can be accessed by column name attribute, ex arr.vbat.
Use with data filtering, smoothing, compressing, etc matrix operations provided by SciPy and lab_utils.transform, lab_utils.decimate.
Use automaticic column names, but force data type to float with force_float_dtype boolean argument.
Override automatic coulumn names and data types (first row) by specifying data_type iterable of (column_name,example_contents) for each column matching query order.
http://docs.scipy.org/doc/numpy-1.10.1/reference/generated/numpy.recarray.html
'''
if force_float_dtype and data_types is None:
dtype = numpy.dtype([(key,type(float())) for key in self.get_column_types()])
elif force_float_dtype and data_types is not None:
raise Exception('Specify only one of force_float_dtype, data_types arguments.')
elif data_types is None:
dtype = numpy.dtype([(k,v) for k,v in self.get_column_types().iteritems()])
else:
dtype = numpy.dtype([(column_name,type(example_contents)) for column_name,example_contents in data_types])
arr = numpy.array([tuple(row) for row in self], dtype)
return arr.view(numpy.recarray)
[docs] def column_query(self, column_list):
'''return partial query string separating column names with comma characters.'''
str = ''
for column in column_list:
str += '{},'.format(column)
return str[:-1]
[docs] def time_delta_query(self, time_div=1, column_name=None):
'''return partial query string which will compute fractional delta seconds from first entry in the table as a column.
Feed back into query to get elapsed time column.
Ex "SELECT rowid, {}, * FROM ...".format(sqlite_data_obj.time_delta_query())
Use time_div to convert from second to your choice of time scales, example: time_div=3600 would be hours.
'''
if column_name is None:
if time_div == 0.001:
column_name = "elapsed_milliseconds"
elif time_div == 1:
column_name = "elapsed_seconds"
elif time_div == 60:
column_name = "elapsed_minutes"
elif time_div == 3600:
column_name = "elapsed_hours"
elif time_div == 86400:
column_name = "elapsed_days"
elif time_div == 31536000:
column_name = "elapsed_years"
else:
column_name = "elapsed_time"
frac_s_str = "strftime('%s',datetime)+strftime('%f',datetime)-strftime('%S',datetime)"
if self.table_name is None:
raise Exception('table_name not specified')
first_time = self.conn.execute("SELECT {} FROM {} ORDER BY rowid ASC;".format(frac_s_str,self.table_name)).fetchone()[0]
return "({}-{})/{} AS {}".format(frac_s_str,first_time,time_div,column_name)
[docs] def filter_change(self, column_name_list, table_name=None, first_row=False, preceding_row=False):
'''return tuple of rowid values where any column in column_name_list changed value.
result tuple can be fed into a new query("SELECT ... WHERE rowid in {}".format(sqlite_data_obj.filter_change())).
it table_name is omitted, instance default will be used.
setting preceding_row to True will also return the rowid before the change occurred.
'''
if table_name is None:
table_name = self.table_name
if table_name is None:
raise Exception('table_name not specified')
if first_row:
first_row = (1,)
else:
first_row = tuple()
sql_query = 'SELECT delay.rowid from {table_name} as orig JOIN {table_name} as delay ON orig.rowid = delay.rowid-1 WHERE '.format(table_name=table_name)
for column_name in column_name_list:
sql_query += 'orig.{column_name} IS NOT delay.{column_name} OR '.format(column_name=column_name)
sql_query = sql_query[:-4]
try:
row_ids = zip(*self.conn.execute(sql_query))[0]
except IndexError as e:
#no changes
return first_row
if preceding_row:
preceding_row_ids = tuple(row -1 for row in row_ids)
return tuple(sorted(first_row + row_ids + preceding_row_ids))
return tuple(sorted(first_row + row_ids))
[docs] def optimize(self):
'''Defragment database file, reducing file size and speeding future queries.
Also re-runs query plan optimizer to spped future queries.
WARNING: May take a lot time to complete when operating on a large database.
WARNING: May re-order rowid's
'''
self.conn.execute("VACUUM;")
self.conn.execute("ANALYZE;")
[docs] def expand_vector_data(self, csv_filename=None, csv_append=False, csv_encoding='utf-8'):
'''Expand vector list data (from oscilloscope, network analyzer, etc) to full row-rank.
Scalar data will be expanded to vector length.
Returns numpy record array.
Optionally write output to comma separated file if csv_filname argument is specified.
'''
columns = []
dtypes = []
data_length = None
column_names = self.get_column_names()
for column in column_names:
columns.append([])
for i,row in enumerate(self):
try:
if row[column].startswith('[') and row[column].endswith(']'):
column_data = [float(x) for x in row[column].strip("[]").split(",")]
if data_length is None and len(column_data) > 1:
data_length = len(column_data)
elif len(column_data) != 1 and len(column_data) != data_length:
raise Exception('Inconsistent data length in vector expansion: {} and {}'.format(len(columns[-1]),data_length))
else:
column_data = [row[column]]
except AttributeError:
column_data = [row[column]]
columns[-1].append(column_data)
if data_length is None:
print "WARNING: No vector data found."
data_length = 1
for i,column in enumerate(columns):
if len(column[0]) == 1:
dtypes.append((column_names[i], type(column[0][0])))
else:
dtypes.append((column_names[i], float))
for rowcount,rowcoldata in enumerate(column):
if len(rowcoldata) == 1:
#expand scalar data to vector length
column[rowcount] = rowcoldata * data_length
#flatten row data
data = []
for rowid in range(rowcount+1):
rowdata = []
for columnid in range(i+1):
rowdata.append(columns[columnid][rowid])
data.extend(zip(*rowdata))
#csv output
if csv_filename is not None:
csv_txt = ""
for column in column_names:
csv_txt += '{},'.format(column)
csv_txt = '{}\n'.format(csv_txt[:-1])
for row in data:
for column in row:
csv_txt += '{},'.format(column)
csv_txt = '{}\n'.format(csv_txt[:-1])
with open(csv_filename, 'a' if csv_append else 'w') as f:
f.write(csv_txt.encode(csv_encoding))
f.close()
print 'Output written to {}'.format(csv_filename)
array = numpy.array(data, dtype=dtypes)
return array.view(numpy.recarray)
[docs]def decimate(rec_array, downsample_factor, **kwargs):
'''Reduce row count by factor of downsample_factor.
By default an order 8 Chebyshev type I filter is used independently on each column.
Set kwarg ftype='fir' to instead use a 30 point FIR filter with hamming window (recommended).
http://docs.scipy.org/doc/scipy-0.16.1/reference/generated/scipy.signal.decimate.html
'''
return vector_transform(rec_array, [lambda col: scipy.signal.decimate(x=col,q=downsample_factor, **kwargs)] * len(rec_array.dtype.names))
[docs]def ramer_douglas_peucker(rec_array, epsilon, verbose=True):
'''reduce number of points in line-sigment curve such that reduced line segment count approximates original curve within epsilon tolerance.
https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm'''
try:
from rdp import rdp
except ImportError:
print "Install Ramer-Douglas-Peucker package.\nhttps://pypi.python.org/pypi/rdp"
raise
start_time = time.time()
column_type = 'float'
#column_type = '<f8'
old_dtype = rec_array.dtype.descr
new_dtype = numpy.dtype([(column[0],column_type) for column in old_dtype])
np_array = rec_array.astype(new_dtype).view(column_type).reshape(-1,2)
reduced_array = rdp(np_array, epsilon)
reduced_rec_array = numpy.fromiter(reduced_array, dtype=new_dtype).view(numpy.recarray)
if verbose:
stop_time = time.time()
print "RDP reduced {} data set from {} to {} points ({:3.1f}%) in {} seconds with epsilon={}.".format(old_dtype[1][0],len(rec_array),len(reduced_rec_array),100.*len(reduced_rec_array)/len(rec_array),int(round((stop_time-start_time))),epsilon)
return reduced_rec_array
[docs]def smooth_spline(rec_array, rms_error, verbose=True, **kwargs):
'''uses http://scipy.github.io/devdocs/generated/scipy.interpolate.UnivariateSpline.html with movable knots
set rms_error to change number of knots to bound smoothed data rms deviation from original data points.
rec_array is modified in place
returns number of knots used to contstruct spline'''
point_count = len(rec_array)
rss = rms_error * point_count**0.5
spl = scipy.interpolate.UnivariateSpline(x=rec_array[rec_array.dtype.names[0]],y=rec_array[rec_array.dtype.names[1]],s=rss, **kwargs)
knot_count = len(spl.get_knots())
first_x = rec_array[0][0]
last_x = rec_array[-1][0]
new_x = numpy.linspace(start=first_x, stop=last_x, num=point_count, endpoint=True, dtype=type(float()))
for i in range(point_count): #replace old array in-place
rec_array[i] = (new_x[i], spl(new_x[i]))
if verbose:
print "rms_error {} constructed {} knots from {} original data points.".format(rms_error, knot_count, point_count)
return knot_count
[docs]def smooth_filtfile(rec_array):
'''somebody finish this.
https://docs.scipy.org/doc/scipy-0.18.1/reference/generated/scipy.signal.filtfilt.html'''
[docs]def polyfit(rec_array, degree=1):
'''returns polynomial fit coefficients list, highest order first
https://docs.scipy.org/doc/numpy/reference/generated/numpy.polyfit.html
'''
return numpy.polyfit(x=rec_array[rec_array.dtype.names[0]], y=rec_array[rec_array.dtype.names[1]], deg=degree)
def _detrend(rec_array, **kwargs):
'''http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html'''
return vector_transform(rec_array, [None, lambda col: scipy.signal.detrend(data=col, **kwargs)] * (len(rec_array.dtype.names)-1))
[docs]def detrend_constant(rec_array, **kwargs):
'''Remove data mean from all columns except first one (assumed x-axis)
http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html
'''
return _detrend(rec_array, type='constant')
[docs]def detrend_linear(rec_array, **kwargs):
'''Remove data mean from all columns except first one (assumed x-axis)
http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html
'''
return _detrend(rec_array, type='linear')
[docs]def integral_nonlinearity(rec_array, lsb_size=1):
'''transform (code, voltage) data into INL
optional lsb_size argument scales y-axis data from real units to lsb count.'''
return scalar_transform(detrend_linear(rec_array), [None, lambda x: x / float(lsb_size)])
[docs]def differential_nonlinearity(rec_array, lsb_size=1):
'''transform (code, voltage) data into DNL.
optional lsb_size argument scales y-axis data from real units to lsb count.'''
return scalar_transform(vector_transform(rec_array, [lambda col: col[:-1], lambda col: numpy.diff(col)]), [None, lambda x: x / float(lsb_size)])
[docs]class US_Time_Zone(datetime.tzinfo):
'''Generic Timezone parent class.
Implements methods required by datetime.tzinfo with US DST rules (second Sunday of March and first Sunday of November).
Requires subclass to define local timezone parameters:
self.tz_name
self.tz_name_dst
self.gmt_offset
self.dst_offset
'''
[docs] def first_sunday_on_or_after(self,dt):
'''return date of first Sunday on or after dt.'''
days_to_go = 6 - dt.weekday()
#if days_to_go:
dt += datetime.timedelta(days_to_go)
return dt
[docs] def utcoffset(self, dt):
'''return DST aware offset from GMT/UTC based on calendar date.'''
return datetime.timedelta(hours=self.gmt_offset) + self.dst(dt)
[docs] def dst(self, dt):
'''return DST offset from standard local time based on calendar date.'''
# DST starts second Sunday in March
# ends first Sunday in November
self.dston = self.first_sunday_on_or_after(datetime.datetime(dt.year, 4, 8))
self.dstoff = self.first_sunday_on_or_after(datetime.datetime(dt.year, 11, 1))
if self.dston <= dt.replace(tzinfo=None) < self.dstoff:
return datetime.timedelta(hours=self.dst_offset)
else:
return datetime.timedelta(0)
[docs] def tzname(self,dt):
'''return DST aware local time zone name based on calendar date.'''
if self.dst(dt):
return self.tz_name_dst
return self.tz_name
[docs]class UTC(US_Time_Zone):
'''UTC / GMT / Zulu time zone.'''
def __init__(self):
self.tz_name = "UTC"
self.tz_name_dst = "UTC"
self.gmt_offset = +0
self.dst_offset = +0
[docs]class US_Eastern_Time(US_Time_Zone):
'''US Eastern time zone. (NYC/BOS)'''
def __init__(self):
self.tz_name = "EST"
self.tz_name_dst = "EDT"
self.gmt_offset = -5
self.dst_offset = +1
[docs]class US_Pacific_Time(US_Time_Zone):
'''US Pacific time zone. (LAX/SMF)'''
def __init__(self):
self.tz_name = "PST"
self.tz_name_dst = "PDT"
self.gmt_offset = -8
self.dst_offset = +1
[docs]def logger_time_str(datetime):
'''return time string in same format as used by lab_core.logger.
Requires timezone-aware datetime object argument to correctly convert to UTC times used by logger.
if datetime object is naieve of timezone, add it with datetime.replace(tzinfo=lab_utils.US_Eastern_Time()) or datetime.replace(tzinfo=lab_utils.UTC())
'''
return datetime.astimezone(UTC()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
class dlog(object):
def __init__(self, filename="output.txt"):
'''Open filehandle to filename. If omitted, filename defaults to "output.txt'''
self.errcnt = 0
self.f = open(filename,'w')
# note the time.clock function won't work well for linux...
# this is written for windows
self.timezero = time.clock()
# time/date stamp header
self.log_notime(time.strftime("%a, %d %b %Y %H:%M:%S"))
def __enter__(self):
return self
def __exit__(self,exc_type, exc_val, exc_tb):
self.f.close()
return None
def log_notime(self,data):
'''Write data to file with trailing newline'''
self.f.write(str(data) + "\n")
print(data)
def log(self,data):
'''Write data to file with timestamp and trailing newline'''
self.log_notime(str(time.clock()-self.timezero) + str(data))
def create_error(self):
'''This function doesn't appear to actually do much. self.errcnt is never written to the dlog.'''
self.errcnt += 1
def finish(self):
'''Write final timestamp and close filehandle'''
self.log_notime("Data log closed at {}. Elapsed time: {}".format(time.strftime("%a, %d %b %Y %H:%M:%S"), time.clock()-self.timezero))
self.f.close()
[docs]class debug_log(object):
'''Log messages into a file and optionally print to screen.'''
# This class used in most of Frank's tests.
def __init__(self, log_file_name=__name__+".log", debug=False):
self.debug = debug
self.f = open(log_file_name, "w")
self.fileno = self.f.fileno
# atexit.register(self.__del__) # Tries to close the debug_log file if the program exits for any reason.
def __enter__(self):
return self
def __exit__(self,exc_type, exc_val, exc_tb):
self.f.close()
return None
[docs] def write(self, msg):
'''Add a message to the log file. Also print the message if debug is True'''
t_str = time.asctime()
m_str = "{} :: {}\n".format(t_str, msg)
if self.debug:
print m_str,
self.f.write(m_str)
self.f.flush()
# def __del__(self):
# self.f.flush()
# os.fsync(self.fileno)
# self.f.close()
class interpolator(object):
def __init__(self):
self._points = []
self._points_ysort = []
self.y_slope = 0
def check_monotonicity(self):
if len(self._points) > 1:
x_pts = [x[0] for x in self._points]
y_pts = [y[1] for y in self._points]
self.y_slope = cmp(y_pts[1],y_pts[0])
for i in range(1,len(self._points)):
if len(x_pts) != len(set(x_pts)):
raise Exception('duplicated x column value')
if self.y_slope*y_pts[i] <= self.y_slope*y_pts[i-1]:
raise Exception('y column values are not monotonically increasing or decreasing relative to x-column values at point ({},{})'.format(self._points[i][0],self._points[i][1]))
def sort(self):
self._points.sort(key=operator.itemgetter(0)) #increasing values in x
self._points_ysort = sorted(self._points, key=operator.itemgetter(1)) #increasing values in y
def add_point(self, x_val, y_val):
self._points.append([x_val,y_val])
self.sort()
self.check_monotonicity()
def add_points(self, point_list):
'''expects 2d list of the form [[x0,y0],[x1,y1],...[xn,yn]]
the points must be strictly monotonic, but not necessarily sorted'''
for point in point_list:
self.add_point(point[0], point[1])
def find(self, key, sorted_key_list, value_list):
'''function operates independent of object internal data
expects sorted_key_list to increase strictly monotonically
will return linear combination of two values from value list weighted by distance from two enclosing points of key in sorted_key_list
'''
points = zip(sorted_key_list, value_list)
if len(points) < 2:
raise Exception('At least two points are required to define a line.')
low_pts = [pt for pt in points if pt[0] <= key]
high_pts = [pt for pt in points if pt[0] >= key]
if len(low_pts) == 0:
#no points below value; extrapolate from first two points
#print 'Bottom extrapolation'
low_pt = points[0]
high_pt = points[1]
elif len(high_pts) == 0:
#no points above value; extrapolate from last two points
#print 'Top extrapolation'
low_pt = points[-2]
high_pt = points[-1]
else:
#interpolate between points lower and higher than key argument
#print 'Interpolation'
low_pt = low_pts[-1]
high_pt = high_pts[0]
if low_pt == high_pt:
#avoid divide by 0
return low_pt[1]
else:
return low_pt[1] + (key-low_pt[0])*(float(high_pt[1]-low_pt[1])/(high_pt[0]-low_pt[0])) #prevent integer divide
def get_x_val(self, y_val):
[x_pts,y_pts] = zip(*self._points_ysort)
return self.find(y_val, y_pts, x_pts)
def get_y_val(self, x_val):
[x_pts,y_pts] = zip(*self._points)
return self.find(x_val, x_pts, y_pts)
[docs]def eng_string( x, fmt = ':.3g', si = True):
'''
Returns float/int value <x> formatted in a simplified engineering format -
using an exponent that is a multiple of 3.
format: printf-style string used to format the value before the exponent.
si: if true, use SI suffix for exponent, e.g. k instead of e3, n instead of
e-9 etc.
E.g. with format='%.2f':
1.23e-08 => 12.30e-9
123 => 123.00
1230.0 => 1.23e3
-1230000.0 => -1.23e6
and with si=True:
1230.0 => 1.23k
-1230000.0 => -1.23M
'''
assert isinstance(x,numbers.Number)
#why is this here? in case x is a string? if so, int('5e3') won't work
#if int(x) != float(x):
# x = float(x)
if x == 0:
return '{{{}}}'.format(fmt).format(x)
sign = ''
if x < 0:
x = -x
sign = '-'
exp = int( math.floor( math.log10( x)))
exp3 = exp - ( exp % 3)
x3 = x / ( 10 ** exp3)
if si and exp3 >= -24 and exp3 <= 24 and exp3 != 0:
exp3_text = u'yzafpnµm kMGTPEZY'[ ( exp3 - (-24)) / 3]
elif exp3 == 0:
exp3_text = ''
else:
exp3_text = 'e%s' % exp3
s1 = "{}{{{}}}".format(sign,fmt)
s2 = s1.format( x3 )
return u"{}{}".format(s2,exp3_text)
class ordered_pair(list):
def transform(self, x_transform = None, y_transform = None):
'''executes x_transform function on first (x) element of each ordered pair data point
executes y_transform function on second (y) element of each ordered pair data point
returns None, data changed in place
not appropriate for filtering functions that require access to adjacent (in time or space) data point values'''
if x_transform is None:
x_transform = lambda x: x
if y_transform is None:
y_transform = lambda y: y
for i in range(len(self)):
self[i] = [x_transform(self[i][0]), y_transform(self[i][1])]
def x_sql_time(self):
'''convert sqlite database datetime string in x-axis data to python datetime object'''
self.transform(x_transform=lambda t: datetime.datetime.strptime(t, '%Y-%m-%dT%H:%M:%S.%fZ'))
def x_sql_elapsed_time(self, seconds=False, minutes=False, hours=False, days=False):
'''convert sqlite database datetime string in x-axis data to python timedelta object
access properties of days, seconds (0 to 86399 inclusive) and microseconds (0 to 999999 inclusive) or method total_seconds()
optionally, instead return numeric total seconds, minutes, hours or days by setting respective argument to True'''
self.x_sql_time()
start_time = self[0][0]
self.transform(x_transform=lambda t: t - start_time)
if not (seconds or minutes or hours or days):
pass
elif seconds and not (minutes or hours or days):
self.transform(x_transform=lambda t: t.total_seconds())
elif minutes and not (seconds or hours or days):
self.transform(x_transform=lambda t: t.total_seconds() / 60.0)
elif hours and not (seconds or minutes or days):
self.transform(x_transform=lambda t: t.total_seconds() / 3600.0)
elif days and not (seconds or minutes or hours):
self.transform(x_transform=lambda t: t.total_seconds() / 86400.0)
else:
raise Exception('Specify at most one of (seconds, minutes, hours, days)')
def xscale(self, x_scale):
'''changes list in place with x points multiplied by x_scale and y points unaltered'''
self.transform(x_transform = lambda x: x * x_scale)
def yscale(self, y_scale):
'''changes list in place with x points unaltered and y points multiplied by y_scale'''
self.transform(y_transform = lambda y: y * y_scale)
def xoffset(self, x_offset):
self.transform(x_transform = lambda x: x + x_offset)
def yoffset(self, y_offset):
self.transform(y_transform = lambda y: y + y_offset)
def xyscale(self, x_scale, y_scale):
'''changes list in place with x points multiplied by x_scale and y points multiplied by y_scale'''
self.transform(x_transform = lambda x: x*x_scale, y_transform = lambda y: y*y_scale)
def truncate(self, length=None, offset=0):
orig_len = len(self)
del self[0:offset] #offset or offset+1?
if length is None:
pass
elif length > 0 and length < 1:
new_len = int(round(length*orig_len))
if new_len > len(self):
raise Exception('Record too short after offset to return {}% of original length.'.format(length*100))
del self[int(round(length*orig_len)):len(self)] #percentage of orignial data?
#xydata = xydata[:int(round(length*len(xydata)))] #percentage of offset data?
print 'Truncating record from {} to {} points starting at {}.'.format(orig_len,int(round(length*orig_len)),offset)
elif length <= len(self) and int(length) == length:
del self[length:len(self)]
print 'Truncating record from {} to {} points starting at {}.'.format(orig_len,length,offset)
else:
raise Exception('length argument should be 0-1 percentage of original record length or integer desired record length.')
def decimate(self, scale):
assert scale > 0
assert scale <= 1
old_len = len(self)
new_len = int(round(scale*old_len))
print 'Decimating record from {} to {} points.'.format(len(self),new_len)
accumulator = 0
decimated_data = []
incr = 1 - (1.0*new_len/old_len)
del_list = []
for i in range(len(self)):
accumulator += incr
if accumulator >= 1:
# print 'dropping point {}:{}'.format(i,self[i])
accumulator -= 1
del_list.append(i) #don't change list length while iterating it
while len(del_list):
del self[del_list.pop()]
def numpy_recarray(self, force_float_dtype=False, data_types=None):
'''return NumPy record array containing data.
Rows can be accessed by index, ex arr[2].
Columns can be accessed by column name attribute, ex arr.vbat.
Use with data filtering, smoothing, compressing, etc matrix operations provided by SciPy and lab_utils.transform, lab_utils.decimate.
Use automaticic column names, but force data type to float with force_float_dtype boolean argument.
Override automatic coulumn names and data types (first row) by specifying data_type iterable of (column_name,example_contents) for each column matching query order.
http://docs.scipy.org/doc/numpy-1.10.1/reference/generated/numpy.recarray.html
'''
if force_float_dtype and data_types is None:
dtype = numpy.dtype([('x',type(float())), ('y',type(float()))])
elif force_float_dtype and data_types is not None:
raise Exception('Specify only one of force_float_dtype, data_types arguments.')
elif data_types is None:
dtype = numpy.dtype([('x',type(self[0][0])), ('x',type(self[0][1]))])
else:
dtype = numpy.dtype([(column_name,type(example_contents)) for column_name,example_contents in data_types])
arr = numpy.array([tuple(row) for row in self], dtype)
return arr.view(numpy.recarray)
def ramer_douglas_peucker(self, epsilon, verbose=True, force_float_dtype=False, data_types=None):
'''reduce number of points in line-sigment curve such that reduced line segment count approximates original curve within epsilon tolerance.
https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm'''
return ramer_douglas_peucker(self.numpy_recarray(force_float_dtype, data_types), epsilon, verbose)
def _smooth(self, axis, window, extrapolation_window):
if window is None or window == 1:
return
window = int(round(window))
if window%2 == 0:
print '*** WARNING ***'
print "Even window sizes like {} have a missing centroid and will slide the data downward using this smoothing function.".format(int(window))
print "I'm incrementing the window by one to {} to correct for this.".format(int(window) + 1)
window += 1
if axis == 'y':
x = zip(*self)[0]
y = zip(*self)[1]
else:
x = zip(*self)[1]
y = zip(*self)[0]
spacing = (x[-1] - x[0]) / float(len(x) - 1)
# fit_lower and fit_upper are linear extrapolations off the left and right sides using the extrapolation_window
fit_lower = numpy.poly1d(numpy.polyfit(x = x[:extrapolation_window], y = y[:extrapolation_window], deg = 1))
fit_upper = numpy.poly1d(numpy.polyfit(x = x[-extrapolation_window:], y = y[-extrapolation_window:], deg = 1))
xpoints_lo = sorted([x[0] - spacing * (points + 1) for points in range(window)])
xpoints_hi = sorted([x[-1] + spacing * (points + 1) for points in range(window)])
values_lo = [fit_lower(point) for point in xpoints_lo]
values_hi = [fit_upper(point) for point in xpoints_hi]
data = values_lo + list(y) + values_hi # Extend data end points left/right
data = numpy.convolve(data, numpy.ones(window)/float(window), 'same') # to assist running average algorithm
data = data.ravel().tolist() # Convert array back to list
data[0:window] = [] # Strip off left padding
data[len(data)-window:] = [] # Strip off right padding
for i in range(len(self)):
if axis == 'y':
self[i] = [x[i], data[i]]
else:
self[i] = [data[i], x[i]]
def smooth_y(self, window = 5, extrapolation_window = None, iterations = 1):
'''Smooths a data set's y axis data for publication.
'window' is the size of the main filtering window.
The data is convolved with a block of '1s'.
The length of the block determines the aggressiveness of the filtering.
A large window size is like having a low frequency pole.
The larger it is the more distortion there will be.
'extrapolation_window' is the size of the window used to determine the line for linear extrapolation off the ends of the data set.
The data needs to be extended so the convolution doesn't run out of data on the ends.
The default extrapolation window is set to the main window but can be reduced to reduce distortion or more properly model the end point derivatives.
'iterations' is the number of iterations the smoothing function runs.
Increasing the iterations is like having more poles at the same frequency thereby producing essentially a 'brick wall' filter.
*****************
**** WARNING ****
*****************
Iterations should be used judiciously as it can lead to large phase shifting which moves the data a great distance on the independent axis.
It's a good idea to always plot the original data and the massaged data together to ensure that the massaged data has not been seriously distorted.'''
if extrapolation_window is None:
extrapolation_window = window
for i in range(iterations):
self._smooth(axis = 'y', window = window, extrapolation_window = extrapolation_window)
def smooth_x(self, window = 5, extrapolation_window = None, iterations = 1):
'''Smooths a data set's x axis data for publication.
'window' is the size of the main filtering window.
The data is convolved with a block of '1s'.
The length of the block determines the aggressiveness of the filtering.
A large window size is like having a low frequency pole.
The larger it is the more distortion there will be.
'extrapolation_window' is the size of the window used to determine the line for linear extrapolation off the ends of the data set.
The data needs to be extended so the convolution doesn't run out of data on the ends.
The default extrapolation window is set to the main window but can be reduced to reduce distortion or more properly model the end point derivatives.
'iterations' is the number of iterations the smoothing function runs.
Increasing the iterations is like having more poles at the same frequency thereby producing essentially a 'brick wall' filter.
*****************
**** WARNING ****
*****************
Iterations should be used judiciously as it can lead to large phase shifting which moves the data a great distance on the independent axis.
It's a good idea to always plot the original data and the massaged data together to ensure that the massaged data has not been seriously distorted.'''
if extrapolation_window is None:
extrapolation_window = window
for i in range(iterations):
self._smooth(axis = 'x', window = window, extrapolation_window = extrapolation_window)
class oscilloscope_channel(ordered_pair):
def __init__(self, time_points, channel_data):
list.__init__(self)
'''takes string data, likely from a two-column sql database query of an oscilloscope trace
and returns a list of (x,y) ordered pairs of floats appropriate for plotting or further manipulation
expects time and channel series data to be surrounded with square braces and comma separated
time_points and channel_data should be of equal length'''
xvalues = [float(x) for x in time_points.strip("[]").split(",")]
yvalues = [float(x) for x in channel_data.strip("[]").split(",")]
self.extend(zip(xvalues, yvalues))
self.array = numpy.array(zip(xvalues, yvalues), dtype=[('x', float), ('y', float)])
def to_recarray(self):
return self.array.view(numpy.recarray)
[docs]def egg_timer(timeout, message=None, length=30, display_callback=None):
'''Provides a blocking delay with a graphic to indicate progress so far so the computer doesn't look idle
optionally, display a message on the line above the timer graphic
optionally, specify a display_callback function to insert extra progress information after the timer display.
display_callback function should accept a single dictionary argument and return a string.'''
light_shade = u"▒" #\u2592
dark_shade = u"█" #\u2588
digits = len(str(int(timeout)))
longest_line_len = 0
status = {}
status['start_time'] = time.time()
status['total_time'] = timeout
status['callback_disp_str'] = ''
status['elapsed_time'] = 0
status['message'] = message
if message is not None:
print message
while status['elapsed_time'] != status['total_time']:
status['present_time'] = time.time()
status['elapsed_time'] = min(status['present_time'] - status['start_time'], status['total_time'])
status['remaining_time'] = status['total_time'] - status['elapsed_time']
status['percent_complete'] = status['elapsed_time'] / status['total_time']
complete_length = int(status['percent_complete'] * length)
status['dark'] = u"█" * complete_length
status['light'] = u"▒" * (length-complete_length)
if display_callback is not None:
status['callback_disp_str'] = display_callback(status)
print_str = u"\r║{{dark}}{{light}}║ {{remaining_time:{digits}.0f}}/{{total_time:{digits}.0f}}s remaining. ({{percent_complete:3.1%}}). {{callback_disp_str}}".format(digits=digits).format(**status) #║╠╣
if len(print_str) < longest_line_len:
pad = " " * (longest_line_len - len(print_str))
else:
pad = ""
longest_line_len = len(print_str)
print print_str + pad,
loop_time = time.time() - status['present_time']
if loop_time < 0.1:
time.sleep(0.1 - loop_time)
print
[docs]def expand_tabs(string, *column_widths, **default_column_width):
'''like string.expandtabs, but works only on a single line and allows for varying column widths.
accepts variable number of positional arguments for each column width.
accepts keyword argument "default_column_width" if not all column widths are specified.
accepts keyword argument "verbose" to warn if column width is too narrow for contents.'''
for key in default_column_width:
if key != "default_column_width" and key != "verbose":
raise Exception('"default_column_width" and "verbose" are the only allowed keyword arguments.')
columns = string.split('\t')
out_str = ''
for idx, column in enumerate(columns):
try:
column_width = column_widths[idx]
except IndexError as e:
if "default_column_width" in default_column_width:
column_width = default_column_width["default_column_width"]
else:
raise Exception('Specify width of each column or specify keyword argument "default_column_width"')
pad = column_width - len(column)
if pad < 1:
if default_column_width.get("verbose", None):
print "Column {} undersize by {}.".format(idx, 1-pad)
pad = 1
space = ' ' * pad
out_str += column + space
return out_str
class ticker(object):
def __init__(self,stock_list=None):
if stock_list is None:
self.stock_list = ['LLTC', 'ADI', 'TXN', 'AAPL', 'GOOG']
else:
self.stock_list = stock_list
def get_quote(self,symbol):
url = 'http://finance.yahoo.com/d/quotes.csv?s=+{}&f=snl1'.format(symbol)
try:
ticker, desc, price = csv.reader([urllib.urlopen(url).read()]).next()
return {'ticker':ticker.strip().replace('"',''), 'desc':desc.strip().replace('"',''), 'price':price.strip().replace('"','')}
except Exception as e:
print e
return {'ticker':None, 'desc':None, 'price':None}
def build_tape(self):
self.str = ''
for stock in self.stock_list:
data = self.get_quote(stock)
self.str += '{}: {} '.format(data['ticker'],data['price'])
return self.str
def rotate(self):
self.str = self.str[1:] + self.str[0]
return self.str
def tick(self, display_function=None, character_time=0.15, refresh_time=45):
if display_function is None:
display_function = lambda msg: self.disp(msg)
refresh_cycles = max(int(refresh_time / character_time), 1)
while True:
self.build_tape()
for i in range(refresh_cycles):
display_function(self.rotate())
time.sleep(character_time)
def disp(self, msg):
print '{}\r'.format(msg),
[docs]def modulate(data1, data2):
'''data1 and data2 are tuples of x and y data that may not have the same number of 'x' values. The result is interpolated up to the higher of the two.'''
independent = []
product = []
if len(data1) > len(data2):
for value in data1:
xvalue = value[0]
data2_value = numpy.interp(xvalue, zip(*data2)[0], zip(*data2)[1])
independent.append(xvalue)
product.append(value[1] * data2_value)
else:
for value in data2:
xvalue = value[0]
data1_value = numpy.interp(xvalue, zip(*data1)[0], zip(*data1)[1])
independent.append(xvalue)
product.append(value[1] * data1_value)
return zip(independent, product)
[docs]def delete_file(filename, max_tries=20, retry_delay=5):
"""Tries to delete a file, retrying if the file is locked (e.g. because it is open
in Notepad++, SQLite Manager, or another program), failing gracefully if the file
doesn't (yet) exist. Gives up after a number of retries and raises RuntimeError.
Good for removing stale sqlite DBs and log files from old runs."""
try:
f_stat = os.stat(filename) # See if file already exists.
# If not, an exception is thrown and we GOTO the "except OSError:" below.
# All code from here to the "except OSError:"
# is only executed if the file actually exists.
tries = max_tries
while tries > 0:
try:
os.remove(filename)
print "Removed prior run file {}".format(filename)
break
except OSError:
print "Unable to remove stale file {} --- RETRYING in {} secs".format(filename, retry_delay)
tries = tries - 1
time.sleep(retry_delay)
else:
print "Giving up!"
raise RuntimeError
except OSError:
print "No prior", filename, "to remove."
if __name__ == "__main__":
pass