Package papyros :: Module distributed
[hide private]
[frames] | no frames]

Source Code for Module papyros.distributed

  1  #!/usr/bin/env python 
  2   
  3  '''A multiprocess implementation of the papyros master-slave API. 
  4   
  5  Both the master and the slaves run as separate processes on one or more machines. 
  6  This module is also executable, spawning the master or slave processes. Run as:: 
  7   
  8      python -m papyros.distributed --help 
  9   
 10  to see the list of available options. 
 11   
 12  @requires: U{Pyro <http://pyro.sourceforge.net/>}. 
 13  ''' 
 14   
 15   
 16  __all__ = ['DistributedMaster', 'DistributedSlave', 'DEFAULT_GROUP'] 
 17   
 18  import os 
 19  import sys 
 20  import logging 
 21  from time import sleep 
 22  from subprocess import Popen 
 23  from optparse import OptionParser, make_option 
 24   
 25  from Pyro.core import ObjBase 
 26  from Pyro.configuration import Config 
 27  from Pyro.errors import NamingError, ConnectionClosedError 
 28   
 29  from papyros import Master, Slave, enableLogging 
 30  from papyros.simplepyro import publish_object, get_proxy, serve_forever 
 31   
 32   
 33  DEFAULT_GROUP = ':Papyros' 
 34  log = logging.getLogger('papyros') 
 35   
 36   
37 -class DistributedMaster(ObjBase, Master):
38
39 - def __new__(cls, group_name=DEFAULT_GROUP, *args, **kwds):
40 try: return get_proxy(cls.name(group_name))##, with_attrs=True) 41 except NamingError, ex: 42 return super(DistributedMaster,cls).__new__(cls, group_name, 43 *args, **kwds)
44
45 - def __init__(self, group_name=DEFAULT_GROUP, *args, **kwds):
46 ObjBase.__init__(self) 47 Master.__init__(self, *args, **kwds) 48 publish_object(self, self.name(group_name))
49 50 @classmethod
51 - def name(cls, group_name):
52 return group_name + '.Master'
53 54
55 -class DistributedSlave(ObjBase, Slave):
56
57 - def __init__(self, group_name=DEFAULT_GROUP, poll_time=1):
58 ObjBase.__init__(self) 59 Slave.__init__(self, None, poll_time) 60 self._master_name = DistributedMaster.name(group_name)
61
62 - def run(self):
63 while True: 64 if self._master is None: 65 try: 66 self._master = get_proxy(self._master_name) 67 log.info('Connected to master: %s' % self._master.URI) 68 except NamingError,ex: 69 sleep(3); continue 70 try: self.process() 71 except ConnectionClosedError: 72 log.warn('Disconnected from master') 73 self._master = None
74 75 76 if __name__ == '__main__': 77 parser = OptionParser( 78 description = "To start a master process, don't specify the --slave/-s option. " 79 "To start N slave processes on this host, give --slave=N.", 80 option_list = [ 81 make_option('-s', '--slaves', type='int', 82 help='Number of new slaves to spawn'), 83 make_option('-g', '--group', default=DEFAULT_GROUP, 84 help='Name of the Pyro process group'), 85 make_option('-c', '--config', metavar='FILE', default='', 86 help='Pyro configuration file'), 87 make_option('-l', '--logging', action='store_true', 88 help='Print out logging messages'), 89 ]) 90 options = parser.parse_args()[0] 91 if options.logging: 92 enableLogging() 93 if options.config: 94 config = Config() 95 config.setup(options.config) 96 if not options.slaves: 97 master = DistributedMaster(options.group) 98 try: uri = master.URI 99 except AttributeError: 100 uri = master.getProxy().URI 101 print 'Starting master of group "%s" at %s' % (options.group,uri) 102 serve_forever(disconnect=True) 103 else: 104 print 'Master of group "%s" is already started and listening at %s!' % ( 105 options.group, uri) 106 else: 107 if options.slaves > 1: 108 cmd = ['python', '-m', 'papyros.distributed', '--slaves=1', 109 '--group=%s' % options.group] 110 if options.config: 111 cmd.append('--config=%s' % options.config) 112 if options.logging: 113 cmd.append('--logging') 114 for n in xrange(options.slaves-1): 115 Popen(cmd) 116 print 'Starting slave process (PID=%s)' % os.getpid() 117 DistributedSlave(options.group).run() 118