Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/streams/common.py: 100%
26 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
1import asyncio
2import logging
3from contextlib import asynccontextmanager, suppress
5from anyio import (
6 BrokenResourceError,
7 ClosedResourceError,
8 create_memory_object_stream,
9 create_task_group,
10)
11from anyio.abc import AnyByteStream
12from anyio.streams.stapled import StapledObjectStream
14logger = logging.getLogger(__name__)
17async def copy_stream(dst: AnyByteStream, src: AnyByteStream):
18 with suppress(BrokenResourceError, ClosedResourceError, asyncio.exceptions.InvalidStateError):
19 async for v in src:
20 await dst.send(v)
21 with suppress(
22 AttributeError,
23 # https://github.com/jumpstarter-dev/jumpstarter/issues/444
24 # sending EOF to UDS on Darwin could result in
25 # OSError: [Errno 57] Socket is not connected
26 OSError,
27 ):
28 await dst.send_eof()
31@asynccontextmanager
32async def forward_stream(a, b):
33 async with a, b:
34 async with create_task_group() as tg:
35 tg.start_soon(copy_stream, a, b)
36 tg.start_soon(copy_stream, b, a)
37 yield
40def create_memory_stream():
41 a_tx, a_rx = create_memory_object_stream[bytes](32)
42 b_tx, b_rx = create_memory_object_stream[bytes](32)
43 a = StapledObjectStream(a_tx, b_rx)
44 b = StapledObjectStream(b_tx, a_rx)
45 return a, b