1
2 """Easy to use object-oriented thread pool framework.
3
4 A thread pool is an object that maintains a pool of worker threads to perform
5 time consuming operations in parallel. It assigns jobs to the threads
6 by putting them in a work request queue, where they are picked up by the
7 next available thread. This then performs the requested operation in the
8 background and puts the results in another queue.
9
10 The thread pool object can then collect the results from all threads from
11 this queue as soon as they become available or after all threads have
12 finished their work. It's also possible, to define callbacks to handle
13 each result as it comes in.
14
15 The basic concept and some code was taken from the book "Python in a Nutshell,
16 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
17 14.5 "Threaded Program Architecture". I wrapped the main program logic in the
18 ThreadPool class, added the WorkRequest class and the callback system and
19 tweaked the code here and there. Kudos also to Florent Aide for the exception
20 handling mechanism.
21
22 Basic usage::
23
24 >>> pool = ThreadPool(poolsize)
25 >>> requests = makeRequests(some_callable, list_of_args, callback)
26 >>> [pool.putRequest(req) for req in requests]
27 >>> pool.wait()
28
29 See the end of the module code for a brief, annotated usage example.
30
31 Website : http://chrisarndt.de/projects/threadpool/
32
33 """
34 __docformat__ = "restructuredtext en"
35
36 __all__ = [
37 'makeRequests',
38 'NoResultsPending',
39 'NoWorkersAvailable',
40 'ThreadPool',
41 'WorkRequest',
42 'WorkerThread'
43 ]
44
45 __author__ = "Christopher Arndt"
46 __revision__ = "$Revision: 413 $"
47 __date__ = "$Date: 2009-10-06 15:55:39 +0200 (Tue, 06 Oct 2009) $"
48 from release import license as __license__, version as __version__
49
50
51
52 import sys
53 import threading
54 import Queue
55 import traceback
56
57
58
60 """All work requests have been processed."""
61 pass
62
64 """No worker threads available to process remaining requests."""
65 pass
66
67
68
70 """Default exception handler callback function.
71
72 This just prints the exception info via ``traceback.print_exception``.
73
74 """
75 traceback.print_exception(*exc_info)
76
77
78
79 -def makeRequests(callable_, args_list, callback=None,
80 exc_callback=_handle_thread_exception):
81 """Create several work requests for same callable with different arguments.
82
83 Convenience function for creating several work requests for the same
84 callable where each invocation of the callable receives different values
85 for its arguments.
86
87 ``args_list`` contains the parameters for each invocation of callable.
88 Each item in ``args_list`` should be either a 2-item tuple of the list of
89 positional arguments and a dictionary of keyword arguments or a single,
90 non-tuple argument.
91
92 See docstring for ``WorkRequest`` for info on ``callback`` and
93 ``exc_callback``.
94
95 """
96 requests = []
97 for item in args_list:
98 if isinstance(item, tuple):
99 requests.append(
100 WorkRequest(callable_, item[0], item[1], callback=callback,
101 exc_callback=exc_callback)
102 )
103 else:
104 requests.append(
105 WorkRequest(callable_, [item], None, callback=callback,
106 exc_callback=exc_callback)
107 )
108 return requests
109
110
111
113 """Background thread connected to the requests/results queues.
114
115 A worker thread sits in the background and picks up work requests from
116 one queue and puts the results in another until it is dismissed.
117
118 """
119
120 - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
121 """Set up thread in daemonic mode and start it immediatedly.
122
123 ``requests_queue`` and ``results_queue`` are instances of
124 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
125 worker thread.
126
127 """
128 threading.Thread.__init__(self, **kwds)
129 self.setDaemon(1)
130 self._requests_queue = requests_queue
131 self._results_queue = results_queue
132 self._poll_timeout = poll_timeout
133 self._dismissed = threading.Event()
134 self.start()
135
137 """Repeatedly process the job queue until told to exit."""
138 while True:
139 if self._dismissed.isSet():
140
141 break
142
143
144
145 try:
146 request = self._requests_queue.get(True, self._poll_timeout)
147 except Queue.Empty:
148 continue
149 else:
150 if self._dismissed.isSet():
151
152 self._requests_queue.put(request)
153 break
154 try:
155 result = request.callable(*request.args, **request.kwds)
156 self._results_queue.put((request, result))
157 except:
158 request.exception = True
159 self._results_queue.put((request, sys.exc_info()))
160
162 """Sets a flag to tell the thread to exit when done with current job."""
163 self._dismissed.set()
164
165
167 """A request to execute a callable for putting in the request queue later.
168
169 See the module function ``makeRequests`` for the common case
170 where you want to build several ``WorkRequest`` objects for the same
171 callable but with different arguments for each call.
172
173 """
174
175 - def __init__(self, callable_, args=None, kwds=None, requestID=None,
176 callback=None, exc_callback=_handle_thread_exception):
177 """Create a work request for a callable and attach callbacks.
178
179 A work request consists of the a callable to be executed by a
180 worker thread, a list of positional arguments, a dictionary
181 of keyword arguments.
182
183 A ``callback`` function can be specified, that is called when the
184 results of the request are picked up from the result queue. It must
185 accept two anonymous arguments, the ``WorkRequest`` object and the
186 results of the callable, in that order. If you want to pass additional
187 information to the callback, just stick it on the request object.
188
189 You can also give custom callback for when an exception occurs with
190 the ``exc_callback`` keyword parameter. It should also accept two
191 anonymous arguments, the ``WorkRequest`` and a tuple with the exception
192 details as returned by ``sys.exc_info()``. The default implementation
193 of this callback just prints the exception info via
194 ``traceback.print_exception``. If you want no exception handler
195 callback, just pass in ``None``.
196
197 ``requestID``, if given, must be hashable since it is used by
198 ``ThreadPool`` object to store the results of that work request in a
199 dictionary. It defaults to the return value of ``id(self)``.
200
201 """
202 if requestID is None:
203 self.requestID = id(self)
204 else:
205 try:
206 self.requestID = hash(requestID)
207 except TypeError:
208 raise TypeError("requestID must be hashable.")
209 self.exception = False
210 self.callback = callback
211 self.exc_callback = exc_callback
212 self.callable = callable_
213 self.args = args or []
214 self.kwds = kwds or {}
215
217 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
218 (self.requestID, self.args, self.kwds, self.exception)
219
221 """A thread pool, distributing work requests and collecting results.
222
223 See the module docstring for more information.
224
225 """
226
227 - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
228 """Set up the thread pool and start num_workers worker threads.
229
230 ``num_workers`` is the number of worker threads to start initially.
231
232 If ``q_size > 0`` the size of the work *request queue* is limited and
233 the thread pool blocks when the queue is full and it tries to put
234 more work requests in it (see ``putRequest`` method), unless you also
235 use a positive ``timeout`` value for ``putRequest``.
236
237 If ``resq_size > 0`` the size of the *results queue* is limited and the
238 worker threads will block when the queue is full and they try to put
239 new results in it.
240
241 .. warning:
242 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
243 the possibilty of a deadlock, when the results queue is not pulled
244 regularly and too many jobs are put in the work requests queue.
245 To prevent this, always set ``timeout > 0`` when calling
246 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
247
248 """
249 self._requests_queue = Queue.Queue(q_size)
250 self._results_queue = Queue.Queue(resq_size)
251 self.workers = []
252 self.dismissedWorkers = []
253 self.workRequests = {}
254 self.createWorkers(num_workers, poll_timeout)
255
257 """Add num_workers worker threads to the pool.
258
259 ``poll_timout`` sets the interval in seconds (int or float) for how
260 ofte threads should check whether they are dismissed, while waiting for
261 requests.
262
263 """
264 for i in range(num_workers):
265 self.workers.append(WorkerThread(self._requests_queue,
266 self._results_queue, poll_timeout=poll_timeout))
267
269 """Tell num_workers worker threads to quit after their current task."""
270 dismiss_list = []
271 for i in range(min(num_workers, len(self.workers))):
272 worker = self.workers.pop()
273 worker.dismiss()
274 dismiss_list.append(worker)
275
276 if do_join:
277 for worker in dismiss_list:
278 worker.join()
279 else:
280 self.dismissedWorkers.extend(dismiss_list)
281
283 """Perform Thread.join() on all worker threads that have been dismissed.
284 """
285 for worker in self.dismissedWorkers:
286 worker.join()
287 self.dismissedWorkers = []
288
289 - def putRequest(self, request, block=True, timeout=None):
290 """Put work request into work queue and save its id for later."""
291 assert isinstance(request, WorkRequest)
292
293 assert not getattr(request, 'exception', None)
294 self._requests_queue.put(request, block, timeout)
295 self.workRequests[request.requestID] = request
296
297 - def poll(self, block=False):
298 """Process any new results in the queue."""
299 while True:
300
301 if not self.workRequests:
302 raise NoResultsPending
303
304 elif block and not self.workers:
305 raise NoWorkersAvailable
306 try:
307
308 request, result = self._results_queue.get(block=block)
309
310 if request.exception and request.exc_callback:
311 request.exc_callback(request, result)
312
313 if request.callback and not \
314 (request.exception and request.exc_callback):
315 request.callback(request, result)
316 del self.workRequests[request.requestID]
317 except Queue.Empty:
318 break
319
321 """Wait for results, blocking until all have arrived."""
322 while 1:
323 try:
324 self.poll(True)
325 except NoResultsPending:
326 break
327
328
329
330
331
332
333 if __name__ == '__main__':
334 import random
335 import time
336
337
339 time.sleep(random.randint(1,5))
340 result = round(random.random() * data, 5)
341
342 if result > 5:
343 raise RuntimeError("Something extraordinary happened!")
344 return result
345
346
348 print "**** Result from request #%s: %r" % (request.requestID, result)
349
350
351
353 if not isinstance(exc_info, tuple):
354
355 print request
356 print exc_info
357 raise SystemExit
358 print "**** Exception occured in request #%s: %s" % \
359 (request.requestID, exc_info)
360
361
362 data = [random.randint(1,10) for i in range(20)]
363
364 requests = makeRequests(do_something, data, print_result, handle_exception)
365
366
367
368
369
370 data = [((random.randint(1,10),), {}) for i in range(20)]
371 requests.extend(
372 makeRequests(do_something, data, print_result, handle_exception)
373
374
375
376 )
377
378
379 print "Creating thread pool with 3 worker threads."
380 main = ThreadPool(3)
381
382
383 for req in requests:
384 main.putRequest(req)
385 print "Work request #%s added." % req.requestID
386
387
388
389
390
391
392
393
394
395 i = 0
396 while True:
397 try:
398 time.sleep(0.5)
399 main.poll()
400 print "Main thread working...",
401 print "(active worker threads: %i)" % (threading.activeCount()-1, )
402 if i == 10:
403 print "**** Adding 3 more worker threads..."
404 main.createWorkers(3)
405 if i == 20:
406 print "**** Dismissing 2 worker threads..."
407 main.dismissWorkers(2)
408 i += 1
409 except KeyboardInterrupt:
410 print "**** Interrupted!"
411 break
412 except NoResultsPending:
413 print "**** No pending results."
414 break
415 if main.dismissedWorkers:
416 print "Joining all dismissed worker threads..."
417 main.joinAllDismissedWorkers()
418