Coverage for /home/fedora/jumpstarter/packages/jumpstarter/jumpstarter/streams/blocking.py: 82%
11 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from dataclasses import dataclass
3from anyio.abc import AnyByteStream
4from anyio.from_thread import BlockingPortal
7@dataclass(kw_only=True)
8class BlockingStream:
9 stream: AnyByteStream
10 portal: BlockingPortal
12 def send(self, data: bytes) -> None:
13 return self.portal.call(self.stream.send, data)
15 def receive(self) -> bytes:
16 return self.portal.call(self.stream.receive)