1 '''A small platform independent parallel processing package.
2
3 This package provides a master-slave parallel processing model in Python.
4 Clients can submit jobs to a L{Master} object which is monitored by one or more
5 L{Slave} objects that do the real work and return the processed L{Job}s back to
6 the master. Two main implementations are currently provided, one L{multithreaded}
7 and one L{distributed} in one or more hosts (through U{Pyro <http://pyro.sourceforge.net/>}).
8 '''
9
10 __all__ = ['Master', 'Slave', 'Job', 'UnprocessedJobError']
11
12 import os
13 import sys
14 import Queue
15 import logging, logging.config
16 from cStringIO import StringIO
17 from traceback import print_exception
18
19 log = logging.getLogger('papyros')
20
22 logging.config.fileConfig(os.path.join(os.path.dirname(__file__),
23 'logging.conf'))
24
26 '''A synchronized method decorator'''
27 return f
28 def wrapper(self, *args, **kwargs):
29 try: lock = self.__lock
30 except AttributeError:
31 lock = self.__dict__.setdefault('__lock', RLock())
32 lock.acquire()
33 try: return f(self, *args, **kwargs)
34 finally: lock.release()
35 return wrapper
36
37
39 'A L{Job} dispatcher object, controlling a set of L{slaves <Slave>}.'
40
41 - def __init__(self, input_size=0, output_size=0):
42 '''Initialize this master.
43
44 @param input_size: If a positive integer, it's the maximum number
45 of unassigned jobs. Trying to L{add <addJob>} a new L{Job}
46 when the queue is full blocks or raises C{Queue.Full} exception.
47 @param output_size: If a positive integer, it's the maximum number
48 of completed jobs waiting to be fetched. No more jobs are assigned
49 to the slaves when this number is reached.
50 '''
51 self._inProgressJobs = set()
52 self._unassignedJobs = Queue.Queue(input_size)
53 self._processedJobs = Queue.Queue(output_size)
54
55 - def addJob(self, job, timeout=None):
56 '''Add a L{Job} to the input queue.
57
58 @param timeout: If the input queue is full and C{timeout is None}, block
59 until a slot becomes available. If C{timeout > 0}, block for up to
60 C{timeout} seconds and raise C{Queue.Full} exception if the queue is
61 still full. If C{timeout <= 0}, do not block and raise C{Queue.Full}
62 immediately if the queue is full.
63 '''
64 self._unassignedJobs.put(job, timeout is None or timeout>0, timeout)
65 log.debug('Pushed job %s to the input queue' % job)
66
68 '''Cancel all unassigned jobs.'''
69 while True:
70 try: job = self._unassignedJobs.get(timeout=0)
71 except Queue.Empty: break
72 log.debug('Cancelled job %s' % job)
73
75 'Return the approximate number of pending (waiting or in progress) jobs.'
76 return self._unassignedJobs.qsize() + len(self._inProgressJobs)
77
79 '''Pop the next processed L{Job} from the output queue.
80
81 If there are no pending jobs, it returns C{None}. Otherwise:
82 - If C{timeout is None}, block until a job has finished and return it.
83 - If C{timeout <= 0}, return the first finished job that is
84 immediately available without blocking, or C{None} otherwise.
85 - If C{timeout > 0}, wait up to C{timeout} seconds for a job to
86 finish and return it; return C{None} if no job has finished by
87 the deadline.
88
89 @returns: The next processed L{Job} or C{None} if there is no available
90 for the given C{timeout}.
91 '''
92 if not self.numPendingJobs():
93 return None
94 block = timeout is None or timeout>0
95 try: job = self._processedJobs.get(block, timeout)
96 except Queue.Empty:
97 return None
98 log.debug('Popped job %s from the output queue' % job)
99 assert job is not None
100 self._inProgressJobs.remove(job)
101 return job
102
104 '''Return a list of processed jobs.
105
106 @param timeout: If C{timeout is None} or C{timeout <= 0}, it is
107 equivalent to keep calling L{popProcessedJob} until it returns None.
108 If C{timeout > 0}, this is the maximum overall time to spend on
109 collecting processed jobs.
110 '''
111 if timeout is None or timeout <= 0:
112 return list(iter(lambda: self.popProcessedJob(timeout), None))
113 from time import time
114 end = time() + timeout
115 processed = []
116 while timeout > 0:
117 job = self.popProcessedJob(timeout)
118 if job is None:
119 break
120 processed.append(job)
121 timeout = end - time()
122 return processed
123
125 '''Pop the next unassigned non-cancelled job.
126
127 If there are no assigned jobs, keep polling every C{poll_time} seconds
128 until one comes in. If the job has been cancelled, it is silently
129 discarded. This method should only be called by a L{Slave} of this master.
130 '''
131 while True:
132
133 try: job = self._unassignedJobs.get(timeout=poll_time)
134 except Queue.Empty:
135 return None
136 log.debug('Popped job %s from the input queue' % job)
137 self._inProgressJobs.add(job)
138 return job
139
141 '''Add a processed job in the output queue.
142
143 This method should only be called by a L{Slave} of this master.
144 '''
145
146 self._processedJobs.put(job)
147 log.debug('Pushed job %s to the output queue' % job)
148
149
151 '''Abstract Slave class.
152
153 A L{Slave} sits in a loop waiting for L{Job}s from its L{Master}, picks the
154 next available job, processes it, sends it back to the L{Master} and all
155 over again.
156 '''
157 - def __init__(self, master, poll_time=1):
158 '''
159 @param master: The L{Master} this slaves is communicating with.
160 @param poll_time: How often should this slave poll its master for a new
161 L{Job}.
162 '''
163 if poll_time <= 0:
164 raise ValueError('Poll time must be positive')
165 self._master = master
166 self._pollTime = poll_time
167
171
173 '''Fetch the next available unassigned job from the master, process it
174 and send it back.
175
176 @returns: The processed job, or None if no job was assigned within
177 C{poll_time} seconds.
178 '''
179 job = self._master._assignJob(self._pollTime)
180 if job is not None:
181 job._process()
182 self._master._getProcessedJob(job)
183 log.debug('Finished job %s' % job)
184 return job
185
186
188 '''Abstract base class of a callable to be called later.
189
190 It stores the result or raised exception of the last time it is called.
191 '''
192 __counter = 0
193
194 @synchronized
196 self.args = args
197 self.kwds = kwds
198 self.__exception = None
199 Job.__counter += 1
200 self.__id = Job.__counter
201
204
206 try: return self.__id == other.__id
207 except AttributeError: return False
208
210 '''Abstract method; to be implemented by subclasses.'''
211 raise NotImplementedError('Abstract method')
212
213 @property
215 '''Return the computed result for this processed job.
216
217 If the callable had risen an exception, it is reraised here. The original
218 traceback is also available as C{exc.__traceback__}.
219
220 If this job has not been processed yet, it raises L{UnprocessedJobError}.
221 '''
222 if self.__exception is not None:
223 raise self.__exception
224 try: return self.__result
225 except AttributeError:
226 raise UnprocessedJobError()
227
229 '''Execute this job and store the result or raised exception.
230
231 To be called by L{Slave} instances.
232 '''
233 log.debug('Ready to process job %s' % self)
234 try:
235 self.__result = self(*self.args, **self.kwds)
236 except Exception, ex:
237 type, value, traceback = sys.exc_info()
238 out = StringIO()
239
240 print_exception(type, value, traceback.tb_next, file=out)
241
242 ex.traceback = out.getvalue()
243 self.__exception = ex
244 log.debug('Failed to process job %s' % self)
245 else:
246 self.__exception = None
247 log.debug('Job %s was processed successfully' % self)
248
249
251 '''Raised when attempting to get the result of a L{Job} that has not been
252 processed yet.
253 '''
254