Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/streams/common.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 17:10 +0200

1import asyncio 

2import logging 

3from contextlib import asynccontextmanager, suppress 

4 

5from anyio import ( 

6 BrokenResourceError, 

7 ClosedResourceError, 

8 create_memory_object_stream, 

9 create_task_group, 

10) 

11from anyio.abc import AnyByteStream 

12from anyio.streams.stapled import StapledObjectStream 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17async def copy_stream(dst: AnyByteStream, src: AnyByteStream): 

18 with suppress(BrokenResourceError, ClosedResourceError, asyncio.exceptions.InvalidStateError): 

19 async for v in src: 

20 await dst.send(v) 

21 with suppress( 

22 AttributeError, 

23 # https://github.com/jumpstarter-dev/jumpstarter/issues/444 

24 # sending EOF to UDS on Darwin could result in 

25 # OSError: [Errno 57] Socket is not connected 

26 OSError, 

27 ): 

28 await dst.send_eof() 

29 

30 

31@asynccontextmanager 

32async def forward_stream(a, b): 

33 async with a, b: 

34 async with create_task_group() as tg: 

35 tg.start_soon(copy_stream, a, b) 

36 tg.start_soon(copy_stream, b, a) 

37 yield 

38 

39 

40def create_memory_stream(): 

41 a_tx, a_rx = create_memory_object_stream[bytes](32) 

42 b_tx, b_rx = create_memory_object_stream[bytes](32) 

43 a = StapledObjectStream(a_tx, b_rx) 

44 b = StapledObjectStream(b_tx, a_rx) 

45 return a, b