Coverage for src / mysingle / grpc / interceptors.py: 0%

77 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2gRPC Interceptors 

3 

4서버 및 클라이언트 gRPC 인터셉터 모음 

5- AuthInterceptor: user_id 메타데이터 검증 

6- LoggingInterceptor: gRPC 호출 로깅 

7- MetadataInterceptor: correlation_id 등 공통 메타데이터 주입/검증 

8""" 

9 

10from __future__ import annotations 

11 

12from typing import Any, Callable 

13 

14import grpc 

15 

16from mysingle.constants import ( 

17 GRPC_METADATA_CORRELATION_ID, 

18 GRPC_METADATA_REQUEST_ID, 

19 GRPC_METADATA_USER_ID, 

20) 

21from mysingle.core.logging import get_structured_logger 

22 

23logger = get_structured_logger(__name__) 

24 

25 

26class AuthInterceptor(grpc.aio.ServerInterceptor): 

27 """ 

28 gRPC 서버 인증 인터셉터 

29 

30 user_id 메타데이터를 검증하고, 없으면 UNAUTHENTICATED 에러 반환 

31 개발/테스트 환경에서는 선택적으로 비활성화 가능 

32 

33 Usage: 

34 ```python 

35 from mysingle.grpc import AuthInterceptor 

36 

37 server = grpc.aio.server( 

38 interceptors=[AuthInterceptor(require_auth=True)] 

39 ) 

40 ``` 

41 """ 

42 

43 def __init__( 

44 self, require_auth: bool = True, exempt_methods: list[str] | None = None 

45 ): 

46 """ 

47 Args: 

48 require_auth: 인증 필수 여부 (False면 검증 스킵) 

49 exempt_methods: 인증 면제 메서드 목록 (예: ["/health/Check"]) 

50 """ 

51 self.require_auth = require_auth 

52 self.exempt_methods = set(exempt_methods or []) 

53 

54 async def intercept_service( 

55 self, 

56 continuation: Callable, 

57 handler_call_details: grpc.HandlerCallDetails, 

58 ) -> grpc.RpcMethodHandler: 

59 """gRPC 서비스 인터셉트""" 

60 method = handler_call_details.method 

61 

62 # 면제 메서드는 통과 

63 if method in self.exempt_methods: 

64 return await continuation(handler_call_details) 

65 

66 # 인증 비활성화 시 통과 

67 if not self.require_auth: 

68 logger.debug(f"Auth disabled for method: {method}") 

69 return await continuation(handler_call_details) 

70 

71 # 메타데이터에서 user_id 추출 

72 metadata = dict(handler_call_details.invocation_metadata or []) 

73 user_id = metadata.get(GRPC_METADATA_USER_ID) 

74 

75 if not user_id: 

76 logger.warning( 

77 f"Missing {GRPC_METADATA_USER_ID} in gRPC metadata for {method}" 

78 ) 

79 # UNAUTHENTICATED 에러를 반환하는 핸들러 생성 

80 # gRPC Python의 경우 continuation에서 handler를 가져온 후 context에서 abort 처리 

81 handler = await continuation(handler_call_details) 

82 

83 # Handler wrapper로 인증 에러 주입 

84 async def auth_abort_wrapper(request, context): 

85 await context.abort( 

86 grpc.StatusCode.UNAUTHENTICATED, 

87 f"Missing {GRPC_METADATA_USER_ID} metadata", 

88 ) 

89 

90 return grpc.unary_unary_rpc_method_handler( 

91 auth_abort_wrapper, 

92 request_deserializer=handler.request_deserializer, 

93 response_serializer=handler.response_serializer, 

94 ) 

95 

96 logger.debug(f"gRPC call authenticated: user_id={user_id}, method={method}") 

97 return await continuation(handler_call_details) 

98 

99 

100class LoggingInterceptor(grpc.aio.ServerInterceptor): 

101 """ 

102 gRPC 서버 로깅 인터셉터 

103 

104 모든 gRPC 호출을 구조화된 로그로 기록 

105 - 요청 시작 시간 

106 - 응답 상태 코드 

107 - 소요 시간 

108 - 에러 메시지 (있는 경우) 

109 

110 Usage: 

111 ```python 

112 from mysingle.grpc import LoggingInterceptor 

113 

114 server = grpc.aio.server( 

115 interceptors=[LoggingInterceptor()] 

116 ) 

117 ``` 

118 """ 

119 

120 async def intercept_service( 

121 self, 

122 continuation: Callable, 

123 handler_call_details: grpc.HandlerCallDetails, 

124 ) -> grpc.RpcMethodHandler: 

125 """gRPC 서비스 인터셉트 및 로깅""" 

126 import time 

127 

128 method = handler_call_details.method 

129 metadata = dict(handler_call_details.invocation_metadata or []) 

130 

131 user_id = metadata.get(GRPC_METADATA_USER_ID, "unknown") 

132 correlation_id = metadata.get(GRPC_METADATA_CORRELATION_ID, "N/A") 

133 

134 start_time = time.time() 

135 logger.info( 

136 "gRPC call started", 

137 extra={ 

138 "method": method, 

139 "user_id": user_id, 

140 "correlation_id": correlation_id, 

141 }, 

142 ) 

143 

144 try: 

145 handler = await continuation(handler_call_details) 

146 elapsed = (time.time() - start_time) * 1000 # ms 

147 

148 logger.info( 

149 "gRPC call completed", 

150 extra={ 

151 "method": method, 

152 "user_id": user_id, 

153 "correlation_id": correlation_id, 

154 "elapsed_ms": round(elapsed, 2), 

155 "status": "OK", 

156 }, 

157 ) 

158 return handler 

159 

160 except Exception as e: 

161 elapsed = (time.time() - start_time) * 1000 # ms 

162 logger.error( 

163 "gRPC call failed", 

164 extra={ 

165 "method": method, 

166 "user_id": user_id, 

167 "correlation_id": correlation_id, 

168 "elapsed_ms": round(elapsed, 2), 

169 "error": str(e), 

170 }, 

171 exc_info=True, 

172 ) 

173 raise 

174 

175 

176class MetadataInterceptor(grpc.aio.ServerInterceptor): 

177 """ 

