Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-network/jumpstarter_driver_network/adapters/portforward.py: 64%
22 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from contextlib import asynccontextmanager
2from functools import partial
3from os import PathLike
5from jumpstarter.client import DriverClient
6from jumpstarter.client.adapters import blocking
7from jumpstarter.common import TemporaryTcpListener, TemporaryUnixListener
8from jumpstarter.streams.common import forward_stream
11async def handler(client, method, conn):
12 async with conn:
13 async with client.stream_async(method) as stream:
14 async with forward_stream(conn, stream):
15 pass
18@blocking
19@asynccontextmanager
20async def TcpPortforwardAdapter(
21 *,
22 client: DriverClient,
23 method: str = "connect",
24 local_host: str = "127.0.0.1",
25 local_port: int = 0,
26):
27 async with TemporaryTcpListener(
28 partial(handler, client, method),
29 local_host=local_host,
30 local_port=local_port,
31 ) as addr:
32 yield addr
35@blocking
36@asynccontextmanager
37async def UnixPortforwardAdapter(
38 *,
39 client: DriverClient,
40 method: str = "connect",
41 path: PathLike | None = None,
42):
43 async with TemporaryUnixListener(partial(handler, client, method), path=path) as addr:
44 yield addr