Module demo
[hide private]
[frames] | no frames]

Source Code for Module demo

  1  #!/usr/bin/env python 
  2   
  3  import sys 
  4  import random 
  5  from time import sleep 
  6   
  7  import papyros 
  8  import jobs 
  9   
 10  log = jobs.log 
 11   
 12   
13 -def main():
14 # select job type 15 j = raw_input('Select job type ([S]lowSqrt, [P]rimeFactors): ')[0].upper() 16 if j == 'S': 17 job_type = jobs.SlowSqrt 18 elif j == 'P': 19 job_type = jobs.PrimeFactors 20 21 # select number of jobs 22 num_jobs = int(raw_input('Select number of jobs: ')) 23 24 # select verbosity 25 if raw_input('Verbose output ([Y/N]) ? ').upper().startswith('Y'): 26 papyros.enableLogging() 27 28 # select master type 29 m = raw_input('Select master type ([S]inglethreaded, [M]ultithreaded, ' 30 '[D]istributed): ')[0].upper() 31 if m == 'S': 32 from papyros.singlethreaded import SingleThreadedMaster 33 master = SingleThreadedMaster() 34 elif m == 'M': 35 from papyros.multithreaded import MultiThreadedMaster 36 num_slaves = int(raw_input('Select number of slave threads: ')) 37 master = MultiThreadedMaster(num_slaves) 38 elif m == 'D': 39 from papyros.distributed import DistributedMaster, DEFAULT_GROUP 40 group = raw_input('Select distributed group name [default=%s]: ' % DEFAULT_GROUP) 41 if not group.strip(): 42 group = DEFAULT_GROUP 43 raw_input('Make sure you have started a master process and one or more ' 44 'slave processes for group %r and press any key..' % group) 45 master = DistributedMaster(group) 46 else: 47 sys.exit('Invalid master type') 48 49 # start the demo 50 finished_jobs = run(master, job_type, num_jobs) 51 for i,job in enumerate(finished_jobs): 52 try: out = job.result 53 except Exception, ex: out = ex 54 print 'Job %2d: (%s)\t%s' % (i, job, out)
55 56
57 -def run(master, job_type, num_jobs):
58 finished_jobs = [] 59 def job_done(job): 60 finished_jobs.append(job) 61 # job.result will reraise any exception raised while the job was being 62 # processed; otherwise it will return the computed result 63 try: log('job #%s: result=%s' % (job, job.result)) 64 except Exception, ex: 65 log('job #%s: exception raised: %s\n%s' % (job, ex, ex.traceback))
66 67 # create 12 jobs and add them in the queue 68 for i in xrange(num_jobs): 69 master.addJob(job_type(random.randrange(-sys.maxint,sys.maxint))) 70 71 # collect all processed jobs within 3.5 seconds 72 log('** [1] Starting processedJobs with timeout=3.5') 73 try: firstbatch = master.processedJobs(timeout=3.5) 74 except Exception, ex: log(ex) 75 else: 76 log('%d jobs done:' % len(firstbatch)) 77 for job in firstbatch: 78 job_done(job) 79 log('** [1] finished; %d pending jobs' % master.numPendingJobs()) 80 81 # non-blocking iterator over processed jobs 82 log('** [2] Starting non-blocking loop') 83 for i in xrange(4): 84 for job in iter(lambda: master.popProcessedJob(timeout=0), None): 85 job_done(job) 86 if master.numPendingJobs(): 87 log('Do something in the main thread; will check again after a sec') 88 sleep(1) 89 log('** [2] finished; %d pending jobs' % master.numPendingJobs()) 90 91 # toss a coin on whether to cancel the remaining jobs 92 if random.random() > 0.5: 93 master.cancelAllJobs() 94 log('Cancelled all remaining unassigned jobs') 95 # blocking iterator over any remaining pending jobs 96 log('** [3] Starting blocking loop') 97 for job in iter(master.popProcessedJob, None): 98 job_done(job) 99 log('** [3] finished; %d pending jobs' % master.numPendingJobs()) 100 return finished_jobs 101 102 103 if __name__ == '__main__': 104 random.seed() 105 main() 106