Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/console.py: 34%

50 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1import sys 

2import termios 

3import tty 

4from contextlib import contextmanager 

5 

6from anyio import create_task_group 

7from anyio.streams.file import FileReadStream, FileWriteStream 

8 

9from jumpstarter.client import DriverClient 

10 

11 

12class ConsoleExit(Exception): 

13 pass 

14 

15 

16class Console: 

17 def __init__(self, serial_client: DriverClient): 

18 self.serial_client = serial_client 

19 

20 def run(self): 

21 with self.setraw(): 

22 self.serial_client.portal.call(self.__run) 

23 

24 @contextmanager 

25 def setraw(self): 

26 original = termios.tcgetattr(sys.stdin.fileno()) 

27 try: 

28 tty.setraw(sys.stdin.fileno()) 

29 yield 

30 finally: 

31 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, original) 

32 # Clear screen and move cursor to top-left (like \033c\033[2J\033[H). 

33 print("\033c\033[2J\033[H", end="") 

34 

35 async def __run(self): 

36 async with self.serial_client.stream_async(method="connect") as stream: 

37 try: 

38 async with create_task_group() as tg: 

39 tg.start_soon(self.__serial_to_stdout, stream) 

40 tg.start_soon(self.__stdin_to_serial, stream) 

41 except* ConsoleExit: 

42 pass 

43 

44 async def __serial_to_stdout(self, stream): 

45 stdout = FileWriteStream(sys.stdout.buffer) 

46 while True: 

47 data = await stream.receive() 

48 await stdout.send(data) 

49 sys.stdout.flush() 

50 

51 async def __stdin_to_serial(self, stream): 

52 stdin = FileReadStream(sys.stdin.buffer) 

53 ctrl_b_count = 0 

54 while True: 

55 data = await stdin.receive(max_bytes=1) 

56 if not data: 

57 continue 

58 if data == b"\x02": # Ctrl-B 

59 ctrl_b_count += 1 

60 if ctrl_b_count == 3: 

61 raise ConsoleExit 

62 else: 

63 ctrl_b_count = 0 

64 await stream.send(data)