sdmxabs.fetch

Obtain data from the ABS SDMX API.

  1"""Obtain data from the ABS SDMX API."""
  2
  3from dataclasses import dataclass
  4from typing import Unpack
  5from xml.etree.ElementTree import Element
  6
  7import numpy as np
  8import pandas as pd
  9
 10from sdmxabs.download_cache import GetFileKwargs
 11from sdmxabs.flow_metadata import (
 12    FlowMetaDict,
 13    build_key,
 14    code_lists,
 15    data_dimensions,
 16    data_flows,
 17)
 18from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml
 19
 20# --- constants
 21FREQUENCY_MAPPING = {
 22    "Annual": "Y",
 23    "Quarterly": "Q",
 24    "Monthly": "M",
 25    "Daily": "D",
 26}
 27
 28XML_KEY_SETS = ("SeriesKey", "Attributes")
 29CODELIST_PACKAGE_TYPE = "codelist"
 30DECODE_EXCLUSIONS = {"UNIT_MULT"}  # Metadata items that should not be decoded
 31
 32
 33@dataclass
 34class MetadataContext:
 35    """Context object for processing XML metadata."""
 36
 37    series_count: int
 38    label_elements: list[str]
 39    meta_items: dict[str, str]
 40    dims: FlowMetaDict
 41    item_count: int
 42
 43
 44# --- private functions
 45def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series:
 46    """Convert series index to PeriodIndex if frequency is recognized."""
 47    if frequency not in FREQUENCY_MAPPING:
 48        return series
 49    freq_code = FREQUENCY_MAPPING[frequency]
 50    series.index = pd.PeriodIndex(series.index, freq=freq_code)
 51    return series
 52
 53
 54def _extract_observation_data(xml_series: Element) -> dict[str, str]:
 55    """Extract observation data from XML series element."""
 56    series_elements = {}
 57    for item in xml_series.findall("gen:Obs", NAME_SPACES):
 58        index_container = item.find("gen:ObsDimension", NAME_SPACES)
 59        value_container = item.find("gen:ObsValue", NAME_SPACES)
 60
 61        index_obs = index_container.attrib.get("value") if index_container is not None else None
 62        value_obs = value_container.attrib.get("value") if value_container is not None else None
 63
 64        if index_obs is not None and value_obs is not None:
 65            series_elements[index_obs] = value_obs
 66
 67    return series_elements
 68
 69
 70def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series:
 71    """Extract observed data from the XML for a given single series."""
 72    series_elements = _extract_observation_data(xml_series)
 73    series: pd.Series = pd.Series(series_elements)
 74
 75    # --- if we can, make the series values numeric
 76    series = series.replace("", np.nan)
 77    try:
 78        series = pd.to_numeric(series)
 79    except ValueError:
 80        # If conversion fails, keep the series as is (it may contain useful non-numeric data)
 81        print(f"Could not convert series {meta.name} to numeric, keeping as is.")
 82
 83    # --- convert to PeriodIndex if frequency is available, and sort the index
 84    frequency = meta.get("FREQ", "")
 85    return _convert_to_period_index(series, frequency).sort_index()
 86
 87
 88def _decode_meta_value(meta_value: str, meta_id: str, dims: FlowMetaDict) -> str:
 89    """Decode a metadata value based on its ID and the relevant ABS codelist."""
 90    # Early return if basic requirements not met
 91    if meta_id not in dims:
 92        return meta_value
 93
 94    dim_config = dims[meta_id]
 95    if "id" not in dim_config or "package" not in dim_config:
 96        return meta_value
 97
 98    # Early return if not a codelist
 99    if not dim_config["id"] or dim_config["package"] != CODELIST_PACKAGE_TYPE:
