Package ClusterShell :: Package Engine :: Module Poll
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Poll

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Poll.py 238 2010-02-25 22:30:31Z st-cea $ 
 34   
 35  """ 
 36  A poll() based ClusterShell Engine. 
 37   
 38  The poll() system call is available on Linux and BSD. 
 39  """ 
 40   
 41  import errno 
 42  import select 
 43  import sys 
 44  import time 
 45   
 46  from ClusterShell.Engine.Engine import Engine 
 47  from ClusterShell.Engine.Engine import EngineException 
 48  from ClusterShell.Engine.Engine import EngineNotSupportedError 
 49  from ClusterShell.Engine.Engine import EngineTimeoutException 
 50  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 51   
 52   
53 -class EnginePoll(Engine):
54 """ 55 Poll Engine 56 57 ClusterShell engine using the select.poll mechanism (Linux poll() 58 syscall). 59 """ 60 61 identifier = "poll" 62
63 - def __init__(self, info):
64 """ 65 Initialize Engine. 66 """ 67 Engine.__init__(self, info) 68 try: 69 # get a polling object 70 self.polling = select.poll() 71 except AttributeError: 72 raise EngineNotSupportedError()
73
74 - def _register_specific(self, fd, event):
75 if event & (Engine.E_READ | Engine.E_ERROR): 76 eventmask = select.POLLIN 77 elif event == Engine.E_WRITE: 78 eventmask = select.POLLOUT 79 80 self.polling.register(fd, eventmask)
81
82 - def _unregister_specific(self, fd, ev_is_set):
83 if ev_is_set: 84 self.polling.unregister(fd)
85
86 - def _modify_specific(self, fd, event, setvalue):
87 """ 88 Engine-specific modifications after a interesting event change for 89 a file descriptor. Called automatically by Engine register/unregister and 90 set_events(). For the poll() engine, it reg/unreg or modifies the event mask 91 associated to a file descriptor. 92 """ 93 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, 94 setvalue)) 95 if setvalue: 96 eventmask = 0 97 if event & (Engine.E_READ | Engine.E_ERROR): 98 eventmask = select.POLLIN 99 elif event == Engine.E_WRITE: 100 eventmask = select.POLLOUT 101 self.polling.register(fd, eventmask) 102 else: 103 self.polling.unregister(fd)
104
105 - def runloop(self, timeout):
106 """ 107 Poll engine run(): start clients and properly get replies 108 """ 109 if timeout == 0: 110 timeout = -1 111 112 start_time = time.time() 113 114 # run main event loop... 115 while self.evlooprefcnt > 0: 116 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" \ 117 % (self.evlooprefcnt, self.reg_clifds.keys(), \ 118 len(self.timerq))) 119 try: 120 timeo = self.timerq.nextfire_delay() 121 if timeout > 0 and timeo >= timeout: 122 # task timeout may invalidate clients timeout 123 self.timerq.clear() 124 timeo = timeout 125 elif timeo == -1: 126 timeo = timeout 127 128 self.reg_clifds_changed = False 129 evlist = self.polling.poll(timeo * 1000.0 + 1.0) 130 131 except select.error, (ex_errno, ex_strerror): 132 # might get interrupted by a signal 133 if ex_errno == errno.EINTR: 134 continue 135 elif ex_errno == errno.EINVAL: 136 print >> sys.stderr, \ 137 "EnginePoll: please increase RLIMIT_NOFILE" 138 raise 139 140 for fd, event in evlist: 141 142 if event & select.POLLNVAL: 143 raise EngineException("Caught POLLNVAL on fd %d" % fd) 144 145 if self.reg_clifds_changed: 146 self._debug("REG CLIENTS CHANGED - Aborting current evlist") 147 # Oops, reconsider evlist by calling poll() again. 148 break 149 150 # get client instance 151 client, fdev = self._fd2client(fd) 152 if not client or fdev is None: 153 continue 154 155 # process this client 156 client._processing = True 157 158 # check for poll error condition of some sort 159 if event & select.POLLERR: 160 self._debug("POLLERR %s" % client) 161 self.unregister_writer(client) 162 client.file_writer.close() 163 client.file_writer = None 164 continue 165 166 # check for data to read 167 if event & select.POLLIN: 168 assert fdev & (Engine.E_READ | Engine.E_ERROR) 169 assert client._events & fdev 170 self.modify(client, 0, fdev) 171 try: 172 if fdev & Engine.E_READ: 173 client._handle_read() 174 else: 175 client._handle_error() 176 except EngineClientEOF, e: 177 self._debug("EngineClientEOF %s" % client) 178 if fdev & Engine.E_READ: 179 self.remove(client) 180 continue 181 182 # or check for end of stream (do not handle both at the same 183 # time because handle_read() may perform a partial read) 184 elif event & select.POLLHUP: 185 self._debug("POLLHUP fd=%d %s (r%s,e%s,w%s)" % (fd, 186 client.__class__.__name__, client.reader_fileno(), 187 client.error_fileno(), client.writer_fileno())) 188 client._processing = False 189 190 if fdev & Engine.E_READ: 191 if client._events & Engine.E_ERROR: 192 self.modify(client, 0, fdev) 193 else: 194 self.remove(client) 195 else: 196 if client._events & Engine.E_READ: 197 self.modify(client, 0, fdev) 198 else: 199 self.remove(client) 200 continue 201 202 # check for writing 203 if event & select.POLLOUT: 204 self._debug("POLLOUT fd=%d %s (r%s,e%s,w%s)" % (fd, 205 client.__class__.__name__, client.reader_fileno(), 206 client.error_fileno(), client.writer_fileno())) 207 assert fdev == Engine.E_WRITE 208 assert client._events & fdev 209 self.modify(client, 0, fdev) 210 client._handle_write() 211 212 # post processing 213 client._processing = False 214 215 # apply any changes occured during processing 216 if client.registered: 217 self.set_events(client, client._new_events) 218 219 # check for task runloop timeout 220 if timeout > 0 and time.time() >= start_time + timeout: 221 raise EngineTimeoutException() 222 223 # process clients timeout 224 self.fire_timers() 225 226 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 227 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
228