Package papyros
[hide private]
[frames] | no frames]

Source Code for Package papyros

  1  '''A small platform independent parallel processing package. 
  2   
  3  This package provides a master-slave parallel processing model in Python. 
  4  Clients can submit jobs to a L{Master} object which is monitored by one or more 
  5  L{Slave} objects that do the real work and return the processed L{Job}s back to 
  6  the master. Two main implementations are currently provided, one L{multithreaded} 
  7  and one L{distributed} in one or more hosts (through U{Pyro <http://pyro.sourceforge.net/>}). 
  8  ''' 
  9   
 10  __all__ = ['Master', 'Slave', 'Job', 'UnprocessedJobError'] 
 11   
 12  import os 
 13  import sys 
 14  import Queue 
 15  import logging, logging.config 
 16  from cStringIO import StringIO 
 17  from traceback import print_exception 
 18   
 19  log = logging.getLogger('papyros') 
 20   
21 -def enableLogging():
22 logging.config.fileConfig(os.path.join(os.path.dirname(__file__), 23 'logging.conf'))
24
25 -def synchronized(f):
26 '''A synchronized method decorator''' 27 return f 28 def wrapper(self, *args, **kwargs): 29 try: lock = self.__lock 30 except AttributeError: # first time use 31 lock = self.__dict__.setdefault('__lock', RLock()) 32 lock.acquire() 33 try: return f(self, *args, **kwargs) 34 finally: lock.release()
35 return wrapper 36 37
38 -class Master(object):
39 'A L{Job} dispatcher object, controlling a set of L{slaves <Slave>}.' 40
41 - def __init__(self, input_size=0, output_size=0):
42 '''Initialize this master. 43 44 @param input_size: If a positive integer, it's the maximum number 45 of unassigned jobs. Trying to L{add <addJob>} a new L{Job} 46 when the queue is full blocks or raises C{Queue.Full} exception. 47 @param output_size: If a positive integer, it's the maximum number 48 of completed jobs waiting to be fetched. No more jobs are assigned 49 to the slaves when this number is reached. 50 ''' 51 self._inProgressJobs = set() 52 self._unassignedJobs = Queue.Queue(input_size) 53 self._processedJobs = Queue.Queue(output_size)
54
55 - def addJob(self, job, timeout=None):
56 '''Add a L{Job} to the input queue. 57 58 @param timeout: If the input queue is full and C{timeout is None}, block 59 until a slot becomes available. If C{timeout > 0}, block for up to 60 C{timeout} seconds and raise C{Queue.Full} exception if the queue is 61 still full. If C{timeout <= 0}, do not block and raise C{Queue.Full} 62 immediately if the queue is full. 63 ''' 64 self._unassignedJobs.put(job, timeout is None or timeout>0, timeout) 65 log.debug('Pushed job %s to the input queue' % job)
66
67 - def cancelAllJobs(self):
68 '''Cancel all unassigned jobs.''' 69 while True: 70 try: job = self._unassignedJobs.get(timeout=0) 71 except Queue.Empty: break 72 log.debug('Cancelled job %s' % job)
73
74 - def numPendingJobs(self):
75 'Return the approximate number of pending (waiting or in progress) jobs.' 76 return self._unassignedJobs.qsize() + len(self._inProgressJobs)
77
78 - def popProcessedJob(self, timeout=None):
79 '''Pop the next processed L{Job} from the output queue. 80 81 If there are no pending jobs, it returns C{None}. Otherwise: 82 - If C{timeout is None}, block until a job has finished and return it. 83 - If C{timeout <= 0}, return the first finished job that is 84 immediately available without blocking, or C{None} otherwise. 85 - If C{timeout > 0}, wait up to C{timeout} seconds for a job to 86 finish and return it; return C{None} if no job has finished by 87 the deadline. 88 89 @returns: The next processed L{Job} or C{None} if there is no available 90 for the given C{timeout}. 91 ''' 92 if not self.numPendingJobs(): 93 return None 94 block = timeout is None or timeout>0 95 try: job = self._processedJobs.get(block, timeout) 96 except Queue.Empty: 97 return None 98 log.debug('Popped job %s from the output queue' % job) 99 assert job is not None 100 self._inProgressJobs.remove(job) 101 return job
102
103 - def processedJobs(self, timeout=None):
104 '''Return a list of processed jobs. 105 106 @param timeout: If C{timeout is None} or C{timeout <= 0}, it is 107 equivalent to keep calling L{popProcessedJob} until it returns None. 108 If C{timeout > 0}, this is the maximum overall time to spend on 109 collecting processed jobs. 110 ''' 111 if timeout is None or timeout <= 0: 112 return list(iter(lambda: self.popProcessedJob(timeout), None)) 113 from time import time 114 end = time() + timeout 115 processed = [] 116 while timeout > 0: 117 job = self.popProcessedJob(timeout) 118 if job is None: 119 break 120 processed.append(job) 121 timeout = end - time() 122 return processed
123
124 - def _assignJob(self, poll_time=10):
125 '''Pop the next unassigned non-cancelled job. 126 127 If there are no assigned jobs, keep polling every C{poll_time} seconds 128 until one comes in. If the job has been cancelled, it is silently 129 discarded. This method should only be called by a L{Slave} of this master. 130 ''' 131 while True: 132 # blocks here for poll_time seconds if there are no assigned jobs 133 try: job = self._unassignedJobs.get(timeout=poll_time) 134 except Queue.Empty: 135 return None 136 log.debug('Popped job %s from the input queue' % job) 137 self._inProgressJobs.add(job) 138 return job
139
140 - def _getProcessedJob(self, job):
141 '''Add a processed job in the output queue. 142 143 This method should only be called by a L{Slave} of this master. 144 ''' 145 # blocks here if outputQueue is full 146 self._processedJobs.put(job) 147 log.debug('Pushed job %s to the output queue' % job)
148 149
150 -class Slave(object):
151 '''Abstract Slave class. 152 153 A L{Slave} sits in a loop waiting for L{Job}s from its L{Master}, picks the 154 next available job, processes it, sends it back to the L{Master} and all 155 over again. 156 '''
157 - def __init__(self, master, poll_time=1):
158 ''' 159 @param master: The L{Master} this slaves is communicating with. 160 @param poll_time: How often should this slave poll its master for a new 161 L{Job}. 162 ''' 163 if poll_time <= 0: 164 raise ValueError('Poll time must be positive') 165 self._master = master 166 self._pollTime = poll_time
167
168 - def run(self):
169 while True: 170 self.process()
171
172 - def process(self):
173 '''Fetch the next available unassigned job from the master, process it 174 and send it back. 175 176 @returns: The processed job, or None if no job was assigned within 177 C{poll_time} seconds. 178 ''' 179 job = self._master._assignJob(self._pollTime) 180 if job is not None: 181 job._process() 182 self._master._getProcessedJob(job) 183 log.debug('Finished job %s' % job) 184 return job
185 186
187 -class Job(object):
188 '''Abstract base class of a callable to be called later. 189 190 It stores the result or raised exception of the last time it is called. 191 ''' 192 __counter = 0 193 194 @synchronized
195 - def __init__(self, *args, **kwds):
196 self.args = args 197 self.kwds = kwds 198 self.__exception = None 199 Job.__counter += 1 200 self.__id = Job.__counter
201
202 - def __hash__(self):
203 return self.__id
204
205 - def __eq__(self, other):
206 try: return self.__id == other.__id 207 except AttributeError: return False
208
209 - def __call__(self, *args, **kwds):
210 '''Abstract method; to be implemented by subclasses.''' 211 raise NotImplementedError('Abstract method')
212 213 @property
214 - def result(self):
215 '''Return the computed result for this processed job. 216 217 If the callable had risen an exception, it is reraised here. The original 218 traceback is also available as C{exc.__traceback__}. 219 220 If this job has not been processed yet, it raises L{UnprocessedJobError}. 221 ''' 222 if self.__exception is not None: 223 raise self.__exception 224 try: return self.__result 225 except AttributeError: 226 raise UnprocessedJobError()
227
228 - def _process(self):
229 '''Execute this job and store the result or raised exception. 230 231 To be called by L{Slave} instances. 232 ''' 233 log.debug('Ready to process job %s' % self) 234 try: 235 self.__result = self(*self.args, **self.kwds) 236 except Exception, ex: 237 type, value, traceback = sys.exc_info() 238 out = StringIO() 239 # omit the current level of the traceback; start from the next 240 print_exception(type, value, traceback.tb_next, file=out) 241 # XXX: something like this will be done automatically if PEP 344 is accepted 242 ex.traceback = out.getvalue() 243 self.__exception = ex 244 log.debug('Failed to process job %s' % self) 245 else: 246 self.__exception = None 247 log.debug('Job %s was processed successfully' % self)
248 249
250 -class UnprocessedJobError(Exception):
251 '''Raised when attempting to get the result of a L{Job} that has not been 252 processed yet. 253 '''
254