sdmxabs.fetch_multi

Fetch multiple datasets from the SDMX API.

  1"""Fetch multiple datasets from the SDMX API."""
  2
  3from io import StringIO
  4from typing import Unpack
  5
  6import pandas as pd
  7
  8from sdmxabs.download_cache import CacheError, GetFileKwargs, HttpError
  9from sdmxabs.fetch import fetch
 10
 11# --- private function
 12IndexInformation = tuple[type, str | None]  # (Index type, frequency if PeriodIndex)
 13
 14
 15def _validate_index_compatibility(
 16    data: pd.DataFrame, reference_index_info: IndexInformation | None
 17) -> IndexInformation:
 18    """Validate that the index of the current DataFrame is compatible with the reference index."""
 19    # establish the index information for the current DataFrame
 20    if isinstance(data.index, pd.PeriodIndex):
 21        current_index_info: IndexInformation = (type(data.index), data.index.freqstr)
 22    else:
 23        current_index_info = (type(data.index), None)
 24
 25    # if this is the first DataFrame, set the reference index info
 26    if reference_index_info is None:
 27        reference_index_info = current_index_info
 28
 29    # if this is not the first DataFrame, check for index compatibility
 30    elif current_index_info != reference_index_info:
 31        raise ValueError(
 32            f"Index mismatch: cannot mix {reference_index_info} "
 33            f"with {current_index_info}. "
 34            f"All datasets must have the same index type (e.g., all quarterly or all monthly data)."
 35        )
 36
 37    return reference_index_info
 38
 39
 40def _extract(
 41    wanted: pd.DataFrame,
 42    parameters: dict[str, str] | None,
 43    *,
 44    validate: bool = False,
 45    **kwargs: Unpack[GetFileKwargs],
 46) -> tuple[pd.DataFrame, pd.DataFrame]:  # data / metadata
 47    """Extract the data and metadata for each row in the dimensions DataFrame.
 48
 49    Args:
 50        wanted (pd.DataFrame): DataFrame containing the dimensions to fetch.
 51                               DataFrame cells with NAN values will be ignored.
 52                               The DataFrame must have a populated 'flow_id' column.
 53        parameters (dict[str, str] | None): Additional parameters to pass to the fetch function.
 54                                           If None, no additional parameters are used.
 55        validate (bool, optional): If True, validate `wanted` against the flow's
 56            required dimensions when generating the URL key. Defaults to False.
 57        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
 58
 59    Returns:
 60        tuple[pd.DataFrame, pd.DataFrame]: A DataFrame with the fetched data and
 61                                        a DataFrame with the metadata.
 62
 63    Raises:
 64        ValueError: if any input data is not as expected, or if incompatible
 65                   index types are detected (e.g., mixing quarterly and monthly data).
 66
 67    Note: CacheError and HttpError are raised by the fetch function.
 68          These will be caught and reported to standard output.
 69
 70    """
 71    # --- initial setup - empty return results
 72    return_meta = {}
 73    return_data = {}
 74    counter = 0
 75    reference_index_info: IndexInformation | None = None
 76
 77    # --- loop over the rows of the wanted DataFrame
 78    for _index, row in wanted.iterrows():
 79        # --- get the arguments for the fetch (ignoring NaN values)
 80        row_dict: dict[str, str] = row.dropna().to_dict()
 81        flow_id = row_dict.pop("flow_id", "")
 82        if not flow_id:
 83            # --- if there is no flow_id, we will skip this row
 84            print(f"Skipping row with no flow_id: {row_dict}")
 85            continue
 86
 87        # --- fetch the data and meta data for each row of the selection table
 88        try:
 89            data, meta = fetch(flow_id, dims=row_dict, parameters=parameters, validate=validate, **kwargs)
 90        except (CacheError, HttpError, ValueError) as e:
 91            # --- if there is an error, we will skip this row
 92            print(f"Error fetching {flow_id} with dimensions {row_dict}: {e}")
 93            continue
 94        if data.empty or meta.empty:
 95            # --- this should not happen, but if it does, we will skip this row
 96            print(f"No data for {flow_id} with dimensions {row_dict}")
 97            continue
 98
 99        # --- validate index compatibility - including frequency compatibility for PeriodIndex
