Source code for pqfilt.core

"""Core ``read()`` function -- the main public API of pqfilt."""

from __future__ import annotations

import logging
from glob import glob
from pathlib import Path
from typing import Any

import pandas as pd
import pyarrow.dataset as ds

from ._operators import apply_filter_operator, validate_operator
from ._parser import (
    AndExpr,
    ExprNode,
    FilterExpr,
    OrExpr,
    parse_expression,
    to_pyarrow_expr,
)

__all__ = ["read"]

log = logging.getLogger(__name__)


def _tuples_to_ast(filters: list) -> ExprNode:
    """Convert tuple-style filters to an AST node.

    Supports two layouts:

    * **Flat AND**: ``[("a", ">", 5), ("b", "<", 10)]``
    * **DNF (OR of AND-groups)**: ``[[("a", ">", 5)], [("b", "<", 10)]]``

    Parameters
    ----------
    filters : list
        List of 3-tuples (flat AND) or list of lists of 3-tuples (DNF).

    Returns
    -------
    ExprNode
        Parsed AST node.

    Raises
    ------
    ValueError
        If ``filters`` is empty or contains invalid entries.
    """
    if not filters:
        raise ValueError("Empty filter list")

    # Detect DNF vs flat
    if isinstance(filters[0], list):
        # DNF: each sub-list is an AND-group, groups are OR-ed.
        or_children: list[ExprNode] = []
        for group in filters:
            and_children = [FilterExpr(col=c, op=o, val=v) for c, o, v in group]
            if len(and_children) == 1:
                or_children.append(and_children[0])
            else:
                or_children.append(AndExpr(children=tuple(and_children)))
        if len(or_children) == 1:
            return or_children[0]
        return OrExpr(children=tuple(or_children))
    else:
        # Flat AND
        children = []
        for item in filters:
            if not (isinstance(item, tuple) and len(item) == 3):
                raise ValueError(
                    f"Each filter must be a 3-tuple (col, op, val), got {item!r}"
                )
            c, o, v = item
            validate_operator(o, col=c)
            children.append(FilterExpr(col=c, op=o, val=v))
        if len(children) == 1:
            return children[0]
        return AndExpr(children=tuple(children))


def _resolve_files(source: str | Path | list[str | Path]) -> list[str]:
    """Resolve *source* to a list of file paths (with glob expansion).

    Parameters
    ----------
    source : str, Path, or list
        Single path, glob pattern, or list of paths.

    Returns
    -------
    list of str
        Resolved file paths.

    Raises
    ------
    FileNotFoundError
        If no files match *source*.
    """
    if isinstance(source, (str, Path)):
        s = str(source)
        if any(c in s for c in ["*", "?", "[", "]"]):
            files = sorted(glob(s))
        else:
            files = [s]
    else:
        files = [str(f) for f in source]

    if not files:
        raise FileNotFoundError(f"No files found matching: {source}")
    return files


