Coverage for src / mysingle / clients / base_grpc_client.py: 0%
70 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""
2Base gRPC Client
4모든 마이크로서비스 gRPC 클라이언트의 베이스 클래스
5공통 기능: 채널 관리, 메타데이터 주입, 컨텍스트 매니저, 환경별 연결 설정
7Usage:
8 ```python
9 from mysingle.clients import BaseGrpcClient
10 from app.grpc import my_service_pb2_grpc
12 class MyServiceGrpcClient(BaseGrpcClient):
13 def __init__(
14 self,
15 user_id: str | None = None,
16 correlation_id: str | None = None,
17 **kwargs,
18 ):
19 super().__init__(
20 service_name="my-service",
21 default_port=50051,
22 user_id=user_id,
23 correlation_id=correlation_id,
24 **kwargs,
25 )
26 self.stub = my_service_pb2_grpc.MyServiceStub(self.channel)
28 async def get_data(self, request_id: str):
29 request = my_service_pb2.GetDataRequest(id=request_id)
30 response = await self.stub.GetData(request, metadata=self.metadata)
31 return response
33 # 사용 1: 컨텍스트 매니저
34 async with MyServiceGrpcClient(user_id="user123") as client:
35 data = await client.get_data("req_id")
37 # 사용 2: 수동 연결 관리
38 client = MyServiceGrpcClient(user_id="user123")
39 try:
40 data = await client.get_data("req_id")
41 finally:
42 await client.close()
43 ```
44"""
46from __future__ import annotations
48import os
49import uuid
50from typing import TYPE_CHECKING, Any
52import grpc
54from mysingle.constants import (
55 GRPC_METADATA_CORRELATION_ID,
56 GRPC_METADATA_REQUEST_ID,
57 GRPC_METADATA_USER_ID,
58)
59from mysingle.core.logging import get_structured_logger
61if TYPE_CHECKING:
62 from fastapi import Request
64logger = get_structured_logger(__name__)
67class BaseGrpcClient:
68 """
69 마이크로서비스 gRPC 클라이언트 베이스 클래스
71 Features:
72 - 자동 채널 관리 (secure/insecure)
73 - 메타데이터 자동 주입 (user_id, correlation_id, request_id)
74 - 비동기 컨텍스트 매니저 지원
75 - 환경별 연결 설정 (Docker 내부 vs 외부)
76 - Keepalive 및 연결 옵션 표준화
78 Metadata Headers:
79 - user-id: 사용자 식별자 (필수, 서버 인터셉터에서 검증)
80 - correlation-id: 요청 추적 ID (자동 생성 또는 전파)
81 - request-id: 개별 요청 ID (항상 자동 생성)
83 Environment Variables:
84 - {SERVICE}_GRPC_HOST: 서비스별 호스트 오버라이드
85 예) INDICATOR_GRPC_HOST=indicator-service
86 - GRPC_USE_TLS: TLS 사용 여부 (기본값: false)
87 """
89 def __init__(
90 self,
91 service_name: str,
92 default_port: int,
93 host: str | None = None,
94 user_id: str | None = None,
95 correlation_id: str | None = None,
96 request: "Request | None" = None,
97 timeout: float = 10.0,
98 use_tls: bool | None = None,
99 **kwargs: Any,
100 ):
101 """
102 Args:
103 service_name: 서비스 이름 (예: "indicator-service", "market-data-service")
104 default_port: gRPC 포트 (예: 50052, 50053)
105 host: 명시적 호스트 (없으면 자동 결정)
106 user_id: 사용자 ID (request가 주어지면 자동 추출)
107 correlation_id: 상관관계 ID (없으면 자동 생성)
108 request: FastAPI Request 객체 (메타데이터 자동 추출)
109 timeout: 요청 타임아웃 (초)
110 use_tls: TLS 사용 여부 (None이면 환경변수 기반)
111 **kwargs: grpc.aio.Channel 추가 옵션
113 Note:
114 - request 객체가 제공되면 user_id와 correlation_id를 헤더에서 추출합니다.
115 - user_id가 없으면 서버 AuthInterceptor에서 UNAUTHENTICATED 오류가 발생합니다.
116 """
117 self.service_name = service_name
118 self.default_port = default_port
119 self.timeout = timeout
121 # User ID 결정: 명시적 user_id > request 헤더
122 if user_id:
123 self.user_id: str = user_id
124 elif request:
125 self.user_id = self._extract_user_id_from_request(request)
126 else:
127 self.user_id = ""
129 # Correlation ID 결정: 명시적 correlation_id > request 헤더 > 자동 생성
130 if correlation_id:
131 self.correlation_id: str = correlation_id
132 elif request:
133 extracted_cid = self._extract_correlation_id_from_request(request)
134 self.correlation_id = extracted_cid if extracted_cid else str(uuid.uuid4())
135 else:
136 self.correlation_id = str(uuid.uuid4())
138 # 호스트 결정
139 if host is None:
140 host = self._determine_host()
142 self.host = host
143 self.address = f"{host}:{default_port}"
145 # TLS 설정 결정
146 if use_tls is None:
147 use_tls = os.getenv("GRPC_USE_TLS", "false").lower() == "true"
149 self.use_tls = use_tls
151 # 채널 생성
152 self.channel = self._create_channel(**kwargs)
154 logger.info(
155 f"{self.__class__.__name__} initialized",
156 address=self.address,
157 tls=self.use_tls,
158 timeout=timeout,
159 user_id=self.user_id,
160 correlation_id=self.correlation_id,
161 )
163 @staticmethod
164 def _extract_user_id_from_request(request: "Request") -> str:
165 """
166 FastAPI Request 객체에서 User ID 추출
168 Args:
169 request: FastAPI Request 객체
171 Returns:
172 추출된 User ID (없으면 빈 문자열)
173 """
174 from mysingle.constants import HEADER_USER_ID
176 user_id = request.headers.get(HEADER_USER_ID, "")
177 if not user_id:
178 logger.warning("User ID not found in request headers")
179 return user_id
181 @staticmethod
182 def _extract_correlation_id_from_request(request: "Request") -> str | None:
183 """
184 FastAPI Request 객체에서 Correlation ID 추출
186 Args:
187 request: FastAPI Request 객체
189 Returns:
190 추출된 Correlation ID (없으면 None)
191 """
192 # HTTP 헤더는 대소문자 무관
193 return request.headers.get("X-Correlation-Id") or request.headers.get(
194 "Correlation-Id"
195 )
197 def _determine_host(self) -> str:
198 """
199 환경 기반 호스트 결정
201 우선순위:
202 1. 환경변수 {SERVICE}_GRPC_HOST (예: INDICATOR_GRPC_HOST)
203 2. Docker 환경: service_name (예: indicator-service)
204 3. 기본값: localhost
206 Returns:
207 결정된 호스트
208 """
209 # 환경변수로 서비스명 변환 (indicator-service -> INDICATOR)
210 env_key = (
211 self.service_name.upper().replace("-SERVICE", "").replace("-", "_")
212 + "_GRPC_HOST"
213 )
214 env_host = os.getenv(env_key)
215 if env_host:
216 return env_host
218 # Docker 환경 감지
219 if os.path.exists("/.dockerenv") or os.getenv("DOCKER_ENV"):
220 return self.service_name
222 # 기본값
223 return "localhost"
225 def _create_channel(self, **kwargs: Any) -> grpc.aio.Channel:
226 """
227 gRPC 채널 생성
229 Args:
230 **kwargs: 추가 채널 옵션
232 Returns:
233 생성된 gRPC 채널
234 """
235 # 기본 keepalive 옵션
236 default_options = [
237 ("grpc.keepalive_time_ms", 30000), # 30초마다 keepalive ping
238 ("grpc.keepalive_timeout_ms", 10000), # 10초 타임아웃
239 ("grpc.keepalive_permit_without_calls", True), # 활성 호출 없어도 ping 허용
240 ("grpc.http2.max_pings_without_data", 0), # ping 제한 없음
241 ]
243 # 사용자 정의 옵션과 병합
244 user_options = kwargs.pop("options", [])
245 merged_options = default_options + user_options
247 if self.use_tls:
248 credentials = grpc.ssl_channel_credentials()
249 return grpc.aio.secure_channel(
250 self.address,
251 credentials,
252 options=merged_options,
253 **kwargs,
254 )
255 else:
256 return grpc.aio.insecure_channel(
257 self.address,
258 options=merged_options,
259 **kwargs,
260 )
262 @property
263 def metadata(self) -> list[tuple[str, str]]:
264 """
265 gRPC 메타데이터 생성 (모든 요청에 자동 주입)
267 Returns:
268 메타데이터 튜플 리스트
270 Note:
271 - request_id는 매 호출마다 새로 생성됩니다.
272 - user_id가 없으면 빈 문자열이지만, 서버에서 UNAUTHENTICATED 오류 발생.
273 """
274 return [
275 (GRPC_METADATA_USER_ID, self.user_id),
276 (GRPC_METADATA_CORRELATION_ID, self.correlation_id),
277 (GRPC_METADATA_REQUEST_ID, str(uuid.uuid4())),
278 ]
280 async def close(self):
281 """채널 종료"""
282 if self.channel:
283 await self.channel.close()
284 logger.info(f"{self.__class__.__name__} channel closed")
286 async def __aenter__(self):
287 """비동기 컨텍스트 매니저 진입"""
288 return self
290 async def __aexit__(self, exc_type, exc_val, exc_tb):
291 """비동기 컨텍스트 매니저 종료"""
292 await self.close()