Source code for askemblaex.reconcile

"""
askemblaex/reconcile.py

Reconciles multiple OCR extraction methods on each page file into a single
best-quality text using OpenAI.

Reads all extraction methods from each page file (anything under 'extractions'
except 'reconciled' and 'embedding'), sends them to OpenAI, and writes the
result back into extractions.reconciled.

Skip logic:
  - If reconciled.text is already populated AND reconciled.model matches
    the current OPENAI_MODEL, the page is skipped.
  - If the model has changed, the page is rereconciled.

Environment variables:
  OPENAI_KEY      OpenAI API key
  OPENAI_MODEL    Model to use e.g. gpt-4o
"""

from __future__ import annotations

import argparse
import hashlib
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional

from openai import OpenAI

from .env import load_env
from .pages import (
    PAGE_METHOD_HINT,
    get_page_number,
    load_page,
    save_or_merge_page,
)

load_env()

log = logging.getLogger("askemblaex.reconcile")

# ─────────────────────────────────────────────
# Config
# ─────────────────────────────────────────────

DEFAULT_DIR = Path(r"\\fileserver.office.arpa\Public\genealogy")

SKIP_EXTRACTION_KEYS = {"reconciled", "embedding"}

AZURE_PREFERRED_METHODS = {"azure_computer_vision", "azure_docint"}

SYSTEM_PROMPT = """\
You are an expert document transcription assistant specialising in historical and genealogical records.

You will be given the same document page transcribed by multiple OCR methods. Each method may have \
errors, omissions, or formatting differences. Your task is to produce a single, best-quality \
transcription by carefully combining all sources.

Rules:
- Prefer text from Azure sources (azure_computer_vision, azure_docint) when sources conflict.
- Produce the most accurate and complete text by merging the best parts of each source.
- Correct obvious OCR errors (e.g. "rn" read as "m", broken words, stray characters) \
  using context and cross-source comparison.
- Preserve the original formatting, punctuation, capitalisation, and line breaks as \
  faithfully as possible.
- Do not add, infer, or summarise content that does not appear in any source.
- Output only the reconciled transcription text. No commentary, no explanations, \
  no markdown formatting.
"""


# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────

def _utc_now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _hash_text(text: str) -> str:
    return hashlib.md5(text.encode("utf-8")).hexdigest()


def _get_openai_client() -> OpenAI:
    key = os.getenv("OPENAI_KEY")
    if not key:
        raise ValueError("OPENAI_KEY environment variable is not set.")
    return OpenAI(api_key=key)


def _get_model() -> str:
    model = os.getenv("OPENAI_MODEL")
    if not model:
        raise ValueError("OPENAI_MODEL environment variable is not set.")
    return model


def _build_user_prompt(methods: Dict[str, str]) -> str:
    """
    Build the OpenAI user prompt from all extraction method texts.

    Azure methods are listed first so the model sees the highest-quality
    sources at the top of the context window.

    Args:
        methods: Mapping of method name → extracted text.

    Returns:
        Formatted prompt string ready to pass to the OpenAI chat API.
    """
    parts: List[str] = []

    # Azure methods first
    for method in sorted(methods):
        if method in AZURE_PREFERRED_METHODS:
            parts.append(f"--- {method} ---\n{methods[method]}")

    # Then the rest
    for method in sorted(methods):
        if method not in AZURE_PREFERRED_METHODS:
            parts.append(f"--- {method} ---\n{methods[method]}")

    return (
        "Below are transcriptions of the same document page from multiple OCR methods.\n"
        "Produce a single best-quality reconciled transcription.\n\n"
        + "\n\n".join(parts)
    )


def _should_skip(reconciled: dict, model: str) -> bool:
    """Return True if the page already has a valid reconciliation with the current model."""
    return bool(
        reconciled.get("text")
        and reconciled.get("model") == model
    )


