Coverage for arrakis/__main__.py: 60.1%

158 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-13 15:09 -0700

1import argparse 

2import logging 

3import os 

4import sys 

5 

6import numpy 

7from gpstime import GPSTimeParseAction, gpsnow 

8 

9from . import Channel, Client, __version__, constants 

10from .flight import flight 

11 

12logger = logging.getLogger("arrakis") 

13 

14 

15########## 

16 

17 

18parser = argparse.ArgumentParser() 

19parser.add_argument("--version", "-v", action="version", version=__version__) 

20parser.add_argument( 

21 "--url", 

22 "-u", 

23 type=str, 

24 help="initial server url", 

25) 

26subparsers = parser.add_subparsers() 

27 

28 

29def add_subparser(cmd, **kwargs): 

30 sp = subparsers.add_parser( 

31 cmd.__name__, 

32 help=cmd.__doc__.splitlines()[0], 

33 description=cmd.__doc__, 

34 **kwargs, 

35 ) 

36 sp.set_defaults(func=cmd) 

37 return sp 

38 

39 

40########## 

41 

42 

43def parse_pattern(pattern): 

44 if not pattern or pattern == "*": 

45 pattern = constants.DEFAULT_MATCH 

46 return pattern 

47 

48 

49def print_channel(chan: Channel, *, as_json: bool = False) -> None: 

50 output = chan.to_json() if as_json else repr(chan) 

51 print(output) 

52 

53 

54def _add_find_count_args(parser): 

55 parser.add_argument( 

56 "pattern", 

57 type=str, 

58 nargs="?", 

59 default=constants.DEFAULT_MATCH, 

60 help="channel pattern", 

61 ) 

62 parser.add_argument( 

63 "--data-type", 

64 "--dtype", 

65 metavar="DTYPE", 

66 type=numpy.dtype, 

67 # action="append", 

68 help="data type", 

69 ) 

70 parser.add_argument( 

71 "--min_rate", 

72 metavar="INT", 

73 type=int, 

74 help="minimum sample rate", 

75 ) 

76 parser.add_argument( 

77 "--max_rate", 

78 metavar="INT", 

79 type=int, 

80 help="maximum sample rate", 

81 ) 

82 parser.add_argument( 

83 "--publisher", 

84 metavar="ID", 

85 type=str, 

86 # action="append", 

87 help="publisher ID", 

88 ) 

89 

90 

91################################################## 

92 

93 

94def find(args): 

95 """find channels matching regexp pattern""" 

96 as_json = args.json 

97 del args.json 

98 client = Client(url=args.url) 

99 del args.url 

100 for chan in client.find(**vars(args)): 

101 print_channel(chan, as_json=as_json) 

102 

103 

104sparser = add_subparser(find, aliases=["search", "list"]) 

105_add_find_count_args(sparser) 

106sparser.add_argument( 

107 "-j", 

108 "--json", 

109 action="store_true", 

110 help="print channel output as JSON", 

111) 

112 

113 

114########## 

115 

116 

117def count(args): 

118 """count channels matching pattern""" 

119 client = Client(url=args.url) 

120 del args.url 

121 print(client.count(**vars(args))) 

122 

123 

124sparser = add_subparser(count) 

125_add_find_count_args(sparser) 

126 

127 

128########## 

129 

130 

131def describe(args): 

132 """describe channels""" 

133 as_json = args.json 

134 del args.json 

135 client = Client(url=args.url) 

136 del args.url 

137 for channel in client.describe(**vars(args)).values(): 

138 print_channel(channel, as_json=as_json) 

139 

140 

141sparser = add_subparser(describe, aliases=["show"]) 

142sparser.add_argument("channels", nargs="+", help="list of channels to describe") 

143sparser.add_argument( 

144 "-j", 

145 "--json", 

146 action="store_true", 

147 help="print channel output as JSON", 

148) 

149 

150 

151########## 

152 

153 

154def stream(args): 

155 """stream data for channels""" 

156 channels = args.channels 

157 start = None 

158 end = None 

159 if args.start: 

160 start = args.start.gps() 

161 if args.end: 

162 end = args.end.gps() 

163 client = Client(url=args.url) 

164 for buf in client.stream(channels, start=start, end=end): 

165 print(buf) 

166 if args.latency: 

167 latency = gpsnow() - buf.time 

168 print(f"latency: {latency} s", file=sys.stderr) 

169 

170 

171sparser = add_subparser(stream) 

172sparser.add_argument("channels", nargs="+", help="list of channels to stream") 

173sparser.add_argument( 

174 "--start", 

175 action=GPSTimeParseAction, 

176 help="start time (GPS or arbitrary date/time string)", 

177) 

178sparser.add_argument( 

179 "--end", 

180 action=GPSTimeParseAction, 

181 help="end time (GPS or arbitrary date/time string)", 

182) 

