sdmxabs.fetch

Obtain data from the ABS SDMX API.

  1"""Obtain data from the ABS SDMX API."""
  2
  3from dataclasses import dataclass
  4from typing import Unpack
  5from xml.etree.ElementTree import Element
  6
  7import numpy as np
  8import pandas as pd
  9
 10from sdmxabs.download_cache import GetFileKwargs
 11from sdmxabs.flow_metadata import (
 12    CODE_LIST_ID,
 13    FLOW_NAME,
 14    FlowMetaDict,
 15    build_key,
 16    code_lists,
 17    data_flows,
 18    structure_from_flow_id,
 19)
 20from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml
 21
 22# --- constants
 23FREQUENCY_MAPPING = {
 24    "Annual": "Y",
 25    "Quarterly": "Q",
 26    "Monthly": "M",
 27    "Daily": "D",
 28}
 29
 30XML_KEY_SETS = ("SeriesKey", "Attributes")
 31CODELIST_PACKAGE_TYPE = "codelist"
 32DECODE_EXCLUSIONS = {"UNIT_MULT"}  # Metadata items that should not be decoded
 33
 34
 35@dataclass
 36class MetadataContext:
 37    """Context object for processing XML metadata."""
 38
 39    series_count: int
 40    label_elements: list[str]
 41    meta_items: dict[str, str]
 42    structure: FlowMetaDict
 43    item_count: int
 44
 45
 46# --- private functions
 47def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series:
 48    """Convert series index to PeriodIndex if frequency is recognized."""
 49    if frequency not in FREQUENCY_MAPPING:
 50        return series
 51    freq_code = FREQUENCY_MAPPING[frequency]
 52    series.index = pd.PeriodIndex(series.index, freq=freq_code)
 53    return series
 54
 55
 56def _extract_observation_data(xml_series: Element) -> dict[str, str]:
 57    """Extract observation data from XML series element."""
 58    series_elements = {}
 59    for item in xml_series.findall("gen:Obs", NAME_SPACES):
 60        index_container = item.find("gen:ObsDimension", NAME_SPACES)
 61        value_container = item.find("gen:ObsValue", NAME_SPACES)
 62
 63        index_obs = index_container.attrib.get("value") if index_container is not None else None
 64        value_obs = value_container.attrib.get("value") if value_container is not None else None
 65
 66        if index_obs is not None and value_obs is not None:
 67            series_elements[index_obs] = value_obs
 68
 69    return series_elements
 70
 71
 72def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series:
 73    """Extract observed data from the XML for a given single series."""
 74    series_elements = _extract_observation_data(xml_series)
 75    series: pd.Series = pd.Series(series_elements)
 76
 77    # --- if we can, make the series values numeric
 78    series = series.replace("", np.nan)
 79    try:
 80        series = pd.to_numeric(series)
 81    except ValueError:
 82        # If conversion fails, keep the series as is (it may contain useful non-numeric data)
 83        print(f"Could not convert series {meta.name} to numeric, keeping as is.")
 84
 85    # --- convert to PeriodIndex if frequency is available, and sort the index
 86    frequency = meta.get("FREQ", "")
 87    return _convert_to_period_index(series, frequency).sort_index()
 88
 89
 90def _decode_meta_value(meta_value: str, meta_id: str, structure: FlowMetaDict) -> str:
 91    """Decode a metadata value based on its ID and the relevant ABS codelist."""
 92    # Early return if basic requirements not met
 93    if meta_id not in structure:
 94        return meta_value
 95
 96    dim_config = structure[meta_id]
 97    if CODE_LIST_ID not in dim_config or "package" not in dim_config:
 98        return meta_value
 99
