sdmxabs.fetch
Obtain data from the ABS SDMX API.
1"""Obtain data from the ABS SDMX API.""" 2 3from dataclasses import dataclass 4from typing import Unpack 5from xml.etree.ElementTree import Element 6 7import numpy as np 8import pandas as pd 9 10from sdmxabs.download_cache import GetFileKwargs 11from sdmxabs.flow_metadata import ( 12 FlowMetaDict, 13 build_key, 14 code_lists, 15 data_dimensions, 16 data_flows, 17) 18from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml 19 20# --- constants 21FREQUENCY_MAPPING = { 22 "Annual": "Y", 23 "Quarterly": "Q", 24 "Monthly": "M", 25 "Daily": "D", 26} 27 28XML_KEY_SETS = ("SeriesKey", "Attributes") 29CODELIST_PACKAGE_TYPE = "codelist" 30 31 32@dataclass 33class MetadataContext: 34 """Context object for processing XML metadata.""" 35 36 series_count: int 37 label_elements: list[str] 38 meta_items: dict[str, str] 39 dims: FlowMetaDict 40 item_count: int 41 42 43# --- private functions 44def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series: 45 """Convert series index to PeriodIndex if frequency is recognized.""" 46 if frequency not in FREQUENCY_MAPPING: 47 return series 48 freq_code = FREQUENCY_MAPPING[frequency] 49 series.index = pd.PeriodIndex(series.index, freq=freq_code) 50 return series 51 52 53def _extract_observation_data(xml_series: Element) -> dict[str, str]: 54 """Extract observation data from XML series element.""" 55 series_elements = {} 56 for item in xml_series.findall("gen:Obs", NAME_SPACES): 57 index_container = item.find("gen:ObsDimension", NAME_SPACES) 58 value_container = item.find("gen:ObsValue", NAME_SPACES) 59 60 index_obs = index_container.attrib.get("value") if index_container is not None else None 61 value_obs = value_container.attrib.get("value") if value_container is not None else None 62 63 if index_obs is not None and value_obs is not None: 64 series_elements[index_obs] = value_obs 65 66 return series_elements 67 68 69def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series: 70 """Extract observed data from the XML for a given single series.""" 71 series_elements = _extract_observation_data(xml_series) 72 series: pd.Series = pd.Series(series_elements) 73 74 # --- if we can, make the series values numeric 75 series = series.replace("", np.nan) 76 try: 77 series = pd.to_numeric(series) 78 except ValueError: 79 # If conversion fails, keep the series as is (it may contain useful non-numeric data) 80 print(f"Could not convert series {meta.name} to numeric, keeping as is.") 81 82 # --- convert to PeriodIndex if frequency is available, and sort the index 83 frequency = meta.get("FREQ", "") 84 return _convert_to_period_index(series, frequency).sort_index() 85 86 87def _decode_meta_value(meta_value: str, meta_id: str, dims: FlowMetaDict) -> str: 88 """Decode a metadata value based on its ID and the relevant ABS codelist.""" 89 # Early return if basic requirements not met 90 if meta_id not in dims: 91 return meta_value 92 93 dim_config = dims[meta_id] 94 if "id" not in dim_config or "package" not in dim_config: 95 return meta_value 96 97 # Early return if not a codelist 98 if not dim_config["id"] or dim_config["package"] != CODELIST_PACKAGE_TYPE: 99 return meta_value 100 101 # Try to decode using codelist 102 cl = code_lists(dim_config["id"]) 103 if meta_value in cl and "name" in cl[meta_value]: 104 return cl[meta_value]["name"] 105 106 return meta_value 107 108 109def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None: 110 """Process XML attributes for a given key set.""" 111 attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES) 112 if attribs is None: 113 print(f"No {key_set} found in series, skipping.") 114 return 115 116 for item in attribs.findall("gen:Value", NAME_SPACES): 117 # Extract meta_id, meta_value, and decode it - replace with text if missing 118 meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}") 119 meta_value = item.attrib.get( 120 "value", f"missing meta_value {context.series_count}-{context.item_count}" 121 ) 122 context.label_elements.append(meta_value) 123 context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.dims) 124 context.item_count += 1 125 126 127def _get_series_meta_data( 128 flow_id: str, xml_series: Element, series_count: int, dims: FlowMetaDict 129) -> tuple[str, pd.Series]: 130 """Extract and decode metadata from the XML tree for one given series. 131 132 Args: 133 flow_id (str): The ID of the data flow to which the series belongs. 134 xml_series (Element): The XML element representing the series. 135 series_count (int): The index of the series in the XML tree. 136 dims (FlowMetaDict): Dictionary containing metadata dimensions and 137 their associated codelist names. 138 139 Returns: 140 tuple[str, pd.Series]: A tuple containing the series label and a Series 141 of metadata items for the series. 142 143 """ 144 label_elements = [flow_id] 145 flow_name = data_flows().get(flow_id, {"name": flow_id})["name"] 146 meta_items = {"DATAFLOW": flow_name} 147 148 context = MetadataContext( 149 series_count=series_count, 150 label_elements=label_elements, 151 meta_items=meta_items, 152 dims=dims, 153 item_count=0, 154 ) 155 156 for key_set in XML_KEY_SETS: 157 _process_xml_attributes(xml_series, key_set, context) 158 159 series_label = ".".join(context.label_elements) 160 return series_label, pd.Series(context.meta_items).rename(series_label) 161 162 163def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]: 164 """Extract data from the XML tree.""" 165 # Get the data dimensions for the flow_id, it provides entree to the metadata 166 dims = data_dimensions(flow_id) 167 168 meta = {} 169 data: dict[str, pd.Series] = {} 170 for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)): 171 if xml_series is None: 172 print("No Series found in XML tree, skipping.") 173 continue 174 label, meta_series = _get_series_meta_data( 175 flow_id, 176 # python typing is not smart enough to know that 177 # xml_series is an ElementTree 178 xml_series, 179 series_count, 180 dims, 181 ) 182 series = _get_series_data(xml_series, meta_series) 183 if label in data: 184 # sometimes the SDMX API returns two incomplete series with the same metadata (our label) 185 # my guess: the API may be inconsistent sometimes. 186 series = series.combine_first(data[label]) 187 meta[label] = meta_series 188 series.name = label 189 data[label] = series 190 191 return pd.DataFrame(data), pd.DataFrame(meta).T # data, meta 192 193 194# === public functions === 195def fetch( 196 flow_id: str, 197 dims: dict[str, str] | None = None, 198 parameters: dict[str, str] | None = None, 199 *, 200 validate: bool = False, 201 **kwargs: Unpack[GetFileKwargs], 202) -> tuple[pd.DataFrame, pd.DataFrame]: 203 """Fetch data from the ABS SDMX API. 204 205 Args: 206 flow_id (str): The ID of the data flow from which to retrieve data items. 207 dims (dict[str, str], optional): A dictionary of dimensions to select the 208 data items. If None, the ABS fetch request will be for all data items, 209 which can be slow. 210 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 211 to the data request. Supported parameters include: 212 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 213 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 214 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 215 If None, no parameters are applied. 216 validate (bool, optional): If True, validate `dims` against the flow's 217 required dimensions when generating the URL key. Defaults to False. 218 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 219 220 Returns: a tuple of two DataFrames: 221 - The first DataFrame contains the fetched data. 222 - The second DataFrame contains the metadata. 223 224 Raises: 225 HttpError: If there is an issue with the HTTP request. 226 CacheError: If there is an issue with the cache. 227 ValueError: If no XML root is found in the response. 228 ValueError: If invalid parameter values are provided. 229 230 Notes: 231 If the `dims` argument is not valid you should get a CacheError or HttpError. 232 If the `flow_id` is not valid, you should get a ValueError. 233 234 """ 235 # --- report the parameters used if requested 236 verbose = kwargs.get("verbose", False) 237 if verbose: 238 print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}") 239 240 # --- validate parameters 241 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 242 if parameters: 243 detail_value = parameters.get("detail") 244 if detail_value and detail_value not in valid_detail_values: 245 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 246 247 # --- prepare to get the XML root from the ABS SDMX API 248 # prefer fresh data every time 249 kwargs["modality"] = kwargs.get("modality", "prefer-url") 250 key = build_key( 251 flow_id, 252 dims, 253 validate=validate, 254 ) 255 256 # --- build URL with optional parameters 257 url = f"{URL_STEM}/data/{flow_id}/{key}" 258 if parameters: 259 url_params = [] 260 if "startPeriod" in parameters: 261 url_params.append(f"startPeriod={parameters['startPeriod']}") 262 if "endPeriod" in parameters: 263 url_params.append(f"endPeriod={parameters['endPeriod']}") 264 if "detail" in parameters: 265 url_params.append(f"detail={parameters['detail']}") 266 if url_params: 267 url += "?" + "&".join(url_params) 268 269 xml_root = acquire_xml(url, **kwargs) 270 return _extract(flow_id, xml_root) 271 272 273if __name__ == "__main__": 274 275 def fetch_test() -> None: 276 """Test the fetch() function from the ABS SDMX API.""" 277 flow_id = "WPI" 278 dims = { 279 "MEASURE": "3", 280 "INDEX": "OHRPEB", 281 "SECTOR": "7", 282 "INDUSTRY": "TOT", 283 "TSEST": "10", 284 "REGION": "AUS", 285 "FREQ": "Q", 286 } 287 288 # Test with parameters 289 parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"} 290 291 fetched_data, fetched_meta = fetch( 292 flow_id, 293 dims=dims, 294 parameters=parameters, 295 validate=True, 296 modality="prefer-url", 297 ) 298 expected = (16, 1) 299 if fetched_data.shape != expected: 300 print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.") 301 else: 302 print(f"Test passed: {fetched_data.shape=}.") 303 expected_tsest = "Original" 304 if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest: 305 print("Test passed: TSEST has expected value.") 306 else: 307 print( 308 f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}." 309 ) 310 311 fetch_test()
33@dataclass 34class MetadataContext: 35 """Context object for processing XML metadata.""" 36 37 series_count: int 38 label_elements: list[str] 39 meta_items: dict[str, str] 40 dims: FlowMetaDict 41 item_count: int
Context object for processing XML metadata.
196def fetch( 197 flow_id: str, 198 dims: dict[str, str] | None = None, 199 parameters: dict[str, str] | None = None, 200 *, 201 validate: bool = False, 202 **kwargs: Unpack[GetFileKwargs], 203) -> tuple[pd.DataFrame, pd.DataFrame]: 204 """Fetch data from the ABS SDMX API. 205 206 Args: 207 flow_id (str): The ID of the data flow from which to retrieve data items. 208 dims (dict[str, str], optional): A dictionary of dimensions to select the 209 data items. If None, the ABS fetch request will be for all data items, 210 which can be slow. 211 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 212 to the data request. Supported parameters include: 213 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 214 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 215 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 216 If None, no parameters are applied. 217 validate (bool, optional): If True, validate `dims` against the flow's 218 required dimensions when generating the URL key. Defaults to False. 219 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 220 221 Returns: a tuple of two DataFrames: 222 - The first DataFrame contains the fetched data. 223 - The second DataFrame contains the metadata. 224 225 Raises: 226 HttpError: If there is an issue with the HTTP request. 227 CacheError: If there is an issue with the cache. 228 ValueError: If no XML root is found in the response. 229 ValueError: If invalid parameter values are provided. 230 231 Notes: 232 If the `dims` argument is not valid you should get a CacheError or HttpError. 233 If the `flow_id` is not valid, you should get a ValueError. 234 235 """ 236 # --- report the parameters used if requested 237 verbose = kwargs.get("verbose", False) 238 if verbose: 239 print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}") 240 241 # --- validate parameters 242 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 243 if parameters: 244 detail_value = parameters.get("detail") 245 if detail_value and detail_value not in valid_detail_values: 246 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 247 248 # --- prepare to get the XML root from the ABS SDMX API 249 # prefer fresh data every time 250 kwargs["modality"] = kwargs.get("modality", "prefer-url") 251 key = build_key( 252 flow_id, 253 dims, 254 validate=validate, 255 ) 256 257 # --- build URL with optional parameters 258 url = f"{URL_STEM}/data/{flow_id}/{key}" 259 if parameters: 260 url_params = [] 261 if "startPeriod" in parameters: 262 url_params.append(f"startPeriod={parameters['startPeriod']}") 263 if "endPeriod" in parameters: 264 url_params.append(f"endPeriod={parameters['endPeriod']}") 265 if "detail" in parameters: 266 url_params.append(f"detail={parameters['detail']}") 267 if url_params: 268 url += "?" + "&".join(url_params) 269 270 xml_root = acquire_xml(url, **kwargs) 271 return _extract(flow_id, xml_root)
Fetch data from the ABS SDMX API.
Args:
flow_id (str): The ID of the data flow from which to retrieve data items.
dims (dict[str, str], optional): A dictionary of dimensions to select the
data items. If None, the ABS fetch request will be for all data items,
which can be slow.
parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
to the data request. Supported parameters include:
- 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
- 'endPeriod': End period for data filtering (e.g., '2023-Q4')
- 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
If None, no parameters are applied.
validate (bool, optional): If True, validate dims against the flow's
required dimensions when generating the URL key. Defaults to False.
**kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.
Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.
Notes:
If the dims argument is not valid you should get a CacheError or HttpError.
If the flow_id is not valid, you should get a ValueError.