Coverage for src/usaspending/queries/spending_search.py: 70%

200 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 17:15 -0700

1"""Spending search query builder for USASpending spending by category endpoints.""" 

2 

3from __future__ import annotations 

4 

5import datetime 

6from typing import Any, Optional, Union, Literal 

7 

8from usaspending.client import USASpending 

9from usaspending.exceptions import ValidationError 

10from usaspending.models.spending import Spending 

11from usaspending.models.recipient_spending import RecipientSpending 

12from usaspending.models.district_spending import DistrictSpending 

13from usaspending.queries.query_builder import QueryBuilder 

14from usaspending.logging_config import USASpendingLogger 

15from usaspending.queries.filters import ( 

16 AgencyFilter, 

17 AgencyTier, 

18 AgencyType, 

19 AwardAmount, 

20 AwardAmountFilter, 

21 AwardDateType, 

22 KeywordsFilter, 

23 LocationSpec, 

24 LocationFilter, 

25 LocationScope, 

26 LocationScopeFilter, 

27 SimpleListFilter, 

28 TieredCodeFilter, 

29 TimePeriodFilter, 

30 TreasuryAccountComponentsFilter, 

31) 

32 

33logger = USASpendingLogger.get_logger(__name__) 

34 

35SpendingLevel = Literal["transactions", "awards", "subawards"] 

36SpendingCategory = Literal["recipient", "district"] 

37 

38 

39class SpendingSearch(QueryBuilder["Spending"]): 

40 """ 

41 Builds and executes spending by category search queries, allowing for complex 

42 filtering on spending data. This class follows a fluent interface pattern. 

43 

44 Supports both recipient and district spending searches with configurable 

45 spending levels (transactions, awards, subawards). 

46 """ 

47 

48 def __init__(self, client: USASpending): 

49 """ 

50 Initializes the SpendingSearch query builder. 

51 

52 Args: 

53 client: The USASpending client instance. 

54 """ 

55 super().__init__(client) 

56 self._category: Optional[SpendingCategory] = None 

57 self._spending_level: SpendingLevel = "transactions" 

58 self._subawards: bool = False 

59 

60 @property 

61 def _endpoint(self) -> str: 

62 """The API endpoint for this query.""" 

63 if self._category == "recipient": 

64 return "/search/spending_by_category/recipient/" 

65 elif self._category == "district": 

66 return "/search/spending_by_category/district/" 

67 else: 

68 raise ValidationError( 

69 "Category must be set. Use .by_recipient() or .by_district() method." 

70 ) 

71 

72 def _clone(self) -> SpendingSearch: 

73 """Creates an immutable copy of the query builder.""" 

74 clone = super()._clone() 

75 clone._category = self._category 

76 clone._spending_level = self._spending_level 

77 clone._subawards = self._subawards 

78 return clone 

79 

80 def _build_payload(self, page: int) -> dict[str, Any]: 

81 """Constructs the final API request payload from the filter objects.""" 

82 

83 if self._category is None: 

84 raise ValidationError( 

85 "Category must be set. Use .by_recipient() or .by_district() method." 

86 ) 

87 

88 final_filters = self._aggregate_filters() 

89 

90 payload = { 

91 "filters": final_filters, 

92 "category": self._category, 

93 "limit": self._get_effective_page_size(), 

94 "page": page, 

95 "spending_level": self._spending_level, 

96 } 

97 

98 # Add deprecated subawards field if needed 

99 if self._subawards: 

100 payload["subawards"] = self._subawards 

101 

102 return payload 

103 

104 def _transform_result(self, result: dict[str, Any]) -> Spending: 

105 """Transforms a single API result item into appropriate Spending model.""" 

106 # Add category info to result data for model initialization 

107 result_with_category = { 

108 **result, 

109 "category": self._category, 

110 "spending_level": self._spending_level, 

111 } 

112 

113 if self._category == "recipient": 

114 return RecipientSpending(result_with_category, self._client) 

115 elif self._category == "district": 

116 return DistrictSpending(result_with_category, self._client) 

117 else: 

118 return Spending(result_with_category, self._client) 

119 

120 def count(self) -> int: 

121 """ 

122 Get the total count of results by iterating through pages. 

123 

124 Respects pagination constraints like limit() and max_pages() to match 

125 the behavior of iteration. The spending by category endpoints don't 

126 have a total count in page_metadata, so we fetch pages and count results. 

127 

128 Returns: 

129 The total number of matching spending records, up to any set limits. 

130 """ 

131 logger.debug(f"{self.__class__.__name__}.count() called") 

