sdmxabs.fetch_multi

Fetch multiple datasets from the SDMX API.

  1"""Fetch multiple datasets from the SDMX API."""
  2
  3from io import StringIO
  4from typing import Unpack
  5
  6import pandas as pd
  7
  8from sdmxabs.download_cache import CacheError, GetFileKwargs, HttpError
  9from sdmxabs.fetch import fetch
 10
 11# --- private function
 12IndexInformation = tuple[type, str | None]  # (Index type, frequency if PeriodIndex)
 13
 14
 15def _validate_index_compatibility(
 16    data: pd.DataFrame, reference_index_info: IndexInformation | None
 17) -> IndexInformation:
 18    """Validate that the index of the current DataFrame is compatible with the reference index."""
 19    # establish the index information for the current DataFrame
 20    if isinstance(data.index, pd.PeriodIndex):
 21        current_index_info: IndexInformation = (type(data.index), data.index.freqstr)
 22    else:
 23        current_index_info = (type(data.index), None)
 24
 25    # if this is the first DataFrame, set the reference index info
 26    if reference_index_info is None:
 27        reference_index_info = current_index_info
 28
 29    # if this is not the first DataFrame, check for index compatibility
 30    elif current_index_info != reference_index_info:
 31        raise ValueError(
 32            f"Index mismatch: cannot mix {reference_index_info} "
 33            f"with {current_index_info}. "
 34            f"All datasets must have the same index type (e.g., all quarterly or all monthly data)."
 35        )
 36
 37    return reference_index_info
 38
 39
 40def _extract(
 41    wanted: pd.DataFrame,
 42    parameters: dict[str, str] | None,
 43    *,
 44    validate: bool = False,
 45    **kwargs: Unpack[GetFileKwargs],
 46) -> tuple[pd.DataFrame, pd.DataFrame]:  # data / metadata
 47    """Extract the data and metadata for each row in the dimensions DataFrame.
 48
 49    Args:
 50        wanted (pd.DataFrame): DataFrame containing the dimensions to fetch.
 51                               DataFrame cells with NAN values will be ignored.
 52                               The DataFrame must have a populated 'flow_id' column.
 53        parameters (dict[str, str] | None): Additional parameters to pass to the fetch function.
 54                                           If None, no additional parameters are used.
 55        validate (bool, optional): If True, validate `wanted` against the flow's
 56            required dimensions when generating the URL key. Defaults to False.
 57        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
 58
 59    Returns:
 60        tuple[pd.DataFrame, pd.DataFrame]: A DataFrame with the fetched data and
 61                                        a DataFrame with the metadata.
 62
 63    Raises:
 64        ValueError: if any input data is not as expected, or if incompatible
 65                   index types are detected (e.g., mixing quarterly and monthly data).
 66
 67    Note: CacheError and HttpError are raised by the fetch function.
 68          These will be caught and reported to standard output.
 69
 70    """
 71    # --- initial setup - empty return results
 72    return_meta = {}
 73    return_data = {}
 74    counter = 0
 75    reference_index_info: IndexInformation | None = None
 76
 77    # --- loop over the rows of the wanted DataFrame
 78    for _index, row in wanted.iterrows():
 79        # --- get the arguments for the fetch (ignoring NaN values)
 80        row_dict: dict[str, str] = row.dropna().to_dict()
 81        flow_id = row_dict.pop("flow_id", "")
 82        if not flow_id:
 83            # --- if there is no flow_id, we will skip this row
 84            print(f"Skipping row with no flow_id: {row_dict}")
 85            continue
 86
 87        # --- fetch the data and meta data for each row of the selection table
 88        try:
 89            data, meta = fetch(
 90                flow_id, selection=row_dict, parameters=parameters, validate=validate, **kwargs
 91            )
 92        except (CacheError, HttpError, ValueError) as e:
 93            # --- if there is an error, we will skip this row
 94            print(f"Error fetching {flow_id} with dimensions {row_dict}: {e}")
 95            continue
 96        if data.empty or meta.empty:
 97            # --- this should not happen, but if it does, we will skip this row
 98            print(f"No data for {flow_id} with dimensions {row_dict}")
 99            continue
