Coverage for src/usaspending/download/manager.py: 60%

55 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 17:15 -0700

1# src/usaspending/download/manager.py 

2""" 

3This module defines the DownloadManager class, responsible for orchestrating the 

4download process of award data from the USASpending API. It handles queuing 

5download requests, checking their status, downloading the completed files, 

6and extracting their contents. 

7""" 

8 

9from __future__ import annotations 

10import zipfile 

11import os 

12from typing import TYPE_CHECKING, Optional, List 

13 

14from ..exceptions import APIError, DownloadError 

15from ..logging_config import USASpendingLogger 

16from ..models.download import DownloadStatus, AwardType, FileFormat 

17 

18if TYPE_CHECKING: 

19 from ..client import USASpending 

20 from .job import DownloadJob 

21 

22logger = USASpendingLogger.get_logger(__name__) 

23 

24class DownloadManager: 

25 """Handles the core logic for queuing, monitoring, downloading, and processing award data.""" 

26 

27 BASE_ENDPOINT = "/download/" 

28 

29 def __init__(self, client: USASpending): 

30 self._client = client 

31 

32 def queue_download(self, download_type: AwardType, award_id: str, file_format: FileFormat, destination_dir: Optional[str]) -> DownloadJob: 

33 """ 

34 Sends the initial request to the API to start the download job. 

35 """ 

36 

37 endpoint = f"{self.BASE_ENDPOINT}{download_type}/" 

38 payload = { 

39 "award_id": award_id, 

40 "file_format": file_format 

41 } 

42 

43 logger.info(f"Queueing {download_type} download for award {award_id} (Format: {file_format})") 

44 

45 try: 

46 response_data = self._client._make_uncached_request("POST", endpoint, json=payload) 

47 except APIError as e: 

48 logger.error(f"Failed to queue download for award {award_id}: {e}") 

49 raise DownloadError(f"Failed to queue download: {e}") from e 

50 

51 # Import DownloadJob here to avoid circular dependency 

52 from .job import DownloadJob 

53 

54 file_name = response_data.get("file_name") 

55 if not file_name: 

56 raise DownloadError("API response missing required field 'file_name'.") 

57 

58 job = DownloadJob( 

59 manager=self, 

60 file_name=file_name, 

61 initial_file_url=response_data.get("file_url"), 

62 request_details=response_data.get("download_request"), 

63 destination_dir=destination_dir 

64 ) 

65 

66 return job 

67 

68 def check_status(self, file_name: str) -> DownloadStatus: 

69 """ 

70 Checks the status of a download job via the API. 

71 """ 

72 endpoint = f"{self.BASE_ENDPOINT}status" 

73 params = {"file_name": file_name} 

74 

75 # Never cache this endpoint 

76 response_data = self._client._make_uncached_request("GET", endpoint, params=params) 

77 return DownloadStatus(response_data) 

78 

79 def download_file(self, file_url: str, destination_path: str, file_name: str) -> None: 

80 """ 

81 Downloads the zipped file from the provided URL using the client's binary download method. 

82  

83 This delegates to the client's _download_binary_file method which handles: 

84 - Session management with proper headers 

85 - Retry logic with exponential backoff 

86 - Streaming for large files 

87 - Cleanup of partial downloads on failure 

88 """ 

89 logger.info(f"Initiating download of {file_name}") 

90 

91 try: 

92 # Use the client's binary download method for consistency 

93 self._client._download_binary_file(file_url, destination_path) 

94 

95 except Exception as e: 

96 # Log with file_name context 

97 logger.error(f"Failed to download file {file_name}: {e}") 

98 # Ensure the exception includes the file_name 

99 if hasattr(e, 'file_name') and not e.file_name: 

100 e.file_name = file_name 

101 raise 

102 

103 def unzip_file(self, zip_path: str, extract_dir: str) -> List[str]: 

104 """ 

105 Unzips the downloaded file. 

106 """ 

107 logger.info(f"Unzipping {zip_path} to {extract_dir}") 

108 if not os.path.exists(extract_dir): 

109 os.makedirs(extract_dir) 

110 

111 try: 

112 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 

113 extracted_files = zip_ref.namelist() 

114 zip_ref.extractall(extract_dir) 

115 

116 logger.info(f"Successfully extracted {len(extracted_files)} files.") 

117 return [os.path.join(extract_dir, f) for f in extracted_files] 

118 

119 except zipfile.BadZipFile: 

120 raise DownloadError(f"Downloaded file is not a valid zip archive: {zip_path}", file_name=os.path.basename(zip_path)) 

121 except Exception as e: 

122 raise DownloadError(f"An error occurred during unzipping: {e}", file_name=os.path.basename(zip_path)) from e