sdmxabs.fetch
Obtain data from the ABS SDMX API.
1"""Obtain data from the ABS SDMX API.""" 2 3from dataclasses import dataclass 4from typing import Unpack 5from xml.etree.ElementTree import Element 6 7import numpy as np 8import pandas as pd 9 10from sdmxabs.download_cache import GetFileKwargs 11from sdmxabs.flow_metadata import ( 12 FlowMetaDict, 13 build_key, 14 code_lists, 15 data_dimensions, 16 data_flows, 17) 18from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml 19 20# --- constants 21FREQUENCY_MAPPING = { 22 "Annual": "Y", 23 "Quarterly": "Q", 24 "Monthly": "M", 25 "Daily": "D", 26} 27 28XML_KEY_SETS = ("SeriesKey", "Attributes") 29CODELIST_PACKAGE_TYPE = "codelist" 30DECODE_EXCLUSIONS = {"UNIT_MULT"} # Metadata items that should not be decoded 31 32 33@dataclass 34class MetadataContext: 35 """Context object for processing XML metadata.""" 36 37 series_count: int 38 label_elements: list[str] 39 meta_items: dict[str, str] 40 dims: FlowMetaDict 41 item_count: int 42 43 44# --- private functions 45def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series: 46 """Convert series index to PeriodIndex if frequency is recognized.""" 47 if frequency not in FREQUENCY_MAPPING: 48 return series 49 freq_code = FREQUENCY_MAPPING[frequency] 50 series.index = pd.PeriodIndex(series.index, freq=freq_code) 51 return series 52 53 54def _extract_observation_data(xml_series: Element) -> dict[str, str]: 55 """Extract observation data from XML series element.""" 56 series_elements = {} 57 for item in xml_series.findall("gen:Obs", NAME_SPACES): 58 index_container = item.find("gen:ObsDimension", NAME_SPACES) 59 value_container = item.find("gen:ObsValue", NAME_SPACES) 60 61 index_obs = index_container.attrib.get("value") if index_container is not None else None 62 value_obs = value_container.attrib.get("value") if value_container is not None else None 63 64 if index_obs is not None and value_obs is not None: 65 series_elements[index_obs] = value_obs 66 67 return series_elements 68 69 70def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series: 71 """Extract observed data from the XML for a given single series.""" 72 series_elements = _extract_observation_data(xml_series) 73 series: pd.Series = pd.Series(series_elements) 74 75 # --- if we can, make the series values numeric 76 series = series.replace("", np.nan) 77 try: 78 series = pd.to_numeric(series) 79 except ValueError: 80 # If conversion fails, keep the series as is (it may contain useful non-numeric data) 81 print(f"Could not convert series {meta.name} to numeric, keeping as is.") 82 83 # --- convert to PeriodIndex if frequency is available, and sort the index 84 frequency = meta.get("FREQ", "") 85 return _convert_to_period_index(series, frequency).sort_index() 86 87 88def _decode_meta_value(meta_value: str, meta_id: str, dims: FlowMetaDict) -> str: 89 """Decode a metadata value based on its ID and the relevant ABS codelist.""" 90 # Early return if basic requirements not met 91 if meta_id not in dims: 92 return meta_value 93 94 dim_config = dims[meta_id] 95 if "id" not in dim_config or "package" not in dim_config: 96 return meta_value 97 98 # Early return if not a codelist 99 if not dim_config["id"] or dim_config["package"] != CODELIST_PACKAGE_TYPE: 100 return meta_value 101 102 # Try to decode using codelist 103 cl = code_lists(dim_config["id"]) 104 if meta_value in cl and "name" in cl[meta_value]: 105 return cl[meta_value]["name"] 106 107 return meta_value 108 109 110def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None: 111 """Process XML attributes for a given key set.""" 112 attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES) 113 if attribs is None: 114 print(f"No {key_set} found in series, skipping.") 115 return 116 117 for item in attribs.findall("gen:Value", NAME_SPACES): 118 # Extract meta_id, meta_value, and decode it - replace with text if missing 119 meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}") 120 meta_value = item.attrib.get( 121 "value", f"missing meta_value {context.series_count}-{context.item_count}" 122 ) 123 context.label_elements.append(meta_value) 124 if meta_id not in DECODE_EXCLUSIONS: 125 context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.dims) 126 else: 127 context.meta_items[meta_id] = meta_value 128 context.item_count += 1 129 130 131def _get_series_meta_data( 132 flow_id: str, xml_series: Element, series_count: int, dims: FlowMetaDict 133) -> tuple[str, pd.Series]: 134 """Extract and decode metadata from the XML tree for one given series. 135 136 Args: 137 flow_id (str): The ID of the data flow to which the series belongs. 138 xml_series (Element): The XML element representing the series. 139 series_count (int): The index of the series in the XML tree. 140 dims (FlowMetaDict): Dictionary containing metadata dimensions and 141 their associated codelist names. 142 143 Returns: 144 tuple[str, pd.Series]: A tuple containing the series label and a Series 145 of metadata items for the series. 146 147 """ 148 label_elements = [flow_id] 149 flow_name = data_flows().get(flow_id, {"name": flow_id})["name"] 150 meta_items = {"DATAFLOW": flow_name} 151 152 context = MetadataContext( 153 series_count=series_count, 154 label_elements=label_elements, 155 meta_items=meta_items, 156 dims=dims, 157 item_count=0, 158 ) 159 160 for key_set in XML_KEY_SETS: 161 _process_xml_attributes(xml_series, key_set, context) 162 163 series_label = ".".join(context.label_elements) 164 return series_label, pd.Series(context.meta_items).rename(series_label) 165 166 167def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]: 168 """Extract data from the XML tree.""" 169 # Get the data dimensions for the flow_id, it provides entree to the metadata 170 dims = data_dimensions(flow_id) 171 172 meta = {} 173 data: dict[str, pd.Series] = {} 174 for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)): 175 if xml_series is None: 176 print("No Series found in XML tree, skipping.") 177 continue 178 label, meta_series = _get_series_meta_data( 179 flow_id, 180 # python typing is not smart enough to know that 181 # xml_series is an ElementTree 182 xml_series, 183 series_count, 184 dims, 185 ) 186 series = _get_series_data(xml_series, meta_series) 187 if label in data: 188 # sometimes the SDMX API returns two incomplete series with the same metadata (our label) 189 # my guess: the API may be inconsistent sometimes. 190 series = series.combine_first(data[label]) 191 meta[label] = meta_series 192 series.name = label 193 data[label] = series 194 195 return pd.DataFrame(data), pd.DataFrame(meta).T # data, meta 196 197 198# === public functions === 199def fetch( 200 flow_id: str, 201 dims: dict[str, str] | None = None, 202 parameters: dict[str, str] | None = None, 203 *, 204 validate: bool = False, 205 **kwargs: Unpack[GetFileKwargs], 206) -> tuple[pd.DataFrame, pd.DataFrame]: 207 """Fetch data from the ABS SDMX API. 208 209 Args: 210 flow_id (str): The ID of the data flow from which to retrieve data items. 211 dims (dict[str, str], optional): A dictionary of dimensions to select the 212 data items. If None, the ABS fetch request will be for all data items, 213 which can be slow. 214 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 215 to the data request. Supported parameters include: 216 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 217 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 218 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 219 If None, no parameters are applied. 220 validate (bool, optional): If True, validate `dims` against the flow's 221 required dimensions when generating the URL key. Defaults to False. 222 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 223 224 Returns: a tuple of two DataFrames: 225 - The first DataFrame contains the fetched data. 226 - The second DataFrame contains the metadata. 227 228 Raises: 229 HttpError: If there is an issue with the HTTP request. 230 CacheError: If there is an issue with the cache. 231 ValueError: If no XML root is found in the response. 232 ValueError: If invalid parameter values are provided. 233 234 Notes: 235 If the `dims` argument is not valid you should get a CacheError or HttpError. 236 If the `flow_id` is not valid, you should get a ValueError. 237 238 """ 239 # --- report the parameters used if requested 240 verbose = kwargs.get("verbose", False) 241 if verbose: 242 print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}") 243 244 # --- validate parameters 245 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 246 if parameters: 247 detail_value = parameters.get("detail") 248 if detail_value and detail_value not in valid_detail_values: 249 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 250 251 # --- prepare to get the XML root from the ABS SDMX API 252 # prefer fresh data every time 253 kwargs["modality"] = kwargs.get("modality", "prefer-url") 254 key = build_key(flow_id, dims, validate=validate) 255 256 # --- build URL with optional parameters 257 url = f"{URL_STEM}/data/{flow_id}/{key}" 258 if parameters: 259 url_params = [] 260 if "startPeriod" in parameters: 261 url_params.append(f"startPeriod={parameters['startPeriod']}") 262 if "endPeriod" in parameters: 263 url_params.append(f"endPeriod={parameters['endPeriod']}") 264 if "detail" in parameters: 265 url_params.append(f"detail={parameters['detail']}") 266 if url_params: 267 url += "?" + "&".join(url_params) 268 269 xml_root = acquire_xml(url, **kwargs) 270 return _extract(flow_id, xml_root) 271 272 273if __name__ == "__main__": 274 275 def fetch_test() -> None: 276 """Test the fetch() function from the ABS SDMX API.""" 277 flow_id = "WPI" 278 dims = { 279 "MEASURE": "3", 280 "INDEX": "OHRPEB", 281 "SECTOR": "7", 282 "INDUSTRY": "TOT", 283 "TSEST": "10", 284 "REGION": "AUS", 285 "FREQ": "Q", 286 } 287 288 # Test with parameters 289 parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"} 290 291 fetched_data, fetched_meta = fetch( 292 flow_id, 293 dims=dims, 294 parameters=parameters, 295 validate=True, 296 modality="prefer-url", 297 ) 298 expected = (16, 1) 299 if fetched_data.shape != expected: 300 print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.") 301 else: 302 print(f"Test passed: {fetched_data.shape=}.") 303 expected_tsest = "Original" 304 if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest: 305 print("Test passed: TSEST has expected value.") 306 else: 307 print( 308 f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}." 309 ) 310 311 fetch_test()
34@dataclass 35class MetadataContext: 36 """Context object for processing XML metadata.""" 37 38 series_count: int 39 label_elements: list[str] 40 meta_items: dict[str, str] 41 dims: FlowMetaDict 42 item_count: int
Context object for processing XML metadata.
200def fetch( 201 flow_id: str, 202 dims: dict[str, str] | None = None, 203 parameters: dict[str, str] | None = None, 204 *, 205 validate: bool = False, 206 **kwargs: Unpack[GetFileKwargs], 207) -> tuple[pd.DataFrame, pd.DataFrame]: 208 """Fetch data from the ABS SDMX API. 209 210 Args: 211 flow_id (str): The ID of the data flow from which to retrieve data items. 212 dims (dict[str, str], optional): A dictionary of dimensions to select the 213 data items. If None, the ABS fetch request will be for all data items, 214 which can be slow. 215 parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply 216 to the data request. Supported parameters include: 217 - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') 218 - 'endPeriod': End period for data filtering (e.g., '2023-Q4') 219 - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') 220 If None, no parameters are applied. 221 validate (bool, optional): If True, validate `dims` against the flow's 222 required dimensions when generating the URL key. Defaults to False. 223 **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml(). 224 225 Returns: a tuple of two DataFrames: 226 - The first DataFrame contains the fetched data. 227 - The second DataFrame contains the metadata. 228 229 Raises: 230 HttpError: If there is an issue with the HTTP request. 231 CacheError: If there is an issue with the cache. 232 ValueError: If no XML root is found in the response. 233 ValueError: If invalid parameter values are provided. 234 235 Notes: 236 If the `dims` argument is not valid you should get a CacheError or HttpError. 237 If the `flow_id` is not valid, you should get a ValueError. 238 239 """ 240 # --- report the parameters used if requested 241 verbose = kwargs.get("verbose", False) 242 if verbose: 243 print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}") 244 245 # --- validate parameters 246 valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"} 247 if parameters: 248 detail_value = parameters.get("detail") 249 if detail_value and detail_value not in valid_detail_values: 250 raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}") 251 252 # --- prepare to get the XML root from the ABS SDMX API 253 # prefer fresh data every time 254 kwargs["modality"] = kwargs.get("modality", "prefer-url") 255 key = build_key(flow_id, dims, validate=validate) 256 257 # --- build URL with optional parameters 258 url = f"{URL_STEM}/data/{flow_id}/{key}" 259 if parameters: 260 url_params = [] 261 if "startPeriod" in parameters: 262 url_params.append(f"startPeriod={parameters['startPeriod']}") 263 if "endPeriod" in parameters: 264 url_params.append(f"endPeriod={parameters['endPeriod']}") 265 if "detail" in parameters: 266 url_params.append(f"detail={parameters['detail']}") 267 if url_params: 268 url += "?" + "&".join(url_params) 269 270 xml_root = acquire_xml(url, **kwargs) 271 return _extract(flow_id, xml_root)
Fetch data from the ABS SDMX API.
Args:
flow_id (str): The ID of the data flow from which to retrieve data items.
dims (dict[str, str], optional): A dictionary of dimensions to select the
data items. If None, the ABS fetch request will be for all data items,
which can be slow.
parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
to the data request. Supported parameters include:
- 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
- 'endPeriod': End period for data filtering (e.g., '2023-Q4')
- 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
If None, no parameters are applied.
validate (bool, optional): If True, validate dims against the flow's
required dimensions when generating the URL key. Defaults to False.
**kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.
Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.
Notes:
If the dims argument is not valid you should get a CacheError or HttpError.
If the flow_id is not valid, you should get a ValueError.