Coverage for src / mysingle / core / http_client.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2표준 HTTP 클라이언트 (연결 풀링 지원) 

3Standard HTTP Client with Connection Pooling 

4""" 

5 

6from contextlib import asynccontextmanager 

7from typing import Dict, Optional 

8 

9import httpx 

10 

11from .config import settings 

12from .logging import get_structured_logger 

13 

14logger = get_structured_logger(__name__) 

15 

16 

17class ServiceHttpClient: 

18 """표준 HTTP 클라이언트 (연결 풀링 지원)""" 

19 

20 def __init__( 

21 self, 

22 base_url: str, 

23 timeout: float = 30.0, 

24 max_connections: int = 100, 

25 max_keepalive_connections: int = 20, 

26 headers: Optional[Dict[str, str]] = None, 

27 service_name: Optional[str] = None, 

28 ): 

29 """ 

30 HTTP 클라이언트 초기화 

31 

32 Args: 

33 base_url: 기본 URL 

34 timeout: 요청 타임아웃 (초) 

35 max_connections: 최대 연결 수 

36 max_keepalive_connections: 최대 Keep-Alive 연결 수 

37 headers: 기본 헤더 

38 service_name: 서비스 이름 (로깅용) 

39 """ 

40 self.base_url = base_url 

41 self.service_name = service_name or "unknown" 

42 

43 # 기본 헤더 설정 

44 default_headers = { 

45 "User-Agent": f"mysingle-quant/{settings.PROJECT_NAME}", 

46 "Accept": "application/json", 

47 "Content-Type": "application/json", 

48 } 

49 

50 if headers: 

51 default_headers.update(headers) 

52 

53 # HTTP 클라이언트 생성 

54 self.client = httpx.AsyncClient( 

55 base_url=base_url, 

56 timeout=httpx.Timeout(timeout), 

57 headers=default_headers, 

58 limits=httpx.Limits( 

59 max_connections=max_connections, 

60 max_keepalive_connections=max_keepalive_connections, 

61 ), 

62 follow_redirects=True, 

63 ) 

64 

65 logger.debug( 

66 f"Created HTTP client for {self.service_name}: " 

67 f"base_url={base_url}, timeout={timeout}s, " 

68 f"max_conn={max_connections}, keepalive={max_keepalive_connections}" 

69 ) 

70 

71 async def close(self): 

72 """클라이언트 연결 정리""" 

73 if hasattr(self, "client") and self.client: 

74 await self.client.aclose() 

75 logger.debug(f"Closed HTTP client for {self.service_name}") 

76 

77 # 컨텍스트 매니저 지원 

78 async def __aenter__(self): 

79 return self 

80 

81 async def __aexit__(self, exc_type, exc_val, exc_tb): 

82 await self.close() 

83 

84 # HTTP 메서드 래퍼들 

85 async def get(self, url: str, **kwargs) -> httpx.Response: 

86 """GET 요청""" 

87 return await self.client.get(url, **kwargs) 

88 

89 async def post(self, url: str, **kwargs) -> httpx.Response: 

90 """POST 요청""" 

91 return await self.client.post(url, **kwargs) 

92 

93 async def put(self, url: str, **kwargs) -> httpx.Response: 

94 """PUT 요청""" 

95 return await self.client.put(url, **kwargs) 

96 

97 async def patch(self, url: str, **kwargs) -> httpx.Response: 

98 """PATCH 요청""" 

99 return await self.client.patch(url, **kwargs) 

100 

101 async def delete(self, url: str, **kwargs) -> httpx.Response: 

102 """DELETE 요청""" 

103 return await self.client.delete(url, **kwargs) 

104 

105 async def request(self, method: str, url: str, **kwargs) -> httpx.Response: 

106 """일반 요청 메서드""" 

107 return await self.client.request(method, url, **kwargs) 

108 

109 

110class ServiceHttpClientManager: 

111 """HTTP 클라이언트 매니저 (싱글톤 패턴)""" 

112 

113 _instances: Dict[str, ServiceHttpClient] = {} 

114 

115 @classmethod 

116 def get_client( 

117 cls, service_name: str, base_url: Optional[str] = None, **kwargs 

118 ) -> ServiceHttpClient: 

119 """서비스별 HTTP 클라이언트 획득 (재사용)""" 

120 

121 if service_name in cls._instances: 

122 return cls._instances[service_name] 

123 

124 # base_url 자동 구성 

125 if not base_url: 

126 base_url = cls._build_service_url(service_name) 

127 

128 # 클라이언트 생성 및 캐시 

129 client = ServiceHttpClient( 

130 base_url=base_url, service_name=service_name, **kwargs 

131 ) 

132 

133 cls._instances[service_name] = client 

134 logger.info(f"Created new HTTP client for service: {service_name}") 

135 

136 return client 

137 

138 @classmethod 

139 def _build_service_url(cls, service_name: str) -> str: 

140 """서비스명으로부터 URL 자동 구성""" 

141 # 서비스명 정규화 (언더스코어 → 하이픈) 

142 normalized_name = service_name.replace("_", "-").replace("-service", "") 

143 

144 if settings.USE_API_GATEWAY: 

145 # API Gateway 경로 

146 return f"{settings.API_GATEWAY_URL}/{normalized_name}" 

147 else: 

148 # 직접 연결 (개발 환경) 

149 port_mapping = { 

150 "iam": 8001, 

151 "journey": 8002, 

152 "strategy": 8003, 

153 "backtest": 8004, 

154 "optimization": 8005, 

155 "dashboard": 8006, 

156 "notification": 8007, 

157 "market-data": 8008, 

158 "gen-ai": 8009, 

159 "ml": 8010, 

160 } 

161 port = port_mapping.get(normalized_name, 8000) 

162 return f"http://localhost:{port}" 

163 

164 @classmethod 

165 async def close_all(cls): 

166 """모든 클라이언트 연결 정리""" 

167 for service_name, client in cls._instances.items(): 

168 try: 

169 await client.close() 

170 logger.debug(f"Closed HTTP client for {service_name}") 

171 except Exception as e: 

172 logger.error(f"Error closing HTTP client for {service_name}: {e}") 

173 

174 cls._instances.clear() 

175 logger.info("All HTTP clients closed") 

176 

177 

178# Factory 함수들 

179def create_service_http_client( 

180 service_name: str, 

181 base_url: Optional[str] = None, 

182 headers: Optional[Dict[str, str]] = None, 

183 timeout: float = 30.0, 

184 max_connections: int = 100, 

185 max_keepalive_connections: int = 20, 

186) -> ServiceHttpClient: 

187 """서비스별 HTTP 클라이언트 생성 (일회성)""" 

188 

189 if not base_url: 

190 base_url = ServiceHttpClientManager._build_service_url(service_name) 

191 

192 # X-Service-Name 헤더 자동 추가 

193 default_headers = {"X-Service-Name": service_name} 

194 if headers: 

195 default_headers.update(headers) 

196 

197 return ServiceHttpClient( 

198 base_url=base_url, 

199 headers=default_headers, 

200 service_name=service_name, 

201 timeout=timeout, 

202 max_connections=max_connections, 

203 max_keepalive_connections=max_keepalive_connections, 

204 ) 

205 

206 

207def get_service_http_client( 

208 service_name: str, base_url: Optional[str] = None, **kwargs 

209) -> ServiceHttpClient: 

210 """서비스별 HTTP 클라이언트 획득 (재사용/싱글톤)""" 

211 return ServiceHttpClientManager.get_client( 

212 service_name=service_name, base_url=base_url, **kwargs 

213 ) 

214 

215 

216@asynccontextmanager 

217async def http_client_lifespan(): 

218 """HTTP 클라이언트 생명주기 관리""" 

219 try: 

220 logger.info("🌐 HTTP client manager initialized") 

221 yield ServiceHttpClientManager 

222 finally: 

223 await ServiceHttpClientManager.close_all() 

224 logger.info("🌐 HTTP client manager shutdown completed") 

225 

226 

227# 환경 설정 기반 기본값들 

228class HttpClientConfig: 

229 """HTTP 클라이언트 설정""" 

230 

231 # 환경 변수로 오버라이드 가능한 기본값들 

232 DEFAULT_TIMEOUT: float = float(getattr(settings, "HTTP_CLIENT_TIMEOUT", 30.0)) 

233 DEFAULT_MAX_CONNECTIONS: int = int( 

234 getattr(settings, "HTTP_CLIENT_MAX_CONNECTIONS", 100) 

235 ) 

236 DEFAULT_MAX_KEEPALIVE: int = int(getattr(settings, "HTTP_CLIENT_MAX_KEEPALIVE", 20)) 

237 

238 # 재시도 설정 

239 DEFAULT_MAX_RETRIES: int = int(getattr(settings, "HTTP_CLIENT_MAX_RETRIES", 3)) 

240 DEFAULT_RETRY_DELAY: float = float( 

241 getattr(settings, "HTTP_CLIENT_RETRY_DELAY", 1.0) 

242 ) 

243 

244 

245# 편의 함수들 

246async def make_service_request( 

247 service_name: str, 

248 method: str, 

249 endpoint: str, 

250 headers: Optional[Dict[str, str]] = None, 

251 **kwargs, 

252) -> httpx.Response: 

253 """서비스 요청 편의 함수""" 

254 client = get_service_http_client(service_name) 

255 

256 # 추가 헤더 병합 

257 if headers: 

258 request_headers = {**client.client.headers, **headers} 

259 kwargs["headers"] = request_headers 

260 

261 return await client.request(method, endpoint, **kwargs)