Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/common/streams.py: 76%

29 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1from contextlib import asynccontextmanager 

2from typing import Annotated, Literal, Union 

3from uuid import UUID 

4 

5import grpc 

6from jumpstarter_protocol.jumpstarter.v1 import router_pb2_grpc 

7from pydantic import BaseModel, Field, Json 

8 

9from jumpstarter.common.grpc import aio_secure_channel, ssl_channel_credentials 

10from jumpstarter.streams.common import forward_stream 

11from jumpstarter.streams.router import RouterStream 

12 

13 

14class ResourceStreamRequest(BaseModel): 

15 kind: Literal["resource"] = "resource" 

16 uuid: UUID 

17 x_jmp_content_encoding: str | None = None 

18 

19 

20class DriverStreamRequest(BaseModel): 

21 kind: Literal["driver"] = "driver" 

22 uuid: UUID 

23 method: str 

24 

25 

26StreamRequest = Annotated[ 

27 Union[ResourceStreamRequest, DriverStreamRequest], 

28 Field(discriminator="kind"), 

29] 

30 

31 

32class StreamRequestMetadata(BaseModel): 

33 request: Json[StreamRequest] 

34 

35 

36@asynccontextmanager 

37async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options): 

38 credentials = grpc.composite_channel_credentials( 

39 await ssl_channel_credentials(endpoint, tls_config), 

40 grpc.access_token_call_credentials(token), 

41 ) 

42 

43 async with aio_secure_channel(endpoint, credentials, grpc_options) as channel: 

44 router = router_pb2_grpc.RouterServiceStub(channel) 

45 context = router.Stream(metadata=()) 

46 async with RouterStream(context=context) as s: 

47 async with forward_stream(s, stream): 

48 yield