Coverage for src / mysingle / grpc / interceptors.py: 0%
77 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""
2gRPC Interceptors
4서버 및 클라이언트 gRPC 인터셉터 모음
5- AuthInterceptor: user_id 메타데이터 검증
6- LoggingInterceptor: gRPC 호출 로깅
7- MetadataInterceptor: correlation_id 등 공통 메타데이터 주입/검증
8"""
10from __future__ import annotations
12from typing import Any, Callable
14import grpc
16from mysingle.constants import (
17 GRPC_METADATA_CORRELATION_ID,
18 GRPC_METADATA_REQUEST_ID,
19 GRPC_METADATA_USER_ID,
20)
21from mysingle.core.logging import get_structured_logger
23logger = get_structured_logger(__name__)
26class AuthInterceptor(grpc.aio.ServerInterceptor):
27 """
28 gRPC 서버 인증 인터셉터
30 user_id 메타데이터를 검증하고, 없으면 UNAUTHENTICATED 에러 반환
31 개발/테스트 환경에서는 선택적으로 비활성화 가능
33 Usage:
34 ```python
35 from mysingle.grpc import AuthInterceptor
37 server = grpc.aio.server(
38 interceptors=[AuthInterceptor(require_auth=True)]
39 )
40 ```
41 """
43 def __init__(
44 self, require_auth: bool = True, exempt_methods: list[str] | None = None
45 ):
46 """
47 Args:
48 require_auth: 인증 필수 여부 (False면 검증 스킵)
49 exempt_methods: 인증 면제 메서드 목록 (예: ["/health/Check"])
50 """
51 self.require_auth = require_auth
52 self.exempt_methods = set(exempt_methods or [])
54 async def intercept_service(
55 self,
56 continuation: Callable,
57 handler_call_details: grpc.HandlerCallDetails,
58 ) -> grpc.RpcMethodHandler:
59 """gRPC 서비스 인터셉트"""
60 method = handler_call_details.method
62 # 면제 메서드는 통과
63 if method in self.exempt_methods:
64 return await continuation(handler_call_details)
66 # 인증 비활성화 시 통과
67 if not self.require_auth:
68 logger.debug(f"Auth disabled for method: {method}")
69 return await continuation(handler_call_details)
71 # 메타데이터에서 user_id 추출
72 metadata = dict(handler_call_details.invocation_metadata or [])
73 user_id = metadata.get(GRPC_METADATA_USER_ID)
75 if not user_id:
76 logger.warning(
77 f"Missing {GRPC_METADATA_USER_ID} in gRPC metadata for {method}"
78 )
79 # UNAUTHENTICATED 에러를 반환하는 핸들러 생성
80 # gRPC Python의 경우 continuation에서 handler를 가져온 후 context에서 abort 처리
81 handler = await continuation(handler_call_details)
83 # Handler wrapper로 인증 에러 주입
84 async def auth_abort_wrapper(request, context):
85 await context.abort(
86 grpc.StatusCode.UNAUTHENTICATED,
87 f"Missing {GRPC_METADATA_USER_ID} metadata",
88 )
90 return grpc.unary_unary_rpc_method_handler(
91 auth_abort_wrapper,
92 request_deserializer=handler.request_deserializer,
93 response_serializer=handler.response_serializer,
94 )
96 logger.debug(f"gRPC call authenticated: user_id={user_id}, method={method}")
97 return await continuation(handler_call_details)
100class LoggingInterceptor(grpc.aio.ServerInterceptor):
101 """
102 gRPC 서버 로깅 인터셉터
104 모든 gRPC 호출을 구조화된 로그로 기록
105 - 요청 시작 시간
106 - 응답 상태 코드
107 - 소요 시간
108 - 에러 메시지 (있는 경우)
110 Usage:
111 ```python
112 from mysingle.grpc import LoggingInterceptor
114 server = grpc.aio.server(
115 interceptors=[LoggingInterceptor()]
116 )
117 ```
118 """
120 async def intercept_service(
121 self,
122 continuation: Callable,
123 handler_call_details: grpc.HandlerCallDetails,
124 ) -> grpc.RpcMethodHandler:
125 """gRPC 서비스 인터셉트 및 로깅"""
126 import time
128 method = handler_call_details.method
129 metadata = dict(handler_call_details.invocation_metadata or [])
131 user_id = metadata.get(GRPC_METADATA_USER_ID, "unknown")
132 correlation_id = metadata.get(GRPC_METADATA_CORRELATION_ID, "N/A")
134 start_time = time.time()
135 logger.info(
136 "gRPC call started",
137 extra={
138 "method": method,
139 "user_id": user_id,
140 "correlation_id": correlation_id,
141 },
142 )
144 try:
145 handler = await continuation(handler_call_details)
146 elapsed = (time.time() - start_time) * 1000 # ms
148 logger.info(
149 "gRPC call completed",
150 extra={
151 "method": method,
152 "user_id": user_id,
153 "correlation_id": correlation_id,
154 "elapsed_ms": round(elapsed, 2),
155 "status": "OK",
156 },
157 )
158 return handler
160 except Exception as e:
161 elapsed = (time.time() - start_time) * 1000 # ms
162 logger.error(
163 "gRPC call failed",
164 extra={
165 "method": method,
166 "user_id": user_id,
167 "correlation_id": correlation_id,
168 "elapsed_ms": round(elapsed, 2),
169 "error": str(e),
170 },
171 exc_info=True,
172 )
173 raise
176class MetadataInterceptor(grpc.aio.ServerInterceptor):
177 """
178 gRPC 서버 메타데이터 검증 인터셉터
180 correlation_id, request_id 등 추적 메타데이터 검증 및 자동 생성
181 누락 시 자동 생성하여 컨텍스트에 추가
183 Usage:
184 ```python
185 from mysingle.grpc import MetadataInterceptor
187 server = grpc.aio.server(
188 interceptors=[MetadataInterceptor(auto_generate=True)]
189 )
190 ```
191 """
193 def __init__(self, auto_generate: bool = True):
194 """
195 Args:
196 auto_generate: correlation_id 자동 생성 여부 (True 권장)
197 """
198 self.auto_generate = auto_generate
200 async def intercept_service(
201 self,
202 continuation: Callable,
203 handler_call_details: grpc.HandlerCallDetails,
204 ) -> grpc.RpcMethodHandler:
205 """메타데이터 검증 및 자동 생성"""
206 import uuid
208 metadata = dict(handler_call_details.invocation_metadata or [])
210 # correlation_id 자동 생성
211 if self.auto_generate and GRPC_METADATA_CORRELATION_ID not in metadata:
212 correlation_id = str(uuid.uuid4())
213 metadata[GRPC_METADATA_CORRELATION_ID] = correlation_id
214 logger.debug(f"Auto-generated correlation_id: {correlation_id}")
216 # request_id 자동 생성
217 if self.auto_generate and GRPC_METADATA_REQUEST_ID not in metadata:
218 request_id = str(uuid.uuid4())
219 metadata[GRPC_METADATA_REQUEST_ID] = request_id
220 logger.debug(f"Auto-generated request_id: {request_id}")
222 # 메타데이터 로깅
223 logger.debug(
224 "gRPC metadata",
225 extra={
226 "method": handler_call_details.method,
227 "correlation_id": metadata.get(GRPC_METADATA_CORRELATION_ID),
228 "request_id": metadata.get(GRPC_METADATA_REQUEST_ID),
229 "user_id": metadata.get(GRPC_METADATA_USER_ID),
230 },
231 )
233 return await continuation(handler_call_details)
236# Client Interceptors
239class ClientAuthInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
240 """
241 gRPC 클라이언트 인증 인터셉터
243 user_id, correlation_id를 자동으로 메타데이터에 주입
245 Usage:
246 ```python
247 from mysingle.grpc import ClientAuthInterceptor
248 from fastapi import Request
250 async with grpc.aio.insecure_channel(
251 'service:50051',
252 interceptors=[ClientAuthInterceptor(user_id="user123")]
253 ) as channel:
254 stub = MyServiceStub(channel)
255 response = await stub.MyMethod(request)
256 ```
257 """
259 def __init__(self, user_id: str | None = None, correlation_id: str | None = None):
260 """
261 Args:
262 user_id: 사용자 ID (필수)
263 correlation_id: 상관관계 ID (선택, 자동 생성됨)
264 """
265 self.user_id = user_id
266 self.correlation_id = correlation_id
268 async def intercept_unary_unary(
269 self,
270 continuation: Callable,
271 client_call_details: grpc.ClientCallDetails,
272 request: Any,
273 ) -> Any:
274 """메타데이터 주입"""
275 import uuid
276 from collections import namedtuple
278 # 기존 메타데이터 복사
279 metadata = list(client_call_details.metadata or [])
281 # user_id 주입
282 if self.user_id:
283 metadata.append((GRPC_METADATA_USER_ID, self.user_id))
285 # correlation_id 주입 (없으면 생성)
286 correlation_id = self.correlation_id or str(uuid.uuid4())
287 metadata.append((GRPC_METADATA_CORRELATION_ID, correlation_id))
289 # namedtuple을 사용하여 새로운 ClientCallDetails 생성
290 _ClientCallDetails = namedtuple(
291 "ClientCallDetails",
292 [
293 "method",
294 "timeout",
295 "metadata",
296 "credentials",
297 "wait_for_ready",
298 "compression",
299 ],
300 )
302 new_details = _ClientCallDetails(
303 method=client_call_details.method,
304 timeout=client_call_details.timeout,
305 metadata=tuple(metadata),
306 credentials=client_call_details.credentials,
307 wait_for_ready=(
308 client_call_details.wait_for_ready
309 if hasattr(client_call_details, "wait_for_ready")
310 else None
311 ),
312 compression=(
313 client_call_details.compression
314 if hasattr(client_call_details, "compression")
315 else None
316 ),
317 )
319 return await continuation(new_details, request)
322__all__ = [
323 "AuthInterceptor",
324 "LoggingInterceptor",
325 "MetadataInterceptor",
326 "ClientAuthInterceptor",
327]