Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/console.py: 34%
50 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1import sys
2import termios
3import tty
4from contextlib import contextmanager
6from anyio import create_task_group
7from anyio.streams.file import FileReadStream, FileWriteStream
9from jumpstarter.client import DriverClient
12class ConsoleExit(Exception):
13 pass
16class Console:
17 def __init__(self, serial_client: DriverClient):
18 self.serial_client = serial_client
20 def run(self):
21 with self.setraw():
22 self.serial_client.portal.call(self.__run)
24 @contextmanager
25 def setraw(self):
26 original = termios.tcgetattr(sys.stdin.fileno())
27 try:
28 tty.setraw(sys.stdin.fileno())
29 yield
30 finally:
31 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, original)
32 # Clear screen and move cursor to top-left (like \033c\033[2J\033[H).
33 print("\033c\033[2J\033[H", end="")
35 async def __run(self):
36 async with self.serial_client.stream_async(method="connect") as stream:
37 try:
38 async with create_task_group() as tg:
39 tg.start_soon(self.__serial_to_stdout, stream)
40 tg.start_soon(self.__stdin_to_serial, stream)
41 except* ConsoleExit:
42 pass
44 async def __serial_to_stdout(self, stream):
45 stdout = FileWriteStream(sys.stdout.buffer)
46 while True:
47 data = await stream.receive()
48 await stdout.send(data)
49 sys.stdout.flush()
51 async def __stdin_to_serial(self, stream):
52 stdin = FileReadStream(sys.stdin.buffer)
53 ctrl_b_count = 0
54 while True:
55 data = await stdin.receive(max_bytes=1)
56 if not data:
57 continue
58 if data == b"\x02": # Ctrl-B
59 ctrl_b_count += 1
60 if ctrl_b_count == 3:
61 raise ConsoleExit
62 else:
63 ctrl_b_count = 0
64 await stream.send(data)