100
101        # --- validate index compatibility - including frequency compatibility for PeriodIndex
102        reference_index_info = _validate_index_compatibility(data, reference_index_info)
103
104        # --- manage duplicates
105        for col in data.columns:
106            counter += 1
107            save_name = col
108            if save_name in return_data:
109                save_name += f"_{counter:03d}"
110            return_data[save_name] = data[col]
111            return_meta[save_name] = meta.loc[col]
112
113    return pd.DataFrame(return_data), pd.DataFrame(return_meta).T
114
115
116# --- public function
117def fetch_multi(
118    wanted: pd.DataFrame,
119    parameters: dict[str, str] | None = None,
120    *,
121    validate: bool = False,
122    **kwargs: Unpack[GetFileKwargs],
123) -> tuple[pd.DataFrame, pd.DataFrame]:
124    """Fetch multiple SDMX datasets based on a DataFrame of desired datasets.
125
126    Args:
127        wanted: A DataFrame with rows for each desired data set (of one or more series).
128                Each row should contain the necessary identifiers to fetch the dataset.
129                The columns will be 'flow_id', plus the ABS dimensions relevant to the flow.
130                The 'flow_id' column is mandatory, and the rest are optional.
131                Note: the DataFrame index is not used in the fetching process.
132        parameters: A dictionary of additional parameters to pass to the fetch function.
133        validate: If True, the function will validate dimensions and values against
134                  the ABS SDMX API codelists. Defaults to False.
135        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
136
137    Returns:
138        A tuple containing two DataFrames:
139        - The first DataFrame contains the fetched data.
140        - The second DataFrame contains metadata about the fetched datasets.
141
142    Raises:
143        ValueError: If the 'flow_id' column is missing from the `wanted` DataFrame.
144
145    Note:
146        CacheError and HttpError are raised by the fetch function.
147        These will be caught and reported to standard output.
148
149    Note:
150        The function validates that all datasets have compatible index types.
151        A ValueError will be raised if incompatible index types are detected
152        (e.g., mixing quarterly and monthly data).
153
154    """
155    # --- report the parameters used if requested
156    verbose = kwargs.get("verbose", False)
157    if verbose:
158        print(f"fetch_multi(): {wanted=}, {parameters=}, {validate=}, {kwargs=}")
159
160    # --- quick sanity checks
161    if wanted.empty:
162        print("wanted DataFrame is empty, returning empty DataFrames.")
163        return pd.DataFrame(), pd.DataFrame()
164    if "flow_id" not in wanted.columns:
165        raise ValueError("The 'flow_id' column is required in the 'wanted' DataFrame.")
166
167    # --- do the work
168    return _extract(wanted, parameters, validate=validate, **kwargs)
169
170
171if __name__ == "__main__":
172
173    def module_test() -> None:
174        """Run a simple test of the module."""
175        wanted_text = """
176        flow_id, MEASURE, INDEX, TSEST, REGION, DATA_ITEM, SECTOR, FREQ
177        CPI,           3, 10001,    10,     50,         -,      -,    Q
178        CPI,           3, 999902,   20,     50,         -,      -,    Q
179        CPI,           3, 999903,   20,     50,         -,      -,    Q
180        ANA_EXP,     DCH,      -,   20,    AUS,       FCE,    PHS,    Q
181        ANA_EXP, PCT_DCH,      -,   20,    AUS,       FCE,    PHS,    Q
182        """
183        wanted = pd.read_csv(StringIO(wanted_text), dtype=str, skipinitialspace=True)
184        parameters = {"startPeriod": "2020-Q1", "endPeriod": "2020-Q4", "detail": "full"}
185        fetched_data, _fetched_meta = fetch_multi(
186            wanted,
187            parameters=parameters,
188            validate=False,
189            modality="prefer-url",
190        )
191        expected = (4, 5)
192        if fetched_data.shape == expected:
193            print(f"Test passed: {fetched_data.shape=}.")
194        else:
195            print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.")
196
197    module_test()
IndexInformation = tuple[type, str | None]
def fetch_multi( wanted: pandas.core.frame.DataFrame, parameters: dict[str, str] | None = None, *, validate: bool = False, **kwargs: Unpack[sdmxabs.GetFileKwargs]) -> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
118def fetch_multi(
119    wanted: pd.DataFrame,
120    parameters: dict[str, str] | None = None,
121    *,
122    validate: bool = False,
123    **kwargs: Unpack[GetFileKwargs],
124) -> tuple[pd.DataFrame, pd.DataFrame]:
125    """Fetch multiple SDMX datasets based on a DataFrame of desired datasets.
126
127    Args:
128        wanted: A DataFrame with rows for each desired data set (of one or more series).
129                Each row should contain the necessary identifiers to fetch the dataset.
130                The columns will be 'flow_id', plus the ABS dimensions relevant to the flow.
131                The 'flow_id' column is mandatory, and the rest are optional.
132                Note: the DataFrame index is not used in the fetching process.
133        parameters: A dictionary of additional parameters to pass to the fetch function.
134        validate: If True, the function will validate dimensions and values against
135                  the ABS SDMX API codelists. Defaults to False.
136        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
137
138    Returns:
139        A tuple containing two DataFrames:
140        - The first DataFrame contains the fetched data.
141        - The second DataFrame contains metadata about the fetched datasets.
142
143    Raises:
144        ValueError: If the 'flow_id' column is missing from the `wanted` DataFrame.
145
146    Note:
147        CacheError and HttpError are raised by the fetch function.
148        These will be caught and reported to standard output.
149
150    Note:
151        The function validates that all datasets have compatible index types.
152        A ValueError will be raised if incompatible index types are detected
153        (e.g., mixing quarterly and monthly data).
154
155    """
156    # --- report the parameters used if requested
157    verbose = kwargs.get("verbose", False)
158    if verbose:
159        print(f"fetch_multi(): {wanted=}, {parameters=}, {validate=}, {kwargs=}")
160
161    # --- quick sanity checks
162    if wanted.empty:
163        print("wanted DataFrame is empty, returning empty DataFrames.")
164        return pd.DataFrame(), pd.DataFrame()
165    if "flow_id" not in wanted.columns:
166        raise ValueError("The 'flow_id' column is required in the 'wanted' DataFrame.")
167
168    # --- do the work
169    return _extract(wanted, parameters, validate=validate, **kwargs)

Fetch multiple SDMX datasets based on a DataFrame of desired datasets.

Args: wanted: A DataFrame with rows for each desired data set (of one or more series). Each row should contain the necessary identifiers to fetch the dataset. The columns will be 'flow_id', plus the ABS dimensions relevant to the flow. The 'flow_id' column is mandatory, and the rest are optional. Note: the DataFrame index is not used in the fetching process. parameters: A dictionary of additional parameters to pass to the fetch function. validate: If True, the function will validate dimensions and values against the ABS SDMX API codelists. Defaults to False. **kwargs: Additional keyword arguments passed to the underlying data fetching function.

Returns: A tuple containing two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains metadata about the fetched datasets.

Raises: ValueError: If the 'flow_id' column is missing from the wanted DataFrame.

Note: CacheError and HttpError are raised by the fetch function. These will be caught and reported to standard output.

Note: The function validates that all datasets have compatible index types. A ValueError will be raised if incompatible index types are detected (e.g., mixing quarterly and monthly data).