Package ClusterShell :: Package Worker :: Module Ssh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Ssh

  1  # 
  2  # Copyright CEA/DAM/DIF (2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Ssh.py 242 2010-03-05 13:18:28Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell Ssh/Scp support 
 37   
 38  This module implements OpenSSH engine client and task's worker. 
 39  """ 
 40   
 41  import copy 
 42  import os 
 43  import signal 
 44   
 45  from ClusterShell.NodeSet import NodeSet 
 46  from ClusterShell.Worker.EngineClient import EngineClient 
 47  from ClusterShell.Worker.Worker import DistantWorker, WorkerBadArgumentError 
 48   
 49   
50 -class Ssh(EngineClient):
51 """ 52 Ssh EngineClient. 53 """ 54
55 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False):
56 """ 57 Initialize Ssh EngineClient instance. 58 """ 59 EngineClient.__init__(self, worker, stderr, timeout, autoclose) 60 61 self.key = copy.copy(node) 62 self.command = command 63 self.popen = None
64
65 - def _start(self):
66 """ 67 Start worker, initialize buffers, prepare command. 68 """ 69 task = self.worker.task 70 71 # Build ssh command 72 cmd_l = [ task.info("ssh_path") or "ssh", "-a", "-x" ] 73 74 user = task.info("ssh_user") 75 if user: 76 cmd_l.append("-l %s" % user) 77 78 connect_timeout = task.info("connect_timeout", 0) 79 if connect_timeout > 0: 80 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 81 82 # Disable passphrase/password querying 83 cmd_l.append("-oBatchMode=yes") 84 85 # Add custom ssh options 86 ssh_options = task.info("ssh_options") 87 if ssh_options: 88 cmd_l.append(ssh_options) 89 90 cmd_l.append("%s" % self.key) 91 cmd_l.append("%s" % self.command) 92 93 if task.info("debug", False): 94 task.info("print_debug")(task, "SSH: %s" % ' '.join(cmd_l)) 95 96 self.popen = self._exec_nonblock(cmd_l) 97 self.file_error = self.popen.stderr 98 self.file_reader = self.popen.stdout 99 self.file_writer = self.popen.stdin 100 101 self.worker._on_start() 102 103 return self
104
105 - def _close(self, force, timeout):
106 """ 107 Close client. Called by engine after the client has been 108 unregistered. This method should handle all termination types 109 (normal, forced or on timeout). 110 """ 111 if not force and self._rbuf: 112 # We still have some read data available in buffer, but no 113 # EOL. Generate a final message before closing. 114 self.worker._on_node_msgline(self.key, self._rbuf) 115 116 rc = -1 117 if force or timeout: 118 prc = self.popen.poll() 119 if prc is None: 120 # process is still running, kill it 121 os.kill(self.popen.pid, signal.SIGKILL) 122 else: 123 prc = self.popen.wait() 124 if prc >= 0: 125 rc = prc 126 127 self.popen.stdin.close() 128 self.popen.stdout.close() 129 130 if rc >= 0: 131 self.worker._on_node_rc(self.key, rc) 132 elif timeout: 133 self.worker._on_node_timeout(self.key) 134 135 self.worker._check_fini()
136
137 - def _handle_read(self):
138 """ 139 Handle a read notification. Called by the engine as the result of an 140 event indicating that a read is available. 141 """ 142 debug = self.worker.task.info("debug", False) 143 if debug: 144 print_debug = self.worker.task.info("print_debug") 145 146 for msg in self._readlines(): 147 if debug: 148 print_debug(self.worker.task, "%s: %s" % (self.key, msg)) 149 # handle full msg line 150 self.worker._on_node_msgline(self.key, msg)
151
152 - def _handle_error(self):
153 """ 154 Handle a read error (stderr) notification. 155 """ 156 debug = self.worker.task.info("debug", False) 157 if debug: 158 print_debug = self.worker.task.info("print_debug") 159 160 for msg in self._readerrlines(): 161 if debug: 162 print_debug(self.worker.task, "%s@STDERR: %s" % (self.key, msg)) 163 # handle full msg line 164 self.worker._on_node_errline(self.key, msg)
165 166
167 -class Scp(Ssh):
168 """ 169 Scp EngineClient. 170 """ 171
172 - def __init__(self, node, source, dest, worker, stderr, timeout, preserve):
173 """ 174 Initialize Scp instance. 175 """ 176 Ssh.__init__(self, node, None, worker, stderr, timeout) 177 self.source = source 178 self.dest = dest 179 self.popen = None 180 self.file_reader = None 181 self.file_writer = None 182 183 # Directory check 184 self.isdir = os.path.isdir(self.source) 185 # Note: file sanity checks can be added to Scp._start() as 186 # soon as Task._start_thread is able to dispatch exceptions on 187 # _start (need trac ticket #21). 188 189 # Preserve modification times and modes? 190 self.preserve = preserve
191
192 - def _start(self):
193 """ 194 Start client, initialize buffers, prepare command. 195 """ 196 task = self.worker.task 197 198 # Build scp command 199 cmd_l = [ task.info("scp_path") or "scp" ] 200 201 if self.isdir: 202 cmd_l.append("-r") 203 204 if self.preserve: 205 cmd_l.append("-p") 206 207 user = task.info("scp_user") or task.info("ssh_user") 208 if user: 209 cmd_l.append("-l %s" % user) 210 211 connect_timeout = task.info("connect_timeout", 0) 212 if connect_timeout > 0: 213 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 214 215 # Disable passphrase/password querying 216 cmd_l.append("-oBatchMode=yes") 217 218 # Add custom scp options 219 for key in [ "ssh_options", "scp_options" ]: 220 ssh_options = task.info(key) 221 if ssh_options: 222 cmd_l.append(ssh_options) 223 224 cmd_l.append(self.source) 225 226 user = task.info("ssh_user") 227 if user: 228 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest)) 229 else: 230 cmd_l.append("%s:%s" % (self.key, self.dest)) 231 232 if task.info("debug", False): 233 task.info("print_debug")(task, "SCP: %s" % ' '.join(cmd_l)) 234 235 self.popen = self._exec_nonblock(cmd_l) 236 self.file_reader = self.popen.stdout 237 self.file_writer = self.popen.stdin 238 239 return self
240 241
242 -class WorkerSsh(DistantWorker):
243 """ 244 ClusterShell ssh-based worker Class. 245 246 Remote Shell (ssh) usage example: 247 worker = WorkerSsh(nodeset, handler=MyEventHandler(), 248 timeout=30, command="/bin/hostname") 249 Remote Copy (scp) usage example: 250 worker = WorkerSsh(nodeset, handler=MyEventHandler(), 251 timeout=30, source="/etc/my.conf", 252 dest="/etc/my.conf") 253 ... 254 task.schedule(worker) # schedule worker for execution 255 ... 256 task.resume() # run 257 """ 258
259 - def __init__(self, nodes, handler, timeout, **kwargs):
260 """ 261 Initialize Ssh worker instance. 262 """ 263 DistantWorker.__init__(self, handler) 264 265 self.clients = [] 266 self.nodes = NodeSet(nodes) 267 self.command = kwargs.get('command') 268 self.source = kwargs.get('source') 269 self.dest = kwargs.get('dest') 270 autoclose = kwargs.get('autoclose', False) 271 stderr = kwargs.get('stderr', False) 272 self._close_count = 0 273 self._has_timeout = False 274 275 # Prepare underlying engine clients (ssh/scp processes) 276 if self.command is not None: 277 # secure remote shell 278 for node in self.nodes: 279 self.clients.append(Ssh(node, self.command, self, stderr, 280 timeout, autoclose)) 281 elif self.source: 282 # secure copy 283 for node in self.nodes: 284 self.clients.append(Scp(node, self.source, self.dest, 285 self, stderr, timeout, kwargs.get('preserve', False))) 286 else: 287 raise WorkerBadArgumentError()
288
289 - def _engine_clients(self):
290 """ 291 Access underlying engine clients. 292 """ 293 return self.clients
294
295 - def _on_node_rc(self, node, rc):
296 DistantWorker._on_node_rc(self, node, rc) 297 self._close_count += 1
298
299 - def _on_node_timeout(self, node):
300 DistantWorker._on_node_timeout(self, node) 301 self._close_count += 1 302 self._has_timeout = True
303
304 - def _check_fini(self):
305 if self._close_count >= len(self.clients): 306 if self._has_timeout: 307 self._invoke("ev_timeout") 308 self._invoke("ev_close")
309
310 - def write(self, buf):
311 """ 312 Write to worker clients. 313 """ 314 for c in self.clients: 315 c._write(buf)
316
317 - def set_write_eof(self):
318 """ 319 Tell worker to close its writer file descriptor once flushed. Do not 320 perform writes after this call. 321 """ 322 for c in self.clients: 323 c._set_write_eof()
324