Coverage for /run/media/veganeco/water/status600.com/pypi/reptilian_climates/modules_series_2/ships/fields/gardens/ships/process/multi_3/__init__.py: 28%
71 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 16:08 -0800
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 16:08 -0800
3'''
4 calendar:
5 [ ] output recorder
6 [ ] coverage
7 [ ] start in background
8'''
11"""
12 import ships.process.multi as multiproc
14 multiprocs = multiproc.start (
15 processes = [
16 {
17 "string": 'python3 -m http.server 9000',
18 "Popen": {
19 "cwd": None
20 }
21 },
22 {
23 "string": 'python3 -m http.server 9001',
24 "Popen": {
25 "cwd": None
26 }
27 }
28 ],
30 #
31 # True -> wait for "ctrl and c"
32 #
33 wait = False
34 )
36 processes = multiprocs.processes
38 time.sleep (.5)
41 #
42 # stop
43 #
44 multiprocs.stop ()
45"""
47import rich
48import pexpect
50from subprocess import Popen
51import shlex
52import atexit
54import coverage
56def start (
57 coverage_dir = "",
58 processes = [],
59 wait = False
60):
61 processes_list = []
63 print ('coverage?')
65 cov = coverage.Coverage ()
66 cov.start ()
68 for process in processes:
69 if (type (process) == str):
70 routine = Popen (shlex.split (process_string))
72 #print ('routine:', routine)
74 processes_list.append (routine)
76 elif (type (process) == dict):
77 process_string = process ["string"]
79 cwd = None
80 env = None
82 args = {}
83 if ("Popen" in process):
84 args = process ["Popen"]
86 #
87 # a noop
88 #
89 journal = lambda *args, **kwargs: None
90 if ("journal" in process):
91 journal = process ["journal"]
93 '''
94 report = {
95 "journal": []
96 }
97 '''
107 p = pexpect.spawn (
108 process_string,
109 ** args
110 )
111 def awareness_EOF (p):
112 while not p.eof ():
113 line = p.readline ()
115 try:
116 UTF8_line = line.decode ('UTF8')
117 UTF8_parsed = "yes"
118 except Exception:
119 UTF8_line = ""
120 UTF8_parsed = "no"
122 try:
123 hexadecimal_line = line.hex ()
124 hexadecimal_parsed = "yes"
125 except Exception:
126 hexadecimal_line = ""
127 hexadecimal_parsed = "no"
130 line_parsed = {
131 "UTF8": {
132 "parsed": UTF8_parsed,
133 "line": UTF8_line
134 },
135 "hexadecimal": {
136 "parsed": hexadecimal_parsed,
137 "line": hexadecimal_line
138 }
139 };
141 journal (line_parsed)
143 #report ["journal"].append (line_parsed)
145 #print (line, line_parsed)
147 #rich.print_json (data = line_parsed)
149 awareness_EOF (p)
151 '''
152 this_process_Popen = Popen (
153 shlex.split (process_string),
154 ** args
155 )
156 '''
158 #print ('this_process_Popen:', p)
160 processes_list.append (p)
163 def stop ():
164 print ('stopping')
166 for process in processes_list:
169 print ('building coverage')
171 cov.stop ()
172 cov.save ()
173 cov.html_report (directory = "coverage_report")
176 process.close ()
178 '''
179 This might only work if this is called:
180 process.wait ()
181 '''
182 atexit.register (stop)
184 if (wait):
185 for process in processes_list:
186 #
187 # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait
188 #
189 process.wait ()
192 class returns:
193 def __init__ (this, processes):
194 this.processes = processes
196 def stop (this):
197 print ("stop called")
199 stop ()
201 this_returns = returns (
202 processes = processes_list
203 )
205 print (this_returns)
207 return this_returns