Coverage for src/usaspending/client.py: 47%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 17:15 -0700

1"""Main USASpending client.""" 

2 

3from __future__ import annotations 

4import time 

5from typing import Optional, Dict, Any, TYPE_CHECKING 

6from urllib.parse import urljoin 

7 

8import requests 

9import cachier 

10 

11from .config import config 

12from .exceptions import HTTPError, APIError 

13from .logging_config import USASpendingLogger, log_api_request, log_api_response 

14 

15if TYPE_CHECKING: 

16 from .resources.base_resource import BaseResource 

17 from .resources.award_resource import AwardResource 

18 from .resources.transactions_resource import TransactionsResource 

19 from .resources.recipients_resource import RecipientsResource 

20 from .resources.spending_resource import SpendingResource 

21 from .resources.funding_resource import FundingResource 

22 from .resources.download_resource import DownloadResource 

23 from .resources.subawards_resource import SubAwardsResource 

24 from .resources.agency_resource import AgencyResource 

25 from .utils.rate_limit import RateLimiter 

26 from .utils.retry import RetryHandler 

27 

28logger = USASpendingLogger.get_logger(__name__) 

29 

30 

31class USASpending: 

32 """Main client for USASpending API. 

33 

34 This client provides a centralized interface to the USASpending.gov API 

35 with automatic retry, rate limiting, and caching capabilities. 

36 

37 Example: 

38 >>> client = USASpending() 

39 >>> awards = client.awards.search().for_agency("NASA").limit(10) 

40 >>> for award in awards: 

41 ... print(f"{award.recipient_name}: ${award.amount:,.2f}") 

42 """ 

43 

44 def __init__(self): 

45 """Initialize USASpending client.""" 

46 

47 logger.debug( 

48 f"Initializing USASpending client with base URL: {config.base_url}" 

49 ) 

50 

51 # Initialize HTTP session 

52 self._session = self._create_session() 

53 

54 # Lazy-loaded components 

55 self._rate_limiter: Optional[RateLimiter] = None 

56 self._retry_handler: Optional[RetryHandler] = None 

57 

58 # Resource cache 

59 self._resources: Dict[str, BaseResource] = {} 

60 

61 logger.debug("USASpending client initialized successfully") 

62 

63 def _create_session(self) -> requests.Session: 

64 """Create configured requests session.""" 

65 session = requests.Session() 

66 session.headers.update( 

67 { 

68 "User-Agent": config.user_agent, 

69 "Accept": "application/json", 

70 "Content-Type": "application/json", 

71 } 

72 ) 

73 return session 

74 

75 @property 

76 def rate_limiter(self) -> RateLimiter: 

77 """Get rate limiter (lazy-loaded).""" 

78 if self._rate_limiter is None: 

79 from .utils.rate_limit import RateLimiter 

80 

81 self._rate_limiter = RateLimiter( 

82 config.rate_limit_calls, config.rate_limit_period 

83 ) 

84 return self._rate_limiter 

85 

86 @property 

87 def retry_handler(self) -> RetryHandler: 

88 """Get retry handler (lazy-loaded).""" 

89 if self._retry_handler is None: 

90 from .utils.retry import RetryHandler 

91 

92 self._retry_handler = RetryHandler() 

93 return self._retry_handler 

94 

95 @property 

96 def awards(self) -> "AwardResource": 

97 """Access award endpoints.""" 

98 if "awards" not in self._resources: 

99 from .resources.award_resource import AwardResource 

100 

101 self._resources["awards"] = AwardResource(self) 

102 return self._resources["awards"] 

103 

104 @property 

105 def downloads(self) -> "DownloadResource": 

106 """ 

107 Access download operations for detailed award data. 

108  

109 Allows queuing, monitoring, and retrieval of bulk award files. 

110 """ 

111 if "downloads" not in self._resources: 

112 from .resources.download_resource import DownloadResource 

113 

114 self._resources["downloads"] = DownloadResource(self) 

115 return self._resources["downloads"] 

116 

117 @property 

118 def recipients(self) -> "RecipientsResource": 

119 """Access recipient endpoints.""" 

120 if "recipients" not in self._resources: 

121 from .resources.recipients_resource import RecipientsResource 

122 

123 self._resources["recipients"] = RecipientsResource(self) 

124 return self._resources["recipients"] 

125 

126 @property 

127 def transactions(self) -> "TransactionsResource": 

128 """Access transaction endpoints.""" 

129 if "transactions" not in self._resources: 

130 from .resources.transactions_resource import TransactionsResource 

131 