100    # Early return if not a codelist
101    if not dim_config[CODE_LIST_ID] or dim_config["package"] != CODELIST_PACKAGE_TYPE:
102        return meta_value
103
104    # Try to decode using codelist
105    cl = code_lists(dim_config[CODE_LIST_ID])
106    if meta_value in cl and "name" in cl[meta_value]:
107        return cl[meta_value]["name"]
108
109    return meta_value
110
111
112def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None:
113    """Process XML attributes for a given key set."""
114    attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES)
115    if attribs is None:
116        print(f"No {key_set} found in series, skipping.")
117        return
118
119    for item in attribs.findall("gen:Value", NAME_SPACES):
120        # Extract meta_id, meta_value, and decode it - replace with text if missing
121        meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}")
122        meta_value = item.attrib.get(
123            "value", f"missing meta_value {context.series_count}-{context.item_count}"
124        )
125        context.label_elements.append(meta_value)
126        if meta_id not in DECODE_EXCLUSIONS:
127            context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.structure)
128        else:
129            context.meta_items[meta_id] = meta_value
130        context.item_count += 1
131
132
133def _get_series_meta_data(
134    flow_id: str, xml_series: Element, series_count: int, structure: FlowMetaDict
135) -> tuple[str, pd.Series]:
136    """Extract and decode metadata from the XML tree for one given series.
137
138    Args:
139        flow_id (str): The ID of the data flow to which the series belongs.
140        xml_series (Element): The XML element representing the series.
141        series_count (int): The index of the series in the XML tree.
142        structure (FlowMetaDict): Dictionary containing the data structure metadata dimensions and
143            their associated codelist names.
144
145    Returns:
146        tuple[str, pd.Series]: A tuple containing the series label and a Series
147            of metadata items for the series.
148
149    """
150    label_elements = [flow_id]
151    flow_name = data_flows().get(flow_id, {FLOW_NAME: flow_id})[FLOW_NAME]
152    meta_items = {"DATAFLOW": flow_name}
153
154    context = MetadataContext(
155        series_count=series_count,
156        label_elements=label_elements,
157        meta_items=meta_items,
158        structure=structure,
159        item_count=0,
160    )
161
162    for key_set in XML_KEY_SETS:
163        _process_xml_attributes(xml_series, key_set, context)
164
165    series_label = ".".join(context.label_elements)
166    return series_label, pd.Series(context.meta_items).rename(series_label)
167
168
169def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]:
170    """Extract data from the XML tree."""
171    # Get the data dimensions for the flow_id, it provides entree to the metadata
172    structure = structure_from_flow_id(flow_id)
173
174    meta = {}
175    data: dict[str, pd.Series] = {}
176    for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)):
177        if xml_series is None:
178            print("No Series found in XML tree, skipping.")
179            continue
180        label, meta_series = _get_series_meta_data(
181            flow_id,
182            # python typing is not smart enough to know that
183            # xml_series is an ElementTree
184            xml_series,
185            series_count,
186            structure,
187        )
188        series = _get_series_data(xml_series, meta_series)
189        if label in data:
190            # sometimes the SDMX API returns two incomplete series with the same metadata (our label)
191            # my guess: the API may be inconsistent sometimes.
192            series = series.combine_first(data[label])
193        meta[label] = meta_series
194        series.name = label
195        data[label] = series
196
197    return pd.DataFrame(data), pd.DataFrame(meta).T  # data, meta
198
199
200# === public functions ===
201def fetch(
202    flow_id: str,
203    selection: dict[str, str] | None = None,
204    parameters: dict[str, str] | None = None,
205    *,
206    validate: bool = False,
207    **kwargs: Unpack[GetFileKwargs],
208) -> tuple[pd.DataFrame, pd.DataFrame]:
209    """Fetch data from the ABS SDMX API.
210
211    Args:
212        flow_id (str): The ID of the data flow from which to retrieve data items.
213        selection (dict[str, str], optional): A dictionary of dimension=value pairs
214            to select the data items. If None, the ABS fetch request will be for all
215            data items, which can be slow.
216        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
217            to the data request. Supported parameters include:
218            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
219            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
220            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
221            If None, no parameters are applied.
222        validate (bool, optional): If True, validate  against the flow's
223            required dimensions when generating the URL key. Defaults to False.
224        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
225
226    Returns: a tuple of two DataFrames:
227        - The first DataFrame contains the fetched data.
228        - The second DataFrame contains the metadata.
229
230    Raises:
231        HttpError: If there is an issue with the HTTP request.
232        CacheError: If there is an issue with the cache.
233        ValueError: If no XML root is found in the response.
234        ValueError: If invalid parameter values are provided.
235
236    Notes:
237        If the `dims` argument is not valid you should get a CacheError or HttpError.
238        If the `flow_id` is not valid, you should get a ValueError.
239
240    """
241    # --- report the parameters used if requested
242    verbose = kwargs.get("verbose", False)
243    if verbose:
244        print(f"fetch(): {flow_id=} {selection=} {parameters=} {validate=} {kwargs=}")
245
246    # --- validate parameters
247    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
248    if parameters:
249        detail_value = parameters.get("detail")
250        if detail_value and detail_value not in valid_detail_values:
251            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
252
253    # --- prepare to get the XML root from the ABS SDMX API
254    # prefer fresh data every time
255    kwargs["modality"] = kwargs.get("modality", "prefer-url")
256    key = build_key(flow_id, selection, validate=validate)
257
258    # --- build URL with optional parameters
259    url = f"{URL_STEM}/data/{flow_id}/{key}"
260    if parameters:
261        url_params = []
262        if "startPeriod" in parameters:
263            url_params.append(f"startPeriod={parameters['startPeriod']}")
264        if "endPeriod" in parameters:
265            url_params.append(f"endPeriod={parameters['endPeriod']}")
266        if "detail" in parameters:
267            url_params.append(f"detail={parameters['detail']}")
268        if url_params:
269            url += "?" + "&".join(url_params)
270
271    xml_root = acquire_xml(url, **kwargs)
272    return _extract(flow_id, xml_root)
273
274
275if __name__ == "__main__":
276
277    def fetch_test() -> None:
278        """Test the fetch() function from the ABS SDMX API."""
279        flow_id = "WPI"
280        dims = {
281            "MEASURE": "3",
282            "INDEX": "OHRPEB",
283            "SECTOR": "7",
284            "INDUSTRY": "TOT",
285            "TSEST": "10",
286            "REGION": "AUS",
287            "FREQ": "Q",
288        }
289
290        # Test with parameters
291        parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"}
292
293        fetched_data, fetched_meta = fetch(
294            flow_id,
295            selection=dims,
296            parameters=parameters,
297            validate=True,
298            modality="prefer-url",
299        )
300        expected = (16, 1)
301        if fetched_data.shape != expected:
302            print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.")
303        else:
304            print(f"Test passed: {fetched_data.shape=}.")
305        expected_tsest = "Original"
306        if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest:
307            print("Test passed: TSEST has expected value.")
308        else:
309            print(
310                f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}."
311            )
312
313    fetch_test()
FREQUENCY_MAPPING = {'Annual': 'Y', 'Quarterly': 'Q', 'Monthly': 'M', 'Daily': 'D'}
XML_KEY_SETS = ('SeriesKey', 'Attributes')
CODELIST_PACKAGE_TYPE = 'codelist'
DECODE_EXCLUSIONS = {'UNIT_MULT'}
@dataclass
class MetadataContext:
36@dataclass
37class MetadataContext:
38    """Context object for processing XML metadata."""
39
40    series_count: int
41    label_elements: list[str]
42    meta_items: dict[str, str]
43    structure: FlowMetaDict
44    item_count: int

