Coverage for src / mysingle / clients / base_grpc_client.py: 0%

70 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2Base gRPC Client 

3 

4모든 마이크로서비스 gRPC 클라이언트의 베이스 클래스 

5공통 기능: 채널 관리, 메타데이터 주입, 컨텍스트 매니저, 환경별 연결 설정 

6 

7Usage: 

8 ```python 

9 from mysingle.clients import BaseGrpcClient 

10 from app.grpc import my_service_pb2_grpc 

11 

12 class MyServiceGrpcClient(BaseGrpcClient): 

13 def __init__( 

14 self, 

15 user_id: str | None = None, 

16 correlation_id: str | None = None, 

17 **kwargs, 

18 ): 

19 super().__init__( 

20 service_name="my-service", 

21 default_port=50051, 

22 user_id=user_id, 

23 correlation_id=correlation_id, 

24 **kwargs, 

25 ) 

26 self.stub = my_service_pb2_grpc.MyServiceStub(self.channel) 

27 

28 async def get_data(self, request_id: str): 

29 request = my_service_pb2.GetDataRequest(id=request_id) 

30 response = await self.stub.GetData(request, metadata=self.metadata) 

31 return response 

32 

33 # 사용 1: 컨텍스트 매니저 

34 async with MyServiceGrpcClient(user_id="user123") as client: 

35 data = await client.get_data("req_id") 

36 

37 # 사용 2: 수동 연결 관리 

38 client = MyServiceGrpcClient(user_id="user123") 

39 try: 

40 data = await client.get_data("req_id") 

41 finally: 

42 await client.close() 

43 ``` 

44""" 

45 

46from __future__ import annotations 

47 

48import os 

49import uuid 

50from typing import TYPE_CHECKING, Any 

51 

52import grpc 

53 

54from mysingle.constants import ( 

55 GRPC_METADATA_CORRELATION_ID, 

56 GRPC_METADATA_REQUEST_ID, 

57 GRPC_METADATA_USER_ID, 

58) 

59from mysingle.core.logging import get_structured_logger 

60 

61if TYPE_CHECKING: 

62 from fastapi import Request 

63 

64logger = get_structured_logger(__name__) 

65 

66 

67class BaseGrpcClient: 

68 """ 

69 마이크로서비스 gRPC 클라이언트 베이스 클래스 

70 

71 Features: 

72 - 자동 채널 관리 (secure/insecure) 

73 - 메타데이터 자동 주입 (user_id, correlation_id, request_id) 

74 - 비동기 컨텍스트 매니저 지원 

75 - 환경별 연결 설정 (Docker 내부 vs 외부) 

76 - Keepalive 및 연결 옵션 표준화 

77 

78 Metadata Headers: 

79 - user-id: 사용자 식별자 (필수, 서버 인터셉터에서 검증) 

80 - correlation-id: 요청 추적 ID (자동 생성 또는 전파) 

81 - request-id: 개별 요청 ID (항상 자동 생성) 

82 

83 Environment Variables: 

84 - {SERVICE}_GRPC_HOST: 서비스별 호스트 오버라이드 

85 예) INDICATOR_GRPC_HOST=indicator-service 

86 - GRPC_USE_TLS: TLS 사용 여부 (기본값: false) 

87 """ 

88 

89 def __init__( 

90 self, 

91 service_name: str, 

92 default_port: int, 

93 host: str | None = None, 

94 user_id: str | None = None, 

95 correlation_id: str | None = None, 

96 request: "Request | None" = None, 

97 timeout: float = 10.0, 

98 use_tls: bool | None = None, 

99 **kwargs: Any, 

100 ): 

101 """ 

102 Args: 

103 service_name: 서비스 이름 (예: "indicator-service", "market-data-service") 

104 default_port: gRPC 포트 (예: 50052, 50053) 

105 host: 명시적 호스트 (없으면 자동 결정) 

106 user_id: 사용자 ID (request가 주어지면 자동 추출) 

107 correlation_id: 상관관계 ID (없으면 자동 생성) 

108 request: FastAPI Request 객체 (메타데이터 자동 추출) 

109 timeout: 요청 타임아웃 (초) 

110 use_tls: TLS 사용 여부 (None이면 환경변수 기반) 

111 **kwargs: grpc.aio.Channel 추가 옵션 

112 

113 Note: 

114 - request 객체가 제공되면 user_id와 correlation_id를 헤더에서 추출합니다. 

115 - user_id가 없으면 서버 AuthInterceptor에서 UNAUTHENTICATED 오류가 발생합니다. 

