1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Interface of underlying Task's Engine.
37
38 An Engine implements a loop your thread enters and uses to call event handlers
39 in response to incoming events (from workers, timers, etc.).
40 """
41
42 import errno
43 import heapq
44 import time
45
46
48 """
49 Base engine exception.
50 """
51
53 """
54 Raised on user abort.
55 """
58
60 """
61 Raised when a timeout is encountered.
62 """
63
65 """
66 Error raised when an illegal operation has been performed.
67 """
68
70 """
71 Error raised when the engine is already running.
72 """
73
75 """
76 Error raised when the engine mechanism is not supported.
77 """
78
79
81 """
82 Abstract class for ClusterShell's engine timer. Such a timer
83 requires a relative fire time (delay) in seconds (as float), and
84 supports an optional repeating interval in seconds (as float too).
85
86 See EngineTimer for more information about ClusterShell timers.
87 """
88
89 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
90 """
91 Create a base timer.
92 """
93 self.fire_delay = fire_delay
94 self.interval = interval
95 self.autoclose = autoclose
96 self._engine = None
97 self._timercase = None
98
100 """
101 Bind to engine, called by Engine.
102 """
103 if self._engine:
104
105 raise EngineIllegalOperationError("Already bound to engine.")
106
107 self._engine = engine
108
110 """
111 Invalidates a timer object, stopping it from ever firing again.
112 """
113 if self._engine:
114 self._engine.timerq.invalidate(self)
115 self._engine = None
116
118 """
119 Returns a boolean value that indicates whether an EngineTimer
120 object is valid and able to fire.
121 """
122 return self._engine != None
123
125 """
126 Set the next firing delay in seconds for an EngineTimer object.
127
128 The optional paramater `interval' sets the firing interval
129 of the timer. If not specified, the timer fires once and then
130 is automatically invalidated.
131
132 Time values are expressed in second using floating point
133 values. Precision is implementation (and system) dependent.
134
135 It is safe to call this method from the task owning this
136 timer object, in any event handlers, anywhere.
137
138 However, resetting a timer's next firing time may be a
139 relatively expensive operation. It is more efficient to let
140 timers autorepeat or to use this method from the timer's own
141 event handler callback (ie. from its ev_timer).
142 """
143 if not self.is_valid():
144 raise EngineIllegalOperationError("Operation on invalid timer.")
145
146 self.fire_delay = fire_delay
147 self.interval = interval
148 self._engine.timerq.reschedule(self)
149
151 raise NotImplementedError("Derived classes must implement.")
152
153
155 """
156 Concrete class EngineTimer
157
158 An EngineTimer object represents a timer bound to an engine that
159 fires at a preset time in the future. Timers can fire either only
160 once or repeatedly at fixed time intervals. Repeating timers can
161 also have their next firing time manually adjusted.
162
163 A timer is not a real-time mechanism; it fires when the task's
164 underlying engine to which the timer has been added is running and
165 able to check if the timer's firing time has passed.
166 """
167
168 - def __init__(self, fire_delay, interval, autoclose, handler):
169 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose)
170 self.eh = handler
171 assert self.eh != None, "An event handler is needed for timer."
172
174 self.eh._invoke("ev_timer", self)
175
177
179 """
180 Helper class that allows comparisons of fire times, to be easily used
181 in an heapq.
182 """
184 self.client = client
185 self.client._timercase = self
186
187 assert self.client.fire_delay > 0
188 self.fire_date = self.client.fire_delay + time.time()
189
191 return cmp(self.fire_date, other.fire_date)
192
193 - def arm(self, client):
194 assert client != None
195 self.client = client
196 self.client._timercase = self
197
198 time_current = time.time()
199 if self.client.fire_delay > 0:
200 self.fire_date = self.client.fire_delay + time_current
201 else:
202 interval = float(self.client.interval)
203 assert interval > 0
204 self.fire_date += interval
205
206
207
208 while self.fire_date < time_current:
209 self.fire_date += interval
210
212 client = self.client
213 client._timercase = None
214 self.client = None
215 return client
216
218 return self.client != None
219
220
222 """
223 Initializer.
224 """
225 self._engine = engine
226 self.timers = []
227 self.armed_count = 0
228
230 """
231 Return the number of active timers.
232 """
233 return self.armed_count
234
236 """
237 Insert and arm a client's timer.
238 """
239
240 if client.fire_delay > 0:
241 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client))
242 self.armed_count += 1
243 if not client.autoclose:
244 self._engine.evlooprefcnt += 1
245
254
256 """
257 Invalidate client's timer. Current implementation doesn't really remove
258 the timer, but simply flags it as disarmed.
259 """
260 if not client._timercase:
261
262 client.fire_delay = 0
263 client.interval = 0
264 return
265
266 if self.armed_count <= 0:
267 raise ValueError, "Engine client timer not found in timer queue"
268
269 client._timercase.disarm()
270 self.armed_count -= 1
271 if not client.autoclose:
272 self._engine.evlooprefcnt -= 1
273
275 """
276 Dequeue disarmed timers (sort of garbage collection).
277 """
278 while len(self.timers) > 0 and not self.timers[0].armed():
279 heapq.heappop(self.timers)
280
282 """
283 Remove the smallest timer from the queue and fire its associated client.
284 Raise IndexError if the queue is empty.
285 """
286 self._dequeue_disarmed()
287
288 timercase = heapq.heappop(self.timers)
289 client = timercase.disarm()
290
291 client.fire_delay = 0
292 client._fire()
293
294 if client.fire_delay > 0 or client.interval > 0:
295 timercase.arm(client)
296 heapq.heappush(self.timers, timercase)
297 else:
298 self.armed_count -= 1
299 if not client.autoclose:
300 self._engine.evlooprefcnt -= 1
301
303 """
304 Return next timer fire delay (relative time).
305 """
306 self._dequeue_disarmed()
307 if len(self.timers) > 0:
308 return max(0., self.timers[0].fire_date - time.time())
309
310 return -1
311
313 """
314 Has a timer expired?
315 """
316 self._dequeue_disarmed()
317 return len(self.timers) > 0 and \
318 (self.timers[0].fire_date - time.time()) <= 1e-2
319
321 """
322 Stop and clear all timers.
323 """
324 for timer in self.timers:
325 if timer.armed():
326 timer.client.invalidate()
327
328 self.timers = []
329 self.armed_count = 0
330
331
333 """
334 Interface for ClusterShell engine. Subclasses have to implement a runloop
335 listening for client events.
336 """
337
338
339 E_READ = 0x1
340 E_ERROR = 0x2
341 E_WRITE = 0x4
342 E_ANY = E_READ | E_ERROR | E_WRITE
343
344 identifier = "(none)"
345
347 """
348 Initialize base class.
349 """
350
351 self.info = info
352
353
354 self.info['engine'] = self.identifier
355
356
357 self._clients = set()
358 self._ports = set()
359
360
361 self.reg_clients = 0
362
363
364
365 self.reg_clifds = {}
366
367
368
369
370
371
372
373
374
375 self.reg_clifds_changed = False
376
377
378 self.timerq = _EngineTimerQ(self)
379
380
381
382 self.evlooprefcnt = 0
383
384
385 self.running = False
386
387 self._exited = False
388
390 """
391 Get a copy of clients set.
392 """
393 return self._clients.copy()
394
396 """
397 Get a copy of ports set.
398 """
399 return self._ports.copy()
400
415
416 - def add(self, client):
417 """
418 Add a client to engine. Subclasses that override this method
419 should call base class method.
420 """
421
422 client._set_engine(self)
423
424 if client.delayable:
425
426 self._clients.add(client)
427 else:
428
429 self._ports.add(client)
430
431 if self.running:
432
433 if not client.delayable:
434 self.register(client)
435 elif self.info["fanout"] > self.reg_clients:
436 self.register(client._start())
437
438 - def remove(self, client, did_timeout=False):
439 """
440 Remove a client from engine. Subclasses that override this
441 method should call base class method.
442 """
443 self._debug("REMOVE %s" % client)
444 if client.delayable:
445 self._clients.remove(client)
446 else:
447 self._ports.remove(client)
448
449 if client.registered:
450 self.unregister(client)
451 client._close(force=False, timeout=did_timeout)
452 self.start_all()
453
454 - def clear(self, did_timeout=False, clear_ports=False):
455 """
456 Remove all clients. Subclasses that override this method should
457 call base class method.
458 """
459 all_clients = [self._clients]
460 if clear_ports:
461 all_clients.append(self._ports)
462
463 for clients in all_clients:
464 while len(clients) > 0:
465 client = clients.pop()
466 if client.registered:
467 self.unregister(client)
468 client._close(force=True, timeout=did_timeout)
469
471 """
472 Register an engine client. Subclasses that override this method
473 should call base class method.
474 """
475 assert client in self._clients or client in self._ports
476 assert not client.registered
477
478 efd = client.error_fileno()
479 rfd = client.reader_fileno()
480 wfd = client.writer_fileno()
481 assert rfd != None or wfd != None
482
483 self._debug("REG %s(e%s,r%s,w%s)(autoclose=%s)" % \
484 (client.__class__.__name__, efd, rfd, wfd,
485 client.autoclose))
486
487 client._events = 0
488 client.registered = True
489
490 if client.delayable:
491 self.reg_clients += 1
492
493 if client.autoclose:
494 refcnt_inc = 0
495 else:
496 refcnt_inc = 1
497
498 if efd != None:
499 self.reg_clifds[efd] = client
500 self.reg_clifds_changed = True
501 client._events |= Engine.E_ERROR
502 self.evlooprefcnt += refcnt_inc
503 self._register_specific(efd, Engine.E_ERROR)
504 if rfd != None:
505 self.reg_clifds[rfd] = client
506 self.reg_clifds_changed = True
507 client._events |= Engine.E_READ
508 self.evlooprefcnt += refcnt_inc
509 self._register_specific(rfd, Engine.E_READ)
510 if wfd != None:
511 self.reg_clifds[wfd] = client
512 self.reg_clifds_changed = True
513 client._events |= Engine.E_WRITE
514 self.evlooprefcnt += refcnt_inc
515 self._register_specific(wfd, Engine.E_WRITE)
516
517 client._new_events = client._events
518
519
520 self.timerq.schedule(client)
521
537
539 """
540 Unregister a client. Subclasses that override this method should
541 call base class method.
542 """
543
544 assert client.registered
545 self._debug("UNREG %s (r%s,e%s,w%s)" % (client.__class__.__name__,
546 client.reader_fileno(), client.error_fileno(),
547 client.writer_fileno()))
548
549
550 self.timerq.invalidate(client)
551
552 if client.autoclose:
553 refcnt_inc = 0
554 else:
555 refcnt_inc = 1
556
557
558 efd = client.error_fileno()
559 if efd != None:
560 self._unregister_specific(efd, client._events & Engine.E_ERROR)
561 client._events &= ~Engine.E_ERROR
562 del self.reg_clifds[efd]
563 self.reg_clifds_changed = True
564 self.evlooprefcnt -= refcnt_inc
565
566 rfd = client.reader_fileno()
567 if rfd != None:
568 self._unregister_specific(rfd, client._events & Engine.E_READ)
569 client._events &= ~Engine.E_READ
570 del self.reg_clifds[rfd]
571 self.reg_clifds_changed = True
572 self.evlooprefcnt -= refcnt_inc
573
574 wfd = client.writer_fileno()
575 if wfd != None:
576 self._unregister_specific(wfd, client._events & Engine.E_WRITE)
577 client._events &= ~Engine.E_WRITE
578 del self.reg_clifds[wfd]
579 self.reg_clifds_changed = True
580 self.evlooprefcnt -= refcnt_inc
581
582 client._new_events = 0
583 client.registered = False
584 self.reg_clients -= 1
585
586 - def modify(self, client, setmask, clearmask):
587 """
588 Modify the next loop interest events bitset for a client.
589 """
590 self._debug("MODEV set:0x%x clear:0x%x %s" % (setmask, clearmask,
591 client))
592 client._new_events &= ~clearmask
593 client._new_events |= setmask
594
595 if not client._processing:
596
597 self.reg_clifds_changed = True
598
599 self.set_events(client, client._new_events)
600
602 """Engine-specific register fd for event method."""
603 raise NotImplementedError("Derived classes must implement.")
604
606 """Engine-specific unregister fd method."""
607 raise NotImplementedError("Derived classes must implement.")
608
610 """Engine-specific modify fd for event method."""
611 raise NotImplementedError("Derived classes must implement.")
612
658
660 """
661 Set client reading state.
662 """
663
664 self.modify(client, Engine.E_READ, 0)
665
667 """
668 Set client reading error state.
669 """
670
671 self.modify(client, Engine.E_ERROR, 0)
672
679
686
692
694 """
695 Fire expired timers for processing.
696 """
697 while self.timerq.expired():
698 self.timerq.fire()
699
701 """
702 Start and register all port clients.
703 """
704
705 for port in self._ports:
706 if not port.registered:
707 self._debug("START PORT %s" % port)
708 self.register(port)
709
711 """
712 Start and register all other possible clients, in respect of task fanout.
713 """
714
715 fanout = self.info["fanout"]
716 assert fanout > 0
717 if fanout <= self.reg_clients:
718 return
719
720
721 for client in self._clients:
722 if not client.registered:
723 self._debug("START CLIENT %s" % client.__class__.__name__)
724 self.register(client._start())
725 if fanout <= self.reg_clients:
726 break
727
728 - def run(self, timeout):
758
760 """
761 Peek in ports for possible early pending messages.
762 This method simply tries to read port pipes in non-
763 blocking mode.
764 """
765
766
767 ports = self._ports.copy()
768 for port in ports:
769 try:
770 port._handle_read()
771 except (IOError, OSError), (err, strerr):
772 if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
773
774 return
775
776 raise
777
779 """
780 Engine specific run loop. Derived classes must implement.
781 """
782 raise NotImplementedError("Derived classes must implement.")
783
792
794 """
795 Returns True if the engine has exited the runloop once.
796 """
797 return not self.running and self._exited
798
803