[docs] def read( source: str | Path | list[str | Path], *, filters: str | list | ExprNode | None = None, columns: list[str] | None = None, per_file: bool = True, output: str | Path | None = None, overwrite: bool = False, ) -> pd.DataFrame: """Read Parquet file(s) with predicate-pushdown filtering. Wraps ``pyarrow.dataset`` to apply row-group-level predicate pushdown, avoiding unnecessary I/O and memory usage. Parameters ---------- source : str, Path, or list File path, glob pattern (e.g., ``"data/*.parquet"``), or explicit list of paths. filters : str, list, ExprNode, or None, optional Filter specification. Accepts several formats: **Expression string** -- parsed via the built-in mini-language:: "vmag < 20" "(a < 30 & b > 50) | c == 1" "desig in 1,2,3" **List of 3-tuples** (flat AND):: [("a", ">", 5), ("b", "<", 10)] **List of lists** (DNF -- OR of AND-groups):: [[("a", ">", 5)], [("b", "<", 10)]] **Pre-parsed AST node** (``FilterExpr``, ``AndExpr``, ``OrExpr``). columns : list of str, optional Columns to load (projection pushdown). ``None`` loads all columns. per_file : bool, optional If ``True`` (default), apply the filter to each file independently and concatenate. Better memory efficiency for many large files. If ``False``, concatenate first, then apply pandas-level filtering (useful when the filter cannot be pushed down). output : str or Path, optional Save the result to this path (``.parquet`` or ``.csv``). overwrite : bool, optional Allow overwriting *output* if it already exists. Returns ------- pandas.DataFrame Filtered (and optionally column-selected) DataFrame. Raises ------ FileNotFoundError No files matched *source*. FileExistsError *output* exists and *overwrite* is ``False``. ValueError Invalid filter syntax. TypeError *filters* is not a supported type. Examples -------- Simple filter:: df = pqfilt.read("data.parquet", filters="vmag < 20") AND + OR expression:: df = pqfilt.read("data.parquet", filters="(a < 30 & b > 50) | c == 1") Tuple syntax:: df = pqfilt.read("data.parquet", filters=[("a", ">", 5), ("b", "<", 10)]) """ files = _resolve_files(source) # -- normalise filters to a pyarrow Expression (or None) -- pa_filter: Any | None = None ast: ExprNode | None = None if filters is not None: if isinstance(filters, str): ast = parse_expression(filters) elif isinstance(filters, list): ast = _tuples_to_ast(filters) elif isinstance(filters, (FilterExpr, AndExpr, OrExpr)): ast = filters else: raise TypeError( f"filters must be str, list, or ExprNode, got {type(filters).__name__}" ) pa_filter = to_pyarrow_expr(ast) # -- read -- if per_file: dfs: list[pd.DataFrame] = [] for fpath in files: dataset = ds.dataset(fpath, format="parquet") table = dataset.to_table(columns=columns, filter=pa_filter) df = table.to_pandas() if not df.empty: dfs.append(df) result = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() else: # Load everything first, then filter with pandas. dfs = [] for fpath in files: dataset = ds.dataset(fpath, format="parquet") table = dataset.to_table(columns=columns) df = table.to_pandas() if not df.empty: dfs.append(df) if not dfs: result = pd.DataFrame() else: result = pd.concat(dfs, ignore_index=True) if ast is not None: result = _apply_pandas_filter(result, ast) # -- save -- if output is not None: out = Path(output) if out.exists() and not overwrite: raise FileExistsError( f"Output file '{output}' already exists. Use overwrite=True." ) if out.suffix.lower() == ".csv": result.to_csv(out, index=False) else: result.to_parquet(out, index=False) log.info("Saved %d rows to %s", len(result), out) return result
# ----------------------------------------------------------------- # Pandas-level filtering (for per_file=False path) # ----------------------------------------------------------------- def _apply_pandas_filter(df: pd.DataFrame, node: ExprNode) -> pd.DataFrame: """Apply a parsed filter AST to a pandas DataFrame. Parameters ---------- df : pandas.DataFrame Input data. node : ExprNode Parsed filter AST. Returns ------- pandas.DataFrame Filtered rows with reset index. """ mask = _eval_node(df, node) return df[mask].reset_index(drop=True) def _eval_node(df: pd.DataFrame, node: ExprNode) -> pd.Series: """Recursively evaluate an AST node to a boolean Series.""" if isinstance(node, FilterExpr): if node.col not in df.columns: log.warning("Column %r not found in DataFrame; skipping filter.", node.col) return pd.Series(True, index=df.index) return apply_filter_operator(node.op, df[node.col], node.val) elif isinstance(node, AndExpr): mask = pd.Series(True, index=df.index) for child in node.children: mask = mask & _eval_node(df, child) return mask elif isinstance(node, OrExpr): mask = pd.Series(False, index=df.index) for child in node.children: mask = mask | _eval_node(df, child) return mask else: raise TypeError(f"Unknown node type: {type(node)}")