116 """ 

117 self.service_name = service_name 

118 self.default_port = default_port 

119 self.timeout = timeout 

120 

121 # User ID 결정: 명시적 user_id > request 헤더 

122 if user_id: 

123 self.user_id: str = user_id 

124 elif request: 

125 self.user_id = self._extract_user_id_from_request(request) 

126 else: 

127 self.user_id = "" 

128 

129 # Correlation ID 결정: 명시적 correlation_id > request 헤더 > 자동 생성 

130 if correlation_id: 

131 self.correlation_id: str = correlation_id 

132 elif request: 

133 extracted_cid = self._extract_correlation_id_from_request(request) 

134 self.correlation_id = extracted_cid if extracted_cid else str(uuid.uuid4()) 

135 else: 

136 self.correlation_id = str(uuid.uuid4()) 

137 

138 # 호스트 결정 

139 if host is None: 

140 host = self._determine_host() 

141 

142 self.host = host 

143 self.address = f"{host}:{default_port}" 

144 

145 # TLS 설정 결정 

146 if use_tls is None: 

147 use_tls = os.getenv("GRPC_USE_TLS", "false").lower() == "true" 

148 

149 self.use_tls = use_tls 

150 

151 # 채널 생성 

152 self.channel = self._create_channel(**kwargs) 

153 

154 logger.info( 

155 f"{self.__class__.__name__} initialized", 

156 address=self.address, 

157 tls=self.use_tls, 

158 timeout=timeout, 

159 user_id=self.user_id, 

160 correlation_id=self.correlation_id, 

161 ) 

162 

163 @staticmethod 

164 def _extract_user_id_from_request(request: "Request") -> str: 

165 """ 

166 FastAPI Request 객체에서 User ID 추출 

167 

168 Args: 

169 request: FastAPI Request 객체 

170 

171 Returns: 

172 추출된 User ID (없으면 빈 문자열) 

173 """ 

174 from mysingle.constants import HEADER_USER_ID 

175 

176 user_id = request.headers.get(HEADER_USER_ID, "") 

177 if not user_id: 

178 logger.warning("User ID not found in request headers") 

179 return user_id 

180 

181 @staticmethod 

182 def _extract_correlation_id_from_request(request: "Request") -> str | None: 

183 """ 

184 FastAPI Request 객체에서 Correlation ID 추출 

185 

186 Args: 

187 request: FastAPI Request 객체 

188 

189 Returns: 

190 추출된 Correlation ID (없으면 None) 

191 """ 

192 # HTTP 헤더는 대소문자 무관 

193 return request.headers.get("X-Correlation-Id") or request.headers.get( 

194 "Correlation-Id" 

195 ) 

196 

197 def _determine_host(self) -> str: 

198 """ 

199 환경 기반 호스트 결정 

200 

201 우선순위: 

202 1. 환경변수 {SERVICE}_GRPC_HOST (예: INDICATOR_GRPC_HOST) 

203 2. Docker 환경: service_name (예: indicator-service) 

204 3. 기본값: localhost 

205 

206 Returns: 

207 결정된 호스트 

208 """ 

209 # 환경변수로 서비스명 변환 (indicator-service -> INDICATOR) 

210 env_key = ( 

211 self.service_name.upper().replace("-SERVICE", "").replace("-", "_") 

212 + "_GRPC_HOST" 

213 ) 

214 env_host = os.getenv(env_key) 

215 if env_host: 

216 return env_host 

217 

218 # Docker 환경 감지 

219 if os.path.exists("/.dockerenv") or os.getenv("DOCKER_ENV"): 

220 return self.service_name 

221 

222 # 기본값 

223 return "localhost" 

224 

225 def _create_channel(self, **kwargs: Any) -> grpc.aio.Channel: 

226 """ 

227 gRPC 채널 생성 

228 

229 Args: 

230 **kwargs: 추가 채널 옵션 

231 

232 Returns: 

233 생성된 gRPC 채널 

234 """ 

235 # 기본 keepalive 옵션 

236 default_options = [ 

237 ("grpc.keepalive_time_ms", 30000), # 30초마다 keepalive ping 

238 ("grpc.keepalive_timeout_ms", 10000), # 10초 타임아웃 

239 ("grpc.keepalive_permit_without_calls", True), # 활성 호출 없어도 ping 허용 

240 ("grpc.http2.max_pings_without_data", 0), # ping 제한 없음 

241 ] 

242 

243 # 사용자 정의 옵션과 병합 

244 user_options = kwargs.pop("options", []) 

245 merged_options = default_options + user_options 

246 

247 if self.use_tls: 

248 credentials = grpc.ssl_channel_credentials() 

249 return grpc.aio.secure_channel( 

250 self.address, 

251 credentials, 

252 options=merged_options, 

253 **kwargs, 

254 ) 

255 else: 

256 return grpc.aio.insecure_channel( 

257 self.address, 

258 options=merged_options, 

259 **kwargs, 

260 ) 

261 

262 @property 

263 def metadata(self) -> list[tuple[str, str]]: 

264 """ 

265 gRPC 메타데이터 생성 (모든 요청에 자동 주입) 

266 

267 Returns: 

268 메타데이터 튜플 리스트 

269 

270 Note: 

271 - request_id는 매 호출마다 새로 생성됩니다. 

272 - user_id가 없으면 빈 문자열이지만, 서버에서 UNAUTHENTICATED 오류 발생. 

273 """ 

274 return [ 

275 (GRPC_METADATA_USER_ID, self.user_id), 

276 (GRPC_METADATA_CORRELATION_ID, self.correlation_id), 

277 (GRPC_METADATA_REQUEST_ID, str(uuid.uuid4())), 

278 ] 

279 

280 async def close(self): 

281 """채널 종료""" 

282 if self.channel: 

283 await self.channel.close() 

284 logger.info(f"{self.__class__.__name__} channel closed") 

285 

286 async def __aenter__(self): 

287 """비동기 컨텍스트 매니저 진입""" 

288 return self 

289 

290 async def __aexit__(self, exc_type, exc_val, exc_tb): 

291 """비동기 컨텍스트 매니저 종료""" 

292 await self.close()