Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/client/grpc.py: 55%
163 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1from __future__ import annotations
3from collections import OrderedDict
4from dataclasses import InitVar, dataclass, field
5from datetime import datetime, timedelta
6from types import SimpleNamespace
7from typing import Any
9from google.protobuf import duration_pb2, field_mask_pb2, json_format
10from grpc import ChannelConnectivity
11from grpc.aio import Channel
12from jumpstarter_protocol import client_pb2, client_pb2_grpc, jumpstarter_pb2_grpc, kubernetes_pb2, router_pb2_grpc
13from pydantic import BaseModel, ConfigDict, Field, field_serializer
15from jumpstarter.common.grpc import translate_grpc_exceptions
18def parse_identifier(identifier: str, kind: str) -> (str, str):
19 segments = identifier.split("/")
20 if len(segments) != 4:
21 raise ValueError("incorrect number of segments in identifier, expecting 4, got {}".format(len(segments)))
22 if segments[0] != "namespaces":
23 raise ValueError("incorrect first segment in identifier, expecting namespaces, got {}".format(segments[0]))
24 if segments[2] != kind:
25 raise ValueError("incorrect third segment in identifier, expecting {}, got {}".format(kind, segments[2]))
26 return segments[1], segments[3]
29def parse_client_identifier(identifier: str) -> (str, str):
30 return parse_identifier(identifier, "clients")
33def parse_exporter_identifier(identifier: str) -> (str, str):
34 return parse_identifier(identifier, "exporters")
37def parse_lease_identifier(identifier: str) -> (str, str):
38 return parse_identifier(identifier, "leases")
41class Exporter(BaseModel):
42 namespace: str
43 name: str
44 labels: dict[str, str]
46 @classmethod
47 def from_protobuf(cls, data: client_pb2.Exporter) -> Exporter:
48 namespace, name = parse_exporter_identifier(data.name)
49 return cls(namespace=namespace, name=name, labels=data.labels)
51 @classmethod
52 def rich_add_columns(cls, table):
53 table.add_column("NAME")
54 table.add_column("LABELS")
56 def rich_add_rows(self, table):
57 table.add_row(
58 self.name,
59 ",".join(("{}={}".format(i[0], i[1]) for i in self.labels.items())),
60 )
62 def rich_add_names(self, names):
63 names.append(self.name)
66class Lease(BaseModel):
67 namespace: str
68 name: str
69 selector: str
70 duration: timedelta
71 client: str
72 exporter: str
73 conditions: list[kubernetes_pb2.Condition]
74 effective_begin_time: datetime | None = None
76 model_config = ConfigDict(
77 arbitrary_types_allowed=True,
78 ser_json_timedelta="float",
79 )
81 @field_serializer("conditions")
82 def serialize_conditions(self, conditions: list[kubernetes_pb2.Condition], _info):
83 return [json_format.MessageToDict(condition) for condition in conditions]
85 @classmethod
86 def from_protobuf(cls, data: client_pb2.Lease) -> Lease:
87 namespace, name = parse_lease_identifier(data.name)
89 _, client = parse_client_identifier(data.client)
90 if data.exporter != "":
91 _, exporter = parse_exporter_identifier(data.exporter)
92 else:
93 exporter = ""
95 effective_begin_time = None
96 if data.effective_begin_time:
97 effective_begin_time = data.effective_begin_time.ToDatetime(
98 tzinfo=datetime.now().astimezone().tzinfo,
99 )
101 return cls(
102 namespace=namespace,
103 name=name,
104 selector=data.selector,
105 duration=data.duration.ToTimedelta(),
106 client=client,
107 exporter=exporter,
108 effective_begin_time=effective_begin_time,
109 conditions=data.conditions,
110 )
112 @classmethod
113 def rich_add_columns(cls, table):
114 table.add_column("NAME")
115 table.add_column("SELECTOR")
116 table.add_column("DURATION")
117 table.add_column("CLIENT")
118 table.add_column("EXPORTER")
120 def rich_add_rows(self, table):
121 table.add_row(
122 self.name,
123 self.selector,
124 str(self.duration),
125 self.client,
126 self.exporter,
127 )
129 def rich_add_names(self, names):
130 names.append(self.name)
133class ExporterList(BaseModel):
134 exporters: list[Exporter]
135 next_page_token: str | None = Field(exclude=True)
137 @classmethod
138 def from_protobuf(cls, data: client_pb2.ListExportersResponse) -> ExporterList:
139 return cls(
140 exporters=list(map(Exporter.from_protobuf, data.exporters)),
141 next_page_token=data.next_page_token,
142 )
144 @classmethod
145 def rich_add_columns(cls, table):
146 Exporter.rich_add_columns(table)
148 def rich_add_rows(self, table):
149 for exporter in self.exporters:
150 exporter.rich_add_rows(table)
152 def rich_add_names(self, names):
153 for exporter in self.exporters:
154 exporter.rich_add_names(names)
157class LeaseList(BaseModel):
158 leases: list[Lease]
159 next_page_token: str | None = Field(exclude=True)
161 @classmethod
162 def from_protobuf(cls, data: client_pb2.ListLeasesResponse) -> LeaseList:
163 return cls(
164 leases=list(map(Lease.from_protobuf, data.leases)),
165 next_page_token=data.next_page_token,
166 )
168 @classmethod
169 def rich_add_columns(cls, table):
170 Lease.rich_add_columns(table)
172 def rich_add_rows(self, table):
173 for lease in self.leases:
174 lease.rich_add_rows(table)
176 def rich_add_names(self, names):
177 for lease in self.leases:
178 lease.rich_add_names(names)
181@dataclass(kw_only=True, slots=True)
182class ClientService:
183 channel: Channel
184 namespace: str
185 stub: client_pb2_grpc.ClientServiceStub = field(init=False)
187 def __post_init__(self):
188 self.stub = client_pb2_grpc.ClientServiceStub(channel=self.channel)
190 async def GetExporter(self, *, name: str):
191 with translate_grpc_exceptions():
192 exporter = await self.stub.GetExporter(
193 client_pb2.GetExporterRequest(
194 name="namespaces/{}/exporters/{}".format(self.namespace, name),
195 )
196 )
197 return Exporter.from_protobuf(exporter)
199 async def ListExporters(
200 self,
201 *,
202 page_size: int | None = None,
203 page_token: str | None = None,
204 filter: str | None = None,
205 ):
206 with translate_grpc_exceptions():
207 exporters = await self.stub.ListExporters(
208 client_pb2.ListExportersRequest(
209 parent="namespaces/{}".format(self.namespace),
210 page_size=page_size,
211 page_token=page_token,
212 filter=filter,
213 )
214 )
215 return ExporterList.from_protobuf(exporters)
217 async def GetLease(self, *, name: str):
218 with translate_grpc_exceptions():
219 lease = await self.stub.GetLease(
220 client_pb2.GetLeaseRequest(
221 name="namespaces/{}/leases/{}".format(self.namespace, name),
222 )
223 )
224 return Lease.from_protobuf(lease)
226 async def ListLeases(
227 self,
228 *,
229 page_size: int | None = None,
230 page_token: str | None = None,
231 filter: str | None = None,
232 ):
233 with translate_grpc_exceptions():
234 leases = await self.stub.ListLeases(
235 client_pb2.ListLeasesRequest(
236 parent="namespaces/{}".format(self.namespace),
237 page_size=page_size,
238 page_token=page_token,
239 filter=filter,
240 )
241 )
242 return LeaseList.from_protobuf(leases)
244 async def CreateLease(
245 self,
246 *,
247 selector: str,
248 duration: timedelta,
249 ):
250 duration_pb = duration_pb2.Duration()
251 duration_pb.FromTimedelta(duration)
253 with translate_grpc_exceptions():
254 lease = await self.stub.CreateLease(
255 client_pb2.CreateLeaseRequest(
256 parent="namespaces/{}".format(self.namespace),
257 lease=client_pb2.Lease(
258 duration=duration_pb,
259 selector=selector,
260 ),
261 )
262 )
263 return Lease.from_protobuf(lease)
265 async def UpdateLease(
266 self,
267 *,
268 name: str,
269 duration: timedelta,
270 ):
271 duration_pb = duration_pb2.Duration()
272 duration_pb.FromTimedelta(duration)
274 update_mask = field_mask_pb2.FieldMask()
275 update_mask.FromJsonString("duration")
277 with translate_grpc_exceptions():
278 lease = await self.stub.UpdateLease(
279 client_pb2.UpdateLeaseRequest(
280 lease=client_pb2.Lease(
281 name="namespaces/{}/leases/{}".format(self.namespace, name),
282 duration=duration_pb,
283 ),
284 update_mask=update_mask,
285 )
286 )
287 return Lease.from_protobuf(lease)
289 async def DeleteLease(self, *, name: str):
290 with translate_grpc_exceptions():
291 await self.stub.DeleteLease(
292 client_pb2.DeleteLeaseRequest(
293 name="namespaces/{}/leases/{}".format(self.namespace, name),
294 )
295 )
298@dataclass(frozen=True, slots=True)
299class MultipathExporterStub:
300 """
301 Multipath ExporterServiceStub
303 Connecting to exporter service using multiple channels.
304 All channels are tried in sequence, and the first one ready
305 is used, prioritizing channels in the front.
306 """
308 channels: InitVar[list[Channel]]
310 __stubs: dict[Channel, Any] = field(init=False, default_factory=OrderedDict)
312 def __post_init__(self, channels):
313 for channel in channels:
314 stub = SimpleNamespace()
315 jumpstarter_pb2_grpc.ExporterServiceStub.__init__(stub, channel)
316 router_pb2_grpc.RouterServiceStub.__init__(stub, channel)
317 self.__stubs[channel] = stub
319 def __getattr__(self, name):
320 for channel, stub in self.__stubs.items():
321 # find the first channel that's ready
322 if channel.get_state(try_to_connect=True) == ChannelConnectivity.READY:
323 return getattr(stub, name)
324 # or fallback to the last channel (via router)
325 return getattr(next(reversed(self.__stubs.values())), name)