100        return meta_value
101
102    # Try to decode using codelist
103    cl = code_lists(dim_config["id"])
104    if meta_value in cl and "name" in cl[meta_value]:
105        return cl[meta_value]["name"]
106
107    return meta_value
108
109
110def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None:
111    """Process XML attributes for a given key set."""
112    attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES)
113    if attribs is None:
114        print(f"No {key_set} found in series, skipping.")
115        return
116
117    for item in attribs.findall("gen:Value", NAME_SPACES):
118        # Extract meta_id, meta_value, and decode it - replace with text if missing
119        meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}")
120        meta_value = item.attrib.get(
121            "value", f"missing meta_value {context.series_count}-{context.item_count}"
122        )
123        context.label_elements.append(meta_value)
124        if meta_id not in DECODE_EXCLUSIONS:
125            context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.dims)
126        else:
127            context.meta_items[meta_id] = meta_value
128        context.item_count += 1
129
130
131def _get_series_meta_data(
132    flow_id: str, xml_series: Element, series_count: int, dims: FlowMetaDict
133) -> tuple[str, pd.Series]:
134    """Extract and decode metadata from the XML tree for one given series.
135
136    Args:
137        flow_id (str): The ID of the data flow to which the series belongs.
138        xml_series (Element): The XML element representing the series.
139        series_count (int): The index of the series in the XML tree.
140        dims (FlowMetaDict): Dictionary containing metadata dimensions and
141            their associated codelist names.
142
143    Returns:
144        tuple[str, pd.Series]: A tuple containing the series label and a Series
145            of metadata items for the series.
146
147    """
148    label_elements = [flow_id]
149    flow_name = data_flows().get(flow_id, {"name": flow_id})["name"]
150    meta_items = {"DATAFLOW": flow_name}
151
152    context = MetadataContext(
153        series_count=series_count,
154        label_elements=label_elements,
155        meta_items=meta_items,
156        dims=dims,
157        item_count=0,
158    )
159
160    for key_set in XML_KEY_SETS:
161        _process_xml_attributes(xml_series, key_set, context)
162
163    series_label = ".".join(context.label_elements)
164    return series_label, pd.Series(context.meta_items).rename(series_label)
165
166
167def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]:
168    """Extract data from the XML tree."""
169    # Get the data dimensions for the flow_id, it provides entree to the metadata
170    dims = data_dimensions(flow_id)
171
172    meta = {}
173    data: dict[str, pd.Series] = {}
174    for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)):
175        if xml_series is None:
176            print("No Series found in XML tree, skipping.")
177            continue
178        label, meta_series = _get_series_meta_data(
179            flow_id,
180            # python typing is not smart enough to know that
181            # xml_series is an ElementTree
182            xml_series,
183            series_count,
184            dims,
185        )
186        series = _get_series_data(xml_series, meta_series)
187        if label in data:
188            # sometimes the SDMX API returns two incomplete series with the same metadata (our label)
189            # my guess: the API may be inconsistent sometimes.
190            series = series.combine_first(data[label])
191        meta[label] = meta_series
192        series.name = label
193        data[label] = series
194
195    return pd.DataFrame(data), pd.DataFrame(meta).T  # data, meta
196
197
198# === public functions ===
199def fetch(
200    flow_id: str,
201    dims: dict[str, str] | None = None,
202    parameters: dict[str, str] | None = None,
203    *,
204    validate: bool = False,
205    **kwargs: Unpack[GetFileKwargs],
206) -> tuple[pd.DataFrame, pd.DataFrame]:
207    """Fetch data from the ABS SDMX API.
208
209    Args:
210        flow_id (str): The ID of the data flow from which to retrieve data items.
211        dims (dict[str, str], optional): A dictionary of dimensions to select the
212            data items. If None, the ABS fetch request will be for all data items,
213            which can be slow.
214        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
215            to the data request. Supported parameters include:
216            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
217            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
218            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
219            If None, no parameters are applied.
220        validate (bool, optional): If True, validate `dims` against the flow's
221            required dimensions when generating the URL key. Defaults to False.
222        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
223
224    Returns: a tuple of two DataFrames:
225        - The first DataFrame contains the fetched data.
226        - The second DataFrame contains the metadata.
227
228    Raises:
229        HttpError: If there is an issue with the HTTP request.
230        CacheError: If there is an issue with the cache.
231        ValueError: If no XML root is found in the response.
232        ValueError: If invalid parameter values are provided.
233
234    Notes:
235        If the `dims` argument is not valid you should get a CacheError or HttpError.
236        If the `flow_id` is not valid, you should get a ValueError.
237
238    """
239    # --- report the parameters used if requested
240    verbose = kwargs.get("verbose", False)
241    if verbose:
242        print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}")
243
244    # --- validate parameters
245    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
246    if parameters:
247        detail_value = parameters.get("detail")
248        if detail_value and detail_value not in valid_detail_values:
249            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
250
251    # --- prepare to get the XML root from the ABS SDMX API
252    # prefer fresh data every time
253    kwargs["modality"] = kwargs.get("modality", "prefer-url")
254    key = build_key(flow_id, dims, validate=validate)
255
256    # --- build URL with optional parameters
257    url = f"{URL_STEM}/data/{flow_id}/{key}"
258    if parameters:
259        url_params = []
260        if "startPeriod" in parameters:
261            url_params.append(f"startPeriod={parameters['startPeriod']}")
262        if "endPeriod" in parameters:
263            url_params.append(f"endPeriod={parameters['endPeriod']}")
264        if "detail" in parameters:
265            url_params.append(f"detail={parameters['detail']}")
266        if url_params:
267            url += "?" + "&".join(url_params)
268
269    xml_root = acquire_xml(url, **kwargs)
270    return _extract(flow_id, xml_root)
271
272
273if __name__ == "__main__":
274
275    def fetch_test() -> None:
276        """Test the fetch() function from the ABS SDMX API."""
277        flow_id = "WPI"
278        dims = {
279            "MEASURE": "3",
280            "INDEX": "OHRPEB",
281            "SECTOR": "7",
282            "INDUSTRY": "TOT",
283            "TSEST": "10",
284            "REGION": "AUS",
285            "FREQ": "Q",
286        }
287
288        # Test with parameters
289        parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"}
290
291        fetched_data, fetched_meta = fetch(
292            flow_id,
293            dims=dims,
294            parameters=parameters,
295            validate=True,
296            modality="prefer-url",
297        )
298        expected = (16, 1)
299        if fetched_data.shape != expected:
300            print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.")
301        else:
302            print(f"Test passed: {fetched_data.shape=}.")
303        expected_tsest = "Original"
304        if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest:
305            print("Test passed: TSEST has expected value.")
306        else:
307            print(
308                f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}."
309            )
310
311    fetch_test()
FREQUENCY_MAPPING = {'Annual': 'Y', 'Quarterly': 'Q', 'Monthly': 'M', 'Daily': 'D'}
XML_KEY_SETS = ('SeriesKey', 'Attributes')
CODELIST_PACKAGE_TYPE = 'codelist'
DECODE_EXCLUSIONS = {'UNIT_MULT'}
@dataclass
class MetadataContext:
34@dataclass
35class MetadataContext:
36    """Context object for processing XML metadata."""
37
38    series_count: int
39    label_elements: list[str]
40    meta_items: dict[str, str]
41    dims: FlowMetaDict
42    item_count: int