132 self._resources["transactions"] = TransactionsResource(self) 

133 return self._resources["transactions"] 

134 

135 @property 

136 def spending(self) -> "SpendingResource": 

137 """Access spending by category endpoints.""" 

138 if "spending" not in self._resources: 

139 from .resources.spending_resource import SpendingResource 

140 

141 self._resources["spending"] = SpendingResource(self) 

142 return self._resources["spending"] 

143 

144 @property 

145 def funding(self) -> "FundingResource": 

146 """Access funding endpoints.""" 

147 if "funding" not in self._resources: 

148 from .resources.funding_resource import FundingResource 

149 

150 self._resources["funding"] = FundingResource(self) 

151 return self._resources["funding"] 

152 

153 @property 

154 def subawards(self) -> "SubAwardsResource": 

155 """Access subaward endpoints.""" 

156 if "subawards" not in self._resources: 

157 from .resources.subawards_resource import SubAwardsResource 

158 

159 self._resources["subawards"] = SubAwardsResource(self) 

160 return self._resources["subawards"] 

161 

162 @property 

163 def agencies(self) -> "AgencyResource": 

164 """Access agency endpoints.""" 

165 if "agencies" not in self._resources: 

166 from .resources.agency_resource import AgencyResource 

167 

168 self._resources["agencies"] = AgencyResource(self) 

169 return self._resources["agencies"] 

170 

171 @cachier.cachier() 

172 def _make_request( 

173 self, 

174 method: str, 

175 endpoint: str, 

176 params: Optional[Dict[str, Any]] = None, 

177 json: Optional[Dict[str, Any]] = None, 

178 **kwargs, 

179 ) -> Dict[str, Any]: 

180 """A cacheable HTTP request method that won't trigger rate-limiting or other unnecessary behaviors""" 

181 return self._make_uncached_request( 

182 method, endpoint, params=params, json=json, **kwargs 

183 ) 

184 

185 def _make_uncached_request( 

186 self, 

187 method: str, 

188 endpoint: str, 

189 params: Optional[Dict[str, Any]] = None, 

190 json: Optional[Dict[str, Any]] = None, 

191 **kwargs, 

192 ) -> Dict[str, Any]: 

193 """Make HTTP request with retry and rate limiting. 

194 

195 Args: 

196 method: HTTP method (GET, POST, etc.) 

197 endpoint: API endpoint (without base URL) 

198 params: Query parameters 

199 json: JSON body for POST requests 

200 **kwargs: Additional arguments for requests 

201 

202 Returns: 

203 Response data as dictionary 

204 

205 Raises: 

206 HTTPError: For HTTP errors 

207 APIError: For API-reported errors 

208 RateLimitError: When rate limited 

209 """ 

210 # Apply rate limiting 

211 self.rate_limiter.wait_if_needed() 

212 

213 # Build full URL 

214 url = urljoin(config.base_url, endpoint.lstrip("/")) 

215 

216 # Log API request 

217 log_api_request(logger, method, url, params, json) 

218 

219 # Prepare request 

220 request_kwargs = { 

221 "method": method, 

222 "url": url, 

223 "params": params, 

224 "json": json, 

225 "timeout": config.timeout, 

226 **kwargs, 

227 } 

228 

229 # Track request timing 

230 start_time = time.time() 

231 

232 try: 

233 # Make request with retry 

234 response = self.retry_handler.execute( 

235 self._session.request, **request_kwargs 

236 ) 

237 

238 # Calculate duration 

239 duration = time.time() - start_time 

240 

241 # Handle specific 400 Bad Request responses first 

242 if response.status_code == 400: 

243 try: 

244 data = response.json() 

245 # Use "detail" property if available, otherwise fall back to generic message 

246 error_msg = ( 

247 data.get("detail") 

248 or data.get("error") 

249 or data.get("message") 

250 or "Bad Request" 

251 ) 

252 log_api_response( 

253 logger, 

254 response.status_code, 

255 len(response.content) if response.content else None, 

256 duration, 

257 error_msg, 

258 ) 

259 raise APIError(error_msg, status_code=400, response_body=data) 

260 except ValueError: 

261 # If JSON parsing fails, use generic 400 error 

262 error_msg = "Bad Request - Invalid JSON response" 

263 log_api_response( 

264 logger, 

265 response.status_code, 

266 len(response.content) if response.content else None, 

267 duration, 

268 error_msg, 

269 ) 

270 raise APIError(error_msg, status_code=400) 

271 

272 # Handle other HTTP errors 

273 try: 

274 response.raise_for_status() 