132 

133 # Early return for zero or negative limits 

134 if self._total_limit is not None and self._total_limit <= 0: 

135 logger.info( 

136 f"{self.__class__.__name__}.count() = 0 (limit: {self._total_limit})" 

137 ) 

138 return 0 

139 

140 total_count = 0 

141 page = 1 

142 pages_fetched = 0 

143 

144 while True: 

145 # Check if we've reached the max pages limit 

146 if self._max_pages and pages_fetched >= self._max_pages: 

147 logger.debug(f"Max pages limit ({self._max_pages}) reached") 

148 break 

149 

150 response = self._execute_query(page) 

151 results = response.get("results", []) 

152 

153 # Count items, but respect total_limit 

154 items_to_count = len(results) 

155 if self._total_limit is not None: 

156 remaining = self._total_limit - total_count 

157 items_to_count = min(items_to_count, remaining) 

158 

159 total_count += items_to_count 

160 

161 # Stop if we've reached our limit 

162 if self._total_limit is not None and total_count >= self._total_limit: 

163 logger.debug(f"Total limit of {self._total_limit} items reached") 

164 break 

165 

166 # Check if there are more pages 

167 page_metadata = response.get("page_metadata", {}) 

168 has_next = page_metadata.get("hasNext", False) 

169 

170 if not has_next or not results: 

171 break 

172 

173 page += 1 

174 pages_fetched += 1 

175 

176 logger.info(f"{self.__class__.__name__}.count() = {total_count}") 

177 return total_count 

178 

179 # ========================================================================== 

180 # Category Selection Methods 

181 # ========================================================================== 

182 

183 def by_recipient(self) -> SpendingSearch: 

184 """ 

185 Configure search to return spending grouped by recipient. 

186 

187 Returns: 

188 A new SpendingSearch instance configured for recipient spending. 

189 """ 

190 clone = self._clone() 

191 clone._category = "recipient" 

192 return clone 

193 

194 def by_district(self) -> SpendingSearch: 

195 """ 

196 Configure search to return spending grouped by congressional district. 

197 

198 Returns: 

199 A new SpendingSearch instance configured for district spending. 

200 """ 

201 clone = self._clone() 

202 clone._category = "district" 

203 return clone 

204 

205 # ========================================================================== 

206 # Spending Level Configuration 

207 # ========================================================================== 

208 

209 def spending_level(self, level: SpendingLevel) -> SpendingSearch: 

210 """ 

211 Set the spending level for data aggregation. 

212 

213 Args: 

214 level: The spending level - "transactions", "awards", or "subawards" 

215 

216 Returns: 

217 A new SpendingSearch instance with the spending level configured. 

218 """ 

219 clone = self._clone() 

220 clone._spending_level = level 

221 return clone 

222 

223 def subawards_only(self, enabled: bool = True) -> SpendingSearch: 

224 """ 

225 Enable subawards search (deprecated parameter). 

226 

227 Args: 

228 enabled: Whether to search subawards instead of prime awards 

229 

230 Returns: 

231 A new SpendingSearch instance with subawards flag set. 

232 """ 

233 clone = self._clone() 

234 clone._subawards = enabled 

235 return clone 

236 

237 # ========================================================================== 

238 # Filter Methods (same as AwardsSearch) 

239 # ========================================================================== 

240 

241 def with_keywords(self, *keywords: str) -> SpendingSearch: 

242 """ 

243 Filter by a list of keywords. 

244 

245 Args: 

246 *keywords: One or more keywords to search for. 

247 

248 Returns: 

249 A new SpendingSearch instance with the filter applied. 

250 """ 

251 clone = self._clone() 

252 clone._filter_objects.append(KeywordsFilter(values=list(keywords))) 

253 return clone 

254 

255 def in_time_period( 

256 self, 

257 start_date: Union[datetime.date, str], 

258 end_date: Union[datetime.date, str], 

259 new_awards_only: bool = False, 

260 date_type: Optional[AwardDateType] = None, 

261 ) -> SpendingSearch: 

262 """ 

263 Filter by a specific date range. 

264 

265 Args: 

266 start_date: The start date of the period (datetime.date or string in "YYYY-MM-DD" format). 

267 end_date: The end date of the period (datetime.date or string in "YYYY-MM-DD" format). 

268 new_awards_only: If True, filters by awards with a start date within the given range. 

269 date_type: The type of date to filter on (e.g., action_date). 

270 

271 Returns: 

272 A new SpendingSearch instance with the filter applied. 

273 

274 Raises: 

275 ValidationError: If string dates are not in valid "YYYY-MM-DD" format. 

276 """ 

