Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/client/grpc.py: 55%

163 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1from __future__ import annotations 

2 

3from collections import OrderedDict 

4from dataclasses import InitVar, dataclass, field 

5from datetime import datetime, timedelta 

6from types import SimpleNamespace 

7from typing import Any 

8 

9from google.protobuf import duration_pb2, field_mask_pb2, json_format 

10from grpc import ChannelConnectivity 

11from grpc.aio import Channel 

12from jumpstarter_protocol import client_pb2, client_pb2_grpc, jumpstarter_pb2_grpc, kubernetes_pb2, router_pb2_grpc 

13from pydantic import BaseModel, ConfigDict, Field, field_serializer 

14 

15from jumpstarter.common.grpc import translate_grpc_exceptions 

16 

17 

18def parse_identifier(identifier: str, kind: str) -> (str, str): 

19 segments = identifier.split("/") 

20 if len(segments) != 4: 

21 raise ValueError("incorrect number of segments in identifier, expecting 4, got {}".format(len(segments))) 

22 if segments[0] != "namespaces": 

23 raise ValueError("incorrect first segment in identifier, expecting namespaces, got {}".format(segments[0])) 

24 if segments[2] != kind: 

25 raise ValueError("incorrect third segment in identifier, expecting {}, got {}".format(kind, segments[2])) 

26 return segments[1], segments[3] 

27 

28 

29def parse_client_identifier(identifier: str) -> (str, str): 

30 return parse_identifier(identifier, "clients") 

31 

32 

33def parse_exporter_identifier(identifier: str) -> (str, str): 

34 return parse_identifier(identifier, "exporters") 

35 

36 

37def parse_lease_identifier(identifier: str) -> (str, str): 

38 return parse_identifier(identifier, "leases") 

39 

40 

41class Exporter(BaseModel): 

42 namespace: str 

43 name: str 

44 labels: dict[str, str] 

45 

46 @classmethod 

47 def from_protobuf(cls, data: client_pb2.Exporter) -> Exporter: 

48 namespace, name = parse_exporter_identifier(data.name) 

49 return cls(namespace=namespace, name=name, labels=data.labels) 

50 

51 @classmethod 

52 def rich_add_columns(cls, table): 

53 table.add_column("NAME") 

54 table.add_column("LABELS") 

55 

56 def rich_add_rows(self, table): 

57 table.add_row( 

58 self.name, 

59 ",".join(("{}={}".format(i[0], i[1]) for i in self.labels.items())), 

60 ) 

61 

62 def rich_add_names(self, names): 

63 names.append(self.name) 

64 

65 

66class Lease(BaseModel): 

67 namespace: str 

68 name: str 

69 selector: str 

70 duration: timedelta 

71 client: str 

72 exporter: str 

73 conditions: list[kubernetes_pb2.Condition] 

74 effective_begin_time: datetime | None = None 

75 

76 model_config = ConfigDict( 

77 arbitrary_types_allowed=True, 

78 ser_json_timedelta="float", 

79 ) 

80 

81 @field_serializer("conditions") 

82 def serialize_conditions(self, conditions: list[kubernetes_pb2.Condition], _info): 

83 return [json_format.MessageToDict(condition) for condition in conditions] 

84 

85 @classmethod 

86 def from_protobuf(cls, data: client_pb2.Lease) -> Lease: 

87 namespace, name = parse_lease_identifier(data.name) 

88 

89 _, client = parse_client_identifier(data.client) 

90 if data.exporter != "": 

91 _, exporter = parse_exporter_identifier(data.exporter) 

92 else: 

93 exporter = "" 

94 

95 effective_begin_time = None 

96 if data.effective_begin_time: 

97 effective_begin_time = data.effective_begin_time.ToDatetime( 

98 tzinfo=datetime.now().astimezone().tzinfo, 

99 ) 

100 

101 return cls( 

102 namespace=namespace, 

103 name=name, 

104 selector=data.selector, 

105 duration=data.duration.ToTimedelta(), 

106 client=client, 

107 exporter=exporter, 

108 effective_begin_time=effective_begin_time, 

109 conditions=data.conditions, 

110 ) 

111 

112 @classmethod 

113 def rich_add_columns(cls, table): 

114 table.add_column("NAME") 

115 table.add_column("SELECTOR") 

116 table.add_column("DURATION") 

117 table.add_column("CLIENT") 

118 table.add_column("EXPORTER") 

119 

120 def rich_add_rows(self, table): 

121 table.add_row( 

122 self.name, 

123 self.selector, 

124 str(self.duration), 

125 self.client, 

126 self.exporter, 

127 ) 

128 

129 def rich_add_names(self, names): 

130 names.append(self.name) 

131 

132 

133class ExporterList(BaseModel): 

134 exporters: list[Exporter] 

135 next_page_token: str | None = Field(exclude=True) 

136 

137 @classmethod 

138 def from_protobuf(cls, data: client_pb2.ListExportersResponse) -> ExporterList: 

139 return cls( 

140 exporters=list(map(Exporter.from_protobuf, data.exporters)), 

141 next_page_token=data.next_page_token, 

142 ) 

143 

144 @classmethod 

145 def rich_add_columns(cls, table): 

146 Exporter.rich_add_columns(table) 

147 

148 def rich_add_rows(self, table): 

149 for exporter in self.exporters: 

150 exporter.rich_add_rows(table) 

151 

152 def rich_add_names(self, names): 

153 for exporter in self.exporters: 

154 exporter.rich_add_names(names) 

155 

156 

157class LeaseList(BaseModel): 

158 leases: list[Lease] 

159 next_page_token: str | None = Field(exclude=True) 

160 

161 @classmethod 

