Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-network/jumpstarter_driver_network/adapters/novnc.py: 56%
18 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1from contextlib import asynccontextmanager
2from urllib.parse import urlencode, urlunparse
4from ..streams import WebsocketServerStream
5from jumpstarter.client import DriverClient
6from jumpstarter.client.adapters import blocking
7from jumpstarter.common import TemporaryTcpListener
8from jumpstarter.streams.common import forward_stream
11@blocking
12@asynccontextmanager
13async def NovncAdapter(*, client: DriverClient, method: str = "connect"):
14 async def handler(conn):
15 async with conn:
16 async with client.stream_async(method) as stream:
17 async with WebsocketServerStream(stream=stream) as stream:
18 async with forward_stream(conn, stream):
19 pass
21 async with TemporaryTcpListener(handler) as addr:
22 yield urlunparse(
23 (
24 "https",
25 "novnc.com",
26 "/noVNC/vnc.html",
27 "",
28 urlencode({"autoconnect": 1, "reconnect": 1, "host": addr[0], "port": addr[1]}),
29 "",
30 )
31 )