Context object for processing XML metadata.

MetadataContext( series_count: int, label_elements: list[str], meta_items: dict[str, str], dims: dict[str, dict[str, str]], item_count: int)
series_count: int
label_elements: list[str]
meta_items: dict[str, str]
dims: dict[str, dict[str, str]]
item_count: int
def fetch( flow_id: str, dims: dict[str, str] | None = None, parameters: dict[str, str] | None = None, *, validate: bool = False, **kwargs: Unpack[sdmxabs.GetFileKwargs]) -> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
200def fetch(
201    flow_id: str,
202    dims: dict[str, str] | None = None,
203    parameters: dict[str, str] | None = None,
204    *,
205    validate: bool = False,
206    **kwargs: Unpack[GetFileKwargs],
207) -> tuple[pd.DataFrame, pd.DataFrame]:
208    """Fetch data from the ABS SDMX API.
209
210    Args:
211        flow_id (str): The ID of the data flow from which to retrieve data items.
212        dims (dict[str, str], optional): A dictionary of dimensions to select the
213            data items. If None, the ABS fetch request will be for all data items,
214            which can be slow.
215        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
216            to the data request. Supported parameters include:
217            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
218            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
219            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
220            If None, no parameters are applied.
221        validate (bool, optional): If True, validate `dims` against the flow's
222            required dimensions when generating the URL key. Defaults to False.
223        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
224
225    Returns: a tuple of two DataFrames:
226        - The first DataFrame contains the fetched data.
227        - The second DataFrame contains the metadata.
228
229    Raises:
230        HttpError: If there is an issue with the HTTP request.
231        CacheError: If there is an issue with the cache.
232        ValueError: If no XML root is found in the response.
233        ValueError: If invalid parameter values are provided.
234
235    Notes:
236        If the `dims` argument is not valid you should get a CacheError or HttpError.
237        If the `flow_id` is not valid, you should get a ValueError.
238
239    """
240    # --- report the parameters used if requested
241    verbose = kwargs.get("verbose", False)
242    if verbose:
243        print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}")
244
245    # --- validate parameters
246    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
247    if parameters:
248        detail_value = parameters.get("detail")
249        if detail_value and detail_value not in valid_detail_values:
250            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
251
252    # --- prepare to get the XML root from the ABS SDMX API
253    # prefer fresh data every time
254    kwargs["modality"] = kwargs.get("modality", "prefer-url")
255    key = build_key(flow_id, dims, validate=validate)
256
257    # --- build URL with optional parameters
258    url = f"{URL_STEM}/data/{flow_id}/{key}"
259    if parameters:
260        url_params = []
261        if "startPeriod" in parameters:
262            url_params.append(f"startPeriod={parameters['startPeriod']}")
263        if "endPeriod" in parameters:
264            url_params.append(f"endPeriod={parameters['endPeriod']}")
265        if "detail" in parameters:
266            url_params.append(f"detail={parameters['detail']}")
267        if url_params:
268            url += "?" + "&".join(url_params)
269
270    xml_root = acquire_xml(url, **kwargs)
271    return _extract(flow_id, xml_root)

Fetch data from the ABS SDMX API.

Args: flow_id (str): The ID of the data flow from which to retrieve data items. dims (dict[str, str], optional): A dictionary of dimensions to select the data items. If None, the ABS fetch request will be for all data items, which can be slow. parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply to the data request. Supported parameters include: - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') - 'endPeriod': End period for data filtering (e.g., '2023-Q4') - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') If None, no parameters are applied. validate (bool, optional): If True, validate dims against the flow's required dimensions when generating the URL key. Defaults to False. **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().

Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.

Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.

Notes: If the dims argument is not valid you should get a CacheError or HttpError. If the flow_id is not valid, you should get a ValueError.