1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 A ClusterShell Engine using epoll, an I/O event notification facility.
37
38 The epoll event distribution interface is available on Linux 2.6, and
39 has been included in Python 2.6.
40 """
41
42 import errno
43 import select
44 import time
45
46 from ClusterShell.Engine.Engine import Engine
47 from ClusterShell.Engine.Engine import EngineNotSupportedError
48 from ClusterShell.Engine.Engine import EngineTimeoutException
49 from ClusterShell.Worker.EngineClient import EngineClientEOF
50
51
53 """
54 EPoll Engine
55
56 ClusterShell Engine class using the select.epoll mechanism.
57 """
58
59 identifier = "epoll"
60
71
73 """
74 Engine-specific fd registering. Called by Engine register.
75 """
76 if event & (Engine.E_READ | Engine.E_ERROR):
77 eventmask = select.EPOLLIN
78 elif event == Engine.E_WRITE:
79 eventmask = select.EPOLLOUT
80
81 self.epolling.register(fd, eventmask)
82
84 """
85 Engine-specific fd unregistering. Called by Engine unregister.
86 """
87 self.epolling.unregister(fd)
88
90 """
91 Engine-specific modifications after a interesting event change for
92 a file descriptor. Called automatically by Engine set_events().
93 For the epoll engine, it modifies the event mask associated to a file
94 descriptor.
95 """
96 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event,
97 setvalue))
98 eventmask = 0
99 if setvalue:
100 if event & (Engine.E_READ | Engine.E_ERROR):
101 eventmask = select.EPOLLIN
102 elif event == Engine.E_WRITE:
103 eventmask = select.EPOLLOUT
104
105 self.epolling.modify(fd, eventmask)
106
108 """
109 Run epoll main loop.
110 """
111 if timeout == 0:
112 timeout = -1
113
114 start_time = time.time()
115
116
117 while self.evlooprefcnt > 0:
118 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
119 (self.evlooprefcnt, self.reg_clifds.keys(),
120 len(self.timerq)))
121 try:
122 timeo = self.timerq.nextfire_delay()
123 if timeout > 0 and timeo >= timeout:
124
125 self.timerq.clear()
126 timeo = timeout
127 elif timeo == -1:
128 timeo = timeout
129
130 self.reg_clifds_changed = False
131 evlist = self.epolling.poll(timeo + 0.001)
132
133 except IOError, e:
134
135 if e.errno == errno.EINTR:
136 continue
137
138 for fd, event in evlist:
139
140 if self.reg_clifds_changed:
141 self._debug("REG CLIENTS CHANGED - Aborting current evlist")
142
143 break
144
145
146 client, fdev = self._fd2client(fd)
147 if not client or fdev is None:
148 continue
149
150
151 client._processing = True
152
153
154 if event & select.EPOLLERR:
155 self._debug("EPOLLERR %s" % client)
156 self.unregister_writer(client)
157 client.file_writer.close()
158 client.file_writer = None
159 continue
160
161
162 if event & select.EPOLLIN:
163 assert fdev & (Engine.E_READ | Engine.E_ERROR)
164 assert client._events & fdev
165 self.modify(client, 0, fdev)
166 try:
167 if fdev & Engine.E_READ:
168 client._handle_read()
169 else:
170 client._handle_error()
171 except EngineClientEOF, e:
172 self._debug("EngineClientEOF %s" % client)
173 if fdev & Engine.E_READ:
174 self.remove(client)
175 continue
176
177
178
179 elif event & select.EPOLLHUP:
180 self._debug("EPOLLHUP fd=%d %s (r%s,w%s)" % (fd,
181 client.__class__.__name__, client.reader_fileno(),
182 client.writer_fileno()))
183 client._processing = False
184
185 if fdev & Engine.E_READ:
186 if client._events & Engine.E_ERROR:
187 self.modify(client, 0, fdev)
188 else:
189 self.remove(client)
190 else:
191 if client._events & Engine.E_READ:
192 self.modify(client, 0, fdev)
193 else:
194 self.remove(client)
195 continue
196
197
198 if event & select.EPOLLOUT:
199 self._debug("EPOLLOUT fd=%d %s (r%s,w%s)" % (fd,
200 client.__class__.__name__, client.reader_fileno(),
201 client.writer_fileno()))
202 assert fdev == Engine.E_WRITE
203 assert client._events & fdev
204 self.modify(client, 0, fdev)
205 client._handle_write()
206
207
208 client._processing = False
209
210
211 if client.registered:
212 self.set_events(client, client._new_events)
213
214
215 if timeout > 0 and time.time() >= start_time + timeout:
216 raise EngineTimeoutException()
217
218
219 self.fire_timers()
220
221 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
222 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
223