Coverage for arrakis/__main__.py: 60.1%
158 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
1import argparse
2import logging
3import os
4import sys
6import numpy
7from gpstime import GPSTimeParseAction, gpsnow
9from . import Channel, Client, __version__, constants
10from .flight import flight
12logger = logging.getLogger("arrakis")
15##########
18parser = argparse.ArgumentParser()
19parser.add_argument("--version", "-v", action="version", version=__version__)
20parser.add_argument(
21 "--url",
22 "-u",
23 type=str,
24 help="initial server url",
25)
26subparsers = parser.add_subparsers()
29def add_subparser(cmd, **kwargs):
30 sp = subparsers.add_parser(
31 cmd.__name__,
32 help=cmd.__doc__.splitlines()[0],
33 description=cmd.__doc__,
34 **kwargs,
35 )
36 sp.set_defaults(func=cmd)
37 return sp
40##########
43def parse_pattern(pattern):
44 if not pattern or pattern == "*":
45 pattern = constants.DEFAULT_MATCH
46 return pattern
49def print_channel(chan: Channel, *, as_json: bool = False) -> None:
50 output = chan.to_json() if as_json else repr(chan)
51 print(output)
54def _add_find_count_args(parser):
55 parser.add_argument(
56 "pattern",
57 type=str,
58 nargs="?",
59 default=constants.DEFAULT_MATCH,
60 help="channel pattern",
61 )
62 parser.add_argument(
63 "--data-type",
64 "--dtype",
65 metavar="DTYPE",
66 type=numpy.dtype,
67 # action="append",
68 help="data type",
69 )
70 parser.add_argument(
71 "--min_rate",
72 metavar="INT",
73 type=int,
74 help="minimum sample rate",
75 )
76 parser.add_argument(
77 "--max_rate",
78 metavar="INT",
79 type=int,
80 help="maximum sample rate",
81 )
82 parser.add_argument(
83 "--publisher",
84 metavar="ID",
85 type=str,
86 # action="append",
87 help="publisher ID",
88 )
91##################################################
94def find(args):
95 """find channels matching regexp pattern"""
96 as_json = args.json
97 del args.json
98 client = Client(url=args.url)
99 del args.url
100 for chan in client.find(**vars(args)):
101 print_channel(chan, as_json=as_json)
104sparser = add_subparser(find, aliases=["search", "list"])
105_add_find_count_args(sparser)
106sparser.add_argument(
107 "-j",
108 "--json",
109 action="store_true",
110 help="print channel output as JSON",
111)
114##########
117def count(args):
118 """count channels matching pattern"""
119 client = Client(url=args.url)
120 del args.url
121 print(client.count(**vars(args)))
124sparser = add_subparser(count)
125_add_find_count_args(sparser)
128##########
131def describe(args):
132 """describe channels"""
133 as_json = args.json
134 del args.json
135 client = Client(url=args.url)
136 del args.url
137 for channel in client.describe(**vars(args)).values():
138 print_channel(channel, as_json=as_json)
141sparser = add_subparser(describe, aliases=["show"])
142sparser.add_argument("channels", nargs="+", help="list of channels to describe")
143sparser.add_argument(
144 "-j",
145 "--json",
146 action="store_true",
147 help="print channel output as JSON",
148)
151##########
154def stream(args):
155 """stream data for channels"""
156 channels = args.channels
157 start = None
158 end = None
159 if args.start:
160 start = args.start.gps()
161 if args.end:
162 end = args.end.gps()
163 client = Client(url=args.url)
164 for buf in client.stream(channels, start=start, end=end):
165 print(buf)
166 if args.latency:
167 latency = gpsnow() - buf.time
168 print(f"latency: {latency} s", file=sys.stderr)
171sparser = add_subparser(stream)
172sparser.add_argument("channels", nargs="+", help="list of channels to stream")
173sparser.add_argument(
174 "--start",
175 action=GPSTimeParseAction,
176 help="start time (GPS or arbitrary date/time string)",
177)
178sparser.add_argument(
179 "--end",
180 action=GPSTimeParseAction,
181 help="end time (GPS or arbitrary date/time string)",
182)
183sparser.add_argument(
184 "--latency",
185 action="store_true",
186 help="print buffer latency to stderr",
187)
190##########
193def fetch(args):
194 """fetch data for channels"""
195 args.start = args.start.gps()
196 args.end = args.end.gps()
197 client = Client(url=args.url)
198 del args.url
199 data = client.fetch(**vars(args))
200 print(data)
203sparser = add_subparser(fetch)
204sparser.add_argument("channels", nargs="+", help="list of channels to fetch")
205sparser.add_argument(
206 "--start",
207 required=True,
208 action=GPSTimeParseAction,
209 help="start time (GPS or arbitrary date/time string)",
210)
211sparser.add_argument(
212 "--end",
213 required=True,
214 action=GPSTimeParseAction,
215 help="end time (GPS or arbitrary date/time string)",
216)
219##########
222def publish(args):
223 """publish values to channels
225 Arguments should be channel name + generator function pairs. The
226 generator function is used to generate data for the specified
227 channel and should be a sympy expression, with the 't' value used
228 to indicate time, e.g. "sin(t)". A numeric value for the
229 generator function will generate a constant stream.
231 """
232 import sched
233 import time
234 from math import log2
236 from sympy import lambdify, parse_expr
237 from sympy.abc import t
239 from . import Publisher, SeriesBlock, Time
241 if not args.list:
242 if len(args.channel_args) % 2 != 0:
243 parser.error("arguments must be channel name + generator function pairs")
244 chan_funcs = {}
245 for name, value in zip(args.channel_args[::2], args.channel_args[1::2]):
246 expr = parse_expr(value)
247 chan_funcs[name] = lambdify(t, expr, "numpy")
249 if args.rate < 1 or log2(args.rate) % 1 != 0:
250 parser.error("rate must be power of two.")
252 publisher = Publisher(args.publisher_id, args.url)
253 publisher.register()
255 if args.list:
256 for _name, channel in publisher.channels.items():
257 print_channel(channel)
258 return
260 def _gen_data(publisher, tick):
261 metadata = {}
262 series = {}
263 for name, func in chan_funcs.items():
264 try:
265 channel = publisher.channels[name]
266 except KeyError:
267 msg = f"unknown channel for publisher: {name}"
268 raise ValueError(msg) from None
269 time_array = numpy.arange(int(channel.sample_rate / args.rate)) + tick
270 data = numpy.array(
271 numpy.broadcast_to(func(time_array), time_array.shape),
272 dtype=channel.data_type,
273 )
274 series[name] = data
275 metadata[name] = channel
276 sblock = SeriesBlock(
277 tick * Time.SECONDS,
278 series,
279 metadata,
280 )
281 logger.info("publish: %s", sblock)
282 publisher.publish(sblock)
284 s = sched.scheduler(time.time, time.sleep)
285 tick = int(time.time())
286 with publisher:
287 while True:
288 tick += 1 / args.rate
289 s.enterabs(tick, 0, _gen_data, (publisher, tick))
290 s.run()
293sparser = add_subparser(publish)
294sparser.add_argument(
295 "--publisher-id",
296 type=str,
297 required=True,
298 help="publisher ID (required)",
299)
300sparser.add_argument(
301 "--rate",
302 type=int,
303 default=16,
304 help="publication rate, in Hz. Must be a power of two.",
305)
306lgroup = sparser.add_mutually_exclusive_group()
307lgroup.add_argument(
308 "--list",
309 action="store_true",
310 help="list publisher channels and exit",
311)
312lgroup.add_argument(
313 "channel_args",
314 nargs="*",
315 metavar="CHANNEL FUNC",
316 default=[],
317 help="channel name + generator function pairs",
318)
321##################################################
324def main():
325 logger.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper())
326 handler = logging.StreamHandler()
327 handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
328 logger.addHandler(handler)
330 args = parser.parse_args()
332 if "func" not in args:
333 parser.print_help()
334 return
336 func = args.func
337 del args.func
338 logger.debug(args)
340 try:
341 func(args)
342 except flight.FlightError as e:
343 msg = f"request error: {e}"
344 raise SystemExit(msg) from e
347if __name__ == "__main__":
348 import signal
350 signal.signal(signal.SIGINT, signal.SIG_DFL)
351 main()