Coverage for src / mysingle / core / http_client.py: 0%
93 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""
2표준 HTTP 클라이언트 (연결 풀링 지원)
3Standard HTTP Client with Connection Pooling
4"""
6from contextlib import asynccontextmanager
7from typing import Dict, Optional
9import httpx
11from .config import settings
12from .logging import get_structured_logger
14logger = get_structured_logger(__name__)
17class ServiceHttpClient:
18 """표준 HTTP 클라이언트 (연결 풀링 지원)"""
20 def __init__(
21 self,
22 base_url: str,
23 timeout: float = 30.0,
24 max_connections: int = 100,
25 max_keepalive_connections: int = 20,
26 headers: Optional[Dict[str, str]] = None,
27 service_name: Optional[str] = None,
28 ):
29 """
30 HTTP 클라이언트 초기화
32 Args:
33 base_url: 기본 URL
34 timeout: 요청 타임아웃 (초)
35 max_connections: 최대 연결 수
36 max_keepalive_connections: 최대 Keep-Alive 연결 수
37 headers: 기본 헤더
38 service_name: 서비스 이름 (로깅용)
39 """
40 self.base_url = base_url
41 self.service_name = service_name or "unknown"
43 # 기본 헤더 설정
44 default_headers = {
45 "User-Agent": f"mysingle-quant/{settings.PROJECT_NAME}",
46 "Accept": "application/json",
47 "Content-Type": "application/json",
48 }
50 if headers:
51 default_headers.update(headers)
53 # HTTP 클라이언트 생성
54 self.client = httpx.AsyncClient(
55 base_url=base_url,
56 timeout=httpx.Timeout(timeout),
57 headers=default_headers,
58 limits=httpx.Limits(
59 max_connections=max_connections,
60 max_keepalive_connections=max_keepalive_connections,
61 ),
62 follow_redirects=True,
63 )
65 logger.debug(
66 f"Created HTTP client for {self.service_name}: "
67 f"base_url={base_url}, timeout={timeout}s, "
68 f"max_conn={max_connections}, keepalive={max_keepalive_connections}"
69 )
71 async def close(self):
72 """클라이언트 연결 정리"""
73 if hasattr(self, "client") and self.client:
74 await self.client.aclose()
75 logger.debug(f"Closed HTTP client for {self.service_name}")
77 # 컨텍스트 매니저 지원
78 async def __aenter__(self):
79 return self
81 async def __aexit__(self, exc_type, exc_val, exc_tb):
82 await self.close()
84 # HTTP 메서드 래퍼들
85 async def get(self, url: str, **kwargs) -> httpx.Response:
86 """GET 요청"""
87 return await self.client.get(url, **kwargs)
89 async def post(self, url: str, **kwargs) -> httpx.Response:
90 """POST 요청"""
91 return await self.client.post(url, **kwargs)
93 async def put(self, url: str, **kwargs) -> httpx.Response:
94 """PUT 요청"""
95 return await self.client.put(url, **kwargs)
97 async def patch(self, url: str, **kwargs) -> httpx.Response:
98 """PATCH 요청"""
99 return await self.client.patch(url, **kwargs)
101 async def delete(self, url: str, **kwargs) -> httpx.Response:
102 """DELETE 요청"""
103 return await self.client.delete(url, **kwargs)
105 async def request(self, method: str, url: str, **kwargs) -> httpx.Response:
106 """일반 요청 메서드"""
107 return await self.client.request(method, url, **kwargs)
110class ServiceHttpClientManager:
111 """HTTP 클라이언트 매니저 (싱글톤 패턴)"""
113 _instances: Dict[str, ServiceHttpClient] = {}
115 @classmethod
116 def get_client(
117 cls, service_name: str, base_url: Optional[str] = None, **kwargs
118 ) -> ServiceHttpClient:
119 """서비스별 HTTP 클라이언트 획득 (재사용)"""
121 if service_name in cls._instances:
122 return cls._instances[service_name]
124 # base_url 자동 구성
125 if not base_url:
126 base_url = cls._build_service_url(service_name)
128 # 클라이언트 생성 및 캐시
129 client = ServiceHttpClient(
130 base_url=base_url, service_name=service_name, **kwargs
131 )
133 cls._instances[service_name] = client
134 logger.info(f"Created new HTTP client for service: {service_name}")
136 return client
138 @classmethod
139 def _build_service_url(cls, service_name: str) -> str:
140 """서비스명으로부터 URL 자동 구성"""
141 # 서비스명 정규화 (언더스코어 → 하이픈)
142 normalized_name = service_name.replace("_", "-").replace("-service", "")
144 if settings.USE_API_GATEWAY:
145 # API Gateway 경로
146 return f"{settings.API_GATEWAY_URL}/{normalized_name}"
147 else:
148 # 직접 연결 (개발 환경)
149 port_mapping = {
150 "iam": 8001,
151 "journey": 8002,
152 "strategy": 8003,
153 "backtest": 8004,
154 "optimization": 8005,
155 "dashboard": 8006,
156 "notification": 8007,
157 "market-data": 8008,
158 "gen-ai": 8009,
159 "ml": 8010,
160 }
161 port = port_mapping.get(normalized_name, 8000)
162 return f"http://localhost:{port}"
164 @classmethod
165 async def close_all(cls):
166 """모든 클라이언트 연결 정리"""
167 for service_name, client in cls._instances.items():
168 try:
169 await client.close()
170 logger.debug(f"Closed HTTP client for {service_name}")
171 except Exception as e:
172 logger.error(f"Error closing HTTP client for {service_name}: {e}")
174 cls._instances.clear()
175 logger.info("All HTTP clients closed")
178# Factory 함수들
179def create_service_http_client(
180 service_name: str,
181 base_url: Optional[str] = None,
182 headers: Optional[Dict[str, str]] = None,
183 timeout: float = 30.0,
184 max_connections: int = 100,
185 max_keepalive_connections: int = 20,
186) -> ServiceHttpClient:
187 """서비스별 HTTP 클라이언트 생성 (일회성)"""
189 if not base_url:
190 base_url = ServiceHttpClientManager._build_service_url(service_name)
192 # X-Service-Name 헤더 자동 추가
193 default_headers = {"X-Service-Name": service_name}
194 if headers:
195 default_headers.update(headers)
197 return ServiceHttpClient(
198 base_url=base_url,
199 headers=default_headers,
200 service_name=service_name,
201 timeout=timeout,
202 max_connections=max_connections,
203 max_keepalive_connections=max_keepalive_connections,
204 )
207def get_service_http_client(
208 service_name: str, base_url: Optional[str] = None, **kwargs
209) -> ServiceHttpClient:
210 """서비스별 HTTP 클라이언트 획득 (재사용/싱글톤)"""
211 return ServiceHttpClientManager.get_client(
212 service_name=service_name, base_url=base_url, **kwargs
213 )
216@asynccontextmanager
217async def http_client_lifespan():
218 """HTTP 클라이언트 생명주기 관리"""
219 try:
220 logger.info("🌐 HTTP client manager initialized")
221 yield ServiceHttpClientManager
222 finally:
223 await ServiceHttpClientManager.close_all()
224 logger.info("🌐 HTTP client manager shutdown completed")
227# 환경 설정 기반 기본값들
228class HttpClientConfig:
229 """HTTP 클라이언트 설정"""
231 # 환경 변수로 오버라이드 가능한 기본값들
232 DEFAULT_TIMEOUT: float = float(getattr(settings, "HTTP_CLIENT_TIMEOUT", 30.0))
233 DEFAULT_MAX_CONNECTIONS: int = int(
234 getattr(settings, "HTTP_CLIENT_MAX_CONNECTIONS", 100)
235 )
236 DEFAULT_MAX_KEEPALIVE: int = int(getattr(settings, "HTTP_CLIENT_MAX_KEEPALIVE", 20))
238 # 재시도 설정
239 DEFAULT_MAX_RETRIES: int = int(getattr(settings, "HTTP_CLIENT_MAX_RETRIES", 3))
240 DEFAULT_RETRY_DELAY: float = float(
241 getattr(settings, "HTTP_CLIENT_RETRY_DELAY", 1.0)
242 )
245# 편의 함수들
246async def make_service_request(
247 service_name: str,
248 method: str,
249 endpoint: str,
250 headers: Optional[Dict[str, str]] = None,
251 **kwargs,
252) -> httpx.Response:
253 """서비스 요청 편의 함수"""
254 client = get_service_http_client(service_name)
256 # 추가 헤더 병합
257 if headers:
258 request_headers = {**client.client.headers, **headers}
259 kwargs["headers"] = request_headers
261 return await client.request(method, endpoint, **kwargs)