1
2
3 import sys
4 import random
5 from time import sleep
6
7 import papyros
8 import jobs
9
10 log = jobs.log
11
12
14
15 j = raw_input('Select job type ([S]lowSqrt, [P]rimeFactors): ')[0].upper()
16 if j == 'S':
17 job_type = jobs.SlowSqrt
18 elif j == 'P':
19 job_type = jobs.PrimeFactors
20
21
22 num_jobs = int(raw_input('Select number of jobs: '))
23
24
25 if raw_input('Verbose output ([Y/N]) ? ').upper().startswith('Y'):
26 papyros.enableLogging()
27
28
29 m = raw_input('Select master type ([S]inglethreaded, [M]ultithreaded, '
30 '[D]istributed): ')[0].upper()
31 if m == 'S':
32 from papyros.singlethreaded import SingleThreadedMaster
33 master = SingleThreadedMaster()
34 elif m == 'M':
35 from papyros.multithreaded import MultiThreadedMaster
36 num_slaves = int(raw_input('Select number of slave threads: '))
37 master = MultiThreadedMaster(num_slaves)
38 elif m == 'D':
39 from papyros.distributed import DistributedMaster, DEFAULT_GROUP
40 group = raw_input('Select distributed group name [default=%s]: ' % DEFAULT_GROUP)
41 if not group.strip():
42 group = DEFAULT_GROUP
43 raw_input('Make sure you have started a master process and one or more '
44 'slave processes for group %r and press any key..' % group)
45 master = DistributedMaster(group)
46 else:
47 sys.exit('Invalid master type')
48
49
50 finished_jobs = run(master, job_type, num_jobs)
51 for i,job in enumerate(finished_jobs):
52 try: out = job.result
53 except Exception, ex: out = ex
54 print 'Job %2d: (%s)\t%s' % (i, job, out)
55
56
57 -def run(master, job_type, num_jobs):
58 finished_jobs = []
59 def job_done(job):
60 finished_jobs.append(job)
61
62
63 try: log('job #%s: result=%s' % (job, job.result))
64 except Exception, ex:
65 log('job #%s: exception raised: %s\n%s' % (job, ex, ex.traceback))
66
67
68 for i in xrange(num_jobs):
69 master.addJob(job_type(random.randrange(-sys.maxint,sys.maxint)))
70
71
72 log('** [1] Starting processedJobs with timeout=3.5')
73 try: firstbatch = master.processedJobs(timeout=3.5)
74 except Exception, ex: log(ex)
75 else:
76 log('%d jobs done:' % len(firstbatch))
77 for job in firstbatch:
78 job_done(job)
79 log('** [1] finished; %d pending jobs' % master.numPendingJobs())
80
81
82 log('** [2] Starting non-blocking loop')
83 for i in xrange(4):
84 for job in iter(lambda: master.popProcessedJob(timeout=0), None):
85 job_done(job)
86 if master.numPendingJobs():
87 log('Do something in the main thread; will check again after a sec')
88 sleep(1)
89 log('** [2] finished; %d pending jobs' % master.numPendingJobs())
90
91
92 if random.random() > 0.5:
93 master.cancelAllJobs()
94 log('Cancelled all remaining unassigned jobs')
95
96 log('** [3] Starting blocking loop')
97 for job in iter(master.popProcessedJob, None):
98 job_done(job)
99 log('** [3] finished; %d pending jobs' % master.numPendingJobs())
100 return finished_jobs
101
102
103 if __name__ == '__main__':
104 random.seed()
105 main()
106