100        reference_index_info = _validate_index_compatibility(data, reference_index_info)
101
102        # --- manage duplicates
103        for col in data.columns:
104            counter += 1
105            save_name = col
106            if save_name in return_data:
107                save_name += f"_{counter:03d}"
108            return_data[save_name] = data[col]
109            return_meta[save_name] = meta.loc[col]
110
111    return pd.DataFrame(return_data), pd.DataFrame(return_meta).T
112
113
114# --- public function
115def fetch_multi(
116    wanted: pd.DataFrame,
117    parameters: dict[str, str] | None = None,
118    *,
119    validate: bool = False,
120    **kwargs: Unpack[GetFileKwargs],
121) -> tuple[pd.DataFrame, pd.DataFrame]:
122    """Fetch multiple SDMX datasets based on a DataFrame of desired datasets.
123
124    Args:
125        wanted: A DataFrame with rows for each desired data set (of one or more series).
126                Each row should contain the necessary identifiers to fetch the dataset.
127                The columns will be 'flow_id', plus the ABS dimensions relevant to the flow.
128                The 'flow_id' column is mandatory, and the rest are optional.
129                Note: the DataFrame index is not used in the fetching process.
130        parameters: A dictionary of additional parameters to pass to the fetch function.
131        validate: If True, the function will validate dimensions and values against
132                  the ABS SDMX API codelists. Defaults to False.
133        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
134
135    Returns:
136        A tuple containing two DataFrames:
137        - The first DataFrame contains the fetched data.
138        - The second DataFrame contains metadata about the fetched datasets.
139
140    Raises:
141        ValueError: If the 'flow_id' column is missing from the `wanted` DataFrame.
142
143    Note:
144        CacheError and HttpError are raised by the fetch function.
145        These will be caught and reported to standard output.
146
147    Note:
148        The function validates that all datasets have compatible index types.
149        A ValueError will be raised if incompatible index types are detected
150        (e.g., mixing quarterly and monthly data).
151
152    """
153    # --- report the parameters used if requested
154    verbose = kwargs.get("verbose", False)
155    if verbose:
156        print(f"fetch_multi(): {wanted=}, {parameters=}, {validate=}, {kwargs=}")
157
158    # --- quick sanity checks
159    if wanted.empty:
160        print("wanted DataFrame is empty, returning empty DataFrames.")
161        return pd.DataFrame(), pd.DataFrame()
162    if "flow_id" not in wanted.columns:
163        raise ValueError("The 'flow_id' column is required in the 'wanted' DataFrame.")
164
165    # --- do the work
166    return _extract(wanted, parameters, validate=validate, **kwargs)
167
168
169if __name__ == "__main__":
170
171    def module_test() -> None:
172        """Run a simple test of the module."""
173        wanted_text = """
174        flow_id, MEASURE, INDEX, TSEST, REGION, DATA_ITEM, SECTOR, FREQ
175        CPI,           3, 10001,    10,     50,         -,      -,    Q
176        CPI,           3, 999902,   20,     50,         -,      -,    Q
177        CPI,           3, 999903,   20,     50,         -,      -,    Q
178        ANA_EXP,     DCH,      -,   20,    AUS,       FCE,    PHS,    Q
179        ANA_EXP, PCT_DCH,      -,   20,    AUS,       FCE,    PHS,    Q
180        """
181        wanted = pd.read_csv(StringIO(wanted_text), dtype=str, skipinitialspace=True)
182        parameters = {"startPeriod": "2020-Q1", "endPeriod": "2020-Q4", "detail": "full"}
183        fetched_data, _fetched_meta = fetch_multi(
184            wanted,
185            parameters=parameters,
186            validate=False,
187            modality="prefer-url",
188        )
189        expected = (4, 5)
190        if fetched_data.shape == expected:
191            print(f"Test passed: {fetched_data.shape=}.")
192        else:
193            print(f"Test FAILED: data shape {fetched_data.shape} is unexpected {expected=}.")
194
195    module_test()
IndexInformation = tuple[type, str | None]
def fetch_multi( wanted: pandas.core.frame.DataFrame, parameters: dict[str, str] | None = None, *, validate: bool = False, **kwargs: Unpack[sdmxabs.GetFileKwargs]) -> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
116def fetch_multi(
117    wanted: pd.DataFrame,
118    parameters: dict[str, str] | None = None,
119    *,
120    validate: bool = False,
121    **kwargs: Unpack[GetFileKwargs],
122) -> tuple[pd.DataFrame, pd.DataFrame]:
123    """Fetch multiple SDMX datasets based on a DataFrame of desired datasets.
124
125    Args:
126        wanted: A DataFrame with rows for each desired data set (of one or more series).
127                Each row should contain the necessary identifiers to fetch the dataset.
128                The columns will be 'flow_id', plus the ABS dimensions relevant to the flow.
129                The 'flow_id' column is mandatory, and the rest are optional.
130                Note: the DataFrame index is not used in the fetching process.
131        parameters: A dictionary of additional parameters to pass to the fetch function.
132        validate: If True, the function will validate dimensions and values against
133                  the ABS SDMX API codelists. Defaults to False.
134        **kwargs: Additional keyword arguments passed to the underlying data fetching function.
135
136    Returns:
137        A tuple containing two DataFrames:
138        - The first DataFrame contains the fetched data.
139        - The second DataFrame contains metadata about the fetched datasets.
140
141    Raises:
142        ValueError: If the 'flow_id' column is missing from the `wanted` DataFrame.
143
144    Note:
145        CacheError and HttpError are raised by the fetch function.
146        These will be caught and reported to standard output.
147
148    Note:
149        The function validates that all datasets have compatible index types.
150        A ValueError will be raised if incompatible index types are detected
151        (e.g., mixing quarterly and monthly data).
152
153    """
154    # --- report the parameters used if requested
155    verbose = kwargs.get("verbose", False)
156    if verbose:
157        print(f"fetch_multi(): {wanted=}, {parameters=}, {validate=}, {kwargs=}")
158
159    # --- quick sanity checks
160    if wanted.empty:
161        print("wanted DataFrame is empty, returning empty DataFrames.")
162        return pd.DataFrame(), pd.DataFrame()
163    if "flow_id" not in wanted.columns:
164        raise ValueError("The 'flow_id' column is required in the 'wanted' DataFrame.")
165
166    # --- do the work
167    return _extract(wanted, parameters, validate=validate, **kwargs)

Fetch multiple SDMX datasets based on a DataFrame of desired datasets.

Args: wanted: A DataFrame with rows for each desired data set (of one or more series). Each row should contain the necessary identifiers to fetch the dataset. The columns will be 'flow_id', plus the ABS dimensions relevant to the flow. The 'flow_id' column is mandatory, and the rest are optional. Note: the DataFrame index is not used in the fetching process. parameters: A dictionary of additional parameters to pass to the fetch function. validate: If True, the function will validate dimensions and values against the ABS SDMX API codelists. Defaults to False. **kwargs: Additional keyword arguments passed to the underlying data fetching function.

Returns: A tuple containing two DataFrames: - The first DataFrame contains the fetched data. - The second DataFrame contains metadata about the fetched datasets.

Raises: ValueError: If the 'flow_id' column is missing from the wanted DataFrame.

Note: CacheError and HttpError are raised by the fetch function. These will be caught and reported to standard output.

Note: The function validates that all datasets have compatible index types. A ValueError will be raised if incompatible index types are detected (e.g., mixing quarterly and monthly data).