sdmxabs.fetch
Obtain data from the ABS SDMX API.
1"""Obtain data from the ABS SDMX API.""" 2 3from dataclasses import dataclass 4from typing import Unpack 5from xml.etree.ElementTree import Element 6 7import numpy as np 8import pandas as pd 9 10from sdmxabs.download_cache import GetFileKwargs 11from sdmxabs.flow_metadata import ( 12 CODE_LIST_ID, 13 FLOW_NAME, 14 FlowMetaDict, 15 build_key, 16 code_lists, 17 data_flows, 18 structure_from_flow_id, 19) 20from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml 21 22# --- constants 23FREQUENCY_MAPPING = { 24 "Annual": "Y", 25 "Quarterly": "Q", 26 "Monthly": "M", 27 "Daily": "D", 28} 29 30XML_KEY_SETS = ("SeriesKey", "Attributes") 31CODELIST_PACKAGE_TYPE = "codelist" 32DECODE_EXCLUSIONS = {"UNIT_MULT"} # Metadata items that should not be decoded 33 34 35@dataclass 36class MetadataContext: 37 """Context object for processing XML metadata.""" 38 39 series_count: int 40 label_elements: list[str] 41 meta_items: dict[str, str] 42 structure: FlowMetaDict 43 item_count: int 44 45 46# --- private functions 47def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series: 48 """Convert series index to PeriodIndex if frequency is recognized.""" 49 if frequency not in FREQUENCY_MAPPING: 50 return series 51 freq_code = FREQUENCY_MAPPING[frequency] 52 series.index = pd.PeriodIndex(series.index, freq=freq_code) 53 return series 54 55 56def _extract_observation_data(xml_series: Element) -> dict[str, str]: 57 """Extract observation data from XML series element.""" 58 series_elements = {} 59 for item in xml_series.findall("gen:Obs", NAME_SPACES): 60 index_container = item.find("gen:ObsDimension", NAME_SPACES) 61 value_container = item.find("gen:ObsValue", NAME_SPACES) 62 63 index_obs = index_container.attrib.get("value") if index_container is not None else None 64 value_obs = value_container.attrib.get("value") if value_container is not None else None 65 66 if index_obs is not None and value_obs is not None: 67 series_elements[index_obs] = value_obs 68 69 return series_elements 70 71 72def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series: 73 """Extract observed data from the XML for a given single series.""" 74 series_elements = _extract_observation_data(xml_series) 75 series: pd.Series = pd.Series(series_elements) 76 77 # --- if we can, make the series values numeric 78 series = series.replace("", np.nan) 79 try: 80 series = pd.to_numeric(series) 81 except ValueError: 82 # If conversion fails, keep the series as is (it may contain useful non-numeric data) 83 print(f"Could not convert series {meta.name} to numeric, keeping as is.") 84 85 # --- convert to PeriodIndex if frequency is available, and sort the index 86 frequency = meta.get("FREQ", "") 87 return _convert_to_period_index(series, frequency).sort_index() 88 89 90def _decode_meta_value(meta_value: str, meta_id: str, structure: FlowMetaDict) -> str: 91 """Decode a metadata value based on its ID and the relevant ABS codelist.""" 92 # Early return if basic requirements not met 93 if meta_id not in structure: 94 return meta_value 95 96 dim_config = structure[meta_id] 97 if CODE_LIST_ID not in dim_config or "package" not in dim_config: 98 return meta_value 99 100 # Early return if not a codelist 101 if not dim_config[CODE_LIST_ID] or dim_config["package"] != CODELIST_PACKAGE_TYPE: 102 return meta_value 103 104 # Try to decode using codelist 105 cl = code_lists(dim_config[CODE_LIST_ID]) 106 if meta_value in cl and "name" in cl[meta_value]: 107 return cl[meta_value]["name"] 108 109 return meta_value 110 111 112def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None: 113 """Process XML attributes for a given key set.""" 114 attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES) 115 if attribs is None: 116 print(f"No {key_set} found in series, skipping.") 117 return 118 119 for item in attribs.findall("gen:Value", NAME_SPACES): 120 # Extract meta_id, meta_value, and decode it - replace with text if missing 121 meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}") 122 meta_value = item.attrib.get( 123 "value", f"missing meta_value {context.series_count}-{context.item_count}" 124 ) 125 context.label_elements.append(meta_value) 126 if meta_id not in DECODE_EXCLUSIONS: 127 context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.structure) 128 else: 129 context.meta_items[meta_id] = meta_value 130 context.item_count += 1 131 132 133def _get_series_meta_data( 134 flow_id: str, xml_series: Element, series_count: int, structure: FlowMetaDict 135) -> tuple[str, pd.Series]: 136 """Extract and decode metadata from the XML tree for one given series. 137 138 Args: 139 flow_id (str): The ID of the data flow to which the series belongs. 140 xml_series (Element): The XML element representing the series. 141 series_count (int): The index of the series in the XML tree. 142 structure (FlowMetaDict): Dictionary containing the data structure metadata dimensions and 143 their associated codelist names. 144 145 Returns: 146 tuple[str, pd.Series]: A tuple containing the series label and a Series 147 of metadata items for the series. 148 149 """ 150 label_elements = [flow_id] 151 flow_name = data_flows().get(flow_id, {FLOW_NAME: flow_id})[FLOW_NAME] 152 meta_items = {"DATAFLOW": flow_name} 153 154 context = MetadataContext( 155 series_count=series_count, 156 label_elements=label_elements, 157 meta_items=meta_items, 158 structure=structure, 159 item_count=0, 160 ) 161 162 for key_set in XML_KEY_SETS: 163 _process_xml_attributes(xml_series, key_set, context) 164 165 series_label = ".".join(context.label_elements) 166 return series_label, pd.Series(context.meta_items).rename(series_label) 167 168 169def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]: 170 """Extract data from the XML tree.""" 171 # Get the data dimensions for the flow_id, it provides entree to the metadata 172 structure = structure_from_flow_id(flow_id) 173 174 meta = {} 175 data: dict[str, pd.Series] = {} 176 for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)): 177 if xml_series is None: 178 print("No Series found in XML tree, skipping.") 179 continue 180 label, meta_series = _get_series_meta_data( 181 flow_id, 182 # python typing is not smart enough to know that 183 # xml_series is an ElementTree 184 xml_series, 185 series_count, 186 structure, 187 ) 188 series = _get_series_data(xml_series, meta_series) 189 if label in data: 190 # sometimes the SDMX API returns two incomplete series with the same metadata (our label) 191 # my guess: the API may be inconsistent sometimes. 192 series = series.combine_first(data[label]) 193 meta[label] = meta_series 194 series.name = label 195 data[label] = series 196 197 return pd.DataFrame(data), pd.DataFrame(meta).T # data, meta 198 199 200# === public functions === 201def fetch( 202 flow_id: str, 203 selection: dict[str, str] | None = None, 204 parameters: dict[str, str] | None = None, 205 *, 206 validate: bool = False, 207 **kwargs: Unpack[GetFileKwargs], 208) -> tuple[pd.DataFrame, pd.DataFrame]: 209 """Fetch data from the ABS SDMX API. 210 211 Args: 212 flow_id (str): The ID of the data flow from which to retrieve data items. 213 selection (dict[str, str], optional): A dictionary of dimension=value pairs 214 to select the data items. If None, the ABS fetch request will be for all 215 data items, which can be slow. 216 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 217 to the data request. Supported parameters include: 218 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 219 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 220 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 221 If None, no parameters are applied. 222 validate (bool, optional): If True, validate against the flow's 223 required dimensions when generating the URL key. Defaults to False. 224 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 225 226 Returns: a tuple of two DataFrames: 227 - The first DataFrame contains the fetched data. 228 - The second DataFrame contains the metadata. 229 230 Raises: 231 HttpError: If there is an issue with the HTTP request. 232 CacheError: If there is an issue with the cache. 233 ValueError: If no XML root is found in the response. 234 ValueError: If invalid parameter values are provided. 235 236 Notes: 237 If the `dims` argument is not valid you should get a CacheError or HttpError. 238 If the `flow_id` is not valid, you should get a ValueError. 239 240 """ 241 # --- report the parameters used if requested 242 verbose = kwargs.get("verbose", False) 243 if verbose: 244 print(f"fetch(): {flow_id=} {selection=} {parameters=} {validate=} {kwargs=}") 245 246 # --- validate parameters 247 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 248 if parameters: 249 detail_value = parameters.get("detail") 250 if detail_value and detail_value not in valid_detail_values: 251 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 252 253 # --- prepare to get the XML root from the ABS SDMX API 254 # prefer fresh data every time 255 kwargs["modality"] = kwargs.get("modality", "prefer-url") 256 key = build_key(flow_id, selection, validate=validate) 257 258 # --- build URL with optional parameters 259 url = f"{URL_STEM}/data/{flow_id}/{key}" 260 if parameters: 261 url_params = [] 262 if "startPeriod" in parameters: 263 url_params.append(f"startPeriod={parameters['startPeriod']}") 264 if "endPeriod" in parameters: 265 url_params.append(f"endPeriod={parameters['endPeriod']}") 266 if "detail" in parameters: 267 url_params.append(f"detail={parameters['detail']}") 268 if url_params: 269 url += "?" + "&".join(url_params) 270 271 xml_root = acquire_xml(url, **kwargs) 272 return _extract(flow_id, xml_root) 273 274 275if __name__ == "__main__": 276 277 def fetch_test() -> None: 278 """Test the fetch() function from the ABS SDMX API.""" 279 flow_id = "WPI" 280 dims = { 281 "MEASURE": "3", 282 "INDEX": "OHRPEB", 283 "SECTOR": "7", 284 "INDUSTRY": "TOT", 285 "TSEST": "10", 286 "REGION": "AUS", 287 "FREQ": "Q", 288 } 289 290 # Test with parameters 291 parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"} 292 293 fetched_data, fetched_meta = fetch( 294 flow_id, 295 selection=dims, 296 parameters=parameters, 297 validate=True, 298 modality="prefer-url", 299 ) 300 expected = (16, 1) 301 if fetched_data.shape != expected: 302 print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.") 303 else: 304 print(f"Test passed: {fetched_data.shape=}.") 305 expected_tsest = "Original" 306 if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest: 307 print("Test passed: TSEST has expected value.") 308 else: 309 print( 310 f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}." 311 ) 312 313 fetch_test()
36@dataclass 37class MetadataContext: 38 """Context object for processing XML metadata.""" 39 40 series_count: int 41 label_elements: list[str] 42 meta_items: dict[str, str] 43 structure: FlowMetaDict 44 item_count: int
Context object for processing XML metadata.
202def fetch( 203 flow_id: str, 204 selection: dict[str, str] | None = None, 205 parameters: dict[str, str] | None = None, 206 *, 207 validate: bool = False, 208 **kwargs: Unpack[GetFileKwargs], 209) -> tuple[pd.DataFrame, pd.DataFrame]: 210 """Fetch data from the ABS SDMX API. 211 212 Args: 213 flow_id (str): The ID of the data flow from which to retrieve data items. 214 selection (dict[str, str], optional): A dictionary of dimension=value pairs 215 to select the data items. If None, the ABS fetch request will be for all 216 data items, which can be slow. 217 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 218 to the data request. Supported parameters include: 219 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 220 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 221 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 222 If None, no parameters are applied. 223 validate (bool, optional): If True, validate against the flow's 224 required dimensions when generating the URL key. Defaults to False. 225 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 226 227 Returns: a tuple of two DataFrames: 228 - The first DataFrame contains the fetched data. 229 - The second DataFrame contains the metadata. 230 231 Raises: 232 HttpError: If there is an issue with the HTTP request. 233 CacheError: If there is an issue with the cache. 234 ValueError: If no XML root is found in the response. 235 ValueError: If invalid parameter values are provided. 236 237 Notes: 238 If the `dims` argument is not valid you should get a CacheError or HttpError. 239 If the `flow_id` is not valid, you should get a ValueError. 240 241 """ 242 # --- report the parameters used if requested 243 verbose = kwargs.get("verbose", False) 244 if verbose: 245 print(f"fetch(): {flow_id=} {selection=} {parameters=} {validate=} {kwargs=}") 246 247 # --- validate parameters 248 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 249 if parameters: 250 detail_value = parameters.get("detail") 251 if detail_value and detail_value not in valid_detail_values: 252 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 253 254 # --- prepare to get the XML root from the ABS SDMX API 255 # prefer fresh data every time 256 kwargs["modality"] = kwargs.get("modality", "prefer-url") 257 key = build_key(flow_id, selection, validate=validate) 258 259 # --- build URL with optional parameters 260 url = f"{URL_STEM}/data/{flow_id}/{key}" 261 if parameters: 262 url_params = [] 263 if "startPeriod" in parameters: 264 url_params.append(f"startPeriod={parameters['startPeriod']}") 265 if "endPeriod" in parameters: 266 url_params.append(f"endPeriod={parameters['endPeriod']}") 267 if "detail" in parameters: 268 url_params.append(f"detail={parameters['detail']}") 269 if url_params: 270 url += "?" + "&".join(url_params) 271 272 xml_root = acquire_xml(url, **kwargs) 273 return _extract(flow_id, xml_root)
Fetch data from the ABS SDMX API.
Args: flow_id (str): The ID of the data flow from which to retrieve data items. selection (dict[str, str], optional): A dictionary of dimension=value pairs to select the data items. If None, the ABS fetch request will be for all data items, which can be slow. parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply to the data request. Supported parameters include: - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') - 'endPeriod': End period for data filtering (e.g., '2023-Q4') - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') If None, no parameters are applied. validate (bool, optional): If True, validate against the flow's required dimensions when generating the URL key. Defaults to False. **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.
Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.
Notes:
If the dims argument is not valid you should get a CacheError or HttpError.
If the flow_id is not valid, you should get a ValueError.