1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 """
37 Utility program to run commands on a cluster using the ClusterShell library.
38
39 clush is a pdsh-like command which benefits from the ClusterShell library
40 and its Ssh worker. It features an integrated output results gathering
41 system (dshbak-like), can get node groups by running predefined external
42 commands and can redirect lines read on its standard input to the remote
43 commands.
44
45 When no command are specified, clush runs interactively.
46
47 """
48
49 import fcntl
50 import optparse
51 import os
52 import sys
53 import signal
54 import ConfigParser
55
56 from ClusterShell.Event import EventHandler
57 from ClusterShell.NodeSet import NodeSet, NodeSetParseError
58 from ClusterShell.Task import Task, task_self
59 from ClusterShell.Worker.Worker import WorkerSimple
60 from ClusterShell import __version__
61
62 VERB_QUIET = 0
63 VERB_STD = 1
64 VERB_VERB = 2
65 VERB_DEBUG = 3
66
68 """Exception used by the signal handler"""
69
81
83 """Direct output event handler class."""
84
87
89 result = worker.last_read()
90 if type(result) is tuple:
91 (ns, buf) = result
92 else:
93 buf = result
94 if self._label:
95 print "%s: %s" % (ns, buf)
96 else:
97 print "%s" % buf
98
100 t = worker.last_error()
101 if type(t) is tuple:
102 (ns, buf) = t
103 else:
104 buf = t
105 if self._label:
106 print >> sys.stderr, "%s: %s" % (ns, buf)
107 else:
108 print >> sys.stderr,"%s" % buf
109
111 if hasattr(worker, "last_retcode"):
112 ns, rc = worker.last_retcode()
113 else:
114 ns = "local"
115 rc = worker.retcode()
116 if rc > 0:
117 print >> sys.stderr, "clush: %s: exited with exit code %d" % (ns, rc)
118
122
124
125
126
127 worker.task.set_default("USER_running", False)
128 if worker.task.default("USER_handle_SIGUSR1"):
129 os.kill(os.getpid(), signal.SIGUSR1)
130
132 """Gathered output event handler class."""
133
134 - def __init__(self, label, gather_print, runtimer):
135 self._label = label
136 self._gather_print = gather_print
137 self._runtimer = runtimer
138
140 t = worker.last_error()
141 if type(t) is tuple:
142 (ns, buf) = t
143 else:
144 buf = t
145 if self._runtimer:
146 self._runtimer.eh.erase_line()
147 if self._label:
148 print >> sys.stderr, "%s: %s" % (ns, buf)
149 else:
150 print >> sys.stderr,"%s" % buf
151 if self._runtimer:
152
153 self._runtimer.eh.set_dirty()
154
156
157 if self._runtimer:
158 self._runtimer.eh.finalize(worker.task.default("USER_interactive"))
159
160
161 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1]))
162 for rc, nodelist in worker.iter_retcodes():
163
164 for buf, nodeset in sorted(map(nodesetify,
165 worker.iter_buffers(nodelist)),
166 cmp=bufnodeset_cmp):
167 self._gather_print(nodeset, buf)
168
169
170 for rc, nodelist in worker.iter_retcodes():
171 if rc != 0:
172 ns = NodeSet.fromlist(nodelist)
173 print >> sys.stderr, "clush: %s: exited with exit code %s" % (ns, rc)
174
175
176 if worker.num_timeout() > 0:
177 print >> sys.stderr, "clush: %s: command timeout" % \
178 NodeSet.fromlist(worker.iter_keys_timeout())
179
180
181 worker.task.set_default("USER_running", False)
182 if worker.task.default("USER_handle_SIGUSR1"):
183 os.kill(os.getpid(), signal.SIGUSR1)
184
187 self.task = task
188 self.total = total
189 self.cnt_last = -1
190 self.tslen = len(str(self.total))
191 self.wholelen = 0
192
195
198
200 if self.wholelen:
201 sys.stderr.write(' ' * self.wholelen + '\r')
202
204 cnt = len(self.task._engine.clients())
205 if cnt != self.cnt_last:
206 self.cnt_last = cnt
207
208 towrite = 'clush: %*d/%*d\r' % (self.tslen, self.total - cnt,
209 self.tslen, self.total)
210 self.wholelen = len(towrite)
211 sys.stderr.write(towrite)
212
214
215 fmt = 'clush: %*d/%*d'
216 if cr:
217 fmt += '\n'
218 else:
219 fmt += '\r'
220 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
221
223 """Exception used by the signal handler"""
224
225 - def __init__(self, section, option, msg):
226 Exception.__init__(self)
227 self.section = section
228 self.option = option
229 self.msg = msg
230
232 return "(Config %s.%s): %s" % (self.section, self.option, self.msg)
233
235 """Config class for clush (specialized ConfigParser)"""
236
237 main_defaults = { "fanout" : "64",
238 "connect_timeout" : "30",
239 "command_timeout" : "0",
240 "history_size" : "100",
241 "verbosity" : "%d" % VERB_STD }
242
244 ConfigParser.ConfigParser.__init__(self)
245
246 self.add_section("Main")
247 for key, value in ClushConfig.main_defaults.iteritems():
248 self.set("Main", key, value)
249
250 self.read(['/etc/clustershell/clush.conf', os.path.expanduser('~/.clush.conf')])
251
255
256 - def set_main(self, option, value):
257 self.set("Main", option, str(value))
258
259 - def getint(self, section, option):
260 try:
261 return ConfigParser.ConfigParser.getint(self, section, option)
262 except (ConfigParser.Error, TypeError, ValueError), e:
263 raise ClushConfigError(section, option, e)
264
266 try:
267 return ConfigParser.ConfigParser.getfloat(self, section, option)
268 except (ConfigParser.Error, TypeError, ValueError), e:
269 raise ClushConfigError(section, option, e)
270
272 try:
273 return self.get(section, option)
274 except ConfigParser.Error, e:
275 pass
276
282
284 return self.getint("Main", "fanout")
285
287 return self.getfloat("Main", "connect_timeout")
288
290 return self.getfloat("Main", "command_timeout")
291
294
297
300
302 section = "External"
303 option = "nodes_all"
304 try:
305 return self.get(section, option)
306 except ConfigParser.Error, e:
307 raise ClushConfigError(section, option, e)
308
310 section = "External"
311 option = "nodes_group"
312 try:
313 return self.get(section, option, 0, { "group" : group })
314 except ConfigParser.Error, e:
315 raise ClushConfigError(section, option, e)
316
317
319 """Signal handler used for main thread notification"""
320 if signum == signal.SIGUSR1:
321 raise UpdatePromptException()
322
324 """Turn the history file path"""
325 return os.path.join(os.environ["HOME"], ".clush_history")
326
328 """
329 Configure readline to automatically load and save a history file
330 named .clush_history
331 """
332 import readline
333 readline.parse_and_bind("tab: complete")
334 readline.set_completer_delims("")
335 try:
336 readline.read_history_file(get_history_file())
337 except IOError:
338 pass
339
340
341
343 """Display a dshbak-like header block and content."""
344 sep = "-" * 15
345 sys.stdout.write("%s\n%s\n%s\n%s\n" % (sep, header, sep, content))
346
348 """Display a MsgTree buffer by line with prefixed header."""
349 for line in msg:
350 sys.stdout.write("%s: %s\n" % (header, line))
351
353 """Compare 2 nodesets by their length (we want larger nodeset
354 first) and then by first node."""
355 len_cmp = cmp(len(ns2), len(ns1))
356 if not len_cmp:
357 smaller = NodeSet.fromlist([ns1[0], ns2[0]])[0]
358 if smaller == ns1[0]:
359 return -1
360 else:
361 return 1
362 return len_cmp
363
364
365
367 """Convenience function to compare 2 (buf, nodeset) tuples by their
368 nodeset length (we want larger nodeset first) and then by first
369 node."""
370
371 return nodeset_cmp(bn1[1], bn2[1])
372
373 -def ttyloop(task, nodeset, gather, timeout, label, verbosity, gather_print):
374 """Manage the interactive prompt to run command"""
375 readline_avail = False
376 if task.default("USER_interactive"):
377 assert sys.stdin.isatty()
378 try:
379 import readline
380 readline_setup()
381 readline_avail = True
382 except ImportError:
383 pass
384 if verbosity >= VERB_STD:
385 print "Enter 'quit' to leave this interactive mode"
386
387 rc = 0
388 ns = NodeSet(nodeset)
389 ns_info = True
390 cmd = ""
391 while task.default("USER_running") or cmd.lower() != 'quit':
392 try:
393 if task.default("USER_interactive") and not task.default("USER_running"):
394 if ns_info:
395 print "Working with nodes: %s" % ns
396 ns_info = False
397 prompt = "clush> "
398 else:
399 prompt = ""
400 cmd = raw_input(prompt)
401 except EOFError:
402 print
403 return
404 except UpdatePromptException:
405 if task.default("USER_interactive"):
406 continue
407 return
408 except KeyboardInterrupt, e:
409 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
410 if gather:
411
412
413 task.suspend()
414
415 print_warn = False
416
417
418 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1]))
419 for buf, nodeset in sorted(map(nodesetify, task.iter_buffers()),
420 cmp=bufnodeset_cmp):
421 if not print_warn:
422 print_warn = True
423 print >> sys.stderr, "Warning: Caught keyboard interrupt!"
424 gather_print(nodeset, buf)
425
426
427 ns_ok = NodeSet()
428 for rc, nodelist in task.iter_retcodes():
429 ns_ok.add(NodeSet.fromlist(nodelist))
430 if rc != 0:
431
432 ns = NodeSet.fromlist(nodelist)
433 print >> sys.stderr, \
434 "clush: %s: exited with exit code %s" % (ns, rc)
435
436 e.uncompleted_nodes = ns - ns_ok
437
438
439 if task.num_timeout() > 0:
440 print >> sys.stderr, "clush: %s: command timeout" % \
441 NodeSet.fromlist(task.iter_keys_timeout())
442 raise e
443
444 if task.default("USER_running"):
445 ns_reg, ns_unreg = NodeSet(), NodeSet()
446 for c in task._engine.clients():
447 if c.registered:
448 ns_reg.add(c.key)
449 else:
450 ns_unreg.add(c.key)
451 if ns_unreg:
452 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg)
453 else:
454 pending = ""
455 print >> sys.stderr, "clush: interrupt (^C to abort task)\n" \
456 "clush: in progress(%d): %s%s" % (len(ns_reg), ns_reg, pending)
457 else:
458 cmdl = cmd.lower()
459 try:
460 ns_info = True
461 if cmdl.startswith('+'):
462 ns.update(cmdl[1:])
463 elif cmdl.startswith('-'):
464 ns.difference_update(cmdl[1:])
465 elif cmdl.startswith('@'):
466 ns = NodeSet(cmdl[1:])
467 elif cmdl == '=':
468 gather = not gather
469 if verbosity >= VERB_STD:
470 if gather:
471 print "Switching to gathered output format"
472 else:
473 print "Switching to standard output format"
474 ns_info = False
475 continue
476 elif not cmdl.startswith('?'):
477 ns_info = False
478 except NodeSetParseError:
479 print >> sys.stderr, "clush: nodeset parse error (ignoring)"
480
481 if ns_info:
482 continue
483
484 if cmdl.startswith('!') and len(cmd.strip()) > 0:
485 run_command(task, cmd[1:], None, gather, timeout, None,
486 verbosity, gather_print)
487 elif cmdl != "quit":
488 if not cmd:
489 continue
490 if readline_avail:
491 readline.write_history_file(get_history_file())
492 run_command(task, cmd, ns, gather, timeout, label, verbosity,
493 gather_print)
494 return rc
495
497 """Create a ClusterShell stdin-reader worker bound to specified
498 worker."""
499 assert not sys.stdin.isatty()
500
501 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, \
502 fcntl.fcntl(sys.stdin, fcntl.F_GETFL) | os.O_NDELAY)
503
504
505 worker_stdin = WorkerSimple(sys.stdin, None, None, None,
506 handler=StdInputHandler(worker), timeout=-1, autoclose=True)
507
508
509 worker.task.schedule(worker_stdin)
510
511 -def run_command(task, cmd, ns, gather, timeout, label, verbosity, gather_print):
512 """
513 Create and run the specified command line, displaying
514 results in a dshbak way when gathering is used.
515 """
516 task.set_default("USER_running", True)
517
518 if gather:
519 runtimer = None
520 if verbosity == VERB_STD or verbosity == VERB_VERB:
521
522
523 runtimer = task.timer(2.0, RunTimer(task, len(ns)), interval=1./3.,
524 autoclose=True)
525 worker = task.shell(cmd, nodes=ns, handler=GatherOutputHandler(label,
526 gather_print, runtimer), timeout=timeout)
527 else:
528 worker = task.shell(cmd, nodes=ns, handler=DirectOutputHandler(label),
529 timeout=timeout)
530
531 if task.default("USER_stdin_worker"):
532 bind_stdin(worker)
533
534 task.resume()
535
536 -def run_copy(task, source, dest, ns, timeout, preserve_flag):
537 """
538 run copy command
539 """
540 task.set_default("USER_running", True)
541
542
543 if not os.path.exists(source):
544 print >> sys.stderr, "ERROR: file \"%s\" not found" % source
545 clush_exit(1)
546
547 task.copy(source, dest, ns, handler=DirectOutputHandler(),
548 timeout=timeout, preserve=preserve_flag)
549
550 task.resume()
551
553
554 for stream in [sys.stdout, sys.stderr]:
555 stream.flush()
556
557 os._exit(status)
558
559 -def clush_main(args):
560 """Main clush script function"""
561
562
563 nodeset_base, nodeset_exclude = NodeSet(), NodeSet()
564
565
566
567
568 usage = "%prog [options] command"
569
570 parser = optparse.OptionParser(usage, version="%%prog %s" % __version__)
571 parser.disable_interspersed_args()
572
573 parser.add_option("--nostdin", action="store_true", dest="nostdin",
574 help="don't watch for possible input from stdin")
575
576
577 optgrp = optparse.OptionGroup(parser, "Selecting target nodes")
578 optgrp.add_option("-w", action="append", dest="nodes",
579 help="nodes where to run the command")
580 optgrp.add_option("-x", action="append", dest="exclude",
581 help="exclude nodes from the node list")
582 optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all",
583 help="run command on all nodes")
584 optgrp.add_option("-g", "--group", action="append", dest="group",
585 help="run command on a group of nodes")
586 optgrp.add_option("-X", action="append", dest="exgroup",
587 help="exclude nodes from this group")
588 parser.add_option_group(optgrp)
589
590
591 optgrp = optparse.OptionGroup(parser, "Output behaviour")
592 optgrp.add_option("-q", "--quiet", action="store_true", dest="quiet",
593 help="be quiet, print essential output only")
594 optgrp.add_option("-v", "--verbose", action="store_true", dest="verbose",
595 help="be verbose, print informative messages")
596 optgrp.add_option("-d", "--debug", action="store_true", dest="debug",
597 help="output more messages for debugging purpose")
598
599 optgrp.add_option("-L", action="store_true", dest="line_mode", default=False,
600 help="disable header block and order output by nodes")
601 optgrp.add_option("-N", action="store_false", dest="label", default=True,
602 help="disable labeling of command line")
603 optgrp.add_option("-S", action="store_true", dest="maxrc",
604 help="return the largest of command return codes")
605 optgrp.add_option("-b", "--dshbak", action="store_true", dest="gather",
606 default=False, help="display gathered results in a dshbak-like way")
607 optgrp.add_option("-B", action="store_true", dest="gatherall",
608 default=False, help="like -b but including standard error")
609 parser.add_option_group(optgrp)
610
611
612 optgrp = optparse.OptionGroup(parser, "File copying")
613 optgrp.add_option("-c", "--copy", action="store", dest="source_path",
614 help="copy local file or directory to the nodes")
615 optgrp.add_option("--dest", action="store", dest="dest_path",
616 help="destination file or directory on the nodes")
617 optgrp.add_option("-p", action="store_true", dest="preserve_flag",
618 help="preserve modification times and modes")
619 parser.add_option_group(optgrp)
620
621
622 optgrp = optparse.OptionGroup(parser, "Ssh options")
623 optgrp.add_option("-f", "--fanout", action="store", dest="fanout",
624 help="use a specified fanout", type="int")
625 optgrp.add_option("-l", "--user", action="store", dest="user",
626 help="execute remote command as user")
627 optgrp.add_option("-o", "--options", action="store", dest="options",
628 help="can be used to give ssh options")
629 optgrp.add_option("-t", "--connect_timeout", action="store", dest="connect_timeout",
630 help="limit time to connect to a node" ,type="float")
631 optgrp.add_option("-u", "--command_timeout", action="store", dest="command_timeout",
632 help="limit time for command to run on the node", type="float")
633 parser.add_option_group(optgrp)
634
635 (options, args) = parser.parse_args()
636
637
638
639
640 config = ClushConfig()
641
642
643 if options.quiet:
644 config.set_main("verbosity", VERB_QUIET)
645 if options.verbose:
646 config.set_main("verbosity", VERB_VERB)
647 if options.debug:
648 config.set_main("verbosity", VERB_DEBUG)
649 if options.fanout:
650 config.set_main("fanout", options.fanout)
651 if options.user:
652 config.set_main("ssh_user", options.user)
653 if options.options:
654 config.set_main("ssh_options", options.options)
655 if options.connect_timeout:
656 config.set_main("connect_timeout", options.connect_timeout)
657 if options.command_timeout:
658 config.set_main("command_timeout", options.command_timeout)
659
660
661
662
663 if options.nodes:
664 nodeset_base = NodeSet.fromlist(options.nodes)
665 if options.exclude:
666 nodeset_exclude = NodeSet.fromlist(options.exclude)
667
668
669 task = task_self()
670 task.set_info("debug", config.get_verbosity() > 1)
671 if options.nodes_all:
672 command = config.get_nodes_all_command()
673 task.shell(command, key="all")
674 if options.group:
675 for grp in options.group:
676 command = config.get_nodes_group_command(grp)
677 task.shell(command, key="group")
678 if options.exgroup:
679 for grp in options.exgroup:
680 command = config.get_nodes_group_command(grp)
681 task.shell(command, key="exgroup")
682
683
684 task.resume()
685
686 for buf, keys in task.iter_buffers(['all', 'group']):
687 for line in buf:
688 config.verbose_print(VERB_DEBUG, "Adding nodes from option %s: %s" % (','.join(keys), buf))
689 nodeset_base.add(line)
690 for buf, keys in task.iter_buffers(['exgroup']):
691 for line in buf:
692 config.verbose_print(VERB_DEBUG, "Excluding nodes from option %s: %s" % (','.join(keys), buf))
693 nodeset_exclude.add(line)
694
695
696 nodeset_base.difference_update(nodeset_exclude)
697 if len(nodeset_base) < 1:
698 parser.error('No node to run on.')
699
700 config.verbose_print(VERB_DEBUG, "Final NodeSet: %s" % nodeset_base)
701
702
703
704
705 interactive = not len(args) and not options.source_path
706 if options.nostdin and interactive:
707 parser.error("illegal option `--nostdin' in interactive mode")
708
709 user_interaction = not options.nostdin and sys.stdin.isatty() and \
710 sys.stdout.isatty()
711 config.verbose_print(VERB_DEBUG, "User interaction: %s" % user_interaction)
712 if user_interaction:
713
714
715
716
717 task = Task()
718 signal.signal(signal.SIGUSR1, signal_handler)
719 task.set_default("USER_handle_SIGUSR1", True)
720 else:
721
722 task.set_default("USER_handle_SIGUSR1", False)
723
724 task.set_default("USER_stdin_worker", not (sys.stdin.isatty() or
725 options.nostdin))
726 config.verbose_print(VERB_DEBUG, "Create STDIN worker: %s" % \
727 task.default("USER_stdin_worker"))
728
729 task.set_info("debug", config.get_verbosity() >= VERB_DEBUG)
730 task.set_info("fanout", config.get_fanout())
731
732 ssh_user = config.get_ssh_user()
733 if ssh_user:
734 task.set_info("ssh_user", ssh_user)
735 ssh_path = config.get_ssh_path()
736 if ssh_path:
737 task.set_info("ssh_path", ssh_path)
738 ssh_options = config.get_ssh_options()
739 if ssh_options:
740 task.set_info("ssh_options", ssh_options)
741
742
743 connect_timeout = config.get_connect_timeout()
744 task.set_info("connect_timeout", connect_timeout)
745 command_timeout = config.get_command_timeout()
746 task.set_info("command_timeout", command_timeout)
747
748 gather = options.gatherall or options.gather
749
750 task.set_default("stderr", not options.gatherall)
751
752
753 task.set_default("stdout_msgtree", gather)
754
755 task.set_default("stderr_msgtree", False)
756
757
758 if command_timeout > 0:
759 timeout = command_timeout
760 else:
761 timeout = -1
762
763
764 task.set_default("USER_interactive", interactive)
765 task.set_default("USER_running", False)
766
767 if options.source_path:
768 if not options.dest_path:
769 options.dest_path = os.path.dirname(options.source_path)
770 op = "copy source=%s dest=%s" % (options.source_path, options.dest_path)
771 else:
772 op = "command=\"%s\"" % ' '.join(args)
773
774
775
776 config.verbose_print(VERB_VERB, "clush: nodeset=%s fanout=%d [timeout conn=%.1f " \
777 "cmd=%.1f] %s" % (nodeset_base, config.get_fanout(),
778 task.info("connect_timeout"),
779 task.info("command_timeout"), op))
780
781
782 if options.line_mode:
783 gather_print = print_lines
784 else:
785 gather_print = print_buffer
786
787 if not task.default("USER_interactive"):
788 if options.source_path:
789 run_copy(task, options.source_path, options.dest_path, nodeset_base,
790 0, options.preserve_flag)
791 else:
792 run_command(task, ' '.join(args), nodeset_base, gather, timeout,
793 options.label, config.get_verbosity(), gather_print)
794
795 if user_interaction:
796 ttyloop(task, nodeset_base, gather, timeout, options.label,
797 config.get_verbosity(), gather_print)
798 elif task.default("USER_interactive"):
799 print >> sys.stderr, "ERROR: interactive mode requires a tty"
800 clush_exit(1)
801
802 rc = 0
803 if options.maxrc:
804
805 rc = task.max_retcode()
806 if task.num_timeout() > 0:
807 rc = 255
808 clush_exit(rc)
809
810 if __name__ == '__main__':
811 try:
812 clush_main(sys.argv)
813 except ClushConfigError, e:
814 print >> sys.stderr, "ERROR: %s" % e
815 sys.exit(1)
816 except NodeSetParseError, e:
817 print >> sys.stderr, "NodeSet parse error:", e
818 sys.exit(1)
819 except IOError:
820
821 os._exit(1)
822 except KeyboardInterrupt, e:
823 uncomp_nodes = getattr(e, 'uncompleted_nodes', None)
824 if uncomp_nodes:
825 print >> sys.stderr, "Keyboard interrupt (%s did not complete)." \
826 % uncomp_nodes
827 else:
828 print >> sys.stderr, "Keyboard interrupt."
829 clush_exit(128 + signal.SIGINT)
830