Package ClusterShell :: Package Engine :: Module Engine
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Engine

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Engine.py 236 2010-02-25 22:12:41Z st-cea $ 
 34   
 35  """ 
 36  Interface of underlying Task's Engine. 
 37   
 38  An Engine implements a loop your thread enters and uses to call event handlers 
 39  in response to incoming events (from workers, timers, etc.). 
 40  """ 
 41   
 42  import errno 
 43  import heapq 
 44  import time 
 45   
 46   
47 -class EngineException(Exception):
48 """ 49 Base engine exception. 50 """
51
52 -class EngineAbortException(EngineException):
53 """ 54 Raised on user abort. 55 """
56 - def __init__(self, kill):
57 self.kill = kill
58
59 -class EngineTimeoutException(EngineException):
60 """ 61 Raised when a timeout is encountered. 62 """
63
64 -class EngineIllegalOperationError(EngineException):
65 """ 66 Error raised when an illegal operation has been performed. 67 """
68
69 -class EngineAlreadyRunningError(EngineIllegalOperationError):
70 """ 71 Error raised when the engine is already running. 72 """
73
74 -class EngineNotSupportedError(EngineException):
75 """ 76 Error raised when the engine mechanism is not supported. 77 """
78 79
80 -class EngineBaseTimer:
81 """ 82 Abstract class for ClusterShell's engine timer. Such a timer 83 requires a relative fire time (delay) in seconds (as float), and 84 supports an optional repeating interval in seconds (as float too). 85 86 See EngineTimer for more information about ClusterShell timers. 87 """ 88
89 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
90 """ 91 Create a base timer. 92 """ 93 self.fire_delay = fire_delay 94 self.interval = interval 95 self.autoclose = autoclose 96 self._engine = None 97 self._timercase = None
98
99 - def _set_engine(self, engine):
100 """ 101 Bind to engine, called by Engine. 102 """ 103 if self._engine: 104 # A timer can be registered to only one engine at a time. 105 raise EngineIllegalOperationError("Already bound to engine.") 106 107 self._engine = engine
108
109 - def invalidate(self):
110 """ 111 Invalidates a timer object, stopping it from ever firing again. 112 """ 113 if self._engine: 114 self._engine.timerq.invalidate(self) 115 self._engine = None
116
117 - def is_valid(self):
118 """ 119 Returns a boolean value that indicates whether an EngineTimer 120 object is valid and able to fire. 121 """ 122 return self._engine != None
123
124 - def set_nextfire(self, fire_delay, interval=-1):
125 """ 126 Set the next firing delay in seconds for an EngineTimer object. 127 128 The optional paramater `interval' sets the firing interval 129 of the timer. If not specified, the timer fires once and then 130 is automatically invalidated. 131 132 Time values are expressed in second using floating point 133 values. Precision is implementation (and system) dependent. 134 135 It is safe to call this method from the task owning this 136 timer object, in any event handlers, anywhere. 137 138 However, resetting a timer's next firing time may be a 139 relatively expensive operation. It is more efficient to let 140 timers autorepeat or to use this method from the timer's own 141 event handler callback (ie. from its ev_timer). 142 """ 143 if not self.is_valid(): 144 raise EngineIllegalOperationError("Operation on invalid timer.") 145 146 self.fire_delay = fire_delay 147 self.interval = interval 148 self._engine.timerq.reschedule(self)
149
150 - def _fire(self):
151 raise NotImplementedError("Derived classes must implement.")
152 153
154 -class EngineTimer(EngineBaseTimer):
155 """ 156 Concrete class EngineTimer 157 158 An EngineTimer object represents a timer bound to an engine that 159 fires at a preset time in the future. Timers can fire either only 160 once or repeatedly at fixed time intervals. Repeating timers can 161 also have their next firing time manually adjusted. 162 163 A timer is not a real-time mechanism; it fires when the task's 164 underlying engine to which the timer has been added is running and 165 able to check if the timer's firing time has passed. 166 """ 167
168 - def __init__(self, fire_delay, interval, autoclose, handler):
169 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose) 170 self.eh = handler 171 assert self.eh != None, "An event handler is needed for timer."
172
173 - def _fire(self):
174 self.eh._invoke("ev_timer", self)
175
176 -class _EngineTimerQ:
177
178 - class _EngineTimerCase:
179 """ 180 Helper class that allows comparisons of fire times, to be easily used 181 in an heapq. 182 """
183 - def __init__(self, client):
184 self.client = client 185 self.client._timercase = self 186 # arm timer (first time) 187 assert self.client.fire_delay > 0 188 self.fire_date = self.client.fire_delay + time.time()
189
190 - def __cmp__(self, other):
191 return cmp(self.fire_date, other.fire_date)
192
193 - def arm(self, client):
194 assert client != None 195 self.client = client 196 self.client._timercase = self 197 # setup next firing date 198 time_current = time.time() 199 if self.client.fire_delay > 0: 200 self.fire_date = self.client.fire_delay + time_current 201 else: 202 interval = float(self.client.interval) 203 assert interval > 0 204 self.fire_date += interval 205 # If the firing time is delayed so far that it passes one 206 # or more of the scheduled firing times, reschedule the 207 # timer for the next scheduled firing time in the future. 208 while self.fire_date < time_current: 209 self.fire_date += interval
210
211 - def disarm(self):
212 client = self.client 213 client._timercase = None 214 self.client = None 215 return client
216
217 - def armed(self):
218 return self.client != None
219 220
221 - def __init__(self, engine):
222 """ 223 Initializer. 224 """ 225 self._engine = engine 226 self.timers = [] 227 self.armed_count = 0
228
229 - def __len__(self):
230 """ 231 Return the number of active timers. 232 """ 233 return self.armed_count
234
235 - def schedule(self, client):
236 """ 237 Insert and arm a client's timer. 238 """ 239 # arm only if fire is set 240 if client.fire_delay > 0: 241 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client)) 242 self.armed_count += 1 243 if not client.autoclose: 244 self._engine.evlooprefcnt += 1
245
246 - def reschedule(self, client):
247 """ 248 Re-insert client's timer. 249 """ 250 if client._timercase: 251 self.invalidate(client) 252 self._dequeue_disarmed() 253 self.schedule(client)
254
255 - def invalidate(self, client):
256 """ 257 Invalidate client's timer. Current implementation doesn't really remove 258 the timer, but simply flags it as disarmed. 259 """ 260 if not client._timercase: 261 # if timer is being fire, invalidate its values 262 client.fire_delay = 0 263 client.interval = 0 264 return 265 266 if self.armed_count <= 0: 267 raise ValueError, "Engine client timer not found in timer queue" 268 269 client._timercase.disarm() 270 self.armed_count -= 1 271 if not client.autoclose: 272 self._engine.evlooprefcnt -= 1
273
274 - def _dequeue_disarmed(self):
275 """ 276 Dequeue disarmed timers (sort of garbage collection). 277 """ 278 while len(self.timers) > 0 and not self.timers[0].armed(): 279 heapq.heappop(self.timers)
280
281 - def fire(self):
282 """ 283 Remove the smallest timer from the queue and fire its associated client. 284 Raise IndexError if the queue is empty. 285 """ 286 self._dequeue_disarmed() 287 288 timercase = heapq.heappop(self.timers) 289 client = timercase.disarm() 290 291 client.fire_delay = 0 292 client._fire() 293 294 if client.fire_delay > 0 or client.interval > 0: 295 timercase.arm(client) 296 heapq.heappush(self.timers, timercase) 297 else: 298 self.armed_count -= 1 299 if not client.autoclose: 300 self._engine.evlooprefcnt -= 1
301
302 - def nextfire_delay(self):
303 """ 304 Return next timer fire delay (relative time). 305 """ 306 self._dequeue_disarmed() 307 if len(self.timers) > 0: 308 return max(0., self.timers[0].fire_date - time.time()) 309 310 return -1
311
312 - def expired(self):
313 """ 314 Has a timer expired? 315 """ 316 self._dequeue_disarmed() 317 return len(self.timers) > 0 and \ 318 (self.timers[0].fire_date - time.time()) <= 1e-2
319
320 - def clear(self):
321 """ 322 Stop and clear all timers. 323 """ 324 for timer in self.timers: 325 if timer.armed(): 326 timer.client.invalidate() 327 328 self.timers = [] 329 self.armed_count = 0
330 331
332 -class Engine:
333 """ 334 Interface for ClusterShell engine. Subclasses have to implement a runloop 335 listening for client events. 336 """ 337 338 # Engine client I/O event interest bits 339 E_READ = 0x1 340 E_ERROR = 0x2 341 E_WRITE = 0x4 342 E_ANY = E_READ | E_ERROR | E_WRITE 343 344 identifier = "(none)" 345
346 - def __init__(self, info):
347 """ 348 Initialize base class. 349 """ 350 # take a reference on info dict 351 self.info = info 352 353 # and update engine id 354 self.info['engine'] = self.identifier 355 356 # keep track of all clients 357 self._clients = set() 358 self._ports = set() 359 360 # keep track of the number of registered clients 361 self.reg_clients = 0 362 363 # keep track of registered file descriptors in a dict where keys 364 # are fileno and values are clients 365 self.reg_clifds = {} 366 367 # A boolean that indicates when reg_clifds has changed, or when 368 # some client interest event mask has changed. It is set by the 369 # base class, and reset by each engine implementation. 370 # Engines often deal with I/O events in chunk, and some event 371 # may lead to change to some other "client interest event mask" 372 # or could even register or close other clients. When such 373 # changes are made, this boolean is set to True, allowing the 374 # engine implementation to reconsider their events got by chunk. 375 self.reg_clifds_changed = False 376 377 # timer queue to handle both timers and clients timeout 378 self.timerq = _EngineTimerQ(self) 379 380 # reference count to the event loop (must include registered 381 # clients and timers configured WITHOUT autoclose) 382 self.evlooprefcnt = 0 383 384 # running state 385 self.running = False 386 # runloop-has-exited flag 387 self._exited = False
388
389 - def clients(self):
390 """ 391 Get a copy of clients set. 392 """ 393 return self._clients.copy()
394
395 - def ports(self):
396 """ 397 Get a copy of ports set. 398 """ 399 return self._ports.copy()
400
401 - def _fd2client(self, fd):
402 fdev = None 403 client = self.reg_clifds.get(fd) 404 if client: 405 try: 406 if fd == client.reader_fileno(): 407 fdev = Engine.E_READ 408 elif fd == client.error_fileno(): 409 fdev = Engine.E_ERROR 410 elif fd == client.writer_fileno(): 411 fdev = Engine.E_WRITE 412 except: 413 return (client, Engine.E_ERROR) 414 return (client, fdev)
415
416 - def add(self, client):
417 """ 418 Add a client to engine. Subclasses that override this method 419 should call base class method. 420 """ 421 # bind to engine 422 client._set_engine(self) 423 424 if client.delayable: 425 # add to regular client set 426 self._clients.add(client) 427 else: 428 # add to port set (non-delayable) 429 self._ports.add(client) 430 431 if self.running: 432 # in-fly add if running 433 if not client.delayable: 434 self.register(client) 435 elif self.info["fanout"] > self.reg_clients: 436 self.register(client._start())
437
438 - def remove(self, client, did_timeout=False):
439 """ 440 Remove a client from engine. Subclasses that override this 441 method should call base class method. 442 """ 443 self._debug("REMOVE %s" % client) 444 if client.delayable: 445 self._clients.remove(client) 446 else: 447 self._ports.remove(client) 448 449 if client.registered: 450 self.unregister(client) 451 client._close(force=False, timeout=did_timeout) 452 self.start_all()
453
454 - def clear(self, did_timeout=False, clear_ports=False):
455 """ 456 Remove all clients. Subclasses that override this method should 457 call base class method. 458 """ 459 all_clients = [self._clients] 460 if clear_ports: 461 all_clients.append(self._ports) 462 463 for clients in all_clients: 464 while len(clients) > 0: 465 client = clients.pop() 466 if client.registered: 467 self.unregister(client) 468 client._close(force=True, timeout=did_timeout)
469
470 - def register(self, client):
471 """ 472 Register an engine client. Subclasses that override this method 473 should call base class method. 474 """ 475 assert client in self._clients or client in self._ports 476 assert not client.registered 477 478 efd = client.error_fileno() 479 rfd = client.reader_fileno() 480 wfd = client.writer_fileno() 481 assert rfd != None or wfd != None 482 483 self._debug("REG %s(e%s,r%s,w%s)(autoclose=%s)" % \ 484 (client.__class__.__name__, efd, rfd, wfd, 485 client.autoclose)) 486 487 client._events = 0 488 client.registered = True 489 490 if client.delayable: 491 self.reg_clients += 1 492 493 if client.autoclose: 494 refcnt_inc = 0 495 else: 496 refcnt_inc = 1 497 498 if efd != None: 499 self.reg_clifds[efd] = client 500 self.reg_clifds_changed = True 501 client._events |= Engine.E_ERROR 502 self.evlooprefcnt += refcnt_inc 503 self._register_specific(efd, Engine.E_ERROR) 504 if rfd != None: 505 self.reg_clifds[rfd] = client 506 self.reg_clifds_changed = True 507 client._events |= Engine.E_READ 508 self.evlooprefcnt += refcnt_inc 509 self._register_specific(rfd, Engine.E_READ) 510 if wfd != None: 511 self.reg_clifds[wfd] = client 512 self.reg_clifds_changed = True 513 client._events |= Engine.E_WRITE 514 self.evlooprefcnt += refcnt_inc 515 self._register_specific(wfd, Engine.E_WRITE) 516 517 client._new_events = client._events 518 519 # start timeout timer 520 self.timerq.schedule(client)
521
522 - def unregister_writer(self, client):
523 self._debug("UNREG WRITER r%s,w%s" % (client.reader_fileno(), \ 524 client.writer_fileno())) 525 if client.autoclose: 526 refcnt_inc = 0 527 else: 528 refcnt_inc = 1 529 530 wfd = client.writer_fileno() 531 if wfd != None: 532 self._unregister_specific(wfd, client._events & Engine.E_WRITE) 533 client._events &= ~Engine.E_WRITE 534 del self.reg_clifds[wfd] 535 self.reg_clifds_changed = True 536 self.evlooprefcnt -= refcnt_inc
537
538 - def unregister(self, client):
539 """ 540 Unregister a client. Subclasses that override this method should 541 call base class method. 542 """ 543 # sanity check 544 assert client.registered 545 self._debug("UNREG %s (r%s,e%s,w%s)" % (client.__class__.__name__, 546 client.reader_fileno(), client.error_fileno(), 547 client.writer_fileno())) 548 549 # remove timeout timer 550 self.timerq.invalidate(client) 551 552 if client.autoclose: 553 refcnt_inc = 0 554 else: 555 refcnt_inc = 1 556 557 # clear interest events 558 efd = client.error_fileno() 559 if efd != None: 560 self._unregister_specific(efd, client._events & Engine.E_ERROR) 561 client._events &= ~Engine.E_ERROR 562 del self.reg_clifds[efd] 563 self.reg_clifds_changed = True 564 self.evlooprefcnt -= refcnt_inc 565 566 rfd = client.reader_fileno() 567 if rfd != None: 568 self._unregister_specific(rfd, client._events & Engine.E_READ) 569 client._events &= ~Engine.E_READ 570 del self.reg_clifds[rfd] 571 self.reg_clifds_changed = True 572 self.evlooprefcnt -= refcnt_inc 573 574 wfd = client.writer_fileno() 575 if wfd != None: 576 self._unregister_specific(wfd, client._events & Engine.E_WRITE) 577 client._events &= ~Engine.E_WRITE 578 del self.reg_clifds[wfd] 579 self.reg_clifds_changed = True 580 self.evlooprefcnt -= refcnt_inc 581 582 client._new_events = 0 583 client.registered = False 584 self.reg_clients -= 1
585
586 - def modify(self, client, setmask, clearmask):
587 """ 588 Modify the next loop interest events bitset for a client. 589 """ 590 self._debug("MODEV set:0x%x clear:0x%x %s" % (setmask, clearmask, 591 client)) 592 client._new_events &= ~clearmask 593 client._new_events |= setmask 594 595 if not client._processing: 596 # modifying a non processing client? 597 self.reg_clifds_changed = True 598 # apply new_events now 599 self.set_events(client, client._new_events)
600
601 - def _register_specific(self, fd, event):
602 """Engine-specific register fd for event method.""" 603 raise NotImplementedError("Derived classes must implement.")
604
605 - def _unregister_specific(self, fd, ev_is_set):
606 """Engine-specific unregister fd method.""" 607 raise NotImplementedError("Derived classes must implement.")
608
609 - def _modify_specific(self, fd, event, setvalue):
610 """Engine-specific modify fd for event method.""" 611 raise NotImplementedError("Derived classes must implement.")
612
613 - def set_events(self, client, new_events):
614 """ 615 Set the active interest events bitset for a client. 616 """ 617 assert not client._processing 618 619 self._debug("SETEV new_events:0x%x events:0x%x %s" % (new_events, 620 client._events, client)) 621 622 chgbits = new_events ^ client._events 623 if chgbits == 0: 624 return 625 626 # configure interest events as appropriate 627 efd = client.error_fileno() 628 if efd != None: 629 if chgbits & Engine.E_ERROR: 630 status = new_events & Engine.E_ERROR 631 self._modify_specific(efd, Engine.E_ERROR, status) 632 if status: 633 client._events |= Engine.E_ERROR 634 else: 635 client._events &= ~Engine.E_ERROR 636 637 rfd = client.reader_fileno() 638 if rfd != None: 639 if chgbits & Engine.E_READ: 640 status = new_events & Engine.E_READ 641 self._modify_specific(rfd, Engine.E_READ, status) 642 if status: 643 client._events |= Engine.E_READ 644 else: 645 client._events &= ~Engine.E_READ 646 647 wfd = client.writer_fileno() 648 if wfd != None: 649 if chgbits & Engine.E_WRITE: 650 status = new_events & Engine.E_WRITE 651 self._modify_specific(wfd, Engine.E_WRITE, status) 652 if status: 653 client._events |= Engine.E_WRITE 654 else: 655 client._events &= ~Engine.E_WRITE 656 657 client._new_events = client._events
658
659 - def set_reading(self, client):
660 """ 661 Set client reading state. 662 """ 663 # listen for readable events 664 self.modify(client, Engine.E_READ, 0)
665
666 - def set_reading_error(self, client):
667 """ 668 Set client reading error state. 669 """ 670 # listen for readable events 671 self.modify(client, Engine.E_ERROR, 0)
672
673 - def set_writing(self, client):
674 """ 675 Set client writing state. 676 """ 677 # listen for writable events 678 self.modify(client, Engine.E_WRITE, 0)
679
680 - def add_timer(self, timer):
681 """ 682 Add engine timer. 683 """ 684 timer._set_engine(self) 685 self.timerq.schedule(timer)
686
687 - def remove_timer(self, timer):
688 """ 689 Remove engine timer. 690 """ 691 self.timerq.invalidate(timer)
692
693 - def fire_timers(self):
694 """ 695 Fire expired timers for processing. 696 """ 697 while self.timerq.expired(): 698 self.timerq.fire()
699
700 - def start_ports(self):
701 """ 702 Start and register all port clients. 703 """ 704 # Ports are special, non-delayable engine clients 705 for port in self._ports: 706 if not port.registered: 707 self._debug("START PORT %s" % port) 708 self.register(port)
709
710 - def start_all(self):
711 """ 712 Start and register all other possible clients, in respect of task fanout. 713 """ 714 # Get current fanout value 715 fanout = self.info["fanout"] 716 assert fanout > 0 717 if fanout <= self.reg_clients: 718 return 719 720 # Register regular engine clients within the fanout limit 721 for client in self._clients: 722 if not client.registered: 723 self._debug("START CLIENT %s" % client.__class__.__name__) 724 self.register(client._start()) 725 if fanout <= self.reg_clients: 726 break
727
728 - def run(self, timeout):
729 """ 730 Run engine in calling thread. 731 """ 732 # change to running state 733 if self.running: 734 raise EngineAlreadyRunningError() 735 self.running = True 736 737 # start port clients 738 self.start_ports() 739 740 # peek in ports for early pending messages 741 self.snoop_ports() 742 743 # start all other clients 744 self.start_all() 745 746 # note: try-except-finally not supported before python 2.5 747 try: 748 try: 749 self.runloop(timeout) 750 except Exception, e: 751 # any exceptions invalidate clients 752 self.clear(isinstance(e, EngineTimeoutException)) 753 raise 754 finally: 755 # cleanup 756 self.timerq.clear() 757 self.running = False
758
759 - def snoop_ports(self):
760 """ 761 Peek in ports for possible early pending messages. 762 This method simply tries to read port pipes in non- 763 blocking mode. 764 """ 765 # make a copy so that early messages on installed ports may 766 # lead to new ports 767 ports = self._ports.copy() 768 for port in ports: 769 try: 770 port._handle_read() 771 except (IOError, OSError), (err, strerr): 772 if err == errno.EAGAIN or err == errno.EWOULDBLOCK: 773 # no pending message 774 return 775 # raise any other error 776 raise
777
778 - def runloop(self, timeout):
779 """ 780 Engine specific run loop. Derived classes must implement. 781 """ 782 raise NotImplementedError("Derived classes must implement.")
783
784 - def abort(self, kill):
785 """ 786 Abort runloop. 787 """ 788 if self.running: 789 raise EngineAbortException(kill) 790 791 self.clear()
792
793 - def exited(self):
794 """ 795 Returns True if the engine has exited the runloop once. 796 """ 797 return not self.running and self._exited
798
799 - def _debug(self, s):
800 # library engine debugging hook 801 #print s 802 pass
803