sdmxabs.fetch

Obtain data from the ABS SDMX API.

  1"""Obtain data from the ABS SDMX API."""
  2
  3from dataclasses import dataclass
  4from typing import Unpack
  5from xml.etree.ElementTree import Element
  6
  7import numpy as np
  8import pandas as pd
  9
 10from sdmxabs.download_cache import GetFileKwargs
 11from sdmxabs.flow_metadata import (
 12    FlowMetaDict,
 13    build_key,
 14    code_lists,
 15    data_dimensions,
 16    data_flows,
 17)
 18from sdmxabs.xml_base import NAME_SPACES, URL_STEM, acquire_xml
 19
 20# --- constants
 21FREQUENCY_MAPPING = {
 22    "Annual": "Y",
 23    "Quarterly": "Q",
 24    "Monthly": "M",
 25    "Daily": "D",
 26}
 27
 28XML_KEY_SETS = ("SeriesKey", "Attributes")
 29CODELIST_PACKAGE_TYPE = "codelist"
 30
 31
 32@dataclass
 33class MetadataContext:
 34    """Context object for processing XML metadata."""
 35
 36    series_count: int
 37    label_elements: list[str]
 38    meta_items: dict[str, str]
 39    dims: FlowMetaDict
 40    item_count: int
 41
 42
 43# --- private functions
 44def _convert_to_period_index(series: pd.Series, frequency: str) -> pd.Series:
 45    """Convert series index to PeriodIndex if frequency is recognized."""
 46    if frequency not in FREQUENCY_MAPPING:
 47        return series
 48    freq_code = FREQUENCY_MAPPING[frequency]
 49    series.index = pd.PeriodIndex(series.index, freq=freq_code)
 50    return series
 51
 52
 53def _extract_observation_data(xml_series: Element) -> dict[str, str]:
 54    """Extract observation data from XML series element."""
 55    series_elements = {}
 56    for item in xml_series.findall("gen:Obs", NAME_SPACES):
 57        index_container = item.find("gen:ObsDimension", NAME_SPACES)
 58        value_container = item.find("gen:ObsValue", NAME_SPACES)
 59
 60        index_obs = index_container.attrib.get("value") if index_container is not None else None
 61        value_obs = value_container.attrib.get("value") if value_container is not None else None
 62
 63        if index_obs is not None and value_obs is not None:
 64            series_elements[index_obs] = value_obs
 65
 66    return series_elements
 67
 68
 69def _get_series_data(xml_series: Element, meta: pd.Series) -> pd.Series:
 70    """Extract observed data from the XML for a given single series."""
 71    series_elements = _extract_observation_data(xml_series)
 72    series: pd.Series = pd.Series(series_elements)
 73
 74    # --- if we can, make the series values numeric
 75    series = series.replace("", np.nan)
 76    try:
 77        series = pd.to_numeric(series)
 78    except ValueError:
 79        # If conversion fails, keep the series as is (it may contain useful non-numeric data)
 80        print(f"Could not convert series {meta.name} to numeric, keeping as is.")
 81
 82    # --- convert to PeriodIndex if frequency is available, and sort the index
 83    frequency = meta.get("FREQ", "")
 84    return _convert_to_period_index(series, frequency).sort_index()
 85
 86
 87def _decode_meta_value(meta_value: str, meta_id: str, dims: FlowMetaDict) -> str:
 88    """Decode a metadata value based on its ID and the relevant ABS codelist."""
 89    # Early return if basic requirements not met
 90    if meta_id not in dims:
 91        return meta_value
 92
 93    dim_config = dims[meta_id]
 94    if "id" not in dim_config or "package" not in dim_config:
 95        return meta_value
 96
 97    # Early return if not a codelist
 98    if not dim_config["id"] or dim_config["package"] != CODELIST_PACKAGE_TYPE:
 99        return meta_value
