Module threadpool
[frames] | no frames]

Source Code for Module threadpool

  1  # -*- coding: UTF-8 -*- 
  2  """Easy to use object-oriented thread pool framework. 
  3   
  4  A thread pool is an object that maintains a pool of worker threads to perform 
  5  time consuming operations in parallel. It assigns jobs to the threads 
  6  by putting them in a work request queue, where they are picked up by the 
  7  next available thread. This then performs the requested operation in the 
  8  background and puts the results in another queue. 
  9   
 10  The thread pool object can then collect the results from all threads from 
 11  this queue as soon as they become available or after all threads have 
 12  finished their work. It's also possible, to define callbacks to handle 
 13  each result as it comes in. 
 14   
 15  The basic concept and some code was taken from the book "Python in a Nutshell, 
 16  2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section 
 17  14.5 "Threaded Program Architecture". I wrapped the main program logic in the 
 18  ThreadPool class, added the WorkRequest class and the callback system and 
 19  tweaked the code here and there. Kudos also to Florent Aide for the exception 
 20  handling mechanism. 
 21   
 22  Basic usage:: 
 23   
 24      >>> pool = ThreadPool(poolsize) 
 25      >>> requests = makeRequests(some_callable, list_of_args, callback) 
 26      >>> [pool.putRequest(req) for req in requests] 
 27      >>> pool.wait() 
 28   
 29  See the end of the module code for a brief, annotated usage example. 
 30   
 31  Website : http://chrisarndt.de/projects/threadpool/ 
 32   
 33  """ 
 34  __docformat__ = "restructuredtext en" 
 35   
 36  __all__ = [ 
 37      'makeRequests', 
 38      'NoResultsPending', 
 39      'NoWorkersAvailable', 
 40      'ThreadPool', 
 41      'WorkRequest', 
 42      'WorkerThread' 
 43  ] 
 44   
 45  __author__ = "Christopher Arndt" 
 46  __revision__ = "$Revision: 413 $" 
 47  __date__ = "$Date: 2009-10-06 15:55:39 +0200 (Tue, 06 Oct 2009) $" 
 48  from release import license as __license__, version as __version__ 
 49   
 50   
 51  # standard library modules 
 52  import sys 
 53  import threading 
 54  import Queue 
 55  import traceback 
 56   
 57   
 58  # exceptions 