178 gRPC 서버 메타데이터 검증 인터셉터 

179 

180 correlation_id, request_id 등 추적 메타데이터 검증 및 자동 생성 

181 누락 시 자동 생성하여 컨텍스트에 추가 

182 

183 Usage: 

184 ```python 

185 from mysingle.grpc import MetadataInterceptor 

186 

187 server = grpc.aio.server( 

188 interceptors=[MetadataInterceptor(auto_generate=True)] 

189 ) 

190 ``` 

191 """ 

192 

193 def __init__(self, auto_generate: bool = True): 

194 """ 

195 Args: 

196 auto_generate: correlation_id 자동 생성 여부 (True 권장) 

197 """ 

198 self.auto_generate = auto_generate 

199 

200 async def intercept_service( 

201 self, 

202 continuation: Callable, 

203 handler_call_details: grpc.HandlerCallDetails, 

204 ) -> grpc.RpcMethodHandler: 

205 """메타데이터 검증 및 자동 생성""" 

206 import uuid 

207 

208 metadata = dict(handler_call_details.invocation_metadata or []) 

209 

210 # correlation_id 자동 생성 

211 if self.auto_generate and GRPC_METADATA_CORRELATION_ID not in metadata: 

212 correlation_id = str(uuid.uuid4()) 

213 metadata[GRPC_METADATA_CORRELATION_ID] = correlation_id 

214 logger.debug(f"Auto-generated correlation_id: {correlation_id}") 

215 

216 # request_id 자동 생성 

217 if self.auto_generate and GRPC_METADATA_REQUEST_ID not in metadata: 

218 request_id = str(uuid.uuid4()) 

219 metadata[GRPC_METADATA_REQUEST_ID] = request_id 

220 logger.debug(f"Auto-generated request_id: {request_id}") 

221 

222 # 메타데이터 로깅 

223 logger.debug( 

224 "gRPC metadata", 

225 extra={ 

226 "method": handler_call_details.method, 

227 "correlation_id": metadata.get(GRPC_METADATA_CORRELATION_ID), 

228 "request_id": metadata.get(GRPC_METADATA_REQUEST_ID), 

229 "user_id": metadata.get(GRPC_METADATA_USER_ID), 

230 }, 

231 ) 

232 

233 return await continuation(handler_call_details) 

234 

235 

236# Client Interceptors 

237 

238 

239class ClientAuthInterceptor(grpc.aio.UnaryUnaryClientInterceptor): 

240 """ 

241 gRPC 클라이언트 인증 인터셉터 

242 

243 user_id, correlation_id를 자동으로 메타데이터에 주입 

244 

245 Usage: 

246 ```python 

247 from mysingle.grpc import ClientAuthInterceptor 

248 from fastapi import Request 

249 

250 async with grpc.aio.insecure_channel( 

251 'service:50051', 

252 interceptors=[ClientAuthInterceptor(user_id="user123")] 

253 ) as channel: 

254 stub = MyServiceStub(channel) 

255 response = await stub.MyMethod(request) 

256 ``` 

257 """ 

258 

259 def __init__(self, user_id: str | None = None, correlation_id: str | None = None): 

260 """ 

261 Args: 

262 user_id: 사용자 ID (필수) 

263 correlation_id: 상관관계 ID (선택, 자동 생성됨) 

264 """ 

265 self.user_id = user_id 

266 self.correlation_id = correlation_id 

267 

268 async def intercept_unary_unary( 

269 self, 

270 continuation: Callable, 

271 client_call_details: grpc.ClientCallDetails, 

272 request: Any, 

273 ) -> Any: 

274 """메타데이터 주입""" 

275 import uuid 

276 from collections import namedtuple 

277 

278 # 기존 메타데이터 복사 

279 metadata = list(client_call_details.metadata or []) 

280 

281 # user_id 주입 

282 if self.user_id: 

283 metadata.append((GRPC_METADATA_USER_ID, self.user_id)) 

284 

285 # correlation_id 주입 (없으면 생성) 

286 correlation_id = self.correlation_id or str(uuid.uuid4()) 

287 metadata.append((GRPC_METADATA_CORRELATION_ID, correlation_id)) 

288 

289 # namedtuple을 사용하여 새로운 ClientCallDetails 생성 

290 _ClientCallDetails = namedtuple( 

291 "ClientCallDetails", 

292 [ 

293 "method", 

294 "timeout", 

295 "metadata", 

296 "credentials", 

297 "wait_for_ready", 

298 "compression", 

299 ], 

300 ) 

301 

302 new_details = _ClientCallDetails( 

303 method=client_call_details.method, 

304 timeout=client_call_details.timeout, 

305 metadata=tuple(metadata), 

306 credentials=client_call_details.credentials, 

307 wait_for_ready=( 

308 client_call_details.wait_for_ready 

309 if hasattr(client_call_details, "wait_for_ready") 

310 else None 

311 ), 

312 compression=( 

313 client_call_details.compression 

314 if hasattr(client_call_details, "compression") 

315 else None 

316 ), 

317 ) 

318 

319 return await continuation(new_details, request) 

320 

321 

322__all__ = [ 

323 "AuthInterceptor", 

324 "LoggingInterceptor", 

325 "MetadataInterceptor", 

326 "ClientAuthInterceptor", 

327]