Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/driver.py: 67%

49 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1from contextlib import asynccontextmanager 

2from dataclasses import dataclass, field 

3 

4from anyio import ( 

5 create_memory_object_stream, 

6) 

7from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper 

8from anyio.abc import ObjectStream 

9from anyio.streams.stapled import StapledObjectStream 

10from serial import serial_for_url 

11from serial_asyncio import open_serial_connection 

12 

13from jumpstarter.driver import Driver, exportstream 

14 

15LOOP = "loop://" 

16 

17 

18@dataclass(kw_only=True) 

19class AsyncSerial(ObjectStream): 

20 reader: StreamReaderWrapper 

21 writer: StreamWriterWrapper 

22 

23 async def send(self, item): 

24 await self.writer.send(item) 

25 

26 async def receive(self): 

27 return await self.reader.receive() 

28 

29 async def send_eof(self): 

30 pass 

31 

32 async def aclose(self): 

33 await self.writer.aclose() 

34 await self.reader.aclose() 

35 

36 

37@dataclass(kw_only=True) 

38class PySerial(Driver): 

39 url: str 

40 baudrate: int = field(default=115200) 

41 check_present: bool = field(default=True) 

42 

43 def __post_init__(self): 

44 if hasattr(super(), "__post_init__"): 

45 super().__post_init__() 

46 if self.check_present and self.url != LOOP: 

47 serial_for_url(self.url, baudrate=self.baudrate) 

48 

49 @classmethod 

50 def client(cls) -> str: 

51 return "jumpstarter_driver_pyserial.client.PySerialClient" 

52 

53 @exportstream 

54 @asynccontextmanager 

55 async def connect(self): 

56 self.logger.info("Connecting to %s, baudrate: %d", self.url, self.baudrate) 

57 if self.url != LOOP: 

58 reader, writer = await open_serial_connection(url=self.url, baudrate=self.baudrate, limit=1) 

59 writer.transport.set_write_buffer_limits(high=4096, low=0) 

60 async with AsyncSerial( 

61 reader=StreamReaderWrapper(reader), 

62 writer=StreamWriterWrapper(writer), 

63 ) as stream: 

64 yield stream 

65 self.logger.info("Disconnected from %s", self.url) 

66 else: 

67 tx, rx = create_memory_object_stream[bytes](32) # ty: ignore[call-non-callable] 

68 async with StapledObjectStream(tx, rx) as stream: 

69 yield stream