183sparser.add_argument( 

184 "--latency", 

185 action="store_true", 

186 help="print buffer latency to stderr", 

187) 

188 

189 

190########## 

191 

192 

193def fetch(args): 

194 """fetch data for channels""" 

195 args.start = args.start.gps() 

196 args.end = args.end.gps() 

197 client = Client(url=args.url) 

198 del args.url 

199 data = client.fetch(**vars(args)) 

200 print(data) 

201 

202 

203sparser = add_subparser(fetch) 

204sparser.add_argument("channels", nargs="+", help="list of channels to fetch") 

205sparser.add_argument( 

206 "--start", 

207 required=True, 

208 action=GPSTimeParseAction, 

209 help="start time (GPS or arbitrary date/time string)", 

210) 

211sparser.add_argument( 

212 "--end", 

213 required=True, 

214 action=GPSTimeParseAction, 

215 help="end time (GPS or arbitrary date/time string)", 

216) 

217 

218 

219########## 

220 

221 

222def publish(args): 

223 """publish values to channels 

224 

225 Arguments should be channel name + generator function pairs. The 

226 generator function is used to generate data for the specified 

227 channel and should be a sympy expression, with the 't' value used 

228 to indicate time, e.g. "sin(t)". A numeric value for the 

229 generator function will generate a constant stream. 

230 

231 """ 

232 import sched 

233 import time 

234 from math import log2 

235 

236 from sympy import lambdify, parse_expr 

237 from sympy.abc import t 

238 

239 from . import Publisher, SeriesBlock, Time 

240 

241 if not args.list: 

242 if len(args.channel_args) % 2 != 0: 

243 parser.error("arguments must be channel name + generator function pairs") 

244 chan_funcs = {} 

245 for name, value in zip(args.channel_args[::2], args.channel_args[1::2]): 

246 expr = parse_expr(value) 

247 chan_funcs[name] = lambdify(t, expr, "numpy") 

248 

249 if args.rate < 1 or log2(args.rate) % 1 != 0: 

250 parser.error("rate must be power of two.") 

251 

252 publisher = Publisher(args.publisher_id, args.url) 

253 publisher.register() 

254 

255 if args.list: 

256 for _name, channel in publisher.channels.items(): 

257 print_channel(channel) 

258 return 

259 

260 def _gen_data(publisher, tick): 

261 metadata = {} 

262 series = {} 

263 for name, func in chan_funcs.items(): 

264 try: 

265 channel = publisher.channels[name] 

266 except KeyError: 

267 msg = f"unknown channel for publisher: {name}" 

268 raise ValueError(msg) from None 

269 time_array = numpy.arange(int(channel.sample_rate / args.rate)) + tick 

270 data = numpy.array( 

271 numpy.broadcast_to(func(time_array), time_array.shape), 

272 dtype=channel.data_type, 

273 ) 

274 series[name] = data 

275 metadata[name] = channel 

276 sblock = SeriesBlock( 

277 tick * Time.SECONDS, 

278 series, 

279 metadata, 

280 ) 

281 logger.info("publish: %s", sblock) 

282 publisher.publish(sblock) 

283 

284 s = sched.scheduler(time.time, time.sleep) 

285 tick = int(time.time()) 

286 with publisher: 

287 while True: 

288 tick += 1 / args.rate 

289 s.enterabs(tick, 0, _gen_data, (publisher, tick)) 

290 s.run() 

291 

292 

293sparser = add_subparser(publish) 

294sparser.add_argument( 

295 "--publisher-id", 

296 type=str, 

297 required=True, 

298 help="publisher ID (required)", 

299) 

300sparser.add_argument( 

301 "--rate", 

302 type=int, 

303 default=16, 

304 help="publication rate, in Hz. Must be a power of two.", 

305) 

306lgroup = sparser.add_mutually_exclusive_group() 

307lgroup.add_argument( 

308 "--list", 

309 action="store_true", 

310 help="list publisher channels and exit", 

311) 

312lgroup.add_argument( 

313 "channel_args", 

314 nargs="*", 

315 metavar="CHANNEL FUNC", 

316 default=[], 

317 help="channel name + generator function pairs", 

318) 

319 

320 

321################################################## 

322 

323 

324def main(): 

325 logger.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper()) 

326 handler = logging.StreamHandler() 

327 handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) 

328 logger.addHandler(handler) 

329 

330 args = parser.parse_args() 

331 

332 if "func" not in args: 

333 parser.print_help() 

334 return 

335 

336 func = args.func 

337 del args.func 

338 logger.debug(args) 

339 

340 try: 

341 func(args) 

342 except flight.FlightError as e: 

343 msg = f"request error: {e}" 

344 raise SystemExit(msg) from e 

345 

346 

347if __name__ == "__main__": 

348 import signal 

349 

350 signal.signal(signal.SIGINT, signal.SIG_DFL) 

351 main()