Coverage for src/usaspending/queries/spending_search.py: 70%
200 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
1"""Spending search query builder for USASpending spending by category endpoints."""
3from __future__ import annotations
5import datetime
6from typing import Any, Optional, Union, Literal
8from usaspending.client import USASpending
9from usaspending.exceptions import ValidationError
10from usaspending.models.spending import Spending
11from usaspending.models.recipient_spending import RecipientSpending
12from usaspending.models.district_spending import DistrictSpending
13from usaspending.queries.query_builder import QueryBuilder
14from usaspending.logging_config import USASpendingLogger
15from usaspending.queries.filters import (
16 AgencyFilter,
17 AgencyTier,
18 AgencyType,
19 AwardAmount,
20 AwardAmountFilter,
21 AwardDateType,
22 KeywordsFilter,
23 LocationSpec,
24 LocationFilter,
25 LocationScope,
26 LocationScopeFilter,
27 SimpleListFilter,
28 TieredCodeFilter,
29 TimePeriodFilter,
30 TreasuryAccountComponentsFilter,
31)
33logger = USASpendingLogger.get_logger(__name__)
35SpendingLevel = Literal["transactions", "awards", "subawards"]
36SpendingCategory = Literal["recipient", "district"]
39class SpendingSearch(QueryBuilder["Spending"]):
40 """
41 Builds and executes spending by category search queries, allowing for complex
42 filtering on spending data. This class follows a fluent interface pattern.
44 Supports both recipient and district spending searches with configurable
45 spending levels (transactions, awards, subawards).
46 """
48 def __init__(self, client: USASpending):
49 """
50 Initializes the SpendingSearch query builder.
52 Args:
53 client: The USASpending client instance.
54 """
55 super().__init__(client)
56 self._category: Optional[SpendingCategory] = None
57 self._spending_level: SpendingLevel = "transactions"
58 self._subawards: bool = False
60 @property
61 def _endpoint(self) -> str:
62 """The API endpoint for this query."""
63 if self._category == "recipient":
64 return "/search/spending_by_category/recipient/"
65 elif self._category == "district":
66 return "/search/spending_by_category/district/"
67 else:
68 raise ValidationError(
69 "Category must be set. Use .by_recipient() or .by_district() method."
70 )
72 def _clone(self) -> SpendingSearch:
73 """Creates an immutable copy of the query builder."""
74 clone = super()._clone()
75 clone._category = self._category
76 clone._spending_level = self._spending_level
77 clone._subawards = self._subawards
78 return clone
80 def _build_payload(self, page: int) -> dict[str, Any]:
81 """Constructs the final API request payload from the filter objects."""
83 if self._category is None:
84 raise ValidationError(
85 "Category must be set. Use .by_recipient() or .by_district() method."
86 )
88 final_filters = self._aggregate_filters()
90 payload = {
91 "filters": final_filters,
92 "category": self._category,
93 "limit": self._get_effective_page_size(),
94 "page": page,
95 "spending_level": self._spending_level,
96 }
98 # Add deprecated subawards field if needed
99 if self._subawards:
100 payload["subawards"] = self._subawards
102 return payload
104 def _transform_result(self, result: dict[str, Any]) -> Spending:
105 """Transforms a single API result item into appropriate Spending model."""
106 # Add category info to result data for model initialization
107 result_with_category = {
108 **result,
109 "category": self._category,
110 "spending_level": self._spending_level,
111 }
113 if self._category == "recipient":
114 return RecipientSpending(result_with_category, self._client)
115 elif self._category == "district":
116 return DistrictSpending(result_with_category, self._client)
117 else:
118 return Spending(result_with_category, self._client)
120 def count(self) -> int:
121 """
122 Get the total count of results by iterating through pages.
124 Respects pagination constraints like limit() and max_pages() to match
125 the behavior of iteration. The spending by category endpoints don't
126 have a total count in page_metadata, so we fetch pages and count results.
128 Returns:
129 The total number of matching spending records, up to any set limits.
130 """
131 logger.debug(f"{self.__class__.__name__}.count() called")
133 # Early return for zero or negative limits
134 if self._total_limit is not None and self._total_limit <= 0:
135 logger.info(
136 f"{self.__class__.__name__}.count() = 0 (limit: {self._total_limit})"
137 )
138 return 0
140 total_count = 0
141 page = 1
142 pages_fetched = 0
144 while True:
145 # Check if we've reached the max pages limit
146 if self._max_pages and pages_fetched >= self._max_pages:
147 logger.debug(f"Max pages limit ({self._max_pages}) reached")
148 break
150 response = self._execute_query(page)
151 results = response.get("results", [])
153 # Count items, but respect total_limit
154 items_to_count = len(results)
155 if self._total_limit is not None:
156 remaining = self._total_limit - total_count
157 items_to_count = min(items_to_count, remaining)
159 total_count += items_to_count
161 # Stop if we've reached our limit
162 if self._total_limit is not None and total_count >= self._total_limit:
163 logger.debug(f"Total limit of {self._total_limit} items reached")
164 break
166 # Check if there are more pages
167 page_metadata = response.get("page_metadata", {})
168 has_next = page_metadata.get("hasNext", False)
170 if not has_next or not results:
171 break
173 page += 1
174 pages_fetched += 1
176 logger.info(f"{self.__class__.__name__}.count() = {total_count}")
177 return total_count
179 # ==========================================================================
180 # Category Selection Methods
181 # ==========================================================================
183 def by_recipient(self) -> SpendingSearch:
184 """
185 Configure search to return spending grouped by recipient.
187 Returns:
188 A new SpendingSearch instance configured for recipient spending.
189 """
190 clone = self._clone()
191 clone._category = "recipient"
192 return clone
194 def by_district(self) -> SpendingSearch:
195 """
196 Configure search to return spending grouped by congressional district.
198 Returns:
199 A new SpendingSearch instance configured for district spending.
200 """
201 clone = self._clone()
202 clone._category = "district"
203 return clone
205 # ==========================================================================
206 # Spending Level Configuration
207 # ==========================================================================
209 def spending_level(self, level: SpendingLevel) -> SpendingSearch:
210 """
211 Set the spending level for data aggregation.
213 Args:
214 level: The spending level - "transactions", "awards", or "subawards"
216 Returns:
217 A new SpendingSearch instance with the spending level configured.
218 """
219 clone = self._clone()
220 clone._spending_level = level
221 return clone
223 def subawards_only(self, enabled: bool = True) -> SpendingSearch:
224 """
225 Enable subawards search (deprecated parameter).
227 Args:
228 enabled: Whether to search subawards instead of prime awards
230 Returns:
231 A new SpendingSearch instance with subawards flag set.
232 """
233 clone = self._clone()
234 clone._subawards = enabled
235 return clone
237 # ==========================================================================
238 # Filter Methods (same as AwardsSearch)
239 # ==========================================================================
241 def with_keywords(self, *keywords: str) -> SpendingSearch:
242 """
243 Filter by a list of keywords.
245 Args:
246 *keywords: One or more keywords to search for.
248 Returns:
249 A new SpendingSearch instance with the filter applied.
250 """
251 clone = self._clone()
252 clone._filter_objects.append(KeywordsFilter(values=list(keywords)))
253 return clone
255 def in_time_period(
256 self,
257 start_date: Union[datetime.date, str],
258 end_date: Union[datetime.date, str],
259 new_awards_only: bool = False,
260 date_type: Optional[AwardDateType] = None,
261 ) -> SpendingSearch:
262 """
263 Filter by a specific date range.
265 Args:
266 start_date: The start date of the period (datetime.date or string in "YYYY-MM-DD" format).
267 end_date: The end date of the period (datetime.date or string in "YYYY-MM-DD" format).
268 new_awards_only: If True, filters by awards with a start date within the given range.
269 date_type: The type of date to filter on (e.g., action_date).
271 Returns:
272 A new SpendingSearch instance with the filter applied.
274 Raises:
275 ValidationError: If string dates are not in valid "YYYY-MM-DD" format.
276 """
278 # Parse string dates if needed
279 if isinstance(start_date, str):
280 try:
281 start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
282 except ValueError:
283 raise ValidationError(
284 f"Invalid start_date format: '{start_date}'. Expected 'YYYY-MM-DD'."
285 )
287 if isinstance(end_date, str):
288 try:
289 end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
290 except ValueError:
291 raise ValidationError(
292 f"Invalid end_date format: '{end_date}'. Expected 'YYYY-MM-DD'."
293 )
295 # If convenience flag is set, use NEW_AWARDS_ONLY date type
296 if new_awards_only:
297 date_type = AwardDateType.NEW_AWARDS_ONLY
299 clone = self._clone()
300 clone._filter_objects.append(
301 TimePeriodFilter(
302 start_date=start_date, end_date=end_date, date_type=date_type
303 )
304 )
305 return clone
307 def for_fiscal_year(
308 self,
309 year: int,
310 new_awards_only: bool = False,
311 date_type: Optional[AwardDateType] = None,
312 ) -> SpendingSearch:
313 """
314 Adds a time period filter for a single US government fiscal year
315 (October 1 to September 30).
317 Args:
318 year: The fiscal year to filter by.
319 new_awards_only: If True, filters by awards with a start date within the FY
320 date_type: The type of date to filter on (e.g., action_date).
322 Returns:
323 A new SpendingSearch instance with the fiscal year filter applied.
324 """
325 start_date = datetime.date(year - 1, 10, 1)
326 end_date = datetime.date(year, 9, 30)
327 return self.in_time_period(
328 start_date=start_date,
329 end_date=end_date,
330 new_awards_only=new_awards_only,
331 date_type=date_type,
332 )
334 def with_place_of_performance_scope(self, scope: LocationScope) -> SpendingSearch:
335 """
336 Filter spending by domestic or foreign place of performance.
338 Args:
339 scope: The scope, either DOMESTIC or FOREIGN.
341 Returns:
342 A new SpendingSearch instance with the filter applied.
343 """
344 clone = self._clone()
345 clone._filter_objects.append(
346 LocationScopeFilter(key="place_of_performance_scope", scope=scope)
347 )
348 return clone
350 def with_place_of_performance_locations(
351 self, *locations: LocationSpec
352 ) -> SpendingSearch:
353 """
354 Filter by one or more specific geographic places of performance.
356 Args:
357 *locations: One or more Location objects.
359 Returns:
360 A new SpendingSearch instance with the filter applied.
361 """
362 clone = self._clone()
363 clone._filter_objects.append(
364 LocationFilter(
365 key="place_of_performance_locations", locations=list(locations)
366 )
367 )
368 return clone
370 def for_agency(
371 self,
372 name: str,
373 agency_type: AgencyType = AgencyType.AWARDING,
374 tier: AgencyTier = AgencyTier.TOPTIER,
375 ) -> SpendingSearch:
376 """
377 Filter by a specific awarding or funding agency.
379 Args:
380 name: The name of the agency.
381 agency_type: The type of agency (AWARDING or FUNDING).
382 tier: The agency tier (TOPTIER or SUBTIER).
384 Returns:
385 A new SpendingSearch instance with the filter applied.
386 """
387 clone = self._clone()
388 clone._filter_objects.append(
389 AgencyFilter(agency_type=agency_type, tier=tier, name=name)
390 )
391 return clone
393 def with_recipient_search_text(self, *search_terms: str) -> SpendingSearch:
394 """
395 Filter by recipient name, UEI, or DUNS.
397 Args:
398 *search_terms: Text to search for across recipient identifiers.
400 Returns:
401 A new SpendingSearch instance with the filter applied.
402 """
403 clone = self._clone()
404 clone._filter_objects.append(
405 SimpleListFilter(key="recipient_search_text", values=list(search_terms))
406 )
407 return clone
409 def with_recipient_id(self, recipient_id: str) -> SpendingSearch:
410 """
411 Filter by specific recipient ID.
413 Args:
414 recipient_id: Unique identifier for the recipient.
416 Returns:
417 A new SpendingSearch instance with the filter applied.
418 """
419 clone = self._clone()
420 clone._filter_objects.append(
421 SimpleListFilter(key="recipient_id", values=[recipient_id])
422 )
423 return clone
425 def with_recipient_scope(self, scope: LocationScope) -> SpendingSearch:
426 """
427 Filter recipients by domestic or foreign scope.
429 Args:
430 scope: The scope, either DOMESTIC or FOREIGN.
432 Returns:
433 A new SpendingSearch instance with the filter applied.
434 """
435 clone = self._clone()
436 clone._filter_objects.append(
437 LocationScopeFilter(key="recipient_scope", scope=scope)
438 )
439 return clone
441 def with_recipient_locations(self, *locations: LocationSpec) -> SpendingSearch:
442 """
443 Filter by one or more specific recipient locations.
445 Args:
446 *locations: One or more Location objects.
448 Returns:
449 A new SpendingSearch instance with the filter applied.
450 """
451 clone = self._clone()
452 clone._filter_objects.append(
453 LocationFilter(key="recipient_locations", locations=list(locations))
454 )
455 return clone
457 def with_recipient_types(self, *type_names: str) -> SpendingSearch:
458 """
459 Filter by one or more recipient or business types.
461 Args:
462 *type_names: The names of the recipient types (e.g., "small_business").
464 Returns:
465 A new SpendingSearch instance with the filter applied.
466 """
467 clone = self._clone()
468 clone._filter_objects.append(
469 SimpleListFilter(key="recipient_type_names", values=list(type_names))
470 )
471 return clone
473 def with_award_types(self, *award_codes: str) -> SpendingSearch:
474 """
475 Filter by one or more award type codes.
477 Args:
478 *award_codes: A sequence of award type codes (e.g., "A", "B", "02").
480 Returns:
481 A new SpendingSearch instance with the filter applied.
482 """
483 clone = self._clone()
484 clone._filter_objects.append(
485 SimpleListFilter(key="award_type_codes", values=list(award_codes))
486 )
487 return clone
489 def with_award_ids(self, *award_ids: str) -> SpendingSearch:
490 """
491 Filter by specific award IDs (FAIN, PIID, URI).
493 Args:
494 *award_ids: The exact award IDs to search for.
496 Returns:
497 A new SpendingSearch instance with the filter applied.
498 """
499 clone = self._clone()
500 clone._filter_objects.append(
501 SimpleListFilter(key="award_ids", values=list(award_ids))
502 )
503 return clone
505 def with_award_amounts(self, *amounts: AwardAmount) -> SpendingSearch:
506 """
507 Filter by one or more award amount ranges.
509 Args:
510 *amounts: One or more AwardAmount objects defining the ranges.
512 Returns:
513 A new SpendingSearch instance with the filter applied.
514 """
515 clone = self._clone()
516 clone._filter_objects.append(AwardAmountFilter(amounts=list(amounts)))
517 return clone
519 def with_cfda_numbers(self, *program_numbers: str) -> SpendingSearch:
520 """
521 Filter by one or more CFDA program numbers.
523 Args:
524 *program_numbers: The CFDA numbers to filter by.
526 Returns:
527 A new SpendingSearch instance with the filter applied.
528 """
529 clone = self._clone()
530 clone._filter_objects.append(
531 SimpleListFilter(key="program_numbers", values=list(program_numbers))
532 )
533 return clone
535 def with_naics_codes(
536 self,
537 require: Optional[list[str]] = None,
538 exclude: Optional[list[str]] = None,
539 ) -> SpendingSearch:
540 """
541 Filter by NAICS codes, including or excluding specific codes.
543 Args:
544 require: A list of NAICS codes to require.
545 exclude: A list of NAICS codes to exclude.
547 Returns:
548 A new SpendingSearch instance with the filter applied.
549 """
550 clone = self._clone()
551 # The API expects a list of lists, but for NAICS, each list contains one element.
552 require_list = [[code] for code in require] if require else []
553 exclude_list = [[code] for code in exclude] if exclude else []
554 clone._filter_objects.append(
555 TieredCodeFilter(
556 key="naics_codes", require=require_list, exclude=exclude_list
557 )
558 )
559 return clone
561 def with_psc_codes(
562 self,
563 require: Optional[list[list[str]]] = None,
564 exclude: Optional[list[list[str]]] = None,
565 ) -> SpendingSearch:
566 """
567 Filter by Product and Service Codes (PSC), including or excluding codes.
569 Args:
570 require: A list of PSC code paths to require.
571 exclude: A list of PSC code paths to exclude.
573 Returns:
574 A new SpendingSearch instance with the filter applied.
575 """
576 clone = self._clone()
577 clone._filter_objects.append(
578 TieredCodeFilter(
579 key="psc_codes",
580 require=require or [],
581 exclude=exclude or [],
582 )
583 )
584 return clone
586 def with_contract_pricing_types(self, *type_codes: str) -> SpendingSearch:
587 """
588 Filter by one or more contract pricing type codes.
590 Args:
591 *type_codes: The contract pricing type codes.
593 Returns:
594 A new SpendingSearch instance with the filter applied.
595 """
596 clone = self._clone()
597 clone._filter_objects.append(
598 SimpleListFilter(key="contract_pricing_type_codes", values=list(type_codes))
599 )
600 return clone
602 def with_set_aside_types(self, *type_codes: str) -> SpendingSearch:
603 """
604 Filter by one or more set-aside type codes.
606 Args:
607 *type_codes: The set-aside type codes.
609 Returns:
610 A new SpendingSearch instance with the filter applied.
611 """
612 clone = self._clone()
613 clone._filter_objects.append(
614 SimpleListFilter(key="set_aside_type_codes", values=list(type_codes))
615 )
616 return clone
618 def with_extent_competed_types(self, *type_codes: str) -> SpendingSearch:
619 """
620 Filter by one or more extent competed type codes.
622 Args:
623 *type_codes: The extent competed type codes.
625 Returns:
626 A new SpendingSearch instance with the filter applied.
627 """
628 clone = self._clone()
629 clone._filter_objects.append(
630 SimpleListFilter(key="extent_competed_type_codes", values=list(type_codes))
631 )
632 return clone
634 def with_tas_codes(
635 self,
636 require: Optional[list[list[str]]] = None,
637 exclude: Optional[list[list[str]]] = None,
638 ) -> SpendingSearch:
639 """
640 Filter by Treasury Account Symbols (TAS), including or excluding codes.
642 Args:
643 require: A list of TAS code paths to require.
644 exclude: A list of TAS code paths to exclude.
646 Returns:
647 A new SpendingSearch instance with the filter applied.
648 """
649 clone = self._clone()
650 clone._filter_objects.append(
651 TieredCodeFilter(
652 key="tas_codes",
653 require=require or [],
654 exclude=exclude or [],
655 )
656 )
657 return clone
659 def with_treasury_account_components(
660 self, *components: dict[str, str]
661 ) -> SpendingSearch:
662 """
663 Filter by specific components of a Treasury Account.
665 Args:
666 *components: Dictionaries representing TAS components (aid, main, etc.).
668 Returns:
669 A new SpendingSearch instance with the filter applied.
670 """
671 clone = self._clone()
672 clone._filter_objects.append(
673 TreasuryAccountComponentsFilter(components=list(components))
674 )
675 return clone
677 def with_def_codes(self, *def_codes: str) -> SpendingSearch:
678 """
679 Filter by one or more Disaster Emergency Fund (DEF) codes.
681 Args:
682 *def_codes: The DEF codes (e.g., "L", "M", "N").
684 Returns:
685 A new SpendingSearch instance with the filter applied.
686 """
687 clone = self._clone()
688 clone._filter_objects.append(
689 SimpleListFilter(key="def_codes", values=list(def_codes))
690 )
691 return clone