"""
askemblaex/pages.py
Shared utilities for reading and writing per-page extraction files.
Page files live at::
<output_root>/<file_hash>/pages/<file_hash>.page.0035.json
Imported by extract.py (writing), reconcile.py, embed.py, and entities.py
(reading and updating).
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple
log = logging.getLogger("askemblaex.pages")
# ─────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────
PAGE_METHOD_HINT = ".page."
REQUIRED_METADATA_KEYS = [
"_key",
"source.filename",
"source.type",
"source.title",
"source.created_utc",
"source.local",
"source.uris",
"processing",
"raw.content",
"raw.content_type",
"raw.encoding",
"raw.checksum",
"raw.process.ai.openai",
"extraction.complete",
"extraction.started_utc",
"extraction.completed_utc",
"version.schema",
"version.content_version",
"version.embedding_version",
"version.is_active",
]
# Maps internal extraction method names -> schema method names written to file
METHOD_MAP = {
"azure": "azure_computer_vision",
"azure_docint": "azure_docint",
"pymupdf": "pymupdf",
"pdfplumber": "pdfplumber",
}
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
[docs]
def get_page_number(filepath: str | Path) -> Optional[int]:
"""
Extract the page number from a page filename.
Expects filenames of the form ``<doc_id>.page.<NNNN>.json``.
Args:
filepath: Path or filename string to parse.
Returns:
Integer page number, or ``None`` if the filename does not match
the expected pattern or the numeric part cannot be parsed.
Examples::
get_page_number("abc.page.0035.json") # -> 35
get_page_number("abc.metadata._.json") # -> None
"""
stem = Path(filepath).stem # strips .json -> "abc123.page.0035"
if ".page." in stem:
try:
return int(stem.split(".page.")[-1])
except ValueError:
return None
return None
[docs]
def format_page_num(page: int) -> str:
"""
Format a page number as a zero-padded 4-digit string.
Args:
page: Zero-based page number.
Returns:
Zero-padded string e.g. ``"0035"``.
"""
return f"{page:04d}"
[docs]
def get_nested(data: dict, dotted_key: str) -> Tuple[object, bool]:
"""
Traverse a nested dict using a dot-separated key path.
Args:
data: The dict to traverse.
dotted_key: Dot-separated key path e.g. ``"extraction.steps.pymupdf"``.
Returns:
A ``(value, found)`` tuple. *found* is ``False`` if any intermediate
key is missing or the current node is not a dict.
"""
current = data
for k in dotted_key.split("."):
if not isinstance(current, dict) or k not in current:
return None, False
current = current[k]
return current, True
# ─────────────────────────────────────────────
# Page schema
# ─────────────────────────────────────────────
[docs]
def build_page_schema(doc_id: str, page: int) -> dict:
"""
Build a fresh page JSON schema with empty extraction slots.
The returned dict contains slots for all known extraction methods
(``azure_computer_vision``, ``azure_docint``, ``pymupdf``,
``pdfplumber``, ``reconciled``, ``embedding``) with all text fields
set to ``None``.
Args:
doc_id: Document ID (content hash of the source file).
page: Zero-based page number.
Returns:
Page schema dict ready to be written to disk.
"""
now = _utc_now()
return {
"schema_version": 1.0,
"doc_id": doc_id,
"page_num": page,
"default": "reconciled",
"created_at": now,
"updated_at": now,
"extractions": {
"azure_computer_vision": empty_extraction("azure_computer_vision"),
"azure_docint": empty_extraction("azure_docint"),
"pymupdf": empty_extraction("pymupdf"),
"pdfplumber": empty_extraction("pdfplumber"),
"reconciled": {
**empty_extraction("reconciled"),
"source_methods": ["azure_computer_vision", "azure_docint"],
},
"embedding": {
"values": [],
"model": None,
"dim": None,
"created_at": None,
},
},
}
# ─────────────────────────────────────────────
# Page file I/O
# ─────────────────────────────────────────────
[docs]
def page_file_path(out_dir: Path, doc_id: str, page: int) -> Path:
"""
Return the canonical path for a page JSON file.
Args:
out_dir: Hash-keyed document output folder.
doc_id: Document ID (content hash).
page: Zero-based page number.
Returns:
Path ``<out_dir>/pages/<doc_id>.page.<NNNN>.json``.
"""
return out_dir / "pages" / f"{doc_id}.page.{page:04d}.json"
[docs]
def save_or_merge_page(
parent_folder: Path,
doc_id: str,
page: int,
data: dict,
) -> Path:
"""
Save or merge page extraction data into the page JSON file under ``extractions``.
If the file does not exist a fresh schema is initialised with
:func:`build_page_schema` first. If it already exists the new method
data is merged into the existing ``extractions`` dict — existing method
entries are updated, new ones are added, and nothing else is touched.
The file is written to::
<parent_folder>/pages/<doc_id>.page.<NNNN>.json
Args:
parent_folder: Hash-keyed document folder (parent of ``pages/``).
doc_id: Document ID (content hash of the source file).
page: Zero-based page number.
data: Dict of ``{method_name: dict_or_str}`` to merge into
``extractions``. String values are stored as
``{"text": value}``.
Returns:
Path to the written page file.
"""
pages_dir = parent_folder / "pages"
pages_dir.mkdir(parents=True, exist_ok=True)
file_path = pages_dir / f"{doc_id}.page.{page:04d}.json"
# Load existing or initialise fresh schema
if file_path.exists():
try:
existing = json.loads(file_path.read_text(encoding="utf-8"))
if not isinstance(existing, dict):
log.warning("Existing file %s is not a JSON object, overwriting.", file_path.name)
existing = build_page_schema(doc_id, page)
except Exception as e:
log.warning("Could not load %s (%s), overwriting.", file_path.name, e)
existing = build_page_schema(doc_id, page)
else:
existing = build_page_schema(doc_id, page)
# Merge method data into extractions
for method, value in data.items():
existing["extractions"].setdefault(method, empty_extraction(method))
if isinstance(value, dict):
existing["extractions"][method].update(value)
else:
existing["extractions"][method]["text"] = value
existing["updated_at"] = _utc_now()
file_path.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8")
log.debug("Wrote page file: %s", file_path.name)
return file_path
[docs]
def load_page(parent_folder: Path, doc_id: str, page: int) -> Optional[dict]:
"""
Load a page JSON file from ``<parent_folder>/pages/<doc_id>.page.<NNNN>.json``.
Args:
parent_folder: Hash-keyed document folder (parent of ``pages/``).
doc_id: Document ID (content hash of the source file).
page: Zero-based page number.
Returns:
Parsed page dict, or ``None`` if the file does not exist or cannot
be parsed.
"""
file_path = page_file_path(parent_folder, doc_id, page)
if not file_path.exists():
return None
try:
return json.loads(file_path.read_text(encoding="utf-8"))
except Exception as e:
log.error("Failed to load page file %s: %s", file_path.name, e)
return None