277 

278 # Parse string dates if needed 

279 if isinstance(start_date, str): 

280 try: 

281 start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date() 

282 except ValueError: 

283 raise ValidationError( 

284 f"Invalid start_date format: '{start_date}'. Expected 'YYYY-MM-DD'." 

285 ) 

286 

287 if isinstance(end_date, str): 

288 try: 

289 end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date() 

290 except ValueError: 

291 raise ValidationError( 

292 f"Invalid end_date format: '{end_date}'. Expected 'YYYY-MM-DD'." 

293 ) 

294 

295 # If convenience flag is set, use NEW_AWARDS_ONLY date type 

296 if new_awards_only: 

297 date_type = AwardDateType.NEW_AWARDS_ONLY 

298 

299 clone = self._clone() 

300 clone._filter_objects.append( 

301 TimePeriodFilter( 

302 start_date=start_date, end_date=end_date, date_type=date_type 

303 ) 

304 ) 

305 return clone 

306 

307 def for_fiscal_year( 

308 self, 

309 year: int, 

310 new_awards_only: bool = False, 

311 date_type: Optional[AwardDateType] = None, 

312 ) -> SpendingSearch: 

313 """ 

314 Adds a time period filter for a single US government fiscal year 

315 (October 1 to September 30). 

316 

317 Args: 

318 year: The fiscal year to filter by. 

319 new_awards_only: If True, filters by awards with a start date within the FY 

320 date_type: The type of date to filter on (e.g., action_date). 

321 

322 Returns: 

323 A new SpendingSearch instance with the fiscal year filter applied. 

324 """ 

325 start_date = datetime.date(year - 1, 10, 1) 

326 end_date = datetime.date(year, 9, 30) 

327 return self.in_time_period( 

328 start_date=start_date, 

329 end_date=end_date, 

330 new_awards_only=new_awards_only, 

331 date_type=date_type, 

332 ) 

333 

334 def with_place_of_performance_scope(self, scope: LocationScope) -> SpendingSearch: 

335 """ 

336 Filter spending by domestic or foreign place of performance. 

337 

338 Args: 

339 scope: The scope, either DOMESTIC or FOREIGN. 

340 

341 Returns: 

342 A new SpendingSearch instance with the filter applied. 

343 """ 

344 clone = self._clone() 

345 clone._filter_objects.append( 

346 LocationScopeFilter(key="place_of_performance_scope", scope=scope) 

347 ) 

348 return clone 

349 

350 def with_place_of_performance_locations( 

351 self, *locations: LocationSpec 

352 ) -> SpendingSearch: 

353 """ 

354 Filter by one or more specific geographic places of performance. 

355 

356 Args: 

357 *locations: One or more Location objects. 

358 

359 Returns: 

360 A new SpendingSearch instance with the filter applied. 

361 """ 

362 clone = self._clone() 

363 clone._filter_objects.append( 

364 LocationFilter( 

365 key="place_of_performance_locations", locations=list(locations) 

366 ) 

367 ) 

368 return clone 

369 

370 def for_agency( 

371 self, 

372 name: str, 

373 agency_type: AgencyType = AgencyType.AWARDING, 

374 tier: AgencyTier = AgencyTier.TOPTIER, 

375 ) -> SpendingSearch: 

376 """ 

377 Filter by a specific awarding or funding agency. 

378 

379 Args: 

380 name: The name of the agency. 

381 agency_type: The type of agency (AWARDING or FUNDING). 

382 tier: The agency tier (TOPTIER or SUBTIER). 

383 

384 Returns: 

385 A new SpendingSearch instance with the filter applied. 

386 """ 

387 clone = self._clone() 

388 clone._filter_objects.append( 

389 AgencyFilter(agency_type=agency_type, tier=tier, name=name) 

390 ) 

391 return clone 

392 

393 def with_recipient_search_text(self, *search_terms: str) -> SpendingSearch: 

394 """ 

395 Filter by recipient name, UEI, or DUNS. 

396 

397 Args: 

398 *search_terms: Text to search for across recipient identifiers. 

399 

400 Returns: 

401 A new SpendingSearch instance with the filter applied. 

402 """ 

403 clone = self._clone() 

404 clone._filter_objects.append( 

405 SimpleListFilter(key="recipient_search_text", values=list(search_terms)) 

406 ) 

407 return clone 

408 

409 def with_recipient_id(self, recipient_id: str) -> SpendingSearch: 

