Source code for askemblaex.pages

"""
askemblaex/pages.py

Shared utilities for reading and writing per-page extraction files.

Page files live at::

    <output_root>/<file_hash>/pages/<file_hash>.page.0035.json

Imported by extract.py (writing), reconcile.py, embed.py, and entities.py
(reading and updating).
"""

from __future__ import annotations

import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple

log = logging.getLogger("askemblaex.pages")

# ─────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────

PAGE_METHOD_HINT = ".page."

REQUIRED_METADATA_KEYS = [
    "_key",
    "source.filename",
    "source.type",
    "source.title",
    "source.created_utc",
    "source.local",
    "source.uris",
    "processing",
    "raw.content",
    "raw.content_type",
    "raw.encoding",
    "raw.checksum",
    "raw.process.ai.openai",
    "extraction.complete",
    "extraction.started_utc",
    "extraction.completed_utc",
    "version.schema",
    "version.content_version",
    "version.embedding_version",
    "version.is_active",
]

# Maps internal extraction method names -> schema method names written to file
METHOD_MAP = {
    "azure":        "azure_computer_vision",
    "azure_docint": "azure_docint",
    "pymupdf":      "pymupdf",
    "pdfplumber":   "pdfplumber",
}

# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────

def _utc_now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


[docs] def get_page_number(filepath: str | Path) -> Optional[int]: """ Extract the page number from a page filename. Expects filenames of the form ``<doc_id>.page.<NNNN>.json``. Args: filepath: Path or filename string to parse. Returns: Integer page number, or ``None`` if the filename does not match the expected pattern or the numeric part cannot be parsed. Examples:: get_page_number("abc.page.0035.json") # -> 35 get_page_number("abc.metadata._.json") # -> None """ stem = Path(filepath).stem # strips .json -> "abc123.page.0035" if ".page." in stem: try: return int(stem.split(".page.")[-1]) except ValueError: return None return None
[docs] def format_page_num(page: int) -> str: """ Format a page number as a zero-padded 4-digit string. Args: page: Zero-based page number. Returns: Zero-padded string e.g. ``"0035"``. """ return f"{page:04d}"
[docs] def get_nested(data: dict, dotted_key: str) -> Tuple[object, bool]: """ Traverse a nested dict using a dot-separated key path. Args: data: The dict to traverse. dotted_key: Dot-separated key path e.g. ``"extraction.steps.pymupdf"``. Returns: A ``(value, found)`` tuple. *found* is ``False`` if any intermediate key is missing or the current node is not a dict. """ current = data for k in dotted_key.split("."): if not isinstance(current, dict) or k not in current: return None, False current = current[k] return current, True
# ───────────────────────────────────────────── # Page schema # ─────────────────────────────────────────────
[docs] def empty_extraction(method_name: str) -> dict: """ Return a blank extraction slot dict for *method_name*. Args: method_name: Extraction method identifier e.g. ``"azure_computer_vision"``. Returns: Dict with all text/meta fields set to ``None``. """ return { "text": None, "text_hash": None, "provider": None, "method": method_name, "model": None, "extracted_at": None, "source_methods": None, }
[docs] def build_page_schema(doc_id: str, page: int) -> dict: """ Build a fresh page JSON schema with empty extraction slots. The returned dict contains slots for all known extraction methods (``azure_computer_vision``, ``azure_docint``, ``pymupdf``, ``pdfplumber``, ``reconciled``, ``embedding``) with all text fields set to ``None``. Args: doc_id: Document ID (content hash of the source file). page: Zero-based page number. Returns: Page schema dict ready to be written to disk. """ now = _utc_now() return { "schema_version": 1.0, "doc_id": doc_id, "page_num": page, "default": "reconciled", "created_at": now, "updated_at": now, "extractions": { "azure_computer_vision": empty_extraction("azure_computer_vision"), "azure_docint": empty_extraction("azure_docint"), "pymupdf": empty_extraction("pymupdf"), "pdfplumber": empty_extraction("pdfplumber"), "reconciled": { **empty_extraction("reconciled"), "source_methods": ["azure_computer_vision", "azure_docint"], }, "embedding": { "values": [], "model": None, "dim": None, "created_at": None, }, }, }
# ───────────────────────────────────────────── # Page file I/O # ─────────────────────────────────────────────
[docs] def page_file_path(out_dir: Path, doc_id: str, page: int) -> Path: """ Return the canonical path for a page JSON file. Args: out_dir: Hash-keyed document output folder. doc_id: Document ID (content hash). page: Zero-based page number. Returns: Path ``<out_dir>/pages/<doc_id>.page.<NNNN>.json``. """ return out_dir / "pages" / f"{doc_id}.page.{page:04d}.json"
[docs] def save_or_merge_page( parent_folder: Path, doc_id: str, page: int, data: dict, ) -> Path: """ Save or merge page extraction data into the page JSON file under ``extractions``. If the file does not exist a fresh schema is initialised with :func:`build_page_schema` first. If it already exists the new method data is merged into the existing ``extractions`` dict — existing method entries are updated, new ones are added, and nothing else is touched. The file is written to:: <parent_folder>/pages/<doc_id>.page.<NNNN>.json Args: parent_folder: Hash-keyed document folder (parent of ``pages/``). doc_id: Document ID (content hash of the source file). page: Zero-based page number. data: Dict of ``{method_name: dict_or_str}`` to merge into ``extractions``. String values are stored as ``{"text": value}``. Returns: Path to the written page file. """ pages_dir = parent_folder / "pages" pages_dir.mkdir(parents=True, exist_ok=True) file_path = pages_dir / f"{doc_id}.page.{page:04d}.json" # Load existing or initialise fresh schema if file_path.exists(): try: existing = json.loads(file_path.read_text(encoding="utf-8")) if not isinstance(existing, dict): log.warning("Existing file %s is not a JSON object, overwriting.", file_path.name) existing = build_page_schema(doc_id, page) except Exception as e: log.warning("Could not load %s (%s), overwriting.", file_path.name, e) existing = build_page_schema(doc_id, page) else: existing = build_page_schema(doc_id, page) # Merge method data into extractions for method, value in data.items(): existing["extractions"].setdefault(method, empty_extraction(method)) if isinstance(value, dict): existing["extractions"][method].update(value) else: existing["extractions"][method]["text"] = value existing["updated_at"] = _utc_now() file_path.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8") log.debug("Wrote page file: %s", file_path.name) return file_path
[docs] def load_page(parent_folder: Path, doc_id: str, page: int) -> Optional[dict]: """ Load a page JSON file from ``<parent_folder>/pages/<doc_id>.page.<NNNN>.json``. Args: parent_folder: Hash-keyed document folder (parent of ``pages/``). doc_id: Document ID (content hash of the source file). page: Zero-based page number. Returns: Parsed page dict, or ``None`` if the file does not exist or cannot be parsed. """ file_path = page_file_path(parent_folder, doc_id, page) if not file_path.exists(): return None try: return json.loads(file_path.read_text(encoding="utf-8")) except Exception as e: log.error("Failed to load page file %s: %s", file_path.name, e) return None