Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-network/jumpstarter_driver_network/adapters/novnc.py: 56%

18 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1from contextlib import asynccontextmanager 

2from urllib.parse import urlencode, urlunparse 

3 

4from ..streams import WebsocketServerStream 

5from jumpstarter.client import DriverClient 

6from jumpstarter.client.adapters import blocking 

7from jumpstarter.common import TemporaryTcpListener 

8from jumpstarter.streams.common import forward_stream 

9 

10 

11@blocking 

12@asynccontextmanager 

13async def NovncAdapter(*, client: DriverClient, method: str = "connect"): 

14 async def handler(conn): 

15 async with conn: 

16 async with client.stream_async(method) as stream: 

17 async with WebsocketServerStream(stream=stream) as stream: 

18 async with forward_stream(conn, stream): 

19 pass 

20 

21 async with TemporaryTcpListener(handler) as addr: 

22 yield urlunparse( 

23 ( 

24 "https", 

25 "novnc.com", 

26 "/noVNC/vnc.html", 

27 "", 

28 urlencode({"autoconnect": 1, "reconnect": 1, "host": addr[0], "port": addr[1]}), 

29 "", 

30 ) 

31 )