410 """ 

411 Filter by specific recipient ID. 

412 

413 Args: 

414 recipient_id: Unique identifier for the recipient. 

415 

416 Returns: 

417 A new SpendingSearch instance with the filter applied. 

418 """ 

419 clone = self._clone() 

420 clone._filter_objects.append( 

421 SimpleListFilter(key="recipient_id", values=[recipient_id]) 

422 ) 

423 return clone 

424 

425 def with_recipient_scope(self, scope: LocationScope) -> SpendingSearch: 

426 """ 

427 Filter recipients by domestic or foreign scope. 

428 

429 Args: 

430 scope: The scope, either DOMESTIC or FOREIGN. 

431 

432 Returns: 

433 A new SpendingSearch instance with the filter applied. 

434 """ 

435 clone = self._clone() 

436 clone._filter_objects.append( 

437 LocationScopeFilter(key="recipient_scope", scope=scope) 

438 ) 

439 return clone 

440 

441 def with_recipient_locations(self, *locations: LocationSpec) -> SpendingSearch: 

442 """ 

443 Filter by one or more specific recipient locations. 

444 

445 Args: 

446 *locations: One or more Location objects. 

447 

448 Returns: 

449 A new SpendingSearch instance with the filter applied. 

450 """ 

451 clone = self._clone() 

452 clone._filter_objects.append( 

453 LocationFilter(key="recipient_locations", locations=list(locations)) 

454 ) 

455 return clone 

456 

457 def with_recipient_types(self, *type_names: str) -> SpendingSearch: 

458 """ 

459 Filter by one or more recipient or business types. 

460 

461 Args: 

462 *type_names: The names of the recipient types (e.g., "small_business"). 

463 

464 Returns: 

465 A new SpendingSearch instance with the filter applied. 

466 """ 

467 clone = self._clone() 

468 clone._filter_objects.append( 

469 SimpleListFilter(key="recipient_type_names", values=list(type_names)) 

470 ) 

471 return clone 

472 

473 def with_award_types(self, *award_codes: str) -> SpendingSearch: 

474 """ 

475 Filter by one or more award type codes. 

476 

477 Args: 

478 *award_codes: A sequence of award type codes (e.g., "A", "B", "02"). 

479 

480 Returns: 

481 A new SpendingSearch instance with the filter applied. 

482 """ 

483 clone = self._clone() 

484 clone._filter_objects.append( 

485 SimpleListFilter(key="award_type_codes", values=list(award_codes)) 

486 ) 

487 return clone 

488 

489 def with_award_ids(self, *award_ids: str) -> SpendingSearch: 

490 """ 

491 Filter by specific award IDs (FAIN, PIID, URI). 

492 

493 Args: 

494 *award_ids: The exact award IDs to search for. 

495 

496 Returns: 

497 A new SpendingSearch instance with the filter applied. 

498 """ 

499 clone = self._clone() 

500 clone._filter_objects.append( 

501 SimpleListFilter(key="award_ids", values=list(award_ids)) 

502 ) 

503 return clone 

504 

505 def with_award_amounts(self, *amounts: AwardAmount) -> SpendingSearch: 

506 """ 

507 Filter by one or more award amount ranges. 

508 

509 Args: 

510 *amounts: One or more AwardAmount objects defining the ranges. 

511 

512 Returns: 

513 A new SpendingSearch instance with the filter applied. 

514 """ 

515 clone = self._clone() 

516 clone._filter_objects.append(AwardAmountFilter(amounts=list(amounts))) 

517 return clone 

518 

519 def with_cfda_numbers(self, *program_numbers: str) -> SpendingSearch: 

520 """ 

521 Filter by one or more CFDA program numbers. 

522 

523 Args: 

524 *program_numbers: The CFDA numbers to filter by. 

525 

526 Returns: 

527 A new SpendingSearch instance with the filter applied. 

528 """ 

529 clone = self._clone() 

530 clone._filter_objects.append( 

531 SimpleListFilter(key="program_numbers", values=list(program_numbers)) 

532 ) 

533 return clone 

534 

535 def with_naics_codes( 

536 self, 

537 require: Optional[list[str]] = None, 

538 exclude: Optional[list[str]] = None, 

539 ) -> SpendingSearch: 

540 """ 

541 Filter by NAICS codes, including or excluding specific codes. 

542 

543 Args: 

544 require: A list of NAICS codes to require. 

545 exclude: A list of NAICS codes to exclude. 

546 

547 Returns: 

548 A new SpendingSearch instance with the filter applied. 

549 """ 

550 clone = self._clone() 

551 # The API expects a list of lists, but for NAICS, each list contains one element. 

