1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 EngineClient
37
38 ClusterShell engine's client interface.
39
40 An engine client is similar to a process, you can start/stop it, read data from
41 it and write data to it.
42 """
43
44 import fcntl
45 import os
46 import Queue
47 from subprocess import Popen, PIPE, STDOUT
48 import thread
49
50 from ClusterShell.Engine.Engine import EngineBaseTimer
51
52
54 """Generic EngineClient exception."""
55
57 """EOF from client."""
58
60 """Base EngineClient error exception."""
61
63 """Operation not supported by EngineClient."""
64
65
67 """
68 Abstract class EngineClient.
69 """
70
71 - def __init__(self, worker, stderr, timeout, autoclose):
72 """
73 Initializer. Should be called from derived classes.
74 """
75 EngineBaseTimer.__init__(self, timeout, -1, autoclose)
76
77
78 self._events = 0
79
80
81 self._new_events = 0
82 self._processing = False
83
84
85 self.registered = False
86 self.delayable = True
87
88 self.worker = worker
89
90
91 self._stderr = stderr
92
93
94 self.file_error = None
95 self.file_reader = None
96 self.file_writer = None
97
98
99 self._ebuf = ""
100 self._rbuf = ""
101 self._wbuf = ""
102 self._weof = False
103
105 """
106 Fire timeout timer.
107 """
108 if self._engine:
109 self._engine.remove(self, did_timeout=True)
110
112 """
113 Starts client and returns client instance as a convenience.
114 Derived classes (except EnginePort) must implement.
115 """
116 raise NotImplementedError("Derived classes must implement.")
117
119 """
120 Return the standard error reader file descriptor as an integer.
121 """
122 if self.file_error:
123 return self.file_error.fileno()
124 return None
125
127 """
128 Return the reader file descriptor as an integer.
129 """
130 if self.file_reader:
131 return self.file_reader.fileno()
132 return None
133
135 """
136 Return the writer file descriptor as an integer.
137 """
138 if self.file_writer:
139 return self.file_writer.fileno()
140 return None
141
142 - def _close(self, force, timeout):
143 """
144 Close client. Called by the engine after client has been
145 unregistered. This method should handle all termination types
146 (normal, forced or on timeout).
147 Derived classes must implement.
148 """
149 raise NotImplementedError("Derived classes must implement.")
150
152 """
153 Set reading state.
154 """
155 self._engine.set_reading(self)
156
158 """
159 Set error reading state.
160 """
161 self._engine.set_reading_error(self)
162
164 """
165 Set writing state.
166 """
167 self._engine.set_writing(self)
168
169 - def _read(self, size=-1):
170 """
171 Read data from process.
172 """
173 result = self.file_reader.read(size)
174 if not len(result):
175 raise EngineClientEOF()
176 self._set_reading()
177 return result
178
180 """
181 Read error data from process.
182 """
183 result = self.file_error.read(size)
184 if not len(result):
185 raise EngineClientEOF()
186 self._set_reading_error()
187 return result
188
190 """
191 Handle a read notification. Called by the engine as the result of an
192 event indicating that a read is available.
193 """
194 raise NotImplementedError("Derived classes must implement.")
195
197 """
198 Handle a stderr read notification. Called by the engine as the result
199 of an event indicating that a read is available on stderr.
200 """
201 raise NotImplementedError("Derived classes must implement.")
202
204 """
205 Handle a write notification. Called by the engine as the result of an
206 event indicating that a write can be performed now.
207 """
208 if len(self._wbuf) > 0:
209
210 c = os.write(self.file_writer.fileno(), self._wbuf)
211
212 self._wbuf = self._wbuf[c:]
213
214 if self._weof and not self._wbuf:
215 self._close_writer()
216 else:
217 self._set_writing()
218
220 """
221 Utility method to launch a command with stdin/stdout file
222 descriptors configured in non-blocking mode.
223 """
224 full_env = None
225 if env:
226 full_env = os.environ.copy()
227 full_env.update(env)
228
229 if self._stderr:
230 stderr_setup = PIPE
231 else:
232 stderr_setup = STDOUT
233
234
235 proc = Popen(commandlist, bufsize=0, stdin=PIPE, stdout=PIPE,
236 stderr=stderr_setup, close_fds=False, shell=shell, env=full_env)
237
238 if self._stderr:
239 fcntl.fcntl(proc.stderr, fcntl.F_SETFL,
240 fcntl.fcntl(proc.stderr, fcntl.F_GETFL) | os.O_NDELAY)
241 fcntl.fcntl(proc.stdout, fcntl.F_SETFL,
242 fcntl.fcntl(proc.stdout, fcntl.F_GETFL) | os.O_NDELAY)
243 fcntl.fcntl(proc.stdin, fcntl.F_SETFL,
244 fcntl.fcntl(proc.stdin, fcntl.F_GETFL) | os.O_NDELAY)
245
246 return proc
247
249 """
250 Utility method to read client lines
251 """
252
253 readbuf = self._read()
254 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0"
255
256
257
258
259 buf = self._rbuf + readbuf
260 lines = buf.splitlines(True)
261 self._rbuf = ""
262 for line in lines:
263 if line.endswith('\n'):
264 if line.endswith('\r\n'):
265 yield line[:-2]
266 else:
267
268 yield line[:-1]
269 else:
270
271 self._rbuf = line
272
273
275 """
276 Utility method to read client lines
277 """
278
279 readerrbuf = self._readerr()
280 assert len(readerrbuf) > 0, "assertion failed: len(readerrbuf) > 0"
281
282 buf = self._ebuf + readerrbuf
283 lines = buf.splitlines(True)
284 self._ebuf = ""
285 for line in lines:
286 if line.endswith('\n'):
287 if line.endswith('\r\n'):
288 yield line[:-2]
289 else:
290
291 yield line[:-1]
292 else:
293
294 self._ebuf = line
295
296
298 """
299 Add some data to be written to the client.
300 """
301 fd = self.writer_fileno()
302 if fd:
303 assert not self.file_writer.closed
304
305 self._wbuf += buf
306 self._set_writing()
307 else:
308
309 self._wbuf += buf
310
312 self._weof = True
313 if not self._wbuf:
314
315 self._close_writer()
316
318 if self.file_writer and not self.file_writer.closed:
319 self._engine.unregister_writer(self)
320 self.file_writer.close()
321 self.file_writer = None
322
323
325 """
326 An EnginePort is an abstraction object to deliver messages
327 reliably between tasks.
328 """
329
331 """Private class representing a port message.
332
333 A port message may be any Python object.
334 """
335
337 self._user_msg = user_msg
338 self._sync_msg = sync
339 self.reply_lock = thread.allocate_lock()
340 self.reply_lock.acquire()
341
343 """
344 Get and acknowledge message.
345 """
346 self.reply_lock.release()
347 return self._user_msg
348
350 """
351 Wait for message acknowledgment if needed.
352 """
353 if self._sync_msg:
354 self.reply_lock.acquire()
355
356 - def __init__(self, task, handler=None, autoclose=False):
357 """
358 Initialize EnginePort object.
359 """
360 EngineClient.__init__(self, None, False, -1, autoclose)
361 self.task = task
362 self.eh = handler
363
364 self.delayable = False
365
366
367 self._msgq = Queue.Queue(self.task.default("port_qlimit"))
368
369
370 (readfd, writefd) = os.pipe()
371
372 self.file_reader = os.fdopen(readfd, 'r')
373 self.file_writer = os.fdopen(writefd, 'w')
374
375 fcntl.fcntl(readfd, fcntl.F_SETFL,
376 fcntl.fcntl(readfd, fcntl.F_GETFL) | os.O_NDELAY)
377 fcntl.fcntl(writefd, fcntl.F_SETFL,
378 fcntl.fcntl(writefd, fcntl.F_GETFL) | os.O_NDELAY)
379
382
383 - def _close(self, force, timeout):
384 """
385 """
386 self.file_reader.close()
387 self.file_writer.close()
388
389 - def _read(self, size=4096):
394
396 """
397 Handle a read notification. Called by the engine as the result of an
398 event indicating that a read is available.
399 """
400 readbuf = self._read()
401 for c in readbuf:
402
403 pmsg = self._msgq.get(block=False)
404 self.eh.ev_msg(self, pmsg.get())
405
406 - def msg(self, send_msg, send_once=False):
407 """
408 Port message send with optional reply.
409 """
410 pmsg = EnginePort._Msg(send_msg, not send_once)
411 self._msgq.put(pmsg, block=True, timeout=None)
412
413 try:
414 ret = os.write(self.writer_fileno(), "M")
415 except OSError:
416 raise
417
418 pmsg.sync()
419 return ret == 1
420
422 """
423 Port message send-once method (no reply).
424 """
425 self.msg(send_msg, send_once=True)
426