Context object for processing XML metadata.

MetadataContext( series_count: int, label_elements: list[str], meta_items: dict[str, str], structure: dict[str, dict[str, str]], item_count: int)
series_count: int
label_elements: list[str]
meta_items: dict[str, str]
structure: dict[str, dict[str, str]]
item_count: int
def fetch( flow_id: str, selection: dict[str, str] | None = None, parameters: dict[str, str] | None = None, *, validate: bool = False, **kwargs: Unpack[sdmxabs.GetFileKwargs]) -> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
202def fetch(
203    flow_id: str,
204    selection: dict[str, str] | None = None,
205    parameters: dict[str, str] | None = None,
206    *,
207    validate: bool = False,
208    **kwargs: Unpack[GetFileKwargs],
209) -> tuple[pd.DataFrame, pd.DataFrame]:
210    """Fetch data from the ABS SDMX API.
211
212    Args:
213        flow_id (str): The ID of the data flow from which to retrieve data items.
214        selection (dict[str, str], optional): A dictionary of dimension=value pairs
215            to select the data items. If None, the ABS fetch request will be for all
216            data items, which can be slow.
217        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
218            to the data request. Supported parameters include:
219            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
220            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
221            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
222            If None, no parameters are applied.
223        validate (bool, optional): If True, validate  against the flow's
224            required dimensions when generating the URL key. Defaults to False.
225        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
226
227    Returns: a tuple of two DataFrames:
228        - The first DataFrame contains the fetched data.
229        - The second DataFrame contains the metadata.
230
231    Raises:
232        HttpError: If there is an issue with the HTTP request.
233        CacheError: If there is an issue with the cache.
234        ValueError: If no XML root is found in the response.
235        ValueError: If invalid parameter values are provided.
236
237    Notes:
238        If the `dims` argument is not valid you should get a CacheError or HttpError.
239        If the `flow_id` is not valid, you should get a ValueError.
240
241    """
242    # --- report the parameters used if requested
243    verbose = kwargs.get("verbose", False)
244    if verbose:
245        print(f"fetch(): {flow_id=} {selection=} {parameters=} {validate=} {kwargs=}")
246
247    # --- validate parameters
248    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
249    if parameters:
250        detail_value = parameters.get("detail")
251        if detail_value and detail_value not in valid_detail_values:
252            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
253
254    # --- prepare to get the XML root from the ABS SDMX API
255    # prefer fresh data every time
256    kwargs["modality"] = kwargs.get("modality", "prefer-url")
257    key = build_key(flow_id, selection, validate=validate)
258
259    # --- build URL with optional parameters
260    url = f"{URL_STEM}/data/{flow_id}/{key}"
261    if parameters:
262        url_params = []
263        if "startPeriod" in parameters:
264            url_params.append(f"startPeriod={parameters['startPeriod']}")
265        if "endPeriod" in parameters:
266            url_params.append(f"endPeriod={parameters['endPeriod']}")
267        if "detail" in parameters:
268            url_params.append(f"detail={parameters['detail']}")
269        if url_params:
270            url += "?" + "&".join(url_params)
271
272    xml_root = acquire_xml(url, **kwargs)
273    return _extract(flow_id, xml_root)

Fetch data from the ABS SDMX API.

Args: flow_id (str): The ID of the data flow from which to retrieve data items. selection (dict[str, str], optional): A dictionary of dimension=value pairs to select the data items. If None, the ABS fetch request will be for all data items, which can be slow. parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply to the data request. Supported parameters include: - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') - 'endPeriod': End period for data filtering (e.g., '2023-Q4') - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') If None, no parameters are applied. validate (bool, optional): If True, validate against the flow's required dimensions when generating the URL key. Defaults to False. **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().

Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.

Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.

Notes: If the dims argument is not valid you should get a CacheError or HttpError. If the flow_id is not valid, you should get a ValueError.