Coverage for src/usaspending/download/job.py: 45%

98 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 17:15 -0700

1# src/usaspending/download/job.py 

2""" 

3This module defines the DownloadJob class, which encapsulates the lifecycle of a single 

4award data download task from the USASpending API. It handles polling for status updates, 

5downloading the completed file, and extracting its contents. 

6""" 

7 

8from __future__ import annotations 

9import time 

10import os 

11from typing import TYPE_CHECKING, Dict, Any, Optional, List 

12 

13from ..exceptions import DownloadError, APIError 

14from ..logging_config import USASpendingLogger 

15from ..models.download import DownloadStatus, DownloadState 

16 

17if TYPE_CHECKING: 

18 from .manager import DownloadManager 

19 

20logger = USASpendingLogger.get_logger(__name__) 

21 

22class DownloadJob: 

23 """ 

24 Represents a single award data download task, managing its lifecycle (polling, downloading, extraction). 

25 

26 This class is designed to be instantiated by the `DownloadManager` after a download 

27 request has been successfully queued with the USASpending API. It provides methods 

28 to monitor the status of the download job, wait for its completion, and process 

29 the downloaded data (downloading the zip file and extracting its contents). 

30 

31 Attributes: 

32 file_name (str): The unique identifier for the download job, typically a filename 

33 provided by the USASpending API. 

34 destination_dir (str): The local directory where the downloaded zip file will be 

35 saved and its contents extracted. Defaults to the current 

36 working directory if not specified. 

37 request_details (Optional[Dict[str, Any]]): A dictionary containing details of 

38 the original download request, as 

39 returned by the API. 

40 state (DownloadState): The current state of the download job (e.g., PENDING, 

41 RUNNING, FINISHED, FAILED). This is a read-only property. 

42 status_details (Optional[DownloadStatus]): A `DownloadStatus` object containing 

43 the latest detailed status information 

44 retrieved from the API. This is a 

45 read-only property. 

46 error_message (Optional[str]): A message describing the error if the job 

47 transitions to a FAILED state. This is a 

48 read-only property. 

49 result_files (Optional[List[str]]): A list of absolute paths to the files 

50 extracted from the downloaded zip archive 

51 upon successful completion. This is a 

52 read-only property. 

53 

54 """ 

55 

56 DEFAULT_POLL_INTERVAL = 30 # seconds 

57 DEFAULT_TIMEOUT = 1800 # 30 minutes 

58 

59 def __init__(self, manager: DownloadManager, file_name: str, initial_file_url: Optional[str], request_details: Optional[Dict[str, Any]], destination_dir: Optional[str] = None): 

60 self._manager = manager 

61 self.file_name = file_name 

62 self._initial_file_url = initial_file_url # URL provided at initiation 

63 self.request_details = request_details 

64 self.destination_dir = destination_dir or os.getcwd() 

65 

66 self._status_details: Optional[DownloadStatus] = None 

67 self._state: DownloadState = DownloadState.PENDING 

68 self._result_files: Optional[List[str]] = None 

69 self._error_message: Optional[str] = None 

70 

71 @property 

72 def state(self) -> DownloadState: 

73 """Current state of the job.""" 

74 return self._state 

75 

76 @property 

77 def status_details(self) -> Optional[DownloadStatus]: 

78 """Detailed status information from the API (cached).""" 

79 return self._status_details 

80 

81 @property 

82 def error_message(self) -> Optional[str]: 

83 return self._error_message 

84 

85 @property 

86 def result_files(self) -> Optional[List[str]]: 

87 """List of extracted file paths upon successful completion.""" 

88 return self._result_files 

89 

90 @property 

91 def is_complete(self) -> bool: 

92 """True if the job is finished (success or failure).""" 

93 return self._state in [DownloadState.FINISHED, DownloadState.FAILED] 

94 

95 def refresh_status(self) -> DownloadState: 

96 """Polls the API for the latest status and updates the internal state.""" 

97 if self.is_complete: 

98 return self._state 

99 

100 logger.debug(f"Checking status for {self.file_name}...") 

101 try: 

102 self._status_details = self._manager.check_status(self.file_name) 

103 self._state = self._status_details.api_status 

104 

105 if self._state == DownloadState.FAILED: 

106 self._error_message = self._status_details.message or "API reported failure." 

107 

108 return self._state 

109 

110 except APIError as e: 

111 # Handle transient API errors during polling without failing the job immediately 

112 logger.warning(f"API Error checking status for {self.file_name}: {e}. Will retry.") 

113 return self._state # Return previous state 

114 except Exception as e: 

115 logger.error(f"Unexpected error checking status for {self.file_name}: {e}") 

116 self._error_message = f"Status check failed: {e}" 

117 self._state = DownloadState.FAILED 

118 return self._state 

119 

120 def wait_for_completion(self, timeout: int = DEFAULT_TIMEOUT, poll_interval: int = DEFAULT_POLL_INTERVAL, cleanup_zip: bool = False) -> List[str]: 

121 """ 

122 Blocks until the download is complete, then downloads and unzips the file. 

123 """ 

124 logger.info(f"Waiting for download job {self.file_name}. Timeout: {timeout}s. Polling every {poll_interval}s.") 

125 start_time = time.time() 

126 

127 while True: 

128 current_state = self.refresh_status() 

129 

130 if current_state == DownloadState.FINISHED: 

131 logger.info(f"Download job finished. Total wait time: {time.time() - start_time:.2f}s. Proceeding to process.") 

132 return self._process_download(cleanup_zip) 

133 

134 if current_state == DownloadState.FAILED: 

135 raise DownloadError(f"Download job failed: {self.error_message}", file_name=self.file_name, status=DownloadState.FAILED.value) 

136 

137 if time.time() - start_time > timeout: 

138 self._state = DownloadState.FAILED 

139 self._error_message = f"Timeout waiting for download job after {timeout} seconds." 

140 raise DownloadError(self._error_message, file_name=self.file_name, status="timeout") 

141 

142 logger.info(f"Job status: {current_state.value}. Waiting...") 

143 time.sleep(poll_interval) 

144 

145 def _process_download(self, cleanup_zip: bool) -> List[str]: 

146 """Downloads the finished file, unzips it, and optionally cleans up the zip.""" 

147 os.makedirs(self.destination_dir, exist_ok=True) 

148 

149 zip_path = os.path.join(self.destination_dir, self.file_name) 

150 

151 # Create a subdirectory for extraction based on the filename (without .zip) 

152 extract_subdir_name = os.path.splitext(self.file_name)[0] 

153 extract_path = os.path.join(self.destination_dir, extract_subdir_name) 

154 

155 try: 

156 # Use the latest file_url from the status details if available, fallback to initial URL 

157 final_url = self._status_details.file_url if self._status_details else self._initial_file_url 

158 

159 if not final_url: 

160 raise DownloadError("File URL is missing, cannot download.", file_name=self.file_name) 

161 

162 self._manager.download_file(final_url, zip_path, self.file_name) 

163 self._result_files = self._manager.unzip_file(zip_path, extract_path) 

164 

165 if cleanup_zip: 

166 logger.info(f"Cleaning up zip file: {zip_path}") 

167 try: 

168 os.remove(zip_path) 

169 except OSError as e: 

170 logger.warning(f"Could not remove zip file {zip_path}: {e}") 

171 

172 return self._result_files 

173 

174 except DownloadError as e: 

175 self._error_message = str(e) 

176 self._state = DownloadState.FAILED 

177 raise 

178 except Exception as e: 

179 self._error_message = f"Error during file download or extraction: {e}" 

180 self._state = DownloadState.FAILED 

181 raise DownloadError(self._error_message, file_name=self.file_name) from e 

182 

183 def __repr__(self) -> str: 

184 return f"<DownloadJob file_name='{self.file_name}' state='{self.state.value}'>"