Package ClusterShell :: Package Worker :: Module Worker
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Worker

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Worker.py 242 2010-03-05 13:18:28Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell worker interface. 
 37   
 38  A worker is a generic object which provides "grouped" work in a specific task. 
 39  """ 
 40   
 41  from ClusterShell.Worker.EngineClient import EngineClient 
 42  from ClusterShell.NodeSet import NodeSet 
 43   
 44   
45 -class WorkerException(Exception):
46 """Generic worker exception."""
47
48 -class WorkerError(WorkerException):
49 """Generic worker error."""
50
51 -class WorkerBadArgumentError(WorkerError):
52 """Bad argument in worker error."""
53
54 -class Worker(object):
55 """ 56 Base class Worker. 57 """ 58
59 - def __init__(self, handler):
60 """ 61 Initializer. Should be called from derived classes. 62 """ 63 self.eh = handler # associated EventHandler 64 self.task = None
65
66 - def _set_task(self, task):
67 """ 68 Bind worker to task. Called by task.schedule() 69 """ 70 if self.task is not None: 71 # one-shot-only schedule supported for now 72 raise WorkerError("worker has already been scheduled") 73 self.task = task
74
75 - def _task_bound_check(self):
76 if not self.task: 77 raise WorkerError("worker is not task bound")
78
79 - def _engine_clients(self):
80 """ 81 Return a list of underlying engine clients. 82 """ 83 raise NotImplementedError("Derived classes must implement.")
84
85 - def _invoke(self, ev_type):
86 """ 87 Invoke user EventHandler method if needed. 88 """ 89 if self.eh: 90 self.eh._invoke(ev_type, self)
91
92 - def last_read(self):
93 """ 94 Get last read message from event handler. 95 """ 96 raise NotImplementedError("Derived classes must implement.")
97
98 - def last_error(self):
99 """ 100 Get last error message from event handler. 101 """ 102 raise NotImplementedError("Derived classes must implement.")
103
104 - def did_timeout(self):
105 """ 106 Return True if this worker aborted due to timeout. 107 """ 108 self._task_bound_check() 109 return self.task._num_timeout_by_worker(self) > 0
110 111
112 -class DistantWorker(Worker):
113 """ 114 Base class DistantWorker, which provides a useful set of setters/getters 115 to use with distant workers like ssh or pdsh. 116 """ 117
118 - def __init__(self, handler):
119 Worker.__init__(self, handler) 120 121 self._last_node = None 122 self._last_msg = None 123 self._last_rc = 0 124 self.started = False
125
126 - def _on_start(self):
127 """ 128 Starting 129 """ 130 if not self.started: 131 self.started = True 132 self._invoke("ev_start")
133
134 - def _on_node_msgline(self, node, msg):
135 """ 136 Message received from node, update last* stuffs. 137 """ 138 self._last_node = node 139 self._last_msg = msg 140 141 self.task._msg_add((self, node), msg) 142 143 self._invoke("ev_read")
144
145 - def _on_node_errline(self, node, msg):
146 """ 147 Error message received from node, update last* stuffs. 148 """ 149 self._last_node = node 150 self._last_errmsg = msg 151 152 self.task._errmsg_add((self, node), msg) 153 154 self._invoke("ev_error")
155
156 - def _on_node_rc(self, node, rc):
157 """ 158 Return code received from a node, update last* stuffs. 159 """ 160 self._last_node = node 161 self._last_rc = rc 162 163 self.task._rc_set((self, node), rc) 164 165 self._invoke("ev_hup")
166
167 - def _on_node_timeout(self, node):
168 """ 169 Update on node timeout. 170 """ 171 # Update _last_node to allow node resolution after ev_timeout. 172 self._last_node = node 173 174 self.task._timeout_add((self, node))
175
176 - def last_node(self):
177 """ 178 Get last node, useful to get the node in an EventHandler 179 callback like ev_timeout(). 180 """ 181 return self._last_node
182
183 - def last_read(self):
184 """ 185 Get last (node, buffer), useful in an EventHandler.ev_read() 186 """ 187 return self._last_node, self._last_msg
188
189 - def last_error(self):
190 """ 191 Get last (node, error_buffer), useful in an EventHandler.ev_error() 192 """ 193 return self._last_node, self._last_errmsg
194
195 - def last_retcode(self):
196 """ 197 Get last (node, rc), useful in an EventHandler.ev_hup() 198 """ 199 return self._last_node, self._last_rc
200
201 - def node_buffer(self, node):
202 """ 203 Get specific node buffer. 204 """ 205 self._task_bound_check() 206 return self.task._msg_by_source((self, node))
207
208 - def node_error_buffer(self, node):
209 """ 210 Get specific node error buffer. 211 """ 212 self._task_bound_check() 213 return self.task._errmsg_by_source((self, node))
214
215 - def node_rc(self, node):
216 """ 217 Get specific node return code. 218 """ 219 self._task_bound_check() 220 return self.task._rc_by_source((self, node))
221
222 - def iter_buffers(self, match_keys=None):
223 """ 224 Returns an iterator over available buffers and associated 225 NodeSet. If the optional parameter match_keys is defined, only 226 keys found in match_keys are returned. 227 """ 228 self._task_bound_check() 229 for msg, keys in self.task._call_tree_matcher( \ 230 self.task._msgtree.walk, match_keys, self): 231 yield msg, NodeSet.fromlist(keys)
232
233 - def iter_errors(self, match_keys=None):
234 """ 235 Returns an iterator over available error buffers and associated 236 NodeSet. If the optional parameter match_keys is defined, only 237 keys found in match_keys are returned. 238 """ 239 self._task_bound_check() 240 for msg, keys in self.task._call_tree_matcher( \ 241 self.task._errtree.walk, match_keys, self): 242 yield msg, NodeSet.fromlist(keys)
243
244 - def iter_node_buffers(self, match_keys=None):
245 """ 246 Returns an iterator over each node and associated buffer. 247 """ 248 self._task_bound_check() 249 return self.task._call_tree_matcher(self.task._msgtree.items, 250 match_keys, self)
251
252 - def iter_node_errors(self, match_keys=None):
253 """ 254 Returns an iterator over each node and associated error buffer. 255 """ 256 self._task_bound_check() 257 return self.task._call_tree_matcher(self.task._errtree.items, 258 match_keys, self)
259
260 - def iter_retcodes(self, match_keys=None):
261 """ 262 Returns an iterator over return codes and associated NodeSet. 263 If the optional parameter match_keys is defined, only keys 264 found in match_keys are returned. 265 """ 266 self._task_bound_check() 267 for rc, keys in self.task._rc_iter_by_worker(self, match_keys): 268 yield rc, NodeSet.fromlist(keys)
269
270 - def iter_node_retcodes(self):
271 """ 272 Returns an iterator over each node and associated return code. 273 """ 274 self._task_bound_check() 275 return self.task._krc_iter_by_worker(self)
276
277 - def num_timeout(self):
278 """ 279 Return the number of timed out "keys" (ie. nodes) for this worker. 280 """ 281 self._task_bound_check() 282 return self.task._num_timeout_by_worker(self)
283
284 - def iter_keys_timeout(self):
285 """ 286 Iterate over timed out keys (ie. nodes) for a specific worker. 287 """ 288 self._task_bound_check() 289 return self.task._iter_keys_timeout_by_worker(self)
290 291
292 -class WorkerSimple(EngineClient, Worker):
293 """ 294 Implements a simple Worker being itself an EngineClient. 295 """ 296
297 - def __init__(self, file_reader, file_writer, file_error, key, handler, 298 stderr=False, timeout=-1, autoclose=False):
299 """ 300 Initialize worker. 301 """ 302 Worker.__init__(self, handler) 303 EngineClient.__init__(self, self, stderr, timeout, autoclose) 304 305 self.last_msg = None 306 if key is None: # allow key=0 307 self.key = self 308 else: 309 self.key = key 310 self.file_reader = file_reader 311 self.file_writer = file_writer 312 self.file_error = file_error
313
314 - def _engine_clients(self):
315 """ 316 Return a list of underlying engine clients. 317 """ 318 return [self]
319
320 - def set_key(self, key):
321 """ 322 Source key for this worker is free for use. Use this method to 323 set the custom source key for this worker. 324 """ 325 self.key = key
326
327 - def _start(self):
328 """ 329 Start worker. 330 """ 331 self._invoke("ev_start") 332 333 return self
334
335 - def error_fileno(self):
336 """ 337 Returns the standard error reader file descriptor as an integer. 338 """ 339 if self.file_error and not self.file_error.closed: 340 return self.file_error.fileno() 341 342 return None
343
344 - def reader_fileno(self):
345 """ 346 Returns the reader file descriptor as an integer. 347 """ 348 if self.file_reader and not self.file_reader.closed: 349 return self.file_reader.fileno() 350 351 return None
352
353 - def writer_fileno(self):
354 """ 355 Returns the writer file descriptor as an integer. 356 """ 357 if self.file_writer and not self.file_writer.closed: 358 return self.file_writer.fileno() 359 360 return None
361
362 - def _read(self, size=4096):
363 """ 364 Read data from process. 365 """ 366 return EngineClient._read(self, size)
367
368 - def _readerr(self, size=4096):
369 """ 370 Read error data from process. 371 """ 372 return EngineClient._readerr(self, size)
373
374 - def _close(self, force, timeout):
375 """ 376 Close worker. Called by engine after worker has been 377 unregistered. This method should handle all termination types 378 (normal, forced or on timeout). 379 """ 380 if not force and self._rbuf: 381 # We still have some read data available in buffer, but no 382 # EOL. Generate a final message before closing. 383 self.worker._on_msgline(self._rbuf) 384 385 if self.file_reader != None: 386 self.file_reader.close() 387 if self.file_writer != None: 388 self.file_writer.close() 389 if self.file_error != None: 390 self.file_error.close() 391 392 if timeout: 393 self._on_timeout() 394 395 self._invoke("ev_close")
396
397 - def _handle_read(self):
398 """ 399 Engine is telling us there is data available for reading. 400 """ 401 debug = self.task.info("debug", False) 402 if debug: 403 print_debug = self.task.info("print_debug") 404 405 for msg in self._readlines(): 406 if debug: 407 print_debug(self.task, "LINE %s" % msg) 408 self._on_msgline(msg)
409
410 - def _handle_error(self):
411 """ 412 Engine is telling us there is error available for reading. 413 """ 414 debug = self.task.info("debug", False) 415 if debug: 416 print_debug = self.task.info("print_debug") 417 418 for msg in self._readerrlines(): 419 if debug: 420 print_debug(self.task, "LINE@STDERR %s" % msg) 421 self._on_errmsgline(msg)
422
423 - def last_read(self):
424 """ 425 Read last msg, useful in an EventHandler. 426 """ 427 return self.last_msg
428
429 - def last_error(self):
430 """ 431 Get last error message from event handler. 432 """ 433 return self.last_errmsg
434
435 - def _on_msgline(self, msg):
436 """ 437 Add a message. 438 """ 439 # add last msg to local buffer 440 self.last_msg = msg 441 442 # update task 443 self.task._msg_add((self, self.key), msg) 444 445 self._invoke("ev_read")
446
447 - def _on_errmsgline(self, msg):
448 """ 449 Add a message. 450 """ 451 # add last msg to local buffer 452 self.last_errmsg = msg 453 454 # update task 455 self.task._errmsg_add((self, self.key), msg) 456 457 self._invoke("ev_error")
458
459 - def _on_timeout(self):
460 """ 461 Update on timeout. 462 """ 463 self.task._timeout_add((self, self.key)) 464 465 # trigger timeout event 466 self._invoke("ev_timeout")
467
468 - def read(self):
469 """ 470 Read worker buffer. 471 """ 472 self._task_bound_check() 473 for key, msg in self.task._call_tree_matcher(self.task._msgtree.items, 474 worker=self): 475 assert key == self.key 476 return str(msg)
477
478 - def error(self):
479 """ 480 Read worker error buffer. 481 """ 482 self._task_bound_check() 483 for key, msg in self.task._call_tree_matcher(self.task._errtree.items, 484 worker=self): 485 assert key == self.key 486 return str(msg)
487
488 - def write(self, buf):
489 """ 490 Write to worker. 491 """ 492 self._write(buf)
493
494 - def set_write_eof(self):
495 """ 496 Tell worker to close its writer file descriptor once flushed. Do not 497 perform writes after this call. 498 """ 499 self._set_write_eof()
500