def _collect_extraction_texts(extractions: dict) -> Dict[str, str]:
    """
    Collect all extraction method texts excluding reconciled and embedding.
    Only includes methods that have non-empty text.
    """
    texts: Dict[str, str] = {}
    for method, data in extractions.items():
        if method in SKIP_EXTRACTION_KEYS:
            continue
        if not isinstance(data, dict):
            continue
        text = (data.get("text") or "").strip()
        if text:
            texts[method] = text
    return texts


# ─────────────────────────────────────────────
# Core reconciliation
# ─────────────────────────────────────────────

[docs] def reconcile_page( page_data: dict, *, client: OpenAI, model: str, ) -> Optional[str]: """ Reconcile a single page's extraction data into one best-quality text. Skips the page if it already has a valid reconciliation with *model*. If only one extraction source is present, returns that text directly without an OpenAI call. Args: page_data: Full page JSON dict (as loaded from disk). client: Configured :class:`openai.OpenAI` client. model: OpenAI model name e.g. ``"gpt-4o"``. Returns: Reconciled text string, or ``None`` if the page was skipped or had no extraction text to work with. """ extractions = page_data.get("extractions", {}) reconciled = extractions.get("reconciled", {}) if _should_skip(reconciled, model): return None texts = _collect_extraction_texts(extractions) if not texts: log.warning("No extraction text found for page %s", page_data.get("page_num")) return None if len(texts) == 1: # Only one source — no reconciliation needed, just use it directly return next(iter(texts.values())) user_prompt = _build_user_prompt(texts) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=0.0, ) return response.choices[0].message.content.strip()
[docs] def reconcile_page_file( page_file: Path, doc_id: str, page_num: int, parent_folder: Path, *, client: OpenAI, model: str, force: bool = False, verbosity: int = 0, ) -> bool: """ Load a page JSON file, reconcile it with OpenAI, and write the result back. Args: page_file: Path to the page JSON file to process. doc_id: Document ID (content hash of the source file). page_num: Zero-based page number (used for logging). parent_folder: Hash-keyed document folder (parent of ``pages/``). client: Configured :class:`openai.OpenAI` client. model: OpenAI model name e.g. ``"gpt-4o"``. force: If ``True``, re-reconcile even if already done with the same model. verbosity: Console verbosity level (0–3). Returns: ``True`` if reconciliation ran and results were written, ``False`` if the page was skipped. """ try: page_data = json.loads(page_file.read_text(encoding="utf-8")) except Exception as e: log.error("Failed to load page file %s: %s", page_file.name, e) return False extractions = page_data.get("extractions", {}) reconciled = extractions.get("reconciled", {}) if not force and _should_skip(reconciled, model): log.debug("Skipping %s — already reconciled with model %s", page_file.name, model) return False log.info("Reconciling %s", page_file.name) try: text = reconcile_page(page_data, client=client, model=model) except Exception as e: import sys, traceback print(f" \x1b[31m[!] OpenAI failed page {page_num}: {e}\x1b[0m", file=sys.stderr) if verbosity >= 3: traceback.print_exc() elif verbosity >= 1: print(f" \x1b[2m{type(e).__name__}: {e}\x1b[0m", file=sys.stderr) log.error("OpenAI call failed for %s: %s", page_file.name, e) return False if text is None: return False now = _utc_now() reconciled_data = { "reconciled": { "text": text, "text_hash": _hash_text(text), "provider": "openai", "method": "reconciled", "model": model, "extracted_at": now, "source_methods": [ m for m in extractions if m not in SKIP_EXTRACTION_KEYS and isinstance(extractions[m], dict) and (extractions[m].get("text") or "").strip() ], } } save_or_merge_page(parent_folder, doc_id, page_num, reconciled_data) log.info("Reconciled page %d -> %d chars", page_num, len(text)) return True
# ───────────────────────────────────────────── # Folder + root runners # ─────────────────────────────────────────────
[docs] def reconcile_folder( folder: Path, *, client: OpenAI, model: str, force: bool = False, verbosity: int = 0, ) -> tuple[int, int]: """ Reconcile all page files in a hash-keyed document folder. For each page file found under ``<folder>/pages/``, calls OpenAI to merge all OCR extraction methods into a single best-quality transcription and writes the result back into ``extractions.reconciled``. A page is skipped if ``reconciled.text`` is already populated *and* ``reconciled.model`` matches *model*, unless *force* is ``True``. Args: folder: Hash-keyed document folder (must contain a ``pages/`` subdirectory). client: Configured :class:`openai.OpenAI` client instance. model: OpenAI model name e.g. ``"gpt-4o"``. force: If ``True``, re-reconcile even if the page was already reconciled with the same model. Returns: A ``(reconciled_count, skipped_count)`` tuple. """ pages_dir = folder / "pages" if not pages_dir.is_dir(): log.debug("No pages folder in %s", folder.name) return 0, 0 doc_id = folder.stem reconciled_count = 0 skipped_count = 0 page_files = sorted(pages_dir.glob("*.json")) if not page_files: log.debug("No page files in %s", pages_dir) return 0, 0 log.info("Reconciling folder %s%d pages", folder.name, len(page_files)) if verbosity >= 1: print(f"\n[>] {folder.name}{len(page_files)} pages") for page_file in page_files: page_num = get_page_number(page_file) if page_num is None: log.warning("Could not parse page number from %s, skipping", page_file.name) continue ran = reconcile_page_file( page_file, doc_id=doc_id, page_num=page_num, parent_folder=folder, client=client, model=model, force=force, verbosity=verbosity, ) if ran: reconciled_count += 1 if verbosity >= 1: print(f"\x1b[32m [✓] page.{page_num:04d}\x1b[0m") else: skipped_count += 1 if verbosity >= 2: print(f"\x1b[2m [~] page.{page_num:04d} skipped\x1b[0m") # Stamp reconciled step in metadata if any pages were reconciled if reconciled_count > 0: from .metadata import load_metadata, write_metadata metadata = load_metadata(folder, doc_id) if metadata: metadata["extraction"]["steps"]["reconciled"] = True write_metadata(folder, doc_id, metadata) log.info("Stamped reconciled=true in metadata for %s", folder.name) return reconciled_count, skipped_count
[docs] def iter_hash_folders(root: Path, recursive: bool) -> Iterable[Path]: """ Yield document folders that contain a ``pages/`` subdirectory. Args: root: Root directory to search. recursive: If ``True``, walks the tree recursively. If ``False``, only yields immediate subdirectories of *root*. Yields: :class:`pathlib.Path` objects for each qualifying folder. """ if recursive: for p in root.rglob("*"): if p.is_dir() and (p / "pages").is_dir(): yield p else: for p in root.iterdir(): if p.is_dir(): yield p
# ───────────────────────────────────────────── # Main # ─────────────────────────────────────────────
[docs] def main() -> int: parser = argparse.ArgumentParser( description="Reconcile OCR extractions into a single best-quality text using OpenAI." ) parser.add_argument("-d", "--dir", type=Path, default=DEFAULT_DIR, help=f"Root directory of hash folders (default: {DEFAULT_DIR})") parser.add_argument("--recursive", action="store_true", help="Search subfolders recursively.") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose logging.") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", ) if not args.dir.exists() or not args.dir.is_dir(): log.error("Directory does not exist: %s", args.dir) return 2 try: client = _get_openai_client() model = _get_model() except ValueError as e: log.error("%s", e) return 2 print(f"[#] Reconciling with model: {model}") log.info("Reconciliation started. model=%s root=%s", model, args.dir) total_folders = 0 total_reconciled = 0 total_skipped = 0 for folder in iter_hash_folders(args.dir, recursive=args.recursive): total_folders += 1 r, s = reconcile_folder(folder, client=client, model=model) total_reconciled += r total_skipped += s print( f"\nDone — {total_folders} folder(s) | " f"\x1b[32m{total_reconciled} reconciled\x1b[0m | " f"\x1b[2m{total_skipped} skipped\x1b[0m" ) log.info( "Done. folders=%d reconciled=%d skipped=%d", total_folders, total_reconciled, total_skipped, ) return 0
if __name__ == "__main__": raise SystemExit(main())