Package ClusterShell :: Package Worker :: Module EngineClient
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.EngineClient

  1  # 
  2  # Copyright CEA/DAM/DIF (2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id$ 
 34   
 35  """ 
 36  EngineClient 
 37   
 38  ClusterShell engine's client interface. 
 39   
 40  An engine client is similar to a process, you can start/stop it, read data from 
 41  it and write data to it. 
 42  """ 
 43   
 44  import fcntl 
 45  import os 
 46  import Queue 
 47  from subprocess import Popen, PIPE, STDOUT 
 48  import thread 
 49   
 50  from ClusterShell.Engine.Engine import EngineBaseTimer 
 51   
 52   
53 -class EngineClientException(Exception):
54 """Generic EngineClient exception."""
55
56 -class EngineClientEOF(EngineClientException):
57 """EOF from client."""
58
59 -class EngineClientError(EngineClientException):
60 """Base EngineClient error exception."""
61
62 -class EngineClientNotSupportedError(EngineClientError):
63 """Operation not supported by EngineClient."""
64 65
66 -class EngineClient(EngineBaseTimer):
67 """ 68 Abstract class EngineClient. 69 """ 70
71 - def __init__(self, worker, stderr, timeout, autoclose):
72 """ 73 Initializer. Should be called from derived classes. 74 """ 75 EngineBaseTimer.__init__(self, timeout, -1, autoclose) 76 77 # engine-friendly variables 78 self._events = 0 # current configured set of 79 # interesting events (read, 80 # write) for client 81 self._new_events = 0 # new set of interesting events 82 self._processing = False # engine is working on us 83 84 # read-only public 85 self.registered = False # registered on engine or not 86 self.delayable = True # subject to fanout limit 87 88 self.worker = worker 89 90 # boolean indicating whether stderr is on a separate fd 91 self._stderr = stderr 92 93 # associated files 94 self.file_error = None 95 self.file_reader = None 96 self.file_writer = None 97 98 # initialize error, read and write buffers 99 self._ebuf = "" 100 self._rbuf = "" 101 self._wbuf = "" 102 self._weof = False # write-ends notification
103
104 - def _fire(self):
105 """ 106 Fire timeout timer. 107 """ 108 if self._engine: 109 self._engine.remove(self, did_timeout=True)
110
111 - def _start(self):
112 """ 113 Starts client and returns client instance as a convenience. 114 Derived classes (except EnginePort) must implement. 115 """ 116 raise NotImplementedError("Derived classes must implement.")
117
118 - def error_fileno(self):
119 """ 120 Return the standard error reader file descriptor as an integer. 121 """ 122 if self.file_error: 123 return self.file_error.fileno() 124 return None
125
126 - def reader_fileno(self):
127 """ 128 Return the reader file descriptor as an integer. 129 """ 130 if self.file_reader: 131 return self.file_reader.fileno() 132 return None
133
134 - def writer_fileno(self):
135 """ 136 Return the writer file descriptor as an integer. 137 """ 138 if self.file_writer: 139 return self.file_writer.fileno() 140 return None
141
142 - def _close(self, force, timeout):
143 """ 144 Close client. Called by the engine after client has been 145 unregistered. This method should handle all termination types 146 (normal, forced or on timeout). 147 Derived classes must implement. 148 """ 149 raise NotImplementedError("Derived classes must implement.")
150
151 - def _set_reading(self):
152 """ 153 Set reading state. 154 """ 155 self._engine.set_reading(self)
156
157 - def _set_reading_error(self):
158 """ 159 Set error reading state. 160 """ 161 self._engine.set_reading_error(self)
162
163 - def _set_writing(self):
164 """ 165 Set writing state. 166 """ 167 self._engine.set_writing(self)
168
169 - def _read(self, size=-1):
170 """ 171 Read data from process. 172 """ 173 result = self.file_reader.read(size) 174 if not len(result): 175 raise EngineClientEOF() 176 self._set_reading() 177 return result
178
179 - def _readerr(self, size=-1):
180 """ 181 Read error data from process. 182 """ 183 result = self.file_error.read(size) 184 if not len(result): 185 raise EngineClientEOF() 186 self._set_reading_error() 187 return result
188
189 - def _handle_read(self):
190 """ 191 Handle a read notification. Called by the engine as the result of an 192 event indicating that a read is available. 193 """ 194 raise NotImplementedError("Derived classes must implement.")
195
196 - def _handle_error(self):
197 """ 198 Handle a stderr read notification. Called by the engine as the result 199 of an event indicating that a read is available on stderr. 200 """ 201 raise NotImplementedError("Derived classes must implement.")
202
203 - def _handle_write(self):
204 """ 205 Handle a write notification. Called by the engine as the result of an 206 event indicating that a write can be performed now. 207 """ 208 if len(self._wbuf) > 0: 209 # write syscall 210 c = os.write(self.file_writer.fileno(), self._wbuf) 211 # dequeue written buffer 212 self._wbuf = self._wbuf[c:] 213 # check for possible ending 214 if self._weof and not self._wbuf: 215 self._close_writer() 216 else: 217 self._set_writing()
218
219 - def _exec_nonblock(self, commandlist, shell=False, env=None):
220 """ 221 Utility method to launch a command with stdin/stdout file 222 descriptors configured in non-blocking mode. 223 """ 224 full_env = None 225 if env: 226 full_env = os.environ.copy() 227 full_env.update(env) 228 229 if self._stderr: 230 stderr_setup = PIPE 231 else: 232 stderr_setup = STDOUT 233 234 # Launch process in non-blocking mode 235 proc = Popen(commandlist, bufsize=0, stdin=PIPE, stdout=PIPE, 236 stderr=stderr_setup, close_fds=False, shell=shell, env=full_env) 237 238 if self._stderr: 239 fcntl.fcntl(proc.stderr, fcntl.F_SETFL, 240 fcntl.fcntl(proc.stderr, fcntl.F_GETFL) | os.O_NDELAY) 241 fcntl.fcntl(proc.stdout, fcntl.F_SETFL, 242 fcntl.fcntl(proc.stdout, fcntl.F_GETFL) | os.O_NDELAY) 243 fcntl.fcntl(proc.stdin, fcntl.F_SETFL, 244 fcntl.fcntl(proc.stdin, fcntl.F_GETFL) | os.O_NDELAY) 245 246 return proc
247
248 - def _readlines(self):
249 """ 250 Utility method to read client lines 251 """ 252 # read a chunk of data, may raise eof 253 readbuf = self._read() 254 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0" 255 256 # Current version implements line-buffered reads. If needed, we could 257 # easily provide direct, non-buffered, data reads in the future. 258 259 buf = self._rbuf + readbuf 260 lines = buf.splitlines(True) 261 self._rbuf = "" 262 for line in lines: 263 if line.endswith('\n'): 264 if line.endswith('\r\n'): 265 yield line[:-2] # trim CRLF 266 else: 267 # trim LF 268 yield line[:-1] # trim LF 269 else: 270 # keep partial line in buffer 271 self._rbuf = line
272 # breaking here 273
274 - def _readerrlines(self):
275 """ 276 Utility method to read client lines 277 """ 278 # read a chunk of data, may raise eof 279 readerrbuf = self._readerr() 280 assert len(readerrbuf) > 0, "assertion failed: len(readerrbuf) > 0" 281 282 buf = self._ebuf + readerrbuf 283 lines = buf.splitlines(True) 284 self._ebuf = "" 285 for line in lines: 286 if line.endswith('\n'): 287 if line.endswith('\r\n'): 288 yield line[:-2] # trim CRLF 289 else: 290 # trim LF 291 yield line[:-1] # trim LF 292 else: 293 # keep partial line in buffer 294 self._ebuf = line
295 # breaking here 296
297 - def _write(self, buf):
298 """ 299 Add some data to be written to the client. 300 """ 301 fd = self.writer_fileno() 302 if fd: 303 assert not self.file_writer.closed 304 # TODO: write now if ready 305 self._wbuf += buf 306 self._set_writing() 307 else: 308 # bufferize until pipe is ready 309 self._wbuf += buf
310
311 - def _set_write_eof(self):
312 self._weof = True 313 if not self._wbuf: 314 # sendq empty, try to close writer now 315 self._close_writer()
316
317 - def _close_writer(self):
318 if self.file_writer and not self.file_writer.closed: 319 self._engine.unregister_writer(self) 320 self.file_writer.close() 321 self.file_writer = None
322 323
324 -class EnginePort(EngineClient):
325 """ 326 An EnginePort is an abstraction object to deliver messages 327 reliably between tasks. 328 """ 329
330 - class _Msg:
331 """Private class representing a port message. 332 333 A port message may be any Python object. 334 """ 335
336 - def __init__(self, user_msg, sync):
337 self._user_msg = user_msg 338 self._sync_msg = sync 339 self.reply_lock = thread.allocate_lock() 340 self.reply_lock.acquire()
341
342 - def get(self):
343 """ 344 Get and acknowledge message. 345 """ 346 self.reply_lock.release() 347 return self._user_msg
348
349 - def sync(self):
350 """ 351 Wait for message acknowledgment if needed. 352 """ 353 if self._sync_msg: 354 self.reply_lock.acquire()
355
356 - def __init__(self, task, handler=None, autoclose=False):
357 """ 358 Initialize EnginePort object. 359 """ 360 EngineClient.__init__(self, None, False, -1, autoclose) 361 self.task = task 362 self.eh = handler 363 # ports are no subject to fanout 364 self.delayable = False 365 366 # Port messages queue 367 self._msgq = Queue.Queue(self.task.default("port_qlimit")) 368 369 # Request pipe 370 (readfd, writefd) = os.pipe() 371 # Use file objects instead of FD for convenience 372 self.file_reader = os.fdopen(readfd, 'r') 373 self.file_writer = os.fdopen(writefd, 'w') 374 # Set nonblocking flag 375 fcntl.fcntl(readfd, fcntl.F_SETFL, 376 fcntl.fcntl(readfd, fcntl.F_GETFL) | os.O_NDELAY) 377 fcntl.fcntl(writefd, fcntl.F_SETFL, 378 fcntl.fcntl(writefd, fcntl.F_GETFL) | os.O_NDELAY)
379
380 - def _start(self):
381 return self
382
383 - def _close(self, force, timeout):
384 """ 385 """ 386 self.file_reader.close() 387 self.file_writer.close()
388
389 - def _read(self, size=4096):
390 """ 391 Read data from pipe. 392 """ 393 return EngineClient._read(self, size)
394
395 - def _handle_read(self):
396 """ 397 Handle a read notification. Called by the engine as the result of an 398 event indicating that a read is available. 399 """ 400 readbuf = self._read() 401 for c in readbuf: 402 # raise Empty if empty (should never happen) 403 pmsg = self._msgq.get(block=False) 404 self.eh.ev_msg(self, pmsg.get())
405
406 - def msg(self, send_msg, send_once=False):
407 """ 408 Port message send with optional reply. 409 """ 410 pmsg = EnginePort._Msg(send_msg, not send_once) 411 self._msgq.put(pmsg, block=True, timeout=None) 412 413 try: 414 ret = os.write(self.writer_fileno(), "M") 415 except OSError: 416 raise 417 418 pmsg.sync() 419 return ret == 1
420
421 - def msg_send(self, send_msg):
422 """ 423 Port message send-once method (no reply). 424 """ 425 self.msg(send_msg, send_once=True)
426