59 -class NoResultsPending(Exception):
60 """All work requests have been processed.""" 61 pass
62
63 -class NoWorkersAvailable(Exception):
64 """No worker threads available to process remaining requests.""" 65 pass
66 67 68 # internal module helper functions
69 -def _handle_thread_exception(request, exc_info):
70 """Default exception handler callback function. 71 72 This just prints the exception info via ``traceback.print_exception``. 73 74 """ 75 traceback.print_exception(*exc_info)
76 77 78 # utility functions
79 -def makeRequests(callable_, args_list, callback=None, 80 exc_callback=_handle_thread_exception):
81 """Create several work requests for same callable with different arguments. 82 83 Convenience function for creating several work requests for the same 84 callable where each invocation of the callable receives different values 85 for its arguments. 86 87 ``args_list`` contains the parameters for each invocation of callable. 88 Each item in ``args_list`` should be either a 2-item tuple of the list of 89 positional arguments and a dictionary of keyword arguments or a single, 90 non-tuple argument. 91 92 See docstring for ``WorkRequest`` for info on ``callback`` and 93 ``exc_callback``. 94 95 """ 96 requests = [] 97 for item in args_list: 98 if isinstance(item, tuple): 99 requests.append( 100 WorkRequest(callable_, item[0], item[1], callback=callback, 101 exc_callback=exc_callback) 102 ) 103 else: 104 requests.append( 105 WorkRequest(callable_, [item], None, callback=callback, 106 exc_callback=exc_callback) 107 ) 108 return requests
109 110 111 # classes
112 -class WorkerThread(threading.Thread):
113 """Background thread connected to the requests/results queues. 114 115 A worker thread sits in the background and picks up work requests from 116 one queue and puts the results in another until it is dismissed. 117 118 """ 119
120 - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
121 """Set up thread in daemonic mode and start it immediatedly. 122 123 ``requests_queue`` and ``results_queue`` are instances of 124 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new 125 worker thread. 126 127 """ 128 threading.Thread.__init__(self, **kwds) 129 self.setDaemon(1) 130 self._requests_queue = requests_queue 131 self._results_queue = results_queue 132 self._poll_timeout = poll_timeout 133 self._dismissed = threading.Event() 134 self.start()
135
136 - def run(self):
137 """Repeatedly process the job queue until told to exit.""" 138 while True: 139 if self._dismissed.isSet(): 140 # we are dismissed, break out of loop 141 break 142 # get next work request. If we don't get a new request from the 143 # queue after self._poll_timout seconds, we jump to the start of 144 # the while loop again, to give the thread a chance to exit. 145 try: 146 request = self._requests_queue.get(True, self._poll_timeout) 147 except Queue.Empty: 148 continue 149 else: 150 if self._dismissed.isSet(): 151 # we are dismissed, put back request in queue and exit loop 152 self._requests_queue.put(request) 153 break 154 try: 155 result = request.callable(*request.args, **request.kwds) 156 self._results_queue.put((request, result)) 157 except: 158 request.exception = True 159 self._results_queue.put((request, sys.exc_info()))
160
161 - def dismiss(self):
162 """Sets a flag to tell the thread to exit when done with current job.""" 163 self._dismissed.set()
164 165
166 -class WorkRequest:
167 """A request to execute a callable for putting in the request queue later. 168 169 See the module function ``makeRequests`` for the common case 170 where you want to build several ``WorkRequest`` objects for the same 171 callable but with different arguments for each call. 172 173 """ 174
175 - def __init__(self, callable_, args=None, kwds=None, requestID=None, 176 callback=None, exc_callback=_handle_thread_exception):
177 """Create a work request for a callable and attach callbacks. 178 179 A work request consists of the a callable to be executed by a 180 worker thread, a list of positional arguments, a dictionary 181 of keyword arguments. 182 183 A ``callback`` function can be specified, that is called when the 184 results of the request are picked up from the result queue. It must 185 accept two anonymous arguments, the ``WorkRequest`` object and the 186 results of the callable, in that order. If you want to pass additional 187 information to the callback, just stick it on the request object. 188 189 You can also give custom callback for when an exception occurs with 190 the ``exc_callback`` keyword parameter. It should also accept two 191 anonymous arguments, the ``WorkRequest`` and a tuple with the exception 192 details as returned by ``sys.exc_info()``. The default implementation 193 of this callback just prints the exception info via 194 ``traceback.print_exception``. If you want no exception handler 195 callback, just pass in ``None``. 196 197 ``requestID``, if given, must be hashable since it is used by 198 ``ThreadPool`` object to store the results of that work request in a 199 dictionary. It defaults to the return value of ``id(self)``. 200 201 """ 202 if requestID is None: 203 self.requestID = id(self) 204 else: 205 try: 206 self.requestID = hash(requestID) 207 except TypeError: 208 raise TypeError("requestID must be hashable.") 209 self.exception = False 210 self.callback = callback 211 self.exc_callback = exc_callback 212 self.callable = callable_ 213 self.args = args or [] 214 self.kwds = kwds or {}
215
216 - def __str__(self):
217 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 218 (self.requestID, self.args, self.kwds, self.exception)
219
220 -class ThreadPool:
221 """A thread pool, distributing work requests and collecting results. 222 223 See the module docstring for more information. 224 225 """ 226
227 - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
228 """Set up the thread pool and start num_workers worker threads. 229 230 ``num_workers`` is the number of worker threads to start initially. 231 232 If ``q_size > 0`` the size of the work *request queue* is limited and 233 the thread pool blocks when the queue is full and it tries to put 234 more work requests in it (see ``putRequest`` method), unless you also 235 use a positive ``timeout`` value for ``putRequest``. 236 237 If ``resq_size > 0`` the size of the *results queue* is limited and the 238 worker threads will block when the queue is full and they try to put 239 new results in it. 240 241 .. warning: 242 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 243 the possibilty of a deadlock, when the results queue is not pulled 244 regularly and too many jobs are put in the work requests queue. 245 To prevent this, always set ``timeout > 0`` when calling 246 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 247 248 """ 249 self._requests_queue = Queue.Queue(q_size) 250 self._results_queue = Queue.Queue(resq_size) 251 self.workers = [] 252 self.dismissedWorkers = [] 253 self.workRequests = {} 254 self.createWorkers(num_workers, poll_timeout)
255
256 - def createWorkers(self, num_workers, poll_timeout=5):
257 """Add num_workers worker threads to the pool. 258 259 ``poll_timout`` sets the interval in seconds (int or float) for how 260 ofte threads should check whether they are dismissed, while waiting for 261 requests. 262 263 """ 264 for i in range(num_workers): 265 self.workers.append(WorkerThread(self._requests_queue, 266 self._results_queue, poll_timeout=poll_timeout))
267
268 - def dismissWorkers(self, num_workers, do_join=False):
269 """Tell num_workers worker threads to quit after their current task.""" 270 dismiss_list = [] 271 for i in range(min(num_workers, len(self.workers))): 272 worker = self.workers.pop() 273 worker.dismiss() 274 dismiss_list.append(worker) 275 276 if do_join: 277 for worker in dismiss_list: 278 worker.join() 279 else: 280 self.dismissedWorkers.extend(dismiss_list)
281
282 - def joinAllDismissedWorkers(self):
283 """Perform Thread.join() on all worker threads that have been dismissed. 284 """ 285 for worker in self.dismissedWorkers: 286 worker.join() 287 self.dismissedWorkers = []
288
289 - def putRequest(self, request, block=True, timeout=None):
290 """Put work request into work queue and save its id for later.""" 291 assert isinstance(request, WorkRequest) 292 # don't reuse old work requests 293 assert not getattr(request, 'exception', None) 294 self._requests_queue.put(request, block, timeout) 295 self.workRequests[request.requestID] = request
296
297 - def poll(self, block=False):
298 """Process any new results in the queue.""" 299 while True: 300 # still results pending? 301 if not self.workRequests: 302 raise NoResultsPending 303 # are there still workers to process remaining requests? 304 elif block and not self.workers: 305 raise NoWorkersAvailable 306 try: 307 # get back next results 308 request, result = self._results_queue.get(block=block) 309 # has an exception occured? 310 if request.exception and request.exc_callback: 311 request.exc_callback(request, result) 312 # hand results to callback, if any 313 if request.callback and not \ 314 (request.exception and request.exc_callback): 315 request.callback(request, result) 316 del self.workRequests[request.requestID] 317 except Queue.Empty: 318 break
319
320 - def wait(self):
321 """Wait for results, blocking until all have arrived.""" 322 while 1: 323 try: 324 self.poll(True) 325 except NoResultsPending: 326 break
327 328 329 ################ 330 # USAGE EXAMPLE 331 ################ 332 333 if __name__ == '__main__': 334 import random 335 import time 336 337 # the work the threads will have to do (rather trivial in our example)
338 - def do_something(data):
339 time.sleep(random.randint(1,5)) 340 result = round(random.random() * data, 5) 341 # just to show off, we throw an exception once in a while 342 if result > 5: 343 raise RuntimeError("Something extraordinary happened!") 344 return result
345 346 # this will be called each time a result is available 349 350 # this will be called when an exception occurs within a thread 351 # this example exception handler does little more than the default handler
352 - def handle_exception(request, exc_info):
353 if not isinstance(exc_info, tuple): 354 # Something is seriously wrong... 355 print request 356 print exc_info 357 raise SystemExit 358 print "**** Exception occured in request #%s: %s" % \ 359 (request.requestID, exc_info)
360 361 # assemble the arguments for each job to a list... 362 data = [random.randint(1,10) for i in range(20)] 363 # ... and build a WorkRequest object for each item in data 364 requests = makeRequests(do_something, data, print_result, handle_exception) 365 # to use the default exception handler, uncomment next line and comment out 366 # the preceding one. 367 #requests = makeRequests(do_something, data, print_result) 368 369 # or the other form of args_lists accepted by makeRequests: ((,), {}) 370 data = [((random.randint(1,10),), {}) for i in range(20)] 371 requests.extend( 372 makeRequests(do_something, data, print_result, handle_exception) 373 #makeRequests(do_something, data, print_result) 374 # to use the default exception handler, uncomment next line and comment 375 # out the preceding one. 376 ) 377 378 # we create a pool of 3 worker threads 379 print "Creating thread pool with 3 worker threads." 380 main = ThreadPool(3) 381 382 # then we put the work requests in the queue... 383 for req in requests: 384 main.putRequest(req) 385 print "Work request #%s added." % req.requestID 386 # or shorter: 387 # [main.putRequest(req) for req in requests] 388 389 # ...and wait for the results to arrive in the result queue 390 # by using ThreadPool.wait(). This would block until results for 391 # all work requests have arrived: 392 # main.wait() 393 394 # instead we can poll for results while doing something else: 395 i = 0 396 while True: 397 try: 398 time.sleep(0.5) 399 main.poll() 400 print "Main thread working...", 401 print "(active worker threads: %i)" % (threading.activeCount()-1, ) 402 if i == 10: 403 print "**** Adding 3 more worker threads..." 404 main.createWorkers(3) 405 if i == 20: 406 print "**** Dismissing 2 worker threads..." 407 main.dismissWorkers(2) 408 i += 1 409 except KeyboardInterrupt: 410 print "**** Interrupted!" 411 break 412 except NoResultsPending: 413 print "**** No pending results." 414 break 415 if main.dismissedWorkers: 416 print "Joining all dismissed worker threads..." 417 main.joinAllDismissedWorkers() 418