100
101    # Try to decode using codelist
102    cl = code_lists(dim_config["id"])
103    if meta_value in cl and "name" in cl[meta_value]:
104        return cl[meta_value]["name"]
105
106    return meta_value
107
108
109def _process_xml_attributes(xml_series: Element, key_set: str, context: MetadataContext) -> None:
110    """Process XML attributes for a given key set."""
111    attribs = xml_series.find(f"gen:{key_set}", NAME_SPACES)
112    if attribs is None:
113        print(f"No {key_set} found in series, skipping.")
114        return
115
116    for item in attribs.findall("gen:Value", NAME_SPACES):
117        # Extract meta_id, meta_value, and decode it - replace with text if missing
118        meta_id = item.attrib.get("id", f"missing meta_id {context.series_count}-{context.item_count}")
119        meta_value = item.attrib.get(
120            "value", f"missing meta_value {context.series_count}-{context.item_count}"
121        )
122        context.label_elements.append(meta_value)
123        context.meta_items[meta_id] = _decode_meta_value(meta_value, meta_id, context.dims)
124        context.item_count += 1
125
126
127def _get_series_meta_data(
128    flow_id: str, xml_series: Element, series_count: int, dims: FlowMetaDict
129) -> tuple[str, pd.Series]:
130    """Extract and decode metadata from the XML tree for one given series.
131
132    Args:
133        flow_id (str): The ID of the data flow to which the series belongs.
134        xml_series (Element): The XML element representing the series.
135        series_count (int): The index of the series in the XML tree.
136        dims (FlowMetaDict): Dictionary containing metadata dimensions and
137            their associated codelist names.
138
139    Returns:
140        tuple[str, pd.Series]: A tuple containing the series label and a Series
141            of metadata items for the series.
142
143    """
144    label_elements = [flow_id]
145    flow_name = data_flows().get(flow_id, {"name": flow_id})["name"]
146    meta_items = {"DATAFLOW": flow_name}
147
148    context = MetadataContext(
149        series_count=series_count,
150        label_elements=label_elements,
151        meta_items=meta_items,
152        dims=dims,
153        item_count=0,
154    )
155
156    for key_set in XML_KEY_SETS:
157        _process_xml_attributes(xml_series, key_set, context)
158
159    series_label = ".".join(context.label_elements)
160    return series_label, pd.Series(context.meta_items).rename(series_label)
161
162
163def _extract(flow_id: str, tree: Element) -> tuple[pd.DataFrame, pd.DataFrame]:
164    """Extract data from the XML tree."""
165    # Get the data dimensions for the flow_id, it provides entree to the metadata
166    dims = data_dimensions(flow_id)
167
168    meta = {}
169    data: dict[str, pd.Series] = {}
170    for series_count, xml_series in enumerate(tree.findall(".//gen:Series", NAME_SPACES)):
171        if xml_series is None:
172            print("No Series found in XML tree, skipping.")
173            continue
174        label, meta_series = _get_series_meta_data(
175            flow_id,
176            # python typing is not smart enough to know that
177            # xml_series is an ElementTree
178            xml_series,
179            series_count,
180            dims,
181        )
182        series = _get_series_data(xml_series, meta_series)
183        if label in data:
184            # sometimes the SDMX API returns two incomplete series with the same metadata (our label)
185            # my guess: the API may be inconsistent sometimes.
186            series = series.combine_first(data[label])
187        meta[label] = meta_series
188        series.name = label
189        data[label] = series
190
191    return pd.DataFrame(data), pd.DataFrame(meta).T  # data, meta
192
193
194# === public functions ===
195def fetch(
196    flow_id: str,
197    dims: dict[str, str] | None = None,
198    parameters: dict[str, str] | None = None,
199    *,
200    validate: bool = False,
201    **kwargs: Unpack[GetFileKwargs],
202) -> tuple[pd.DataFrame, pd.DataFrame]:
203    """Fetch data from the ABS SDMX API.
204
205    Args:
206        flow_id (str): The ID of the data flow from which to retrieve data items.
207        dims (dict[str, str], optional): A dictionary of dimensions to select the
208            data items. If None, the ABS fetch request will be for all data items,
209            which can be slow.
210        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
211            to the data request. Supported parameters include:
212            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
213            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
214            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
215            If None, no parameters are applied.
216        validate (bool, optional): If True, validate `dims` against the flow's
217            required dimensions when generating the URL key. Defaults to False.
218        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
219
220    Returns: a tuple of two DataFrames:
221        - The first DataFrame contains the fetched data.
222        - The second DataFrame contains the metadata.
223
224    Raises:
225        HttpError: If there is an issue with the HTTP request.
226        CacheError: If there is an issue with the cache.
227        ValueError: If no XML root is found in the response.
228        ValueError: If invalid parameter values are provided.
229
230    Notes:
231        If the `dims` argument is not valid you should get a CacheError or HttpError.
232        If the `flow_id` is not valid, you should get a ValueError.
233
234    """
235    # --- report the parameters used if requested
236    verbose = kwargs.get("verbose", False)
237    if verbose:
238        print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}")
239
240    # --- validate parameters
241    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
242    if parameters:
243        detail_value = parameters.get("detail")
244        if detail_value and detail_value not in valid_detail_values:
245            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
246
247    # --- prepare to get the XML root from the ABS SDMX API
248    # prefer fresh data every time
249    kwargs["modality"] = kwargs.get("modality", "prefer-url")
250    key = build_key(
251        flow_id,
252        dims,
253        validate=validate,
254    )
255
256    # --- build URL with optional parameters
257    url = f"{URL_STEM}/data/{flow_id}/{key}"
258    if parameters:
259        url_params = []
260        if "startPeriod" in parameters:
261            url_params.append(f"startPeriod={parameters['startPeriod']}")
262        if "endPeriod" in parameters:
263            url_params.append(f"endPeriod={parameters['endPeriod']}")
264        if "detail" in parameters:
265            url_params.append(f"detail={parameters['detail']}")
266        if url_params:
267            url += "?" + "&".join(url_params)
268
269    xml_root = acquire_xml(url, **kwargs)
270    return _extract(flow_id, xml_root)
271
272
273if __name__ == "__main__":
274
275    def fetch_test() -> None:
276        """Test the fetch() function from the ABS SDMX API."""
277        flow_id = "WPI"
278        dims = {
279            "MEASURE": "3",
280            "INDEX": "OHRPEB",
281            "SECTOR": "7",
282            "INDUSTRY": "TOT",
283            "TSEST": "10",
284            "REGION": "AUS",
285            "FREQ": "Q",
286        }
287
288        # Test with parameters
289        parameters = {"startPeriod": "2020-Q1", "endPeriod": "2023-Q4", "detail": "full"}
290
291        fetched_data, fetched_meta = fetch(
292            flow_id,
293            dims=dims,
294            parameters=parameters,
295            validate=True,
296            modality="prefer-url",
297        )
298        expected = (16, 1)
299        if fetched_data.shape != expected:
300            print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.")
301        else:
302            print(f"Test passed: {fetched_data.shape=}.")
303        expected_tsest = "Original"
304        if ("TSEST" in fetched_meta.columns) and fetched_meta["TSEST"].iloc[0] == expected_tsest:
305            print("Test passed: TSEST has expected value.")
306        else:
307            print(
308                f"Test FAILED: TSEST value {fetched_meta['TSEST'].iloc[0]} is unexpected {expected_tsest=}."
309            )
310
311    fetch_test()
FREQUENCY_MAPPING = {'Annual': 'Y', 'Quarterly': 'Q', 'Monthly': 'M', 'Daily': 'D'}
XML_KEY_SETS = ('SeriesKey', 'Attributes')
CODELIST_PACKAGE_TYPE = 'codelist'
@dataclass
class MetadataContext:
33@dataclass
34class MetadataContext:
35    """Context object for processing XML metadata."""
36
37    series_count: int
38    label_elements: list[str]
39    meta_items: dict[str, str]
40    dims: FlowMetaDict
41    item_count: int

