Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-network/jumpstarter_driver_network/adapters/dbus.py: 28%
25 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1from contextlib import contextmanager
2from os import environ, getenv
3from typing import TYPE_CHECKING
5from .portforward import TcpPortforwardAdapter
7if TYPE_CHECKING:
8 from ..client import DbusNetworkClient
11@contextmanager
12def DbusAdapter(*, client: "DbusNetworkClient"):
13 match client.kind:
14 case "system":
15 varname = "DBUS_SYSTEM_BUS_ADDRESS"
16 pass
17 case "session":
18 varname = "DBUS_SESSION_BUS_ADDRESS"
19 pass
20 case _:
21 raise ValueError(f"invalid bus type: {client.kind}")
23 oldenv = getenv(varname)
25 with TcpPortforwardAdapter(client=client) as addr:
26 environ[varname] = f"tcp:host={addr[0]},port={addr[1]}"
28 try:
29 yield
30 finally:
31 if oldenv is None:
32 del environ[varname]
33 else:
34 environ[varname] = oldenv