275 except requests.HTTPError as e: 

276 log_api_response( 

277 logger, 

278 response.status_code, 

279 len(response.content) if response.content else None, 

280 duration, 

281 str(e), 

282 ) 

283 raise HTTPError( 

284 f"HTTP {response.status_code}: {e}", 

285 status_code=response.status_code, 

286 ) 

287 

288 # Parse JSON response 

289 try: 

290 data = response.json() 

291 except ValueError as e: 

292 log_api_response( 

293 logger, 

294 response.status_code, 

295 len(response.content) if response.content else None, 

296 duration, 

297 f"Invalid JSON: {e}", 

298 ) 

299 raise APIError(f"Invalid JSON response: {e}") 

300 

301 # Check for API errors (fallback for other error patterns) 

302 # Note: Don't treat "message" alone as an error indicator since some endpoints 

303 # (like download/status) include message as a normal response field 

304 if "error" in data: 

305 error_msg = data.get("error") or data.get("message") or "Unknown API error" 

306 log_api_response( 

307 logger, 

308 response.status_code, 

309 len(response.content) if response.content else None, 

310 duration, 

311 error_msg, 

312 ) 

313 raise APIError( 

314 error_msg, status_code=response.status_code, response_body=data 

315 ) 

316 

317 # Log messages from successful responses (200 status code) 

318 if response.status_code == 200 and "messages" in data: 

319 messages = data["messages"] 

320 if isinstance(messages, list): 

321 for msg in messages: 

322 logger.info(f"API Message: {msg}") 

323 else: 

324 logger.info(f"API Message: {messages}") 

325 

326 # Log successful response 

327 log_api_response( 

328 logger, 

329 response.status_code, 

330 len(response.content) if response.content else None, 

331 duration, 

332 ) 

333 

334 return data 

335 

336 except Exception as e: 

337 # Log any unexpected errors 

338 if "response" in locals(): 

339 log_api_response( 

340 logger, 

341 getattr(response, "status_code", 0), 

342 None, 

343 time.time() - start_time, 

344 str(e), 

345 ) 

346 else: 

347 logger.error(f"Request failed before response: {e}") 

348 raise 

349 

350 def _download_binary_file(self, file_url: str, destination_path: str) -> None: 

351 """Download binary file using client session with streaming support. 

352  

353 This method is used internally for downloading large binary files 

354 like the ZIP archives from the download endpoints. 

355  

356 Args: 

357 file_url: Relative or absolute URL to download 

358 destination_path: Local path where file will be saved 

359  

360 Raises: 

361 DownloadError: If download fails 

362 """ 

363 import os 

364 from .exceptions import DownloadError 

365 

366 # Construct full URL 

367 if file_url.startswith('http'): 

368 download_url = file_url 

369 else: 

370 download_url = urljoin(config.base_url, file_url.lstrip('/')) 

371 

372 logger.info(f"Downloading binary file from {download_url}") 

373 

374 # Use a longer timeout for file downloads 

375 timeout = 600 # 10 minutes 

376 

377 def download_operation(): 

378 """Inner function for retry handler.""" 

379 response = self._session.get( 

380 download_url, 

381 stream=True, 

382 timeout=timeout 

383 ) 

384 response.raise_for_status() 

385 

386 try: 

387 with open(destination_path, 'wb') as f: 

388 for chunk in response.iter_content(chunk_size=8192): 

389 if chunk: 

390 f.write(chunk) 

391 except IOError as e: 

392 raise DownloadError( 

393 f"Error writing file to disk: {e}", 

394 file_name=os.path.basename(destination_path) 

395 ) from e 

396 

397 try: 

398 # Execute with retry handling 

399 self.retry_handler.execute(download_operation) 

400 logger.info(f"Successfully downloaded to {destination_path}") 

401 

402 except Exception as e: 

403 logger.error(f"Failed to download file: {e}") 

404 # Clean up partial file if it exists 

405 if os.path.exists(destination_path): 

406 try: 

407 os.remove(destination_path) 

408 logger.debug(f"Cleaned up partial file: {destination_path}") 

409 except OSError: 

410 pass 

411 

412 # Re-raise as DownloadError if not already 

413 if not isinstance(e, DownloadError): 

414 raise DownloadError( 

415 f"Failed to download file from {download_url}", 

416 file_name=os.path.basename(destination_path) 

417 ) from e 

418 raise 

419 

420 def close(self) -> None: 

421 """Close client and cleanup resources.""" 

422 if self._session: 

423 self._session.close() 

424 logger.debug("USASpending client closed")