Package ClusterShell :: Package Engine :: Module EPoll
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.EPoll

  1  # 
  2  # Copyright CEA/DAM/DIF (2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: EPoll.py 238 2010-02-25 22:30:31Z st-cea $ 
 34   
 35  """ 
 36  A ClusterShell Engine using epoll, an I/O event notification facility. 
 37   
 38  The epoll event distribution interface is available on Linux 2.6, and 
 39  has been included in Python 2.6. 
 40  """ 
 41   
 42  import errno 
 43  import select 
 44  import time 
 45   
 46  from ClusterShell.Engine.Engine import Engine 
 47  from ClusterShell.Engine.Engine import EngineNotSupportedError 
 48  from ClusterShell.Engine.Engine import EngineTimeoutException 
 49  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 50   
 51   
52 -class EngineEPoll(Engine):
53 """ 54 EPoll Engine 55 56 ClusterShell Engine class using the select.epoll mechanism. 57 """ 58 59 identifier = "epoll" 60
61 - def __init__(self, info):
62 """ 63 Initialize Engine. 64 """ 65 Engine.__init__(self, info) 66 try: 67 # get an epoll object 68 self.epolling = select.epoll() 69 except AttributeError: 70 raise EngineNotSupportedError()
71
72 - def _register_specific(self, fd, event):
73 """ 74 Engine-specific fd registering. Called by Engine register. 75 """ 76 if event & (Engine.E_READ | Engine.E_ERROR): 77 eventmask = select.EPOLLIN 78 elif event == Engine.E_WRITE: 79 eventmask = select.EPOLLOUT 80 81 self.epolling.register(fd, eventmask)
82
83 - def _unregister_specific(self, fd, ev_is_set):
84 """ 85 Engine-specific fd unregistering. Called by Engine unregister. 86 """ 87 self.epolling.unregister(fd)
88
89 - def _modify_specific(self, fd, event, setvalue):
90 """ 91 Engine-specific modifications after a interesting event change for 92 a file descriptor. Called automatically by Engine set_events(). 93 For the epoll engine, it modifies the event mask associated to a file 94 descriptor. 95 """ 96 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, 97 setvalue)) 98 eventmask = 0 99 if setvalue: 100 if event & (Engine.E_READ | Engine.E_ERROR): 101 eventmask = select.EPOLLIN 102 elif event == Engine.E_WRITE: 103 eventmask = select.EPOLLOUT 104 105 self.epolling.modify(fd, eventmask)
106
107 - def runloop(self, timeout):
108 """ 109 Run epoll main loop. 110 """ 111 if timeout == 0: 112 timeout = -1 113 114 start_time = time.time() 115 116 # run main event loop... 117 while self.evlooprefcnt > 0: 118 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 119 (self.evlooprefcnt, self.reg_clifds.keys(), 120 len(self.timerq))) 121 try: 122 timeo = self.timerq.nextfire_delay() 123 if timeout > 0 and timeo >= timeout: 124 # task timeout may invalidate clients timeout 125 self.timerq.clear() 126 timeo = timeout 127 elif timeo == -1: 128 timeo = timeout 129 130 self.reg_clifds_changed = False 131 evlist = self.epolling.poll(timeo + 0.001) 132 133 except IOError, e: 134 # might get interrupted by a signal 135 if e.errno == errno.EINTR: 136 continue 137 138 for fd, event in evlist: 139 140 if self.reg_clifds_changed: 141 self._debug("REG CLIENTS CHANGED - Aborting current evlist") 142 # Oops, reconsider evlist by calling poll() again. 143 break 144 145 # get client instance 146 client, fdev = self._fd2client(fd) 147 if not client or fdev is None: 148 continue 149 150 # process this client 151 client._processing = True 152 153 # check for poll error condition of some sort 154 if event & select.EPOLLERR: 155 self._debug("EPOLLERR %s" % client) 156 self.unregister_writer(client) 157 client.file_writer.close() 158 client.file_writer = None 159 continue 160 161 # check for data to read 162 if event & select.EPOLLIN: 163 assert fdev & (Engine.E_READ | Engine.E_ERROR) 164 assert client._events & fdev 165 self.modify(client, 0, fdev) 166 try: 167 if fdev & Engine.E_READ: 168 client._handle_read() 169 else: 170 client._handle_error() 171 except EngineClientEOF, e: 172 self._debug("EngineClientEOF %s" % client) 173 if fdev & Engine.E_READ: 174 self.remove(client) 175 continue 176 177 # or check for end of stream (do not handle both at the same 178 # time because handle_read() may perform a partial read) 179 elif event & select.EPOLLHUP: 180 self._debug("EPOLLHUP fd=%d %s (r%s,w%s)" % (fd, 181 client.__class__.__name__, client.reader_fileno(), 182 client.writer_fileno())) 183 client._processing = False 184 185 if fdev & Engine.E_READ: 186 if client._events & Engine.E_ERROR: 187 self.modify(client, 0, fdev) 188 else: 189 self.remove(client) 190 else: 191 if client._events & Engine.E_READ: 192 self.modify(client, 0, fdev) 193 else: 194 self.remove(client) 195 continue 196 197 # check for writing 198 if event & select.EPOLLOUT: 199 self._debug("EPOLLOUT fd=%d %s (r%s,w%s)" % (fd, 200 client.__class__.__name__, client.reader_fileno(), 201 client.writer_fileno())) 202 assert fdev == Engine.E_WRITE 203 assert client._events & fdev 204 self.modify(client, 0, fdev) 205 client._handle_write() 206 207 # post processing 208 client._processing = False 209 210 # apply any changes occured during processing 211 if client.registered: 212 self.set_events(client, client._new_events) 213 214 # check for task runloop timeout 215 if timeout > 0 and time.time() >= start_time + timeout: 216 raise EngineTimeoutException() 217 218 # process clients timeout 219 self.fire_timers() 220 221 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 222 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
223