Coverage for src/usaspending/client.py: 47%
167 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
1"""Main USASpending client."""
3from __future__ import annotations
4import time
5from typing import Optional, Dict, Any, TYPE_CHECKING
6from urllib.parse import urljoin
8import requests
9import cachier
11from .config import config
12from .exceptions import HTTPError, APIError
13from .logging_config import USASpendingLogger, log_api_request, log_api_response
15if TYPE_CHECKING:
16 from .resources.base_resource import BaseResource
17 from .resources.award_resource import AwardResource
18 from .resources.transactions_resource import TransactionsResource
19 from .resources.recipients_resource import RecipientsResource
20 from .resources.spending_resource import SpendingResource
21 from .resources.funding_resource import FundingResource
22 from .resources.download_resource import DownloadResource
23 from .resources.subawards_resource import SubAwardsResource
24 from .resources.agency_resource import AgencyResource
25 from .utils.rate_limit import RateLimiter
26 from .utils.retry import RetryHandler
28logger = USASpendingLogger.get_logger(__name__)
31class USASpending:
32 """Main client for USASpending API.
34 This client provides a centralized interface to the USASpending.gov API
35 with automatic retry, rate limiting, and caching capabilities.
37 Example:
38 >>> client = USASpending()
39 >>> awards = client.awards.search().for_agency("NASA").limit(10)
40 >>> for award in awards:
41 ... print(f"{award.recipient_name}: ${award.amount:,.2f}")
42 """
44 def __init__(self):
45 """Initialize USASpending client."""
47 logger.debug(
48 f"Initializing USASpending client with base URL: {config.base_url}"
49 )
51 # Initialize HTTP session
52 self._session = self._create_session()
54 # Lazy-loaded components
55 self._rate_limiter: Optional[RateLimiter] = None
56 self._retry_handler: Optional[RetryHandler] = None
58 # Resource cache
59 self._resources: Dict[str, BaseResource] = {}
61 logger.debug("USASpending client initialized successfully")
63 def _create_session(self) -> requests.Session:
64 """Create configured requests session."""
65 session = requests.Session()
66 session.headers.update(
67 {
68 "User-Agent": config.user_agent,
69 "Accept": "application/json",
70 "Content-Type": "application/json",
71 }
72 )
73 return session
75 @property
76 def rate_limiter(self) -> RateLimiter:
77 """Get rate limiter (lazy-loaded)."""
78 if self._rate_limiter is None:
79 from .utils.rate_limit import RateLimiter
81 self._rate_limiter = RateLimiter(
82 config.rate_limit_calls, config.rate_limit_period
83 )
84 return self._rate_limiter
86 @property
87 def retry_handler(self) -> RetryHandler:
88 """Get retry handler (lazy-loaded)."""
89 if self._retry_handler is None:
90 from .utils.retry import RetryHandler
92 self._retry_handler = RetryHandler()
93 return self._retry_handler
95 @property
96 def awards(self) -> "AwardResource":
97 """Access award endpoints."""
98 if "awards" not in self._resources:
99 from .resources.award_resource import AwardResource
101 self._resources["awards"] = AwardResource(self)
102 return self._resources["awards"]
104 @property
105 def downloads(self) -> "DownloadResource":
106 """
107 Access download operations for detailed award data.
109 Allows queuing, monitoring, and retrieval of bulk award files.
110 """
111 if "downloads" not in self._resources:
112 from .resources.download_resource import DownloadResource
114 self._resources["downloads"] = DownloadResource(self)
115 return self._resources["downloads"]
117 @property
118 def recipients(self) -> "RecipientsResource":
119 """Access recipient endpoints."""
120 if "recipients" not in self._resources:
121 from .resources.recipients_resource import RecipientsResource
123 self._resources["recipients"] = RecipientsResource(self)
124 return self._resources["recipients"]
126 @property
127 def transactions(self) -> "TransactionsResource":
128 """Access transaction endpoints."""
129 if "transactions" not in self._resources:
130 from .resources.transactions_resource import TransactionsResource
132 self._resources["transactions"] = TransactionsResource(self)
133 return self._resources["transactions"]
135 @property
136 def spending(self) -> "SpendingResource":
137 """Access spending by category endpoints."""
138 if "spending" not in self._resources:
139 from .resources.spending_resource import SpendingResource
141 self._resources["spending"] = SpendingResource(self)
142 return self._resources["spending"]
144 @property
145 def funding(self) -> "FundingResource":
146 """Access funding endpoints."""
147 if "funding" not in self._resources:
148 from .resources.funding_resource import FundingResource
150 self._resources["funding"] = FundingResource(self)
151 return self._resources["funding"]
153 @property
154 def subawards(self) -> "SubAwardsResource":
155 """Access subaward endpoints."""
156 if "subawards" not in self._resources:
157 from .resources.subawards_resource import SubAwardsResource
159 self._resources["subawards"] = SubAwardsResource(self)
160 return self._resources["subawards"]
162 @property
163 def agencies(self) -> "AgencyResource":
164 """Access agency endpoints."""
165 if "agencies" not in self._resources:
166 from .resources.agency_resource import AgencyResource
168 self._resources["agencies"] = AgencyResource(self)
169 return self._resources["agencies"]
171 @cachier.cachier()
172 def _make_request(
173 self,
174 method: str,
175 endpoint: str,
176 params: Optional[Dict[str, Any]] = None,
177 json: Optional[Dict[str, Any]] = None,
178 **kwargs,
179 ) -> Dict[str, Any]:
180 """A cacheable HTTP request method that won't trigger rate-limiting or other unnecessary behaviors"""
181 return self._make_uncached_request(
182 method, endpoint, params=params, json=json, **kwargs
183 )
185 def _make_uncached_request(
186 self,
187 method: str,
188 endpoint: str,
189 params: Optional[Dict[str, Any]] = None,
190 json: Optional[Dict[str, Any]] = None,
191 **kwargs,
192 ) -> Dict[str, Any]:
193 """Make HTTP request with retry and rate limiting.
195 Args:
196 method: HTTP method (GET, POST, etc.)
197 endpoint: API endpoint (without base URL)
198 params: Query parameters
199 json: JSON body for POST requests
200 **kwargs: Additional arguments for requests
202 Returns:
203 Response data as dictionary
205 Raises:
206 HTTPError: For HTTP errors
207 APIError: For API-reported errors
208 RateLimitError: When rate limited
209 """
210 # Apply rate limiting
211 self.rate_limiter.wait_if_needed()
213 # Build full URL
214 url = urljoin(config.base_url, endpoint.lstrip("/"))
216 # Log API request
217 log_api_request(logger, method, url, params, json)
219 # Prepare request
220 request_kwargs = {
221 "method": method,
222 "url": url,
223 "params": params,
224 "json": json,
225 "timeout": config.timeout,
226 **kwargs,
227 }
229 # Track request timing
230 start_time = time.time()
232 try:
233 # Make request with retry
234 response = self.retry_handler.execute(
235 self._session.request, **request_kwargs
236 )
238 # Calculate duration
239 duration = time.time() - start_time
241 # Handle specific 400 Bad Request responses first
242 if response.status_code == 400:
243 try:
244 data = response.json()
245 # Use "detail" property if available, otherwise fall back to generic message
246 error_msg = (
247 data.get("detail")
248 or data.get("error")
249 or data.get("message")
250 or "Bad Request"
251 )
252 log_api_response(
253 logger,
254 response.status_code,
255 len(response.content) if response.content else None,
256 duration,
257 error_msg,
258 )
259 raise APIError(error_msg, status_code=400, response_body=data)
260 except ValueError:
261 # If JSON parsing fails, use generic 400 error
262 error_msg = "Bad Request - Invalid JSON response"
263 log_api_response(
264 logger,
265 response.status_code,
266 len(response.content) if response.content else None,
267 duration,
268 error_msg,
269 )
270 raise APIError(error_msg, status_code=400)
272 # Handle other HTTP errors
273 try:
274 response.raise_for_status()
275 except requests.HTTPError as e:
276 log_api_response(
277 logger,
278 response.status_code,
279 len(response.content) if response.content else None,
280 duration,
281 str(e),
282 )
283 raise HTTPError(
284 f"HTTP {response.status_code}: {e}",
285 status_code=response.status_code,
286 )
288 # Parse JSON response
289 try:
290 data = response.json()
291 except ValueError as e:
292 log_api_response(
293 logger,
294 response.status_code,
295 len(response.content) if response.content else None,
296 duration,
297 f"Invalid JSON: {e}",
298 )
299 raise APIError(f"Invalid JSON response: {e}")
301 # Check for API errors (fallback for other error patterns)
302 # Note: Don't treat "message" alone as an error indicator since some endpoints
303 # (like download/status) include message as a normal response field
304 if "error" in data:
305 error_msg = data.get("error") or data.get("message") or "Unknown API error"
306 log_api_response(
307 logger,
308 response.status_code,
309 len(response.content) if response.content else None,
310 duration,
311 error_msg,
312 )
313 raise APIError(
314 error_msg, status_code=response.status_code, response_body=data
315 )
317 # Log messages from successful responses (200 status code)
318 if response.status_code == 200 and "messages" in data:
319 messages = data["messages"]
320 if isinstance(messages, list):
321 for msg in messages:
322 logger.info(f"API Message: {msg}")
323 else:
324 logger.info(f"API Message: {messages}")
326 # Log successful response
327 log_api_response(
328 logger,
329 response.status_code,
330 len(response.content) if response.content else None,
331 duration,
332 )
334 return data
336 except Exception as e:
337 # Log any unexpected errors
338 if "response" in locals():
339 log_api_response(
340 logger,
341 getattr(response, "status_code", 0),
342 None,
343 time.time() - start_time,
344 str(e),
345 )
346 else:
347 logger.error(f"Request failed before response: {e}")
348 raise
350 def _download_binary_file(self, file_url: str, destination_path: str) -> None:
351 """Download binary file using client session with streaming support.
353 This method is used internally for downloading large binary files
354 like the ZIP archives from the download endpoints.
356 Args:
357 file_url: Relative or absolute URL to download
358 destination_path: Local path where file will be saved
360 Raises:
361 DownloadError: If download fails
362 """
363 import os
364 from .exceptions import DownloadError
366 # Construct full URL
367 if file_url.startswith('http'):
368 download_url = file_url
369 else:
370 download_url = urljoin(config.base_url, file_url.lstrip('/'))
372 logger.info(f"Downloading binary file from {download_url}")
374 # Use a longer timeout for file downloads
375 timeout = 600 # 10 minutes
377 def download_operation():
378 """Inner function for retry handler."""
379 response = self._session.get(
380 download_url,
381 stream=True,
382 timeout=timeout
383 )
384 response.raise_for_status()
386 try:
387 with open(destination_path, 'wb') as f:
388 for chunk in response.iter_content(chunk_size=8192):
389 if chunk:
390 f.write(chunk)
391 except IOError as e:
392 raise DownloadError(
393 f"Error writing file to disk: {e}",
394 file_name=os.path.basename(destination_path)
395 ) from e
397 try:
398 # Execute with retry handling
399 self.retry_handler.execute(download_operation)
400 logger.info(f"Successfully downloaded to {destination_path}")
402 except Exception as e:
403 logger.error(f"Failed to download file: {e}")
404 # Clean up partial file if it exists
405 if os.path.exists(destination_path):
406 try:
407 os.remove(destination_path)
408 logger.debug(f"Cleaned up partial file: {destination_path}")
409 except OSError:
410 pass
412 # Re-raise as DownloadError if not already
413 if not isinstance(e, DownloadError):
414 raise DownloadError(
415 f"Failed to download file from {download_url}",
416 file_name=os.path.basename(destination_path)
417 ) from e
418 raise
420 def close(self) -> None:
421 """Close client and cleanup resources."""
422 if self._session:
423 self._session.close()
424 logger.debug("USASpending client closed")