162 def from_protobuf(cls, data: client_pb2.ListLeasesResponse) -> LeaseList: 

163 return cls( 

164 leases=list(map(Lease.from_protobuf, data.leases)), 

165 next_page_token=data.next_page_token, 

166 ) 

167 

168 @classmethod 

169 def rich_add_columns(cls, table): 

170 Lease.rich_add_columns(table) 

171 

172 def rich_add_rows(self, table): 

173 for lease in self.leases: 

174 lease.rich_add_rows(table) 

175 

176 def rich_add_names(self, names): 

177 for lease in self.leases: 

178 lease.rich_add_names(names) 

179 

180 

181@dataclass(kw_only=True, slots=True) 

182class ClientService: 

183 channel: Channel 

184 namespace: str 

185 stub: client_pb2_grpc.ClientServiceStub = field(init=False) 

186 

187 def __post_init__(self): 

188 self.stub = client_pb2_grpc.ClientServiceStub(channel=self.channel) 

189 

190 async def GetExporter(self, *, name: str): 

191 with translate_grpc_exceptions(): 

192 exporter = await self.stub.GetExporter( 

193 client_pb2.GetExporterRequest( 

194 name="namespaces/{}/exporters/{}".format(self.namespace, name), 

195 ) 

196 ) 

197 return Exporter.from_protobuf(exporter) 

198 

199 async def ListExporters( 

200 self, 

201 *, 

202 page_size: int | None = None, 

203 page_token: str | None = None, 

204 filter: str | None = None, 

205 ): 

206 with translate_grpc_exceptions(): 

207 exporters = await self.stub.ListExporters( 

208 client_pb2.ListExportersRequest( 

209 parent="namespaces/{}".format(self.namespace), 

210 page_size=page_size, 

211 page_token=page_token, 

212 filter=filter, 

213 ) 

214 ) 

215 return ExporterList.from_protobuf(exporters) 

216 

217 async def GetLease(self, *, name: str): 

218 with translate_grpc_exceptions(): 

219 lease = await self.stub.GetLease( 

220 client_pb2.GetLeaseRequest( 

221 name="namespaces/{}/leases/{}".format(self.namespace, name), 

222 ) 

223 ) 

224 return Lease.from_protobuf(lease) 

225 

226 async def ListLeases( 

227 self, 

228 *, 

229 page_size: int | None = None, 

230 page_token: str | None = None, 

231 filter: str | None = None, 

232 ): 

233 with translate_grpc_exceptions(): 

234 leases = await self.stub.ListLeases( 

235 client_pb2.ListLeasesRequest( 

236 parent="namespaces/{}".format(self.namespace), 

237 page_size=page_size, 

238 page_token=page_token, 

239 filter=filter, 

240 ) 

241 ) 

242 return LeaseList.from_protobuf(leases) 

243 

244 async def CreateLease( 

245 self, 

246 *, 

247 selector: str, 

248 duration: timedelta, 

249 ): 

250 duration_pb = duration_pb2.Duration() 

251 duration_pb.FromTimedelta(duration) 

252 

253 with translate_grpc_exceptions(): 

254 lease = await self.stub.CreateLease( 

255 client_pb2.CreateLeaseRequest( 

256 parent="namespaces/{}".format(self.namespace), 

257 lease=client_pb2.Lease( 

258 duration=duration_pb, 

259 selector=selector, 

260 ), 

261 ) 

262 ) 

263 return Lease.from_protobuf(lease) 

264 

265 async def UpdateLease( 

266 self, 

267 *, 

268 name: str, 

269 duration: timedelta, 

270 ): 

271 duration_pb = duration_pb2.Duration() 

272 duration_pb.FromTimedelta(duration) 

273 

274 update_mask = field_mask_pb2.FieldMask() 

275 update_mask.FromJsonString("duration") 

276 

277 with translate_grpc_exceptions(): 

278 lease = await self.stub.UpdateLease( 

279 client_pb2.UpdateLeaseRequest( 

280 lease=client_pb2.Lease( 

281 name="namespaces/{}/leases/{}".format(self.namespace, name), 

282 duration=duration_pb, 

283 ), 

284 update_mask=update_mask, 

285 ) 

286 ) 

287 return Lease.from_protobuf(lease) 

288 

289 async def DeleteLease(self, *, name: str): 

290 with translate_grpc_exceptions(): 

291 await self.stub.DeleteLease( 

292 client_pb2.DeleteLeaseRequest( 

293 name="namespaces/{}/leases/{}".format(self.namespace, name), 

294 ) 

295 ) 

296 

297 

298@dataclass(frozen=True, slots=True) 

299class MultipathExporterStub: 

300 """ 

301 Multipath ExporterServiceStub 

302 

303 Connecting to exporter service using multiple channels. 

304 All channels are tried in sequence, and the first one ready 

305 is used, prioritizing channels in the front. 

306 """ 

307 

308 channels: InitVar[list[Channel]] 

309 

310 __stubs: dict[Channel, Any] = field(init=False, default_factory=OrderedDict) 

311 

312 def __post_init__(self, channels): 

313 for channel in channels: 

314 stub = SimpleNamespace() 

315 jumpstarter_pb2_grpc.ExporterServiceStub.__init__(stub, channel) 

316 router_pb2_grpc.RouterServiceStub.__init__(stub, channel) 

317 self.__stubs[channel] = stub 

318 

319 def __getattr__(self, name): 

320 for channel, stub in self.__stubs.items(): 

321 # find the first channel that's ready 

322 if channel.get_state(try_to_connect=True) == ChannelConnectivity.READY: 

323 return getattr(stub, name) 

324 # or fallback to the last channel (via router) 

325 return getattr(next(reversed(self.__stubs.values())), name)