Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-network/jumpstarter_driver_network/adapters/fabric.py: 86%
14 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from contextlib import asynccontextmanager
2from functools import partial
3from typing import Any
5from fabric.config import Config
6from fabric.connection import Connection
8from .portforward import handler
9from jumpstarter.client import DriverClient
10from jumpstarter.client.adapters import blocking
11from jumpstarter.common import TemporaryTcpListener
14@blocking
15@asynccontextmanager
16async def FabricAdapter(
17 *,
18 client: DriverClient,
19 method: str = "connect",
20 user: str | None = None,
21 config: Config | None = None,
22 forward_agent: bool | None = None,
23 connect_timeout: int | None = None,
24 connect_kwargs: dict[str, Any] | None = None,
25 inline_ssh_env: bool | None = None,
26):
27 async with TemporaryTcpListener(partial(handler, client, method)) as addr:
28 yield Connection(
29 addr[0],
30 user=user,
31 port=addr[1],
32 config=config,
33 forward_agent=forward_agent,
34 connect_timeout=connect_timeout,
35 connect_kwargs=connect_kwargs,
36 inline_ssh_env=inline_ssh_env,
37 )