1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell Task module.
37
38 Simple example of use:
39
40 from ClusterShell.Task import *
41
42 # get task associated with calling thread
43 task = task_self()
44
45 # add a command to execute on distant nodes
46 task.shell("/bin/uname -r", nodes="tiger[1-30,35]")
47
48 # run task in calling thread
49 task.resume()
50
51 # get results
52 for buf, nodelist in task.iter_buffers():
53 print NodeSet.fromlist(nodelist), buf
54
55 """
56
57 from itertools import imap
58 from operator import itemgetter
59 import sys
60 import threading
61 import traceback
62
63 from ClusterShell.Engine.Engine import EngineAbortException
64 from ClusterShell.Engine.Engine import EngineTimeoutException
65 from ClusterShell.Engine.Engine import EngineAlreadyRunningError
66 from ClusterShell.Engine.Engine import EngineTimer
67 from ClusterShell.Engine.Factory import PreferredEngine
68 from ClusterShell.Worker.EngineClient import EnginePort
69 from ClusterShell.Worker.Ssh import WorkerSsh
70 from ClusterShell.Worker.Popen import WorkerPopen
71
72 from ClusterShell.Event import EventHandler
73 from ClusterShell.MsgTree import MsgTree
74 from ClusterShell.NodeSet import NodeSet
78 """Base task exception."""
79
81 """Base task error exception."""
82
84 """Raised when the task timed out."""
85
87 """Raised when trying to resume an already running task."""
88
90 """Raised when trying to access disabled MsgTree."""
91
94 """
95 Task special MsgTree wrapper class, for easy disabling of MsgTree
96 buffering. This class checks if task.default(keyword) is set before
97 effective MsgTree attribute lookup, according to following rules:
98 If set, allow all MsgTree methods, else:
99 - ignore add() calls
100 - disallow MsgTree methods except clear()
101 """
103 self._task = task
104 self._keyword = keyword
105 self._tree = MsgTree()
106
108
109 if name != 'clear' and not self._task.default(self._keyword):
110
111 if name == 'add':
112 return lambda *args: None
113
114 raise TaskMsgTreeError("%s not set" % self._keyword)
115
116 return getattr(self._tree, name)
117
120 """
121 Default task debug printing function. Cannot provide 'print'
122 directly as it is not a function (will be in Py3k!).
123 """
124 print s
125
126
127 -class Task(object):
128 """
129 Task to execute. May be bound to a specific thread.
130
131 To create a task in a new thread:
132 task = Task()
133
134 To create or get the instance of the task associated with the
135 thread object thr (threading.Thread):
136 task = Task(thread=thr)
137
138 Add command to execute locally in task with:
139 task.shell("/bin/hostname")
140
141 Add command to execute in a distant node in task with:
142 task.shell("/bin/hostname", nodes="tiger[1-20]")
143
144 Run task in its associated thread (will block only if the calling
145 thread is the associated thread:
146 task.resume()
147 """
148
149 _std_default = { "stderr" : False,
150 "stdout_msgtree" : True,
151 "stderr_msgtree" : True,
152 "engine" : 'auto',
153 "port_qlimit" : 32 }
154
155 _std_info = { "debug" : False,
156 "print_debug" : _task_print_debug,
157 "fanout" : 64,
158 "connect_timeout" : 10,
159 "command_timeout" : 0 }
160 _tasks = {}
161 _taskid_max = 0
162 _task_lock = threading.Lock()
163
165 """Special task control port event handler.
166 When a message is received on the port, call appropriate
167 task method."""
169 """Message received: call appropriate task method."""
170
171 func, (args, kwargs) = msg[0], msg[1:]
172
173 func(port.task, *args, **kwargs)
174
176 """Class encapsulating a function that checks if the calling
177 task is running or is the current task, and allowing it to be
178 used as a decorator making the wrapped task method thread-safe."""
179
181 def taskfunc(*args, **kwargs):
182
183 task, fargs = args[0], args[1:]
184
185 if task._is_task_self():
186 return f(task, *fargs, **kwargs)
187 else:
188
189
190 task._dispatch_port.msg_send((f, fargs, kwargs))
191
192
193
194
195 taskfunc.__name__ = f.__name__
196 taskfunc.__doc__ = f.__doc__
197 taskfunc.__dict__ = f.__dict__
198 taskfunc.__module__ = f.__module__
199 return taskfunc
200
202 """Special class to manage task suspend condition."""
203 - def __init__(self, lock=threading.RLock(), initial=0):
204 self._cond = threading.Condition(lock)
205 self.suspend_count = initial
206
208 """Increase suspend count."""
209 self._cond.acquire()
210 self.suspend_count += 1
211 self._cond.release()
212
214 """Decrease suspend count."""
215 self._cond.acquire()
216 self.suspend_count -= 1
217 self._cond.release()
218
220 """Wait for condition if needed."""
221 self._cond.acquire()
222 try:
223 if self.suspend_count > 0:
224 if release_lock:
225 release_lock.release()
226 self._cond.wait()
227 finally:
228 self._cond.release()
229
231 """Signal all threads waiting for condition."""
232 self._cond.acquire()
233 try:
234 self.suspend_count = min(self.suspend_count, 0)
235 self._cond.notifyAll()
236 finally:
237 self._cond.release()
238
239
241 """
242 For task bound to a specific thread, this class acts like a
243 "thread singleton", so new style class is used and new object
244 are only instantiated if needed.
245 """
246 if thread:
247 if thread not in cls._tasks:
248 cls._tasks[thread] = object.__new__(cls)
249 return cls._tasks[thread]
250
251 return object.__new__(cls)
252
254 """
255 Initialize a Task, creating a new thread if needed.
256 """
257 if not getattr(self, "_engine", None):
258
259 self._default_lock = threading.Lock()
260 self._default = self.__class__._std_default.copy()
261 self._info = self.__class__._std_info.copy()
262
263
264
265 self._engine = PreferredEngine(self.default("engine"), self._info)
266 self.timeout = 0
267
268
269 self._run_lock = threading.Lock()
270 self._suspend_lock = threading.RLock()
271
272 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1)
273 self._join_cond = threading.Condition(self._suspend_lock)
274 self._suspended = False
275 self._quit = False
276
277
278 self._msgtree = _TaskMsgTree(self, "stdout_msgtree")
279
280
281 self._errtree = _TaskMsgTree(self, "stderr_msgtree")
282
283
284 self._d_source_rc = {}
285
286 self._d_rc_sources = {}
287
288 self._max_rc = 0
289
290 self._timeout_sources = set()
291
292
293 self._dispatch_port = EnginePort(self,
294 handler=Task._SyncMsgHandler(),
295 autoclose=True)
296 self._engine.add(self._dispatch_port)
297
298
299 Task._task_lock.acquire()
300 Task._taskid_max += 1
301 self._taskid = Task._taskid_max
302 Task._task_lock.release()
303
304
305 if thread:
306 self.thread = thread
307 else:
308 self.thread = thread = \
309 threading.Thread(None,
310 Task._thread_start,
311 "Task-%d" % self._taskid,
312 args=(self,))
313 Task._tasks[thread] = self
314 thread.start()
315
317 """Private method used by the library to check if the task is
318 task_self(), but do not create any task_self() instance."""
319 return self.thread == threading.currentThread()
320
322 print >> sys.stderr, 'Exception in thread %s:' % self.thread
323 traceback.print_exc(file=sys.stderr)
324 self._quit = True
325
327 """Task-managed thread entry point"""
328 while not self._quit:
329 self._suspend_cond.wait_check()
330 if self._quit:
331 break
332 try:
333 self._resume()
334 except:
335 self._handle_exception()
336
337 self._terminate(kill=True)
338
339 - def _run(self, timeout):
340 """Run task (always called from its self thread)."""
341
342 if self._run_lock.locked():
343 raise AlreadyRunningError("task is already running")
344
345 try:
346 self._run_lock.acquire()
347 self._engine.run(timeout)
348 finally:
349 self._run_lock.release()
350
351 - def default(self, default_key, def_val=None):
352 """
353 Return per-task value for key from the "default" dictionary.
354 """
355 self._default_lock.acquire()
356 try:
357 return self._default.get(default_key, def_val)
358 finally:
359 self._default_lock.release()
360
362 """
363 Set task value for specified key in the dictionary "default".
364 Users may store their own task-specific key, value pairs
365 using this method and retrieve them with default().
366
367 Threading considerations:
368 Unlike set_info(), when called from the task's thread or
369 not, set_default() immediately updates the underlying
370 dictionary in a thread-safe manner. This method doesn't
371 wake up the engine when called.
372 """
373 self._default_lock.acquire()
374 try:
375 self._default[default_key] = value
376 finally:
377 self._default_lock.release()
378
379 - def info(self, info_key, def_val=None):
380 """
381 Return per-task information.
382 """
383 return self._info.get(info_key, def_val)
384
385 @tasksyncmethod()
387 """
388 Set task value for a specific key information. Key, value
389 pairs can be passed to the engine and/or workers.
390 Users may store their own task-specific info key, value pairs
391 using this method and retrieve them with info().
392
393 Threading considerations:
394 Unlike set_default(), the underlying info dictionary is only
395 modified from the task's thread. So calling set_info() from
396 another thread leads to queueing the request for late apply
397 (at run time) using the task dispatch port. When received,
398 the request wakes up the engine when the task is running and
399 the info dictionary is then updated.
400 """
401 self._info[info_key] = value
402
403 - def shell(self, command, **kwargs):
404 """
405 Schedule a shell command for local or distant execution.
406
407 Local usage:
408 task.shell(command [, key=key] [, handler=handler]
409 [, timeout=secs] [, autoclose=enable_autoclose]
410 [, stderr=enable_stderr])
411
412 Distant usage:
413 task.shell(command, nodes=nodeset [, handler=handler]
414 [, timeout=secs], [, autoclose=enable_autoclose]
415 [, strderr=enable_stderr])
416 """
417
418 handler = kwargs.get("handler", None)
419 timeo = kwargs.get("timeout", None)
420 ac = kwargs.get("autoclose", False)
421 stderr = kwargs.get("stderr", self.default("stderr"))
422
423 if kwargs.get("nodes", None):
424 assert kwargs.get("key", None) is None, \
425 "'key' argument not supported for distant command"
426
427
428 worker = WorkerSsh(NodeSet(kwargs["nodes"]), command=command,
429 handler=handler, stderr=stderr, timeout=timeo,
430 autoclose=ac)
431 else:
432
433 worker = WorkerPopen(command, key=kwargs.get("key", None),
434 handler=handler, stderr=stderr,
435 timeout=timeo, autoclose=ac)
436
437
438 self.schedule(worker)
439
440 return worker
441
442 - def copy(self, source, dest, nodes, **kwargs):
443 """
444 Copy local file to distant nodes.
445 """
446 assert nodes != None, "local copy not supported"
447
448 handler = kwargs.get("handler", None)
449 timeo = kwargs.get("timeout", None)
450 preserve = kwargs.get("preserve", None)
451
452
453 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler,
454 timeout=timeo, preserve=preserve)
455
456 self.schedule(worker)
457 return worker
458
459 @tasksyncmethod()
461 """Add an EnginePort instance to Engine (private method)."""
462 self._engine.add(port)
463
464 @tasksyncmethod()
466 """Remove a port from Engine (private method)."""
467 self._engine.remove(port)
468
469 - def port(self, handler=None, autoclose=False):
470 """
471 Create a new task port. A task port is an abstraction object to
472 deliver messages reliably between tasks.
473
474 Basic rules:
475 A task can send messages to another task port (thread safe).
476 A task can receive messages from an acquired port either by
477 setting up a notification mechanism or using a polling
478 mechanism that may block the task waiting for a message
479 sent on the port.
480 A port can be acquired by one task only.
481
482 If handler is set to a valid EventHandler object, the port is
483 a send-once port, ie. a message sent to this port generates an
484 ev_msg event notification issued the port's task. If handler
485 is not set, the task can only receive messages on the port by
486 calling port.msg_recv().
487 """
488 port = EnginePort(self, handler, autoclose)
489 self._add_port(port)
490 return port
491
492 @tasksyncmethod()
493 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
494 """
495 Create task's timer.
496 """
497 assert fire >= 0.0, \
498 "timer's relative fire time must be a positive floating number"
499
500 timer = EngineTimer(fire, interval, autoclose, handler)
501 self._engine.add_timer(timer)
502 return timer
503
504 @tasksyncmethod()
506 """
507 Schedule a worker for execution. Only useful for manually
508 instantiated workers.
509 """
510 assert self in Task._tasks.values(), "deleted task"
511
512
513 worker._set_task(self)
514
515
516 for client in worker._engine_clients():
517 self._engine.add(client)
518
520 """Resume called from another thread."""
521 self._suspend_cond.notify_all()
522
541
543 """
544 Resume task. If task is task_self(), workers are executed in
545 the calling thread so this method will block until workers have
546 finished. This is always the case for a single-threaded
547 application (eg. which doesn't create other Task() instance
548 than task_self()). Otherwise, the current thread doesn't block.
549 In that case, you may then want to call task_wait() to wait for
550 completion.
551 """
552 self.timeout = timeout
553
554 self._suspend_cond.atomic_dec()
555
556 if self._is_task_self():
557 self._resume()
558 else:
559 self._resume_thread()
560
561 @tasksyncmethod()
563 assert task_self() == self
564
565 self._suspend_lock.acquire()
566 self._suspended = True
567 self._suspend_lock.release()
568
569
570 self._suspend_cond.wait_check(self._run_lock)
571
572
573 self._suspend_lock.acquire()
574 self._suspended = False
575 self._suspend_lock.release()
576
578 """
579 Suspend task execution. This method may be called from another
580 task (thread-safe). The function returns False if the task
581 cannot be suspended (eg. it's not running), or returns True if
582 the task has been successfully suspended.
583 To resume a suspended task, use task.resume().
584 """
585
586 self._suspend_cond.atomic_inc()
587
588
589 self._suspend_wait()
590
591
592 self._run_lock.acquire()
593
594
595 result = True
596 self._suspend_lock.acquire()
597 if not self._suspended:
598
599 result = False
600 self._run_lock.release()
601 self._suspend_lock.release()
602 return result
603
604 @tasksyncmethod()
605 - def _abort(self, kill=False):
609
610 - def abort(self, kill=False):
611 """
612 Abort a task. Aborting a task removes (and stops when needed)
613 all workers. If optional parameter kill is True, the task
614 object is unbound from the current thread, so calling
615 task_self() creates a new Task object.
616 """
617 if self._run_lock.acquire(0):
618 self._quit = True
619 self._run_lock.release()
620 if self._is_task_self():
621 self._terminate(kill)
622 else:
623
624 self.resume()
625 else:
626
627 self._abort(kill)
628
642
644 """
645 Suspend execution of the calling thread until the target task
646 terminates, unless the target task has already terminated.
647 """
648 self._join_cond.acquire()
649 try:
650 if self._suspend_cond.suspend_count > 0:
651 if not self._suspended:
652
653 return
654 self._join_cond.wait()
655 finally:
656 self._join_cond.release()
657
659 """
660 Return True if the task is running.
661 """
662 return self._engine.running
663
665 """
666 Reset buffers and retcodes management variables.
667 """
668 self._msgtree.clear()
669 self._errtree.clear()
670 self._d_source_rc = {}
671 self._d_rc_sources = {}
672 self._max_rc = 0
673 self._timeout_sources.clear()
674
676 """
677 Add a worker message associated with a source.
678 """
679 self._msgtree.add(source, msg)
680
682 """
683 Add a worker error message associated with a source.
684 """
685 self._errtree.add(source, msg)
686
687 - def _rc_set(self, source, rc, override=True):
688 """
689 Add a worker return code associated with a source.
690 """
691 if not override and self._d_source_rc.has_key(source):
692 return
693
694
695 self._d_source_rc[source] = rc
696
697
698 e = self._d_rc_sources.get(rc)
699 if e is None:
700 self._d_rc_sources[rc] = set([source])
701 else:
702 self._d_rc_sources[rc].add(source)
703
704
705 if rc > self._max_rc:
706 self._max_rc = rc
707
709 """
710 Add a worker timeout associated with a source.
711 """
712
713 self._timeout_sources.add(source)
714
716 """
717 Get a message by its source (worker, key).
718 """
719 s = self._msgtree.get(source)
720 if s is None:
721 return None
722 return str(s)
723
725 """
726 Get an error message by its source (worker, key).
727 """
728 s = self._errtree.get(source)
729 if s is None:
730 return None
731 return str(s)
732
734 """Call identified tree matcher (items, walk) method with options."""
735
736 if worker and not match_keys:
737 match = lambda k: k[0] is worker
738 elif worker and match_keys:
739 match = lambda k: k[0] is worker and k[1] in match_keys
740 elif match_keys:
741 match = lambda k: k[1] in match_keys
742 else:
743 match = None
744
745 return tree_match_func(match, itemgetter(1))
746
748 """
749 Get a return code by its source (worker, key).
750 """
751 return self._d_source_rc.get(source, 0)
752
754 """
755 Return an iterator over return codes for the given key.
756 """
757 for (w, k), rc in self._d_source_rc.iteritems():
758 if k == key:
759 yield rc
760
762 """
763 Return an iterator over return codes and keys list for a
764 specific worker and optional matching keys.
765 """
766 if match_keys:
767
768 for rc, src in self._d_rc_sources.iteritems():
769 keys = [t[1] for t in src if t[0] is worker and \
770 t[1] in match_keys]
771 if len(keys) > 0:
772 yield rc, keys
773 else:
774 for rc, src in self._d_rc_sources.iteritems():
775 keys = [t[1] for t in src if t[0] is worker]
776 if len(keys) > 0:
777 yield rc, keys
778
780 """
781 Return an iterator over key, rc for a specific worker.
782 """
783 for rc, src in self._d_rc_sources.iteritems():
784 for w, k in src:
785 if w is worker:
786 yield k, rc
787
789 """
790 Return the number of timed out "keys" for a specific worker.
791 """
792 cnt = 0
793 for (w, k) in self._timeout_sources:
794 if w is worker:
795 cnt += 1
796 return cnt
797
799 """
800 Iterate over timed out keys (ie. nodes) for a specific worker.
801 """
802 for (w, k) in self._timeout_sources:
803 if w is worker:
804 yield k
805
807 """
808 Get buffer for a specific key. When the key is associated
809 to multiple workers, the resulting buffer will contain
810 all workers content that may overlap.
811 """
812 select_key = lambda k: k[1] == key
813 return "".join(imap(str, self._msgtree.messages(select_key)))
814
815 node_buffer = key_buffer
816
818 """
819 Get error buffer for a specific key. When the key is associated
820 to multiple workers, the resulting buffer will contain all
821 workers content that may overlap.
822 """
823 select_key = lambda k: k[1] == key
824 return "".join(imap(str, self._errtree.messages(select_key)))
825
826 node_error = key_error
827
829 """
830 Return return code for a specific key. When the key is
831 associated to multiple workers, return the max return
832 code from these workers.
833 """
834 return max(self._rc_iter_by_key(key))
835
836 node_retcode = key_retcode
837
839 """
840 Get max return code encountered during last run.
841
842 How retcodes work:
843 If the process exits normally, the return code is its exit
844 status. If the process is terminated by a signal, the return
845 code is 128 + signal number.
846 """
847 return self._max_rc
848
850 """
851 Iterate over buffers, returns a tuple (buffer, keys). For remote
852 workers (Ssh), keys are list of nodes. In that case, you should use
853 NodeSet.fromlist(keys) to get a NodeSet instance (which is more
854 convenient and efficient):
855
856 Optional parameter match_keys add filtering on these keys.
857
858 Usage example:
859
860 for buffer, nodelist in task.iter_buffers():
861 print NodeSet.fromlist(nodelist)
862 print buffer
863 """
864 return self._call_tree_matcher(self._msgtree.walk, match_keys)
865
867 """
868 Iterate over error buffers, returns a tuple (buffer, keys).
869
870 See iter_buffers().
871 """
872 return self._call_tree_matcher(self._errtree.walk, match_keys)
873
875 """
876 Iterate over return codes, returns a tuple (rc, keys).
877
878 Optional parameter match_keys add filtering on these keys.
879
880 How retcodes work:
881 If the process exits normally, the return code is its exit
882 status. If the process is terminated by a signal, the return
883 code is 128 + signal number.
884 """
885 if match_keys:
886
887 for rc, src in self._d_rc_sources.iteritems():
888 keys = [t[1] for t in src if t[1] in match_keys]
889 yield rc, keys
890 else:
891 for rc, src in self._d_rc_sources.iteritems():
892 yield rc, [t[1] for t in src]
893
895 """
896 Return the number of timed out "keys" (ie. nodes).
897 """
898 return len(self._timeout_sources)
899
901 """
902 Iterate over timed out keys (ie. nodes).
903 """
904 for (w, k) in self._timeout_sources:
905 yield k
906
907 @classmethod
908 - def wait(cls, from_thread):
909 """
910 Class method that blocks calling thread until all tasks have
911 finished (from a ClusterShell point of view, for instance,
912 their task.resume() return). It doesn't necessarly mean that
913 associated threads have finished.
914 """
915 Task._task_lock.acquire()
916 try:
917 tasks = Task._tasks.copy()
918 finally:
919 Task._task_lock.release()
920 for thread, task in tasks.iteritems():
921 if thread != from_thread:
922 task.join()
923
926 """
927 Get the Task instance bound to the current thread. This function
928 provided as a convenience is available in the top-level
929 ClusterShell.Task package namespace.
930 """
931 return Task(thread=threading.currentThread())
932
934 """
935 Suspend execution of the calling thread until all tasks terminate,
936 unless all tasks have already terminated. This function is provided
937 as a convenience and is available in the top-level
938 ClusterShell.Task package namespace.
939 """
940 Task.wait(threading.currentThread())
941
943 """
944 Destroy the Task instance bound to the current thread. A next call
945 to task_self() will create a new Task object. This function provided
946 as a convenience is available in the top-level ClusterShell.Task
947 package namespace.
948 """
949 task_self().abort(kill=True)
950
952 """
953 Cleanup routine to destroy all created tasks. This function
954 provided as a convenience is available in the top-level
955 ClusterShell.Task package namespace.
956 """
957 Task._task_lock.acquire()
958 try:
959 tasks = Task._tasks.copy()
960 finally:
961 Task._task_lock.release()
962 for task in tasks.itervalues():
963 task.abort(kill=True)
964