"""
askemblaex/main.py
Single entrypoint for the askemblaex pipeline.
Examples:
# List available extraction methods
askemblaex --list-methods
# Extract only
askemblaex --source /path/to/pdfs --extract
# Reconcile only
askemblaex --source /path/to/output --reconcile
# Full pipeline to a separate output folder
askemblaex --source /path/to/pdfs --output /path/to/output --extract --reconcile
# Only specific methods
askemblaex --source /path/to/pdfs --extract --methods azure_computer_vision pymupdf
# Skip a method
askemblaex --source /path/to/pdfs --extract --skip-methods pdfplumber
# Force re-extraction (won't touch reconciled data)
askemblaex --source /path/to/pdfs --extract --force-extract
# Force re-reconciliation (won't touch extraction data)
askemblaex --source /path/to/output --reconcile --force-reconcile
# Verbose levels
askemblaex --source /path/to/pdfs --extract -v # INFO to console
askemblaex --source /path/to/pdfs --extract -vv # DEBUG to console
askemblaex --source /path/to/pdfs --extract -vvv # DEBUG + full tracebacks
"""
from __future__ import annotations
import argparse
import os
import logging
import sys
import traceback
from pathlib import Path
from typing import Optional, Set
from .env import load_env
load_env()
# ─────────────────────────────────────────────
# Available extraction methods
# ─────────────────────────────────────────────
AVAILABLE_METHODS: dict[str, str] = {
"azure_computer_vision": "Azure Computer Vision Read OCR (requires AZURE_VISION_ENDPOINT, AZURE_VISION_KEY)",
"azure_docint": "Azure Document Intelligence layout + KV (requires AZURE_DOCINT_ENDPOINT, AZURE_DOCINT_KEY)",
"pymupdf": "PyMuPDF embedded text layer (no credentials required)",
"pdfplumber": "pdfplumber embedded text layer (no credentials required)",
}
DEFAULT_DPI = 300
RED = "\x1b[31m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
CYAN = "\x1b[36m"
DIM = "\x1b[2m"
BOLD = "\x1b[1m"
RESET = "\x1b[0m"
# Console handler — attached in _setup_logging
_console_handler: Optional[logging.Handler] = None
log = logging.getLogger("askemblaex.main")
# ─────────────────────────────────────────────
# Logging setup
# ─────────────────────────────────────────────
def _setup_logging(verbosity: int) -> None:
"""
Configure console logging based on verbosity level.
0 — WARNING and above only (errors and service failures)
1 — INFO (-v)
2 — DEBUG (-vv)
3 — DEBUG + full tracebacks printed inline (-vvv)
"""
global _console_handler
level = {
0: logging.WARNING,
1: logging.INFO,
2: logging.DEBUG,
3: logging.DEBUG,
}.get(verbosity, logging.DEBUG)
fmt = {
0: "%(levelname)s: %(message)s",
1: "%(levelname)-8s %(message)s",
2: "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
3: "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
}.get(verbosity, "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
handler.setFormatter(logging.Formatter(fmt))
_console_handler = handler
root = logging.getLogger("askemblaex")
root.setLevel(logging.DEBUG) # capture everything; handler filters
root.addHandler(handler)
def _console_error(msg: str, exc: Optional[Exception] = None, verbosity: int = 0) -> None:
"""
Always print errors to console regardless of verbosity level.
At verbosity 3 also prints the full traceback.
"""
print(f"{RED}[!] {msg}{RESET}", file=sys.stderr)
if exc and verbosity >= 3:
traceback.print_exc()
elif exc and verbosity >= 1:
print(f" {DIM}{type(exc).__name__}: {exc}{RESET}", file=sys.stderr)
def _console_warn(msg: str) -> None:
print(f"{YELLOW}[~] {msg}{RESET}")
def _console_info(msg: str, verbosity: int) -> None:
if verbosity >= 1:
print(f" {msg}")
def _console_debug(msg: str, verbosity: int) -> None:
if verbosity >= 2:
print(f" {DIM}{msg}{RESET}")
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _resolve_methods(
methods: Optional[list[str]],
skip_methods: Optional[list[str]],
) -> Set[str]:
active = set(methods) if methods else set(AVAILABLE_METHODS.keys())
unknown = active - set(AVAILABLE_METHODS.keys())
if unknown:
print(f"{RED}[!] Unknown methods: {', '.join(sorted(unknown))}{RESET}")
print(f" Run --list-methods to see available options.")
sys.exit(1)
if skip_methods:
unknown_skip = set(skip_methods) - set(AVAILABLE_METHODS.keys())
if unknown_skip:
print(f"{RED}[!] Unknown skip-methods: {', '.join(sorted(unknown_skip))}{RESET}")
sys.exit(1)
active -= set(skip_methods)
if not active:
print(f"{RED}[!] No extraction methods remaining after applying --methods / --skip-methods.{RESET}")
sys.exit(1)
return active
def _print_methods() -> None:
print("\nAvailable extraction methods:\n")
for name, description in AVAILABLE_METHODS.items():
print(f" {GREEN}{name}{RESET}")
print(f" {DIM}{description}{RESET}")
print()
def _iter_hash_folders(root: Path, recursive: bool):
if recursive:
for p in root.rglob("*"):
if p.is_dir() and not p.name.startswith("."):
yield p
else:
for p in root.iterdir():
if p.is_dir() and not p.name.startswith("."):
yield p
# ─────────────────────────────────────────────
# Extraction runner
# ─────────────────────────────────────────────
# ─────────────────────────────────────────────
# Reconciliation runner
# ─────────────────────────────────────────────
[docs]
def run_reconciliation(
output: Path,
*,
openai_client,
openai_model: str,
force: bool,
recursive: bool,
verbosity: int,
) -> int:
"""
Run OpenAI reconciliation over all document folders in *output*.
For each folder that contains a ``pages/`` subdirectory, merges all
per-page OCR extractions into a single best-quality transcription stored
under ``extractions.reconciled``.
Args:
output: Root output directory containing hash-keyed document
folders.
openai_client: Configured :class:`openai.OpenAI` client.
openai_model: OpenAI model name e.g. ``"gpt-4o"``.
force: If ``True``, re-reconcile pages already reconciled with
the same model.
recursive: If ``True``, walk *output* recursively.
verbosity: Console verbosity level (0–3).
Returns:
Number of document folders visited.
"""
from .reconcile import reconcile_folder
print(f"\n{BOLD}[#] Reconciliation{RESET} — model: {GREEN}{openai_model}{RESET}")
print(f" Force : {force}\n")
total_folders = 0
total_reconciled = 0
total_skipped = 0
for folder in _iter_hash_folders(output, recursive=recursive):
if not (folder / "pages").is_dir():
continue
total_folders += 1
try:
r, s = reconcile_folder(
folder,
client=openai_client,
model=openai_model,
force=force,
verbosity=verbosity,
)
total_reconciled += r
total_skipped += s
except Exception as e:
_console_error(f"Reconciliation failed for {folder.name} — {e}", exc=e, verbosity=verbosity)
log.exception("Reconciliation failed for folder %s", folder)
print(
f"\n Folders: {total_folders} | "
f"{GREEN}{total_reconciled} reconciled{RESET} | "
f"{DIM}{total_skipped} skipped{RESET}"
)
return total_folders
# ─────────────────────────────────────────────
# Embedding runner
# ─────────────────────────────────────────────
[docs]
def run_embedding(
output: Path,
*,
provider: str,
force: bool,
recursive: bool,
verbosity: int,
) -> int:
"""
Generate embeddings for all reconciled pages in *output*.
Reads each page's ``extractions.reconciled.text`` and writes the
resulting embedding vector into ``extractions.embedding``.
Args:
output: Root output directory containing hash-keyed document folders.
provider: Embedding provider — ``"ollama"`` or ``"openai"``.
force: If ``True``, re-embed even if already embedded with the
same model.
recursive: If ``True``, walk *output* recursively.
verbosity: Console verbosity level (0–3).
Returns:
Number of document folders visited.
"""
from .embed import embed_folder
model = (
os.getenv("OLLAMA_EMODEL") if provider == "ollama"
else os.getenv("OPENAI_EMODEL")
)
print(f"\n{BOLD}[#] Embedding{RESET} — provider: {GREEN}{provider}{RESET} model: {GREEN}{model}{RESET}")
print(f" Force : {force}\n")
total_folders = 0
total_embedded = 0
total_skipped = 0
for folder in _iter_hash_folders(output, recursive=recursive):
if not (folder / "pages").is_dir():
continue
total_folders += 1
try:
e, s = embed_folder(
folder,
provider=provider,
force=force,
verbosity=verbosity,
)
total_embedded += e
total_skipped += s
except Exception as exc:
_console_error(
f"Embedding failed for {folder.name} — {exc}",
exc=exc, verbosity=verbosity)
log.exception("Embedding failed for folder %s", folder)
print(
f"\n Folders: {total_folders} | "
f"{GREEN}{total_embedded} embedded{RESET} | "
f"{DIM}{total_skipped} skipped{RESET}"
)
return total_folders
# ─────────────────────────────────────────────
# Entity extraction runner
# ─────────────────────────────────────────────
[docs]
def run_entities(
output: Path,
*,
openai_client,
openai_model: str,
embed_provider: Optional[str],
force: bool,
recursive: bool,
verbosity: int,
) -> int:
"""
Run windowed structured entity extraction over all document folders.
For each page that has reconciled text, builds a dynamic context window,
calls OpenAI to extract structured entities (persons, events, claims,
places), and optionally generates per-person embedding vectors.
Args:
output: Root output directory containing hash-keyed document
folders.
openai_client: Configured :class:`openai.OpenAI` client.
openai_model: OpenAI model name (used as a fallback; the entity
model is resolved from ``OPENAI_ENTITY_MODEL``).
embed_provider: Embedding provider (``"ollama"``, ``"openai"``) or
``None`` to skip per-person embeddings.
force: If ``True``, re-extract even if already done with the
same model.
recursive: If ``True``, walk *output* recursively.
verbosity: Console verbosity level (0–3).
Returns:
Number of document folders visited.
"""
from .entities import entity_extract_folder, _get_entity_model
entity_model = _get_entity_model()
print(f"\n{BOLD}[#] Entity Extraction{RESET} — model: {GREEN}{entity_model}{RESET}")
if embed_provider:
print(f" Embeddings : {embed_provider}")
else:
print(f" Embeddings : {DIM}disabled (no provider configured){RESET}")
print(f" Force : {force}\n")
total_folders = 0
total_extracted = 0
total_skipped = 0
for folder in _iter_hash_folders(output, recursive=recursive):
if not (folder / "pages").is_dir():
continue
total_folders += 1
try:
e, s = entity_extract_folder(
folder,
client=openai_client,
model=entity_model,
embed_provider=embed_provider,
force=force,
verbosity=verbosity,
)
total_extracted += e
total_skipped += s
except Exception as exc:
_console_error(
f"Entity extraction failed for {folder.name} — {exc}",
exc=exc, verbosity=verbosity,
)
log.exception("Entity extraction failed for folder %s", folder)
print(
f"\n Folders: {total_folders} | "
f"{GREEN}{total_extracted} extracted{RESET} | "
f"{DIM}{total_skipped} skipped{RESET}"
)
return total_folders
# ─────────────────────────────────────────────
# Argument parser
# ─────────────────────────────────────────────
[docs]
def build_parser() -> argparse.ArgumentParser:
"""
Build and return the CLI argument parser.
Returns:
Configured :class:`argparse.ArgumentParser` for the ``askemblaex``
command.
"""
parser = argparse.ArgumentParser(
prog="askemblaex",
description="Extract and reconcile text from genealogical documents.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Directories
parser.add_argument("--source", "-s", type=Path, required=False,
help="Source directory containing PDFs or images.")
parser.add_argument("--output", "-o", type=Path, default=None,
help="Output directory. Defaults to --source.")
# Actions
parser.add_argument("--extract", action="store_true", help="Run extraction.")
parser.add_argument("--reconcile", action="store_true", help="Run reconciliation.")
parser.add_argument("--entities", action="store_true", help="Run structured entity extraction.")
parser.add_argument("--embed", action="store_true", help="Run embedding generation.")
parser.add_argument("--list-methods", action="store_true", help="List available extraction methods and exit.")
# Method control
parser.add_argument("--methods", nargs="+", metavar="METHOD",
help="Whitelist specific extraction methods. Default: all.")
parser.add_argument("--skip-methods", nargs="+", metavar="METHOD",
help="Exclude specific extraction methods.")
# Force flags
parser.add_argument("--force-extract", action="store_true",
help="Re-extract even if extraction.complete=true.")
parser.add_argument("--force-reconcile", action="store_true",
help="Re-reconcile even if model matches.")
parser.add_argument("--force-entities", action="store_true",
help="Re-extract entities even if already done with same model.")
parser.add_argument("--force-embed", action="store_true",
help="Re-embed even if already embedded with same model.")
# Options
parser.add_argument("--dpi", type=int, default=DEFAULT_DPI,
help=f"DPI for PDF page rendering (default: {DEFAULT_DPI}).")
parser.add_argument("--recursive", "-r", action="store_true",
help="Search source directory recursively.")
# Verbosity — count -v flags
parser.add_argument("-v", dest="verbosity", action="count", default=0,
help="-v INFO, -vv DEBUG, -vvv DEBUG + full tracebacks.")
return parser
# ─────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────
[docs]
def main() -> int:
"""
Main entrypoint for the ``askemblaex`` CLI.
Parses arguments, runs preflight checks, then executes whichever pipeline
stages were requested (``--extract``, ``--reconcile``, ``--entities``,
``--embed``) in order.
Returns:
Exit code — ``0`` on success, ``2`` on a configuration error.
"""
parser = build_parser()
args = parser.parse_args()
_setup_logging(args.verbosity)
if args.list_methods:
_print_methods()
return 0
if not args.extract and not args.reconcile and not args.entities and not args.embed:
parser.print_help()
print(f"\n{YELLOW}[~] Nothing to do — pass --extract, --reconcile, --entities, and/or --embed.{RESET}\n")
return 0
if not args.source:
print(f"{RED}[!] --source is required.{RESET}")
return 2
if not args.source.exists() or not args.source.is_dir():
print(f"{RED}[!] Source directory does not exist: {args.source}{RESET}")
return 2
output = args.output if args.output else args.source
output.mkdir(parents=True, exist_ok=True)
print(f"\n{'─' * 60}")
print(f" source : {args.source}")
print(f" output : {output}")
print(f" verbosity : {args.verbosity} ({'-' * max(args.verbosity,1)}v)")
print(f"{'─' * 60}")
# Resolve requested methods
requested_methods = _resolve_methods(
args.methods if args.extract else [],
args.skip_methods,
) if args.extract else set()
# ── Preflight ──
from .preflight import run_preflight
preflight = run_preflight(
requested_methods=requested_methods,
needs_reconcile=args.reconcile,
needs_entities=args.entities,
needs_embed=args.embed,
verbose=args.verbosity,
)
# ── Extraction ──
if args.extract:
run_extraction(
args.source,
output,
active_methods=preflight.active_methods,
service_status=preflight.services,
force=args.force_extract,
dpi=args.dpi,
recursive=args.recursive,
verbosity=args.verbosity,
)
# ── Reconciliation ──
if args.reconcile:
if not preflight.openai_available:
_console_warn("OpenAI is not available — skipping reconciliation.")
else:
run_reconciliation(
output,
openai_client=preflight.openai_client,
openai_model=preflight.openai_model,
force=args.force_reconcile,
recursive=args.recursive,
verbosity=args.verbosity,
)
# ── Entity Extraction ──
if args.entities:
if not preflight.openai_available:
_console_warn("OpenAI is not available — skipping entity extraction.")
else:
run_entities(
output,
openai_client=preflight.openai_client,
openai_model=preflight.openai_model,
embed_provider=preflight.embed_provider,
force=args.force_entities,
recursive=args.recursive,
verbosity=args.verbosity,
)
# ── Embedding ──
if args.embed:
if not preflight.embed_provider:
_console_warn("No embedding provider available — skipping embeddings.")
else:
run_embedding(
output,
provider=preflight.embed_provider,
force=args.force_embed,
recursive=args.recursive,
verbosity=args.verbosity,
)
print(f"\n{GREEN}[✓] Done.{RESET}\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())