"""
askemblaex/reconcile.py
Reconciles multiple OCR extraction methods on each page file into a single
best-quality text using OpenAI.
Reads all extraction methods from each page file (anything under 'extractions'
except 'reconciled' and 'embedding'), sends them to OpenAI, and writes the
result back into extractions.reconciled.
Skip logic:
- If reconciled.text is already populated AND reconciled.model matches
the current OPENAI_MODEL, the page is skipped.
- If the model has changed, the page is rereconciled.
Environment variables:
OPENAI_KEY OpenAI API key
OPENAI_MODEL Model to use e.g. gpt-4o
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional
from openai import OpenAI
from .env import load_env
from .pages import (
PAGE_METHOD_HINT,
get_page_number,
load_page,
save_or_merge_page,
)
load_env()
log = logging.getLogger("askemblaex.reconcile")
# ─────────────────────────────────────────────
# Config
# ─────────────────────────────────────────────
DEFAULT_DIR = Path(r"\\fileserver.office.arpa\Public\genealogy")
SKIP_EXTRACTION_KEYS = {"reconciled", "embedding"}
AZURE_PREFERRED_METHODS = {"azure_computer_vision", "azure_docint"}
SYSTEM_PROMPT = """\
You are an expert document transcription assistant specialising in historical and genealogical records.
You will be given the same document page transcribed by multiple OCR methods. Each method may have \
errors, omissions, or formatting differences. Your task is to produce a single, best-quality \
transcription by carefully combining all sources.
Rules:
- Prefer text from Azure sources (azure_computer_vision, azure_docint) when sources conflict.
- Produce the most accurate and complete text by merging the best parts of each source.
- Correct obvious OCR errors (e.g. "rn" read as "m", broken words, stray characters) \
using context and cross-source comparison.
- Preserve the original formatting, punctuation, capitalisation, and line breaks as \
faithfully as possible.
- Do not add, infer, or summarise content that does not appear in any source.
- Output only the reconciled transcription text. No commentary, no explanations, \
no markdown formatting.
"""
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _hash_text(text: str) -> str:
return hashlib.md5(text.encode("utf-8")).hexdigest()
def _get_openai_client() -> OpenAI:
key = os.getenv("OPENAI_KEY")
if not key:
raise ValueError("OPENAI_KEY environment variable is not set.")
return OpenAI(api_key=key)
def _get_model() -> str:
model = os.getenv("OPENAI_MODEL")
if not model:
raise ValueError("OPENAI_MODEL environment variable is not set.")
return model
def _build_user_prompt(methods: Dict[str, str]) -> str:
"""
Build the OpenAI user prompt from all extraction method texts.
Azure methods are listed first so the model sees the highest-quality
sources at the top of the context window.
Args:
methods: Mapping of method name → extracted text.
Returns:
Formatted prompt string ready to pass to the OpenAI chat API.
"""
parts: List[str] = []
# Azure methods first
for method in sorted(methods):
if method in AZURE_PREFERRED_METHODS:
parts.append(f"--- {method} ---\n{methods[method]}")
# Then the rest
for method in sorted(methods):
if method not in AZURE_PREFERRED_METHODS:
parts.append(f"--- {method} ---\n{methods[method]}")
return (
"Below are transcriptions of the same document page from multiple OCR methods.\n"
"Produce a single best-quality reconciled transcription.\n\n"
+ "\n\n".join(parts)
)
def _should_skip(reconciled: dict, model: str) -> bool:
"""Return True if the page already has a valid reconciliation with the current model."""
return bool(
reconciled.get("text")
and reconciled.get("model") == model
)
def _collect_extraction_texts(extractions: dict) -> Dict[str, str]:
"""
Collect all extraction method texts excluding reconciled and embedding.
Only includes methods that have non-empty text.
"""
texts: Dict[str, str] = {}
for method, data in extractions.items():
if method in SKIP_EXTRACTION_KEYS:
continue
if not isinstance(data, dict):
continue
text = (data.get("text") or "").strip()
if text:
texts[method] = text
return texts
# ─────────────────────────────────────────────
# Core reconciliation
# ─────────────────────────────────────────────
[docs]
def reconcile_page(
page_data: dict,
*,
client: OpenAI,
model: str,
) -> Optional[str]:
"""
Reconcile a single page's extraction data into one best-quality text.
Skips the page if it already has a valid reconciliation with *model*.
If only one extraction source is present, returns that text directly
without an OpenAI call.
Args:
page_data: Full page JSON dict (as loaded from disk).
client: Configured :class:`openai.OpenAI` client.
model: OpenAI model name e.g. ``"gpt-4o"``.
Returns:
Reconciled text string, or ``None`` if the page was skipped or
had no extraction text to work with.
"""
extractions = page_data.get("extractions", {})
reconciled = extractions.get("reconciled", {})
if _should_skip(reconciled, model):
return None
texts = _collect_extraction_texts(extractions)
if not texts:
log.warning("No extraction text found for page %s", page_data.get("page_num"))
return None
if len(texts) == 1:
# Only one source — no reconciliation needed, just use it directly
return next(iter(texts.values()))
user_prompt = _build_user_prompt(texts)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=0.0,
)
return response.choices[0].message.content.strip()
[docs]
def reconcile_page_file(
page_file: Path,
doc_id: str,
page_num: int,
parent_folder: Path,
*,
client: OpenAI,
model: str,
force: bool = False,
verbosity: int = 0,
) -> bool:
"""
Load a page JSON file, reconcile it with OpenAI, and write the result back.
Args:
page_file: Path to the page JSON file to process.
doc_id: Document ID (content hash of the source file).
page_num: Zero-based page number (used for logging).
parent_folder: Hash-keyed document folder (parent of ``pages/``).
client: Configured :class:`openai.OpenAI` client.
model: OpenAI model name e.g. ``"gpt-4o"``.
force: If ``True``, re-reconcile even if already done with
the same model.
verbosity: Console verbosity level (0–3).
Returns:
``True`` if reconciliation ran and results were written, ``False``
if the page was skipped.
"""
try:
page_data = json.loads(page_file.read_text(encoding="utf-8"))
except Exception as e:
log.error("Failed to load page file %s: %s", page_file.name, e)
return False
extractions = page_data.get("extractions", {})
reconciled = extractions.get("reconciled", {})
if not force and _should_skip(reconciled, model):
log.debug("Skipping %s — already reconciled with model %s", page_file.name, model)
return False
log.info("Reconciling %s", page_file.name)
try:
text = reconcile_page(page_data, client=client, model=model)
except Exception as e:
import sys, traceback
print(f" \x1b[31m[!] OpenAI failed page {page_num}: {e}\x1b[0m", file=sys.stderr)
if verbosity >= 3:
traceback.print_exc()
elif verbosity >= 1:
print(f" \x1b[2m{type(e).__name__}: {e}\x1b[0m", file=sys.stderr)
log.error("OpenAI call failed for %s: %s", page_file.name, e)
return False
if text is None:
return False
now = _utc_now()
reconciled_data = {
"reconciled": {
"text": text,
"text_hash": _hash_text(text),
"provider": "openai",
"method": "reconciled",
"model": model,
"extracted_at": now,
"source_methods": [
m for m in extractions
if m not in SKIP_EXTRACTION_KEYS
and isinstance(extractions[m], dict)
and (extractions[m].get("text") or "").strip()
],
}
}
save_or_merge_page(parent_folder, doc_id, page_num, reconciled_data)
log.info("Reconciled page %d -> %d chars", page_num, len(text))
return True
# ─────────────────────────────────────────────
# Folder + root runners
# ─────────────────────────────────────────────
[docs]
def reconcile_folder(
folder: Path,
*,
client: OpenAI,
model: str,
force: bool = False,
verbosity: int = 0,
) -> tuple[int, int]:
"""
Reconcile all page files in a hash-keyed document folder.
For each page file found under ``<folder>/pages/``, calls OpenAI to
merge all OCR extraction methods into a single best-quality
transcription and writes the result back into
``extractions.reconciled``.
A page is skipped if ``reconciled.text`` is already populated *and*
``reconciled.model`` matches *model*, unless *force* is ``True``.
Args:
folder: Hash-keyed document folder (must contain a ``pages/``
subdirectory).
client: Configured :class:`openai.OpenAI` client instance.
model: OpenAI model name e.g. ``"gpt-4o"``.
force: If ``True``, re-reconcile even if the page was already
reconciled with the same model.
Returns:
A ``(reconciled_count, skipped_count)`` tuple.
"""
pages_dir = folder / "pages"
if not pages_dir.is_dir():
log.debug("No pages folder in %s", folder.name)
return 0, 0
doc_id = folder.stem
reconciled_count = 0
skipped_count = 0
page_files = sorted(pages_dir.glob("*.json"))
if not page_files:
log.debug("No page files in %s", pages_dir)
return 0, 0
log.info("Reconciling folder %s — %d pages", folder.name, len(page_files))
if verbosity >= 1:
print(f"\n[>] {folder.name} — {len(page_files)} pages")
for page_file in page_files:
page_num = get_page_number(page_file)
if page_num is None:
log.warning("Could not parse page number from %s, skipping", page_file.name)
continue
ran = reconcile_page_file(
page_file,
doc_id=doc_id,
page_num=page_num,
parent_folder=folder,
client=client,
model=model,
force=force,
verbosity=verbosity,
)
if ran:
reconciled_count += 1
if verbosity >= 1:
print(f"\x1b[32m [✓] page.{page_num:04d}\x1b[0m")
else:
skipped_count += 1
if verbosity >= 2:
print(f"\x1b[2m [~] page.{page_num:04d} skipped\x1b[0m")
# Stamp reconciled step in metadata if any pages were reconciled
if reconciled_count > 0:
from .metadata import load_metadata, write_metadata
metadata = load_metadata(folder, doc_id)
if metadata:
metadata["extraction"]["steps"]["reconciled"] = True
write_metadata(folder, doc_id, metadata)
log.info("Stamped reconciled=true in metadata for %s", folder.name)
return reconciled_count, skipped_count
[docs]
def iter_hash_folders(root: Path, recursive: bool) -> Iterable[Path]:
"""
Yield document folders that contain a ``pages/`` subdirectory.
Args:
root: Root directory to search.
recursive: If ``True``, walks the tree recursively. If ``False``,
only yields immediate subdirectories of *root*.
Yields:
:class:`pathlib.Path` objects for each qualifying folder.
"""
if recursive:
for p in root.rglob("*"):
if p.is_dir() and (p / "pages").is_dir():
yield p
else:
for p in root.iterdir():
if p.is_dir():
yield p
# ─────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────
[docs]
def main() -> int:
parser = argparse.ArgumentParser(
description="Reconcile OCR extractions into a single best-quality text using OpenAI."
)
parser.add_argument("-d", "--dir", type=Path, default=DEFAULT_DIR,
help=f"Root directory of hash folders (default: {DEFAULT_DIR})")
parser.add_argument("--recursive", action="store_true",
help="Search subfolders recursively.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Verbose logging.")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
)
if not args.dir.exists() or not args.dir.is_dir():
log.error("Directory does not exist: %s", args.dir)
return 2
try:
client = _get_openai_client()
model = _get_model()
except ValueError as e:
log.error("%s", e)
return 2
print(f"[#] Reconciling with model: {model}")
log.info("Reconciliation started. model=%s root=%s", model, args.dir)
total_folders = 0
total_reconciled = 0
total_skipped = 0
for folder in iter_hash_folders(args.dir, recursive=args.recursive):
total_folders += 1
r, s = reconcile_folder(folder, client=client, model=model)
total_reconciled += r
total_skipped += s
print(
f"\nDone — {total_folders} folder(s) | "
f"\x1b[32m{total_reconciled} reconciled\x1b[0m | "
f"\x1b[2m{total_skipped} skipped\x1b[0m"
)
log.info(
"Done. folders=%d reconciled=%d skipped=%d",
total_folders, total_reconciled, total_skipped,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())