Package ClusterShell :: Module Task
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Task

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Task.py 238 2010-02-25 22:30:31Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell Task module. 
 37   
 38  Simple example of use: 
 39   
 40      from ClusterShell.Task import * 
 41   
 42      # get task associated with calling thread 
 43      task = task_self() 
 44   
 45      # add a command to execute on distant nodes 
 46      task.shell("/bin/uname -r", nodes="tiger[1-30,35]") 
 47   
 48      # run task in calling thread 
 49      task.resume() 
 50   
 51      # get results 
 52      for buf, nodelist in task.iter_buffers(): 
 53          print NodeSet.fromlist(nodelist), buf 
 54   
 55  """ 
 56   
 57  from itertools import imap 
 58  from operator import itemgetter 
 59  import sys 
 60  import threading 
 61  import traceback 
 62   
 63  from ClusterShell.Engine.Engine import EngineAbortException 
 64  from ClusterShell.Engine.Engine import EngineTimeoutException 
 65  from ClusterShell.Engine.Engine import EngineAlreadyRunningError 
 66  from ClusterShell.Engine.Engine import EngineTimer 
 67  from ClusterShell.Engine.Factory import PreferredEngine 
 68  from ClusterShell.Worker.EngineClient import EnginePort 
 69  from ClusterShell.Worker.Ssh import WorkerSsh 
 70  from ClusterShell.Worker.Popen import WorkerPopen 
 71   
 72  from ClusterShell.Event import EventHandler 
 73  from ClusterShell.MsgTree import MsgTree 
 74  from ClusterShell.NodeSet import NodeSet 
75 76 77 -class TaskException(Exception):
78 """Base task exception."""
79
80 -class TaskError(TaskException):
81 """Base task error exception."""
82
83 -class TimeoutError(TaskError):
84 """Raised when the task timed out."""
85
86 -class AlreadyRunningError(TaskError):
87 """Raised when trying to resume an already running task."""
88
89 -class TaskMsgTreeError(TaskError):
90 """Raised when trying to access disabled MsgTree."""
91
92 93 -class _TaskMsgTree(object):
94 """ 95 Task special MsgTree wrapper class, for easy disabling of MsgTree 96 buffering. This class checks if task.default(keyword) is set before 97 effective MsgTree attribute lookup, according to following rules: 98 If set, allow all MsgTree methods, else: 99 - ignore add() calls 100 - disallow MsgTree methods except clear() 101 """
102 - def __init__(self, task, keyword):
103 self._task = task 104 self._keyword = keyword 105 self._tree = MsgTree()
106
107 - def __getattr__(self, name):
108 # check if msgtree is enabled, but always allow MsgTree.clear() 109 if name != 'clear' and not self._task.default(self._keyword): 110 # disable MsgTree.add method 111 if name == 'add': 112 return lambda *args: None 113 # all other MsgTree methods are not allowed 114 raise TaskMsgTreeError("%s not set" % self._keyword) 115 # msgtree enabled: lookup tree attribute name 116 return getattr(self._tree, name)
117
118 119 -def _task_print_debug(task, s):
120 """ 121 Default task debug printing function. Cannot provide 'print' 122 directly as it is not a function (will be in Py3k!). 123 """ 124 print s
125
126 127 -class Task(object):
128 """ 129 Task to execute. May be bound to a specific thread. 130 131 To create a task in a new thread: 132 task = Task() 133 134 To create or get the instance of the task associated with the 135 thread object thr (threading.Thread): 136 task = Task(thread=thr) 137 138 Add command to execute locally in task with: 139 task.shell("/bin/hostname") 140 141 Add command to execute in a distant node in task with: 142 task.shell("/bin/hostname", nodes="tiger[1-20]") 143 144 Run task in its associated thread (will block only if the calling 145 thread is the associated thread: 146 task.resume() 147 """ 148 149 _std_default = { "stderr" : False, 150 "stdout_msgtree" : True, 151 "stderr_msgtree" : True, 152 "engine" : 'auto', 153 "port_qlimit" : 32 } 154 155 _std_info = { "debug" : False, 156 "print_debug" : _task_print_debug, 157 "fanout" : 64, 158 "connect_timeout" : 10, 159 "command_timeout" : 0 } 160 _tasks = {} 161 _taskid_max = 0 162 _task_lock = threading.Lock() 163
164 - class _SyncMsgHandler(EventHandler):
165 """Special task control port event handler. 166 When a message is received on the port, call appropriate 167 task method."""
168 - def ev_msg(self, port, msg):
169 """Message received: call appropriate task method.""" 170 # pull out function and its arguments from message 171 func, (args, kwargs) = msg[0], msg[1:] 172 # call task method 173 func(port.task, *args, **kwargs)
174
175 - class tasksyncmethod(object):
176 """Class encapsulating a function that checks if the calling 177 task is running or is the current task, and allowing it to be 178 used as a decorator making the wrapped task method thread-safe.""" 179
180 - def __call__(self, f):
181 def taskfunc(*args, **kwargs): 182 # pull out the class instance 183 task, fargs = args[0], args[1:] 184 # check if the calling task is the current thread task 185 if task._is_task_self(): 186 return f(task, *fargs, **kwargs) 187 else: 188 # no, safely call the task method by message 189 # through the task special dispatch port 190 task._dispatch_port.msg_send((f, fargs, kwargs))
191 192 # modify the decorator meta-data for pydoc 193 # Note: should be later replaced by @wraps (functools) 194 # as of Python 2.5 195 taskfunc.__name__ = f.__name__ 196 taskfunc.__doc__ = f.__doc__ 197 taskfunc.__dict__ = f.__dict__ 198 taskfunc.__module__ = f.__module__ 199 return taskfunc
200
201 - class _SuspendCondition(object):
202 """Special class to manage task suspend condition."""
203 - def __init__(self, lock=threading.RLock(), initial=0):
204 self._cond = threading.Condition(lock) 205 self.suspend_count = initial
206
207 - def atomic_inc(self):
208 """Increase suspend count.""" 209 self._cond.acquire() 210 self.suspend_count += 1 211 self._cond.release()
212
213 - def atomic_dec(self):
214 """Decrease suspend count.""" 215 self._cond.acquire() 216 self.suspend_count -= 1 217 self._cond.release()
218
219 - def wait_check(self, release_lock=None):
220 """Wait for condition if needed.""" 221 self._cond.acquire() 222 try: 223 if self.suspend_count > 0: 224 if release_lock: 225 release_lock.release() 226 self._cond.wait() 227 finally: 228 self._cond.release()
229
230 - def notify_all(self):
231 """Signal all threads waiting for condition.""" 232 self._cond.acquire() 233 try: 234 self.suspend_count = min(self.suspend_count, 0) 235 self._cond.notifyAll() 236 finally: 237 self._cond.release()
238 239
240 - def __new__(cls, thread=None):
241 """ 242 For task bound to a specific thread, this class acts like a 243 "thread singleton", so new style class is used and new object 244 are only instantiated if needed. 245 """ 246 if thread: 247 if thread not in cls._tasks: 248 cls._tasks[thread] = object.__new__(cls) 249 return cls._tasks[thread] 250 251 return object.__new__(cls)
252
253 - def __init__(self, thread=None):
254 """ 255 Initialize a Task, creating a new thread if needed. 256 """ 257 if not getattr(self, "_engine", None): 258 # first time called 259 self._default_lock = threading.Lock() 260 self._default = self.__class__._std_default.copy() 261 self._info = self.__class__._std_info.copy() 262 263 # use factory class PreferredEngine that gives the proper 264 # engine instance 265 self._engine = PreferredEngine(self.default("engine"), self._info) 266 self.timeout = 0 267 268 # task synchronization objects 269 self._run_lock = threading.Lock() # primitive lock 270 self._suspend_lock = threading.RLock() # reentrant lock 271 # both join and suspend conditions share the same underlying lock 272 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1) 273 self._join_cond = threading.Condition(self._suspend_lock) 274 self._suspended = False 275 self._quit = False 276 277 # STDIN tree 278 self._msgtree = _TaskMsgTree(self, "stdout_msgtree") 279 280 # STDERR tree 281 self._errtree = _TaskMsgTree(self, "stderr_msgtree") 282 283 # dict of sources to return codes 284 self._d_source_rc = {} 285 # dict of return codes to sources 286 self._d_rc_sources = {} 287 # keep max rc 288 self._max_rc = 0 289 # keep timeout'd sources 290 self._timeout_sources = set() 291 292 # special engine port for task method dispatching 293 self._dispatch_port = EnginePort(self, 294 handler=Task._SyncMsgHandler(), 295 autoclose=True) 296 self._engine.add(self._dispatch_port) 297 298 # set taskid used as Thread name 299 Task._task_lock.acquire() 300 Task._taskid_max += 1 301 self._taskid = Task._taskid_max 302 Task._task_lock.release() 303 304 # create new thread if needed 305 if thread: 306 self.thread = thread 307 else: 308 self.thread = thread = \ 309 threading.Thread(None, 310 Task._thread_start, 311 "Task-%d" % self._taskid, 312 args=(self,)) 313 Task._tasks[thread] = self 314 thread.start()
315
316 - def _is_task_self(self):
317 """Private method used by the library to check if the task is 318 task_self(), but do not create any task_self() instance.""" 319 return self.thread == threading.currentThread()
320
321 - def _handle_exception(self):
322 print >> sys.stderr, 'Exception in thread %s:' % self.thread 323 traceback.print_exc(file=sys.stderr) 324 self._quit = True
325
326 - def _thread_start(self):
327 """Task-managed thread entry point""" 328 while not self._quit: 329 self._suspend_cond.wait_check() 330 if self._quit: 331 break 332 try: 333 self._resume() 334 except: 335 self._handle_exception() 336 337 self._terminate(kill=True)
338
339 - def _run(self, timeout):
340 """Run task (always called from its self thread).""" 341 # check if task is already running 342 if self._run_lock.locked(): 343 raise AlreadyRunningError("task is already running") 344 # use with statement later 345 try: 346 self._run_lock.acquire() 347 self._engine.run(timeout) 348 finally: 349 self._run_lock.release()
350
351 - def default(self, default_key, def_val=None):
352 """ 353 Return per-task value for key from the "default" dictionary. 354 """ 355 self._default_lock.acquire() 356 try: 357 return self._default.get(default_key, def_val) 358 finally: 359 self._default_lock.release()
360
361 - def set_default(self, default_key, value):
362 """ 363 Set task value for specified key in the dictionary "default". 364 Users may store their own task-specific key, value pairs 365 using this method and retrieve them with default(). 366 367 Threading considerations: 368 Unlike set_info(), when called from the task's thread or 369 not, set_default() immediately updates the underlying 370 dictionary in a thread-safe manner. This method doesn't 371 wake up the engine when called. 372 """ 373 self._default_lock.acquire() 374 try: 375 self._default[default_key] = value 376 finally: 377 self._default_lock.release()
378
379 - def info(self, info_key, def_val=None):
380 """ 381 Return per-task information. 382 """ 383 return self._info.get(info_key, def_val)
384 385 @tasksyncmethod()
386 - def set_info(self, info_key, value):
387 """ 388 Set task value for a specific key information. Key, value 389 pairs can be passed to the engine and/or workers. 390 Users may store their own task-specific info key, value pairs 391 using this method and retrieve them with info(). 392 393 Threading considerations: 394 Unlike set_default(), the underlying info dictionary is only 395 modified from the task's thread. So calling set_info() from 396 another thread leads to queueing the request for late apply 397 (at run time) using the task dispatch port. When received, 398 the request wakes up the engine when the task is running and 399 the info dictionary is then updated. 400 """ 401 self._info[info_key] = value
402
403 - def shell(self, command, **kwargs):
404 """ 405 Schedule a shell command for local or distant execution. 406 407 Local usage: 408 task.shell(command [, key=key] [, handler=handler] 409 [, timeout=secs] [, autoclose=enable_autoclose] 410 [, stderr=enable_stderr]) 411 412 Distant usage: 413 task.shell(command, nodes=nodeset [, handler=handler] 414 [, timeout=secs], [, autoclose=enable_autoclose] 415 [, strderr=enable_stderr]) 416 """ 417 418 handler = kwargs.get("handler", None) 419 timeo = kwargs.get("timeout", None) 420 ac = kwargs.get("autoclose", False) 421 stderr = kwargs.get("stderr", self.default("stderr")) 422 423 if kwargs.get("nodes", None): 424 assert kwargs.get("key", None) is None, \ 425 "'key' argument not supported for distant command" 426 427 # create ssh-based worker 428 worker = WorkerSsh(NodeSet(kwargs["nodes"]), command=command, 429 handler=handler, stderr=stderr, timeout=timeo, 430 autoclose=ac) 431 else: 432 # create (local) worker 433 worker = WorkerPopen(command, key=kwargs.get("key", None), 434 handler=handler, stderr=stderr, 435 timeout=timeo, autoclose=ac) 436 437 # schedule worker for execution in this task 438 self.schedule(worker) 439 440 return worker
441
442 - def copy(self, source, dest, nodes, **kwargs):
443 """ 444 Copy local file to distant nodes. 445 """ 446 assert nodes != None, "local copy not supported" 447 448 handler = kwargs.get("handler", None) 449 timeo = kwargs.get("timeout", None) 450 preserve = kwargs.get("preserve", None) 451 452 # create a new copy worker 453 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler, 454 timeout=timeo, preserve=preserve) 455 456 self.schedule(worker) 457 return worker
458 459 @tasksyncmethod()
460 - def _add_port(self, port):
461 """Add an EnginePort instance to Engine (private method).""" 462 self._engine.add(port)
463 464 @tasksyncmethod()
465 - def _remove_port(self, port):
466 """Remove a port from Engine (private method).""" 467 self._engine.remove(port)
468
469 - def port(self, handler=None, autoclose=False):
470 """ 471 Create a new task port. A task port is an abstraction object to 472 deliver messages reliably between tasks. 473 474 Basic rules: 475 A task can send messages to another task port (thread safe). 476 A task can receive messages from an acquired port either by 477 setting up a notification mechanism or using a polling 478 mechanism that may block the task waiting for a message 479 sent on the port. 480 A port can be acquired by one task only. 481 482 If handler is set to a valid EventHandler object, the port is 483 a send-once port, ie. a message sent to this port generates an 484 ev_msg event notification issued the port's task. If handler 485 is not set, the task can only receive messages on the port by 486 calling port.msg_recv(). 487 """ 488 port = EnginePort(self, handler, autoclose) 489 self._add_port(port) 490 return port
491 492 @tasksyncmethod()
493 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
494 """ 495 Create task's timer. 496 """ 497 assert fire >= 0.0, \ 498 "timer's relative fire time must be a positive floating number" 499 500 timer = EngineTimer(fire, interval, autoclose, handler) 501 self._engine.add_timer(timer) 502 return timer
503 504 @tasksyncmethod()
505 - def schedule(self, worker):
506 """ 507 Schedule a worker for execution. Only useful for manually 508 instantiated workers. 509 """ 510 assert self in Task._tasks.values(), "deleted task" 511 512 # bind worker to task self 513 worker._set_task(self) 514 515 # add worker clients to engine 516 for client in worker._engine_clients(): 517 self._engine.add(client)
518
519 - def _resume_thread(self):
520 """Resume called from another thread.""" 521 self._suspend_cond.notify_all()
522
523 - def _resume(self):
524 assert self.thread == threading.currentThread() 525 try: 526 try: 527 self._reset() 528 self._run(self.timeout) 529 except EngineTimeoutException: 530 raise TimeoutError() 531 except EngineAbortException, e: 532 self._terminate(e.kill) 533 except EngineAlreadyRunningError: 534 raise AlreadyRunningError("task engine is already running") 535 finally: 536 # task becomes joinable 537 self._join_cond.acquire() 538 self._suspend_cond.suspend_count += 1 539 self._join_cond.notifyAll() 540 self._join_cond.release()
541
542 - def resume(self, timeout=0):
543 """ 544 Resume task. If task is task_self(), workers are executed in 545 the calling thread so this method will block until workers have 546 finished. This is always the case for a single-threaded 547 application (eg. which doesn't create other Task() instance 548 than task_self()). Otherwise, the current thread doesn't block. 549 In that case, you may then want to call task_wait() to wait for 550 completion. 551 """ 552 self.timeout = timeout 553 554 self._suspend_cond.atomic_dec() 555 556 if self._is_task_self(): 557 self._resume() 558 else: 559 self._resume_thread()
560 561 @tasksyncmethod()
562 - def _suspend_wait(self):
563 assert task_self() == self 564 # atomically set suspend state 565 self._suspend_lock.acquire() 566 self._suspended = True 567 self._suspend_lock.release() 568 569 # wait for special suspend condition, while releasing l_run 570 self._suspend_cond.wait_check(self._run_lock) 571 572 # waking up, atomically unset suspend state 573 self._suspend_lock.acquire() 574 self._suspended = False 575 self._suspend_lock.release()
576
577 - def suspend(self):
578 """ 579 Suspend task execution. This method may be called from another 580 task (thread-safe). The function returns False if the task 581 cannot be suspended (eg. it's not running), or returns True if 582 the task has been successfully suspended. 583 To resume a suspended task, use task.resume(). 584 """ 585 # first of all, increase suspend count 586 self._suspend_cond.atomic_inc() 587 588 # call synchronized suspend method 589 self._suspend_wait() 590 591 # wait for stopped task 592 self._run_lock.acquire() # run_lock ownership transfer 593 594 # get result: are we really suspended or just stopped? 595 result = True 596 self._suspend_lock.acquire() 597 if not self._suspended: 598 # not acknowledging suspend state, task is stopped 599 result = False 600 self._run_lock.release() 601 self._suspend_lock.release() 602 return result
603 604 @tasksyncmethod()
605 - def _abort(self, kill=False):
606 assert task_self() == self 607 # raise an EngineAbortException when task is running 608 self._engine.abort(kill)
609
610 - def abort(self, kill=False):
611 """ 612 Abort a task. Aborting a task removes (and stops when needed) 613 all workers. If optional parameter kill is True, the task 614 object is unbound from the current thread, so calling 615 task_self() creates a new Task object. 616 """ 617 if self._run_lock.acquire(0): 618 self._quit = True 619 self._run_lock.release() 620 if self._is_task_self(): 621 self._terminate(kill) 622 else: 623 # abort on stopped/suspended task 624 self.resume() 625 else: 626 # self._run_lock is locked, call synchronized method 627 self._abort(kill)
628
629 - def _terminate(self, kill):
630 """ 631 Abort completion subroutine. 632 """ 633 self._engine.clear() 634 self._reset() 635 636 if kill: 637 Task._task_lock.acquire() 638 try: 639 del Task._tasks[threading.currentThread()] 640 finally: 641 Task._task_lock.release()
642
643 - def join(self):
644 """ 645 Suspend execution of the calling thread until the target task 646 terminates, unless the target task has already terminated. 647 """ 648 self._join_cond.acquire() 649 try: 650 if self._suspend_cond.suspend_count > 0: 651 if not self._suspended: 652 # ignore stopped task 653 return 654 self._join_cond.wait() 655 finally: 656 self._join_cond.release()
657
658 - def running(self):
659 """ 660 Return True if the task is running. 661 """ 662 return self._engine.running
663
664 - def _reset(self):
665 """ 666 Reset buffers and retcodes management variables. 667 """ 668 self._msgtree.clear() 669 self._errtree.clear() 670 self._d_source_rc = {} 671 self._d_rc_sources = {} 672 self._max_rc = 0 673 self._timeout_sources.clear()
674
675 - def _msg_add(self, source, msg):
676 """ 677 Add a worker message associated with a source. 678 """ 679 self._msgtree.add(source, msg)
680
681 - def _errmsg_add(self, source, msg):
682 """ 683 Add a worker error message associated with a source. 684 """ 685 self._errtree.add(source, msg)
686
687 - def _rc_set(self, source, rc, override=True):
688 """ 689 Add a worker return code associated with a source. 690 """ 691 if not override and self._d_source_rc.has_key(source): 692 return 693 694 # store rc by source 695 self._d_source_rc[source] = rc 696 697 # store source by rc 698 e = self._d_rc_sources.get(rc) 699 if e is None: 700 self._d_rc_sources[rc] = set([source]) 701 else: 702 self._d_rc_sources[rc].add(source) 703 704 # update max rc 705 if rc > self._max_rc: 706 self._max_rc = rc
707
708 - def _timeout_add(self, source):
709 """ 710 Add a worker timeout associated with a source. 711 """ 712 # store source in timeout set 713 self._timeout_sources.add(source)
714
715 - def _msg_by_source(self, source):
716 """ 717 Get a message by its source (worker, key). 718 """ 719 s = self._msgtree.get(source) 720 if s is None: 721 return None 722 return str(s)
723
724 - def _errmsg_by_source(self, source):
725 """ 726 Get an error message by its source (worker, key). 727 """ 728 s = self._errtree.get(source) 729 if s is None: 730 return None 731 return str(s)
732
733 - def _call_tree_matcher(self, tree_match_func, match_keys=None, worker=None):
734 """Call identified tree matcher (items, walk) method with options.""" 735 # filter by worker and optionally by matching keys 736 if worker and not match_keys: 737 match = lambda k: k[0] is worker 738 elif worker and match_keys: 739 match = lambda k: k[0] is worker and k[1] in match_keys 740 elif match_keys: 741 match = lambda k: k[1] in match_keys 742 else: 743 match = None 744 # Call tree matcher function (items or walk) 745 return tree_match_func(match, itemgetter(1))
746
747 - def _rc_by_source(self, source):
748 """ 749 Get a return code by its source (worker, key). 750 """ 751 return self._d_source_rc.get(source, 0)
752
753 - def _rc_iter_by_key(self, key):
754 """ 755 Return an iterator over return codes for the given key. 756 """ 757 for (w, k), rc in self._d_source_rc.iteritems(): 758 if k == key: 759 yield rc
760
761 - def _rc_iter_by_worker(self, worker, match_keys=None):
762 """ 763 Return an iterator over return codes and keys list for a 764 specific worker and optional matching keys. 765 """ 766 if match_keys: 767 # Use the items iterator for the underlying dict. 768 for rc, src in self._d_rc_sources.iteritems(): 769 keys = [t[1] for t in src if t[0] is worker and \ 770 t[1] in match_keys] 771 if len(keys) > 0: 772 yield rc, keys 773 else: 774 for rc, src in self._d_rc_sources.iteritems(): 775 keys = [t[1] for t in src if t[0] is worker] 776 if len(keys) > 0: 777 yield rc, keys
778
779 - def _krc_iter_by_worker(self, worker):
780 """ 781 Return an iterator over key, rc for a specific worker. 782 """ 783 for rc, src in self._d_rc_sources.iteritems(): 784 for w, k in src: 785 if w is worker: 786 yield k, rc
787
788 - def _num_timeout_by_worker(self, worker):
789 """ 790 Return the number of timed out "keys" for a specific worker. 791 """ 792 cnt = 0 793 for (w, k) in self._timeout_sources: 794 if w is worker: 795 cnt += 1 796 return cnt
797
798 - def _iter_keys_timeout_by_worker(self, worker):
799 """ 800 Iterate over timed out keys (ie. nodes) for a specific worker. 801 """ 802 for (w, k) in self._timeout_sources: 803 if w is worker: 804 yield k
805
806 - def key_buffer(self, key):
807 """ 808 Get buffer for a specific key. When the key is associated 809 to multiple workers, the resulting buffer will contain 810 all workers content that may overlap. 811 """ 812 select_key = lambda k: k[1] == key 813 return "".join(imap(str, self._msgtree.messages(select_key)))
814 815 node_buffer = key_buffer 816
817 - def key_error(self, key):
818 """ 819 Get error buffer for a specific key. When the key is associated 820 to multiple workers, the resulting buffer will contain all 821 workers content that may overlap. 822 """ 823 select_key = lambda k: k[1] == key 824 return "".join(imap(str, self._errtree.messages(select_key)))
825 826 node_error = key_error 827
828 - def key_retcode(self, key):
829 """ 830 Return return code for a specific key. When the key is 831 associated to multiple workers, return the max return 832 code from these workers. 833 """ 834 return max(self._rc_iter_by_key(key))
835 836 node_retcode = key_retcode 837
838 - def max_retcode(self):
839 """ 840 Get max return code encountered during last run. 841 842 How retcodes work: 843 If the process exits normally, the return code is its exit 844 status. If the process is terminated by a signal, the return 845 code is 128 + signal number. 846 """ 847 return self._max_rc
848
849 - def iter_buffers(self, match_keys=None):
850 """ 851 Iterate over buffers, returns a tuple (buffer, keys). For remote 852 workers (Ssh), keys are list of nodes. In that case, you should use 853 NodeSet.fromlist(keys) to get a NodeSet instance (which is more 854 convenient and efficient): 855 856 Optional parameter match_keys add filtering on these keys. 857 858 Usage example: 859 860 for buffer, nodelist in task.iter_buffers(): 861 print NodeSet.fromlist(nodelist) 862 print buffer 863 """ 864 return self._call_tree_matcher(self._msgtree.walk, match_keys)
865
866 - def iter_errors(self, match_keys=None):
867 """ 868 Iterate over error buffers, returns a tuple (buffer, keys). 869 870 See iter_buffers(). 871 """ 872 return self._call_tree_matcher(self._errtree.walk, match_keys)
873
874 - def iter_retcodes(self, match_keys=None):
875 """ 876 Iterate over return codes, returns a tuple (rc, keys). 877 878 Optional parameter match_keys add filtering on these keys. 879 880 How retcodes work: 881 If the process exits normally, the return code is its exit 882 status. If the process is terminated by a signal, the return 883 code is 128 + signal number. 884 """ 885 if match_keys: 886 # Use the items iterator for the underlying dict. 887 for rc, src in self._d_rc_sources.iteritems(): 888 keys = [t[1] for t in src if t[1] in match_keys] 889 yield rc, keys 890 else: 891 for rc, src in self._d_rc_sources.iteritems(): 892 yield rc, [t[1] for t in src]
893
894 - def num_timeout(self):
895 """ 896 Return the number of timed out "keys" (ie. nodes). 897 """ 898 return len(self._timeout_sources)
899
900 - def iter_keys_timeout(self):
901 """ 902 Iterate over timed out keys (ie. nodes). 903 """ 904 for (w, k) in self._timeout_sources: 905 yield k
906 907 @classmethod
908 - def wait(cls, from_thread):
909 """ 910 Class method that blocks calling thread until all tasks have 911 finished (from a ClusterShell point of view, for instance, 912 their task.resume() return). It doesn't necessarly mean that 913 associated threads have finished. 914 """ 915 Task._task_lock.acquire() 916 try: 917 tasks = Task._tasks.copy() 918 finally: 919 Task._task_lock.release() 920 for thread, task in tasks.iteritems(): 921 if thread != from_thread: 922 task.join()
923
924 925 -def task_self():
926 """ 927 Get the Task instance bound to the current thread. This function 928 provided as a convenience is available in the top-level 929 ClusterShell.Task package namespace. 930 """ 931 return Task(thread=threading.currentThread())
932
933 -def task_wait():
934 """ 935 Suspend execution of the calling thread until all tasks terminate, 936 unless all tasks have already terminated. This function is provided 937 as a convenience and is available in the top-level 938 ClusterShell.Task package namespace. 939 """ 940 Task.wait(threading.currentThread())
941
942 -def task_terminate():
943 """ 944 Destroy the Task instance bound to the current thread. A next call 945 to task_self() will create a new Task object. This function provided 946 as a convenience is available in the top-level ClusterShell.Task 947 package namespace. 948 """ 949 task_self().abort(kill=True)
950
951 -def task_cleanup():
952 """ 953 Cleanup routine to destroy all created tasks. This function 954 provided as a convenience is available in the top-level 955 ClusterShell.Task package namespace. 956 """ 957 Task._task_lock.acquire() 958 try: 959 tasks = Task._tasks.copy() 960 finally: 961 Task._task_lock.release() 962 for task in tasks.itervalues(): 963 task.abort(kill=True)
964