Context object for processing XML metadata.

MetadataContext( series_count: int, label_elements: list[str], meta_items: dict[str, str], dims: dict[str, dict[str, str]], item_count: int)
series_count: int
label_elements: list[str]
meta_items: dict[str, str]
dims: dict[str, dict[str, str]]
item_count: int
def fetch( flow_id: str, dims: dict[str, str] | None = None, parameters: dict[str, str] | None = None, *, validate: bool = False, **kwargs: Unpack[sdmxabs.GetFileKwargs]) -> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
196def fetch(
197    flow_id: str,
198    dims: dict[str, str] | None = None,
199    parameters: dict[str, str] | None = None,
200    *,
201    validate: bool = False,
202    **kwargs: Unpack[GetFileKwargs],
203) -> tuple[pd.DataFrame, pd.DataFrame]:
204    """Fetch data from the ABS SDMX API.
205
206    Args:
207        flow_id (str): The ID of the data flow from which to retrieve data items.
208        dims (dict[str, str], optional): A dictionary of dimensions to select the
209            data items. If None, the ABS fetch request will be for all data items,
210            which can be slow.
211        parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply
212            to the data request. Supported parameters include:
213            - 'startPeriod': Start period for data filtering (e.g., '2020-Q1')
214            - 'endPeriod': End period for data filtering (e.g., '2023-Q4')
215            - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata')
216            If None, no parameters are applied.
217        validate (bool, optional): If True, validate `dims` against the flow's
218            required dimensions when generating the URL key. Defaults to False.
219        **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().
220
221    Returns: a tuple of two DataFrames:
222        - The first DataFrame contains the fetched data.
223        - The second DataFrame contains the metadata.
224
225    Raises:
226        HttpError: If there is an issue with the HTTP request.
227        CacheError: If there is an issue with the cache.
228        ValueError: If no XML root is found in the response.
229        ValueError: If invalid parameter values are provided.
230
231    Notes:
232        If the `dims` argument is not valid you should get a CacheError or HttpError.
233        If the `flow_id` is not valid, you should get a ValueError.
234
235    """
236    # --- report the parameters used if requested
237    verbose = kwargs.get("verbose", False)
238    if verbose:
239        print(f"fetch(): {flow_id=} {dims=} {parameters=} {validate=} {kwargs=}")
240
241    # --- validate parameters
242    valid_detail_values = {"full", "dataonly", "serieskeysonly", "nodata"}
243    if parameters:
244        detail_value = parameters.get("detail")
245        if detail_value and detail_value not in valid_detail_values:
246            raise ValueError(f"Invalid detail value '{detail_value}'. Must be one of: {valid_detail_values}")
247
248    # --- prepare to get the XML root from the ABS SDMX API
249    # prefer fresh data every time
250    kwargs["modality"] = kwargs.get("modality", "prefer-url")
251    key = build_key(
252        flow_id,
253        dims,
254        validate=validate,
255    )
256
257    # --- build URL with optional parameters
258    url = f"{URL_STEM}/data/{flow_id}/{key}"
259    if parameters:
260        url_params = []
261        if "startPeriod" in parameters:
262            url_params.append(f"startPeriod={parameters['startPeriod']}")
263        if "endPeriod" in parameters:
264            url_params.append(f"endPeriod={parameters['endPeriod']}")
265        if "detail" in parameters:
266            url_params.append(f"detail={parameters['detail']}")
267        if url_params:
268            url += "?" + "&".join(url_params)
269
270    xml_root = acquire_xml(url, **kwargs)
271    return _extract(flow_id, xml_root)

Fetch data from the ABS SDMX API.

Args: flow_id (str): The ID of the data flow from which to retrieve data items. dims (dict[str, str], optional): A dictionary of dimensions to select the data items. If None, the ABS fetch request will be for all data items, which can be slow. parameters (dict[str, str], optional): A dictionary of SDMX parameters to apply to the data request. Supported parameters include: - 'startPeriod': Start period for data filtering (e.g., '2020-Q1') - 'endPeriod': End period for data filtering (e.g., '2023-Q4') - 'detail': Level of detail ('full', 'dataonly', 'serieskeysonly', 'nodata') If None, no parameters are applied. validate (bool, optional): If True, validate dims against the flow's required dimensions when generating the URL key. Defaults to False. **kwargs (GetFileKwargs): Additional keyword arguments passed to acquire_xml().

Returns: a tuple of two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains the metadata.

Raises: HttpError: If there is an issue with the HTTP request. CacheError: If there is an issue with the cache. ValueError: If no XML root is found in the response. ValueError: If invalid parameter values are provided.

Notes: If the dims argument is not valid you should get a CacheError or HttpError. If the flow_id is not valid, you should get a ValueError.