552 require_list = [[code] for code in require] if require else [] 

553 exclude_list = [[code] for code in exclude] if exclude else [] 

554 clone._filter_objects.append( 

555 TieredCodeFilter( 

556 key="naics_codes", require=require_list, exclude=exclude_list 

557 ) 

558 ) 

559 return clone 

560 

561 def with_psc_codes( 

562 self, 

563 require: Optional[list[list[str]]] = None, 

564 exclude: Optional[list[list[str]]] = None, 

565 ) -> SpendingSearch: 

566 """ 

567 Filter by Product and Service Codes (PSC), including or excluding codes. 

568 

569 Args: 

570 require: A list of PSC code paths to require. 

571 exclude: A list of PSC code paths to exclude. 

572 

573 Returns: 

574 A new SpendingSearch instance with the filter applied. 

575 """ 

576 clone = self._clone() 

577 clone._filter_objects.append( 

578 TieredCodeFilter( 

579 key="psc_codes", 

580 require=require or [], 

581 exclude=exclude or [], 

582 ) 

583 ) 

584 return clone 

585 

586 def with_contract_pricing_types(self, *type_codes: str) -> SpendingSearch: 

587 """ 

588 Filter by one or more contract pricing type codes. 

589 

590 Args: 

591 *type_codes: The contract pricing type codes. 

592 

593 Returns: 

594 A new SpendingSearch instance with the filter applied. 

595 """ 

596 clone = self._clone() 

597 clone._filter_objects.append( 

598 SimpleListFilter(key="contract_pricing_type_codes", values=list(type_codes)) 

599 ) 

600 return clone 

601 

602 def with_set_aside_types(self, *type_codes: str) -> SpendingSearch: 

603 """ 

604 Filter by one or more set-aside type codes. 

605 

606 Args: 

607 *type_codes: The set-aside type codes. 

608 

609 Returns: 

610 A new SpendingSearch instance with the filter applied. 

611 """ 

612 clone = self._clone() 

613 clone._filter_objects.append( 

614 SimpleListFilter(key="set_aside_type_codes", values=list(type_codes)) 

615 ) 

616 return clone 

617 

618 def with_extent_competed_types(self, *type_codes: str) -> SpendingSearch: 

619 """ 

620 Filter by one or more extent competed type codes. 

621 

622 Args: 

623 *type_codes: The extent competed type codes. 

624 

625 Returns: 

626 A new SpendingSearch instance with the filter applied. 

627 """ 

628 clone = self._clone() 

629 clone._filter_objects.append( 

630 SimpleListFilter(key="extent_competed_type_codes", values=list(type_codes)) 

631 ) 

632 return clone 

633 

634 def with_tas_codes( 

635 self, 

636 require: Optional[list[list[str]]] = None, 

637 exclude: Optional[list[list[str]]] = None, 

638 ) -> SpendingSearch: 

639 """ 

640 Filter by Treasury Account Symbols (TAS), including or excluding codes. 

641 

642 Args: 

643 require: A list of TAS code paths to require. 

644 exclude: A list of TAS code paths to exclude. 

645 

646 Returns: 

647 A new SpendingSearch instance with the filter applied. 

648 """ 

649 clone = self._clone() 

650 clone._filter_objects.append( 

651 TieredCodeFilter( 

652 key="tas_codes", 

653 require=require or [], 

654 exclude=exclude or [], 

655 ) 

656 ) 

657 return clone 

658 

659 def with_treasury_account_components( 

660 self, *components: dict[str, str] 

661 ) -> SpendingSearch: 

662 """ 

663 Filter by specific components of a Treasury Account. 

664 

665 Args: 

666 *components: Dictionaries representing TAS components (aid, main, etc.). 

667 

668 Returns: 

669 A new SpendingSearch instance with the filter applied. 

670 """ 

671 clone = self._clone() 

672 clone._filter_objects.append( 

673 TreasuryAccountComponentsFilter(components=list(components)) 

674 ) 

675 return clone 

676 

677 def with_def_codes(self, *def_codes: str) -> SpendingSearch: 

678 """ 

679 Filter by one or more Disaster Emergency Fund (DEF) codes. 

680 

681 Args: 

682 *def_codes: The DEF codes (e.g., "L", "M", "N"). 

683 

684 Returns: 

685 A new SpendingSearch instance with the filter applied. 

686 """ 

687 clone = self._clone() 

688 clone._filter_objects.append( 

689 SimpleListFilter(key="def_codes", values=list(def_codes)) 

690 ) 

691 return clone