Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/driver.py: 67%
49 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from contextlib import asynccontextmanager
2from dataclasses import dataclass, field
4from anyio import (
5 create_memory_object_stream,
6)
7from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper
8from anyio.abc import ObjectStream
9from anyio.streams.stapled import StapledObjectStream
10from serial import serial_for_url
11from serial_asyncio import open_serial_connection
13from jumpstarter.driver import Driver, exportstream
15LOOP = "loop://"
18@dataclass(kw_only=True)
19class AsyncSerial(ObjectStream):
20 reader: StreamReaderWrapper
21 writer: StreamWriterWrapper
23 async def send(self, item):
24 await self.writer.send(item)
26 async def receive(self):
27 return await self.reader.receive()
29 async def send_eof(self):
30 pass
32 async def aclose(self):
33 await self.writer.aclose()
34 await self.reader.aclose()
37@dataclass(kw_only=True)
38class PySerial(Driver):
39 url: str
40 baudrate: int = field(default=115200)
41 check_present: bool = field(default=True)
43 def __post_init__(self):
44 if hasattr(super(), "__post_init__"):
45 super().__post_init__()
46 if self.check_present and self.url != LOOP:
47 serial_for_url(self.url, baudrate=self.baudrate)
49 @classmethod
50 def client(cls) -> str:
51 return "jumpstarter_driver_pyserial.client.PySerialClient"
53 @exportstream
54 @asynccontextmanager
55 async def connect(self):
56 self.logger.info("Connecting to %s, baudrate: %d", self.url, self.baudrate)
57 if self.url != LOOP:
58 reader, writer = await open_serial_connection(url=self.url, baudrate=self.baudrate, limit=1)
59 writer.transport.set_write_buffer_limits(high=4096, low=0)
60 async with AsyncSerial(
61 reader=StreamReaderWrapper(reader),
62 writer=StreamWriterWrapper(writer),
63 ) as stream:
64 yield stream
65 self.logger.info("Disconnected from %s", self.url)
66 else:
67 tx, rx = create_memory_object_stream[bytes](32)
68 async with StapledObjectStream(tx, rx) as stream:
69 yield stream