Coverage for src/usaspending/download/job.py: 45%
98 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
1# src/usaspending/download/job.py
2"""
3This module defines the DownloadJob class, which encapsulates the lifecycle of a single
4award data download task from the USASpending API. It handles polling for status updates,
5downloading the completed file, and extracting its contents.
6"""
8from __future__ import annotations
9import time
10import os
11from typing import TYPE_CHECKING, Dict, Any, Optional, List
13from ..exceptions import DownloadError, APIError
14from ..logging_config import USASpendingLogger
15from ..models.download import DownloadStatus, DownloadState
17if TYPE_CHECKING:
18 from .manager import DownloadManager
20logger = USASpendingLogger.get_logger(__name__)
22class DownloadJob:
23 """
24 Represents a single award data download task, managing its lifecycle (polling, downloading, extraction).
26 This class is designed to be instantiated by the `DownloadManager` after a download
27 request has been successfully queued with the USASpending API. It provides methods
28 to monitor the status of the download job, wait for its completion, and process
29 the downloaded data (downloading the zip file and extracting its contents).
31 Attributes:
32 file_name (str): The unique identifier for the download job, typically a filename
33 provided by the USASpending API.
34 destination_dir (str): The local directory where the downloaded zip file will be
35 saved and its contents extracted. Defaults to the current
36 working directory if not specified.
37 request_details (Optional[Dict[str, Any]]): A dictionary containing details of
38 the original download request, as
39 returned by the API.
40 state (DownloadState): The current state of the download job (e.g., PENDING,
41 RUNNING, FINISHED, FAILED). This is a read-only property.
42 status_details (Optional[DownloadStatus]): A `DownloadStatus` object containing
43 the latest detailed status information
44 retrieved from the API. This is a
45 read-only property.
46 error_message (Optional[str]): A message describing the error if the job
47 transitions to a FAILED state. This is a
48 read-only property.
49 result_files (Optional[List[str]]): A list of absolute paths to the files
50 extracted from the downloaded zip archive
51 upon successful completion. This is a
52 read-only property.
54 """
56 DEFAULT_POLL_INTERVAL = 30 # seconds
57 DEFAULT_TIMEOUT = 1800 # 30 minutes
59 def __init__(self, manager: DownloadManager, file_name: str, initial_file_url: Optional[str], request_details: Optional[Dict[str, Any]], destination_dir: Optional[str] = None):
60 self._manager = manager
61 self.file_name = file_name
62 self._initial_file_url = initial_file_url # URL provided at initiation
63 self.request_details = request_details
64 self.destination_dir = destination_dir or os.getcwd()
66 self._status_details: Optional[DownloadStatus] = None
67 self._state: DownloadState = DownloadState.PENDING
68 self._result_files: Optional[List[str]] = None
69 self._error_message: Optional[str] = None
71 @property
72 def state(self) -> DownloadState:
73 """Current state of the job."""
74 return self._state
76 @property
77 def status_details(self) -> Optional[DownloadStatus]:
78 """Detailed status information from the API (cached)."""
79 return self._status_details
81 @property
82 def error_message(self) -> Optional[str]:
83 return self._error_message
85 @property
86 def result_files(self) -> Optional[List[str]]:
87 """List of extracted file paths upon successful completion."""
88 return self._result_files
90 @property
91 def is_complete(self) -> bool:
92 """True if the job is finished (success or failure)."""
93 return self._state in [DownloadState.FINISHED, DownloadState.FAILED]
95 def refresh_status(self) -> DownloadState:
96 """Polls the API for the latest status and updates the internal state."""
97 if self.is_complete:
98 return self._state
100 logger.debug(f"Checking status for {self.file_name}...")
101 try:
102 self._status_details = self._manager.check_status(self.file_name)
103 self._state = self._status_details.api_status
105 if self._state == DownloadState.FAILED:
106 self._error_message = self._status_details.message or "API reported failure."
108 return self._state
110 except APIError as e:
111 # Handle transient API errors during polling without failing the job immediately
112 logger.warning(f"API Error checking status for {self.file_name}: {e}. Will retry.")
113 return self._state # Return previous state
114 except Exception as e:
115 logger.error(f"Unexpected error checking status for {self.file_name}: {e}")
116 self._error_message = f"Status check failed: {e}"
117 self._state = DownloadState.FAILED
118 return self._state
120 def wait_for_completion(self, timeout: int = DEFAULT_TIMEOUT, poll_interval: int = DEFAULT_POLL_INTERVAL, cleanup_zip: bool = False) -> List[str]:
121 """
122 Blocks until the download is complete, then downloads and unzips the file.
123 """
124 logger.info(f"Waiting for download job {self.file_name}. Timeout: {timeout}s. Polling every {poll_interval}s.")
125 start_time = time.time()
127 while True:
128 current_state = self.refresh_status()
130 if current_state == DownloadState.FINISHED:
131 logger.info(f"Download job finished. Total wait time: {time.time() - start_time:.2f}s. Proceeding to process.")
132 return self._process_download(cleanup_zip)
134 if current_state == DownloadState.FAILED:
135 raise DownloadError(f"Download job failed: {self.error_message}", file_name=self.file_name, status=DownloadState.FAILED.value)
137 if time.time() - start_time > timeout:
138 self._state = DownloadState.FAILED
139 self._error_message = f"Timeout waiting for download job after {timeout} seconds."
140 raise DownloadError(self._error_message, file_name=self.file_name, status="timeout")
142 logger.info(f"Job status: {current_state.value}. Waiting...")
143 time.sleep(poll_interval)
145 def _process_download(self, cleanup_zip: bool) -> List[str]:
146 """Downloads the finished file, unzips it, and optionally cleans up the zip."""
147 os.makedirs(self.destination_dir, exist_ok=True)
149 zip_path = os.path.join(self.destination_dir, self.file_name)
151 # Create a subdirectory for extraction based on the filename (without .zip)
152 extract_subdir_name = os.path.splitext(self.file_name)[0]
153 extract_path = os.path.join(self.destination_dir, extract_subdir_name)
155 try:
156 # Use the latest file_url from the status details if available, fallback to initial URL
157 final_url = self._status_details.file_url if self._status_details else self._initial_file_url
159 if not final_url:
160 raise DownloadError("File URL is missing, cannot download.", file_name=self.file_name)
162 self._manager.download_file(final_url, zip_path, self.file_name)
163 self._result_files = self._manager.unzip_file(zip_path, extract_path)
165 if cleanup_zip:
166 logger.info(f"Cleaning up zip file: {zip_path}")
167 try:
168 os.remove(zip_path)
169 except OSError as e:
170 logger.warning(f"Could not remove zip file {zip_path}: {e}")
172 return self._result_files
174 except DownloadError as e:
175 self._error_message = str(e)
176 self._state = DownloadState.FAILED
177 raise
178 except Exception as e:
179 self._error_message = f"Error during file download or extraction: {e}"
180 self._state = DownloadState.FAILED
181 raise DownloadError(self._error_message, file_name=self.file_name) from e
183 def __repr__(self) -> str:
184 return f"<DownloadJob file_name='{self.file_name}' state='{self.state.value}'>"