Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/common/streams.py: 76%
29 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1from contextlib import asynccontextmanager
2from typing import Annotated, Literal, Union
3from uuid import UUID
5import grpc
6from jumpstarter_protocol.jumpstarter.v1 import router_pb2_grpc
7from pydantic import BaseModel, Field, Json
9from jumpstarter.common.grpc import aio_secure_channel, ssl_channel_credentials
10from jumpstarter.streams.common import forward_stream
11from jumpstarter.streams.router import RouterStream
14class ResourceStreamRequest(BaseModel):
15 kind: Literal["resource"] = "resource"
16 uuid: UUID
17 x_jmp_content_encoding: str | None = None
20class DriverStreamRequest(BaseModel):
21 kind: Literal["driver"] = "driver"
22 uuid: UUID
23 method: str
26StreamRequest = Annotated[
27 Union[ResourceStreamRequest, DriverStreamRequest],
28 Field(discriminator="kind"),
29]
32class StreamRequestMetadata(BaseModel):
33 request: Json[StreamRequest]
36@asynccontextmanager
37async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
38 credentials = grpc.composite_channel_credentials(
39 await ssl_channel_credentials(endpoint, tls_config),
40 grpc.access_token_call_credentials(token),
41 )
43 async with aio_secure_channel(endpoint, credentials, grpc_options) as channel:
44 router = router_pb2_grpc.RouterServiceStub(channel)
45 context = router.Stream(metadata=())
46 async with RouterStream(context=context) as s:
47 async with forward_stream(s, stream):
48 yield