Source code for askemblaex.main

"""
askemblaex/main.py

Single entrypoint for the askemblaex pipeline.

Examples:
    # List available extraction methods
    askemblaex --list-methods

    # Extract only
    askemblaex --source /path/to/pdfs --extract

    # Reconcile only
    askemblaex --source /path/to/output --reconcile

    # Full pipeline to a separate output folder
    askemblaex --source /path/to/pdfs --output /path/to/output --extract --reconcile

    # Only specific methods
    askemblaex --source /path/to/pdfs --extract --methods azure_computer_vision pymupdf

    # Skip a method
    askemblaex --source /path/to/pdfs --extract --skip-methods pdfplumber

    # Force re-extraction (won't touch reconciled data)
    askemblaex --source /path/to/pdfs --extract --force-extract

    # Force re-reconciliation (won't touch extraction data)
    askemblaex --source /path/to/output --reconcile --force-reconcile

    # Verbose levels
    askemblaex --source /path/to/pdfs --extract -v     # INFO to console
    askemblaex --source /path/to/pdfs --extract -vv    # DEBUG to console
    askemblaex --source /path/to/pdfs --extract -vvv   # DEBUG + full tracebacks
"""

from __future__ import annotations

import argparse
import os
import logging
import sys
import traceback
from pathlib import Path
from typing import Optional, Set

from .env import load_env

load_env()

# ─────────────────────────────────────────────
# Available extraction methods
# ─────────────────────────────────────────────

AVAILABLE_METHODS: dict[str, str] = {
    "azure_computer_vision": "Azure Computer Vision Read OCR (requires AZURE_VISION_ENDPOINT, AZURE_VISION_KEY)",
    "azure_docint":          "Azure Document Intelligence layout + KV (requires AZURE_DOCINT_ENDPOINT, AZURE_DOCINT_KEY)",
    "pymupdf":               "PyMuPDF embedded text layer (no credentials required)",
    "pdfplumber":            "pdfplumber embedded text layer (no credentials required)",
}

DEFAULT_DPI = 300

RED    = "\x1b[31m"
GREEN  = "\x1b[32m"
YELLOW = "\x1b[33m"
CYAN   = "\x1b[36m"
DIM    = "\x1b[2m"
BOLD   = "\x1b[1m"
RESET  = "\x1b[0m"

# Console handler — attached in _setup_logging
_console_handler: Optional[logging.Handler] = None

log = logging.getLogger("askemblaex.main")


# ─────────────────────────────────────────────
# Logging setup
# ─────────────────────────────────────────────

def _setup_logging(verbosity: int) -> None:
    """
    Configure console logging based on verbosity level.

    0 — WARNING and above only (errors and service failures)
    1 — INFO  (-v)
    2 — DEBUG (-vv)
    3 — DEBUG + full tracebacks printed inline (-vvv)
    """
    global _console_handler

    level = {
        0: logging.WARNING,
        1: logging.INFO,
        2: logging.DEBUG,
        3: logging.DEBUG,
    }.get(verbosity, logging.DEBUG)

    fmt = {
        0: "%(levelname)s: %(message)s",
        1: "%(levelname)-8s %(message)s",
        2: "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
        3: "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    }.get(verbosity, "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(logging.Formatter(fmt))
    _console_handler = handler

    root = logging.getLogger("askemblaex")
    root.setLevel(logging.DEBUG)  # capture everything; handler filters
    root.addHandler(handler)


def _console_error(msg: str, exc: Optional[Exception] = None, verbosity: int = 0) -> None:
    """
    Always print errors to console regardless of verbosity level.
    At verbosity 3 also prints the full traceback.
    """
    print(f"{RED}[!] {msg}{RESET}", file=sys.stderr)
    if exc and verbosity >= 3:
        traceback.print_exc()
    elif exc and verbosity >= 1:
        print(f"    {DIM}{type(exc).__name__}: {exc}{RESET}", file=sys.stderr)


def _console_warn(msg: str) -> None:
    print(f"{YELLOW}[~] {msg}{RESET}")


def _console_info(msg: str, verbosity: int) -> None:
    if verbosity >= 1:
        print(f"    {msg}")


def _console_debug(msg: str, verbosity: int) -> None:
    if verbosity >= 2:
        print(f"    {DIM}{msg}{RESET}")


# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────

def _resolve_methods(
    methods: Optional[list[str]],
    skip_methods: Optional[list[str]],
) -> Set[str]:
    active = set(methods) if methods else set(AVAILABLE_METHODS.keys())

    unknown = active - set(AVAILABLE_METHODS.keys())
    if unknown:
        print(f"{RED}[!] Unknown methods: {', '.join(sorted(unknown))}{RESET}")
        print(f"    Run --list-methods to see available options.")
        sys.exit(1)

    if skip_methods:
        unknown_skip = set(skip_methods) - set(AVAILABLE_METHODS.keys())
        if unknown_skip:
            print(f"{RED}[!] Unknown skip-methods: {', '.join(sorted(unknown_skip))}{RESET}")
            sys.exit(1)
        active -= set(skip_methods)

    if not active:
        print(f"{RED}[!] No extraction methods remaining after applying --methods / --skip-methods.{RESET}")
        sys.exit(1)

    return active


def _print_methods() -> None:
    print("\nAvailable extraction methods:\n")
    for name, description in AVAILABLE_METHODS.items():
        print(f"  {GREEN}{name}{RESET}")
        print(f"    {DIM}{description}{RESET}")
    print()


def _iter_hash_folders(root: Path, recursive: bool):
    if recursive:
        for p in root.rglob("*"):
            if p.is_dir() and not p.name.startswith("."):
                yield p
    else:
        for p in root.iterdir():
            if p.is_dir() and not p.name.startswith("."):
                yield p


# ─────────────────────────────────────────────
# Extraction runner
# ─────────────────────────────────────────────

[docs] def run_extraction( source: Path, output: Path, *, active_methods: Set[str], service_status: dict, force: bool, dpi: int, recursive: bool, verbosity: int, ) -> int: """ Run multi-method text extraction over all discovered source files. Discovers PDFs and images under *source*, processes each one with the active extraction methods, and writes per-page JSON files under *output*. Args: source: Root directory containing source PDFs / images. output: Root output directory for hash-keyed document folders. active_methods: Set of extraction method names to run. service_status: Mapping of ``{method: ServiceStatus}`` from preflight, used to annotate document metadata. force: If ``True``, re-extract even if already marked complete. dpi: DPI for PDF page rendering. recursive: If ``True``, search *source* recursively. verbosity: Console verbosity level (0–3). Returns: Number of source files successfully processed. """ from .extract import discover_files, process_one from .logging_ import setup_main_logger setup_main_logger(output / "logs") files = discover_files(source, recursive=recursive) if not files: _console_warn(f"No files found in {source}") return 0 print(f"\n{BOLD}[#] Extraction{RESET}{len(files)} file(s) found") print(f" Methods : {', '.join(sorted(active_methods))}") print(f" DPI : {dpi}") print(f" Force : {force}") # Show any unavailable services for method in ("azure_computer_vision", "azure_docint"): svc = service_status.get(method) if svc and not svc.available: print(f" {YELLOW}[~] {method} unavailable — {svc.reason}{RESET}") print() processed = 0 failed = 0 for i, src_path in enumerate(files, 1): print(f" [{i}/{len(files)}] {src_path.name}") try: process_one( src_path, output, log, active_methods=active_methods, service_status=service_status, force=force, pdf_dpi=dpi, verbosity=verbosity, ) print(f" {GREEN}[✓]{RESET} {src_path.name}") processed += 1 except Exception as e: failed += 1 _console_error(f"Failed: {src_path.name}{e}", exc=e, verbosity=verbosity) log.exception("Extraction failed for %s", src_path) print(f"\n {GREEN}{processed} succeeded{RESET}", end="") if failed: print(f" {RED}{failed} failed{RESET}", end="") print() return processed
# ───────────────────────────────────────────── # Reconciliation runner # ─────────────────────────────────────────────
[docs] def run_reconciliation( output: Path, *, openai_client, openai_model: str, force: bool, recursive: bool, verbosity: int, ) -> int: """ Run OpenAI reconciliation over all document folders in *output*. For each folder that contains a ``pages/`` subdirectory, merges all per-page OCR extractions into a single best-quality transcription stored under ``extractions.reconciled``. Args: output: Root output directory containing hash-keyed document folders. openai_client: Configured :class:`openai.OpenAI` client. openai_model: OpenAI model name e.g. ``"gpt-4o"``. force: If ``True``, re-reconcile pages already reconciled with the same model. recursive: If ``True``, walk *output* recursively. verbosity: Console verbosity level (0–3). Returns: Number of document folders visited. """ from .reconcile import reconcile_folder print(f"\n{BOLD}[#] Reconciliation{RESET} — model: {GREEN}{openai_model}{RESET}") print(f" Force : {force}\n") total_folders = 0 total_reconciled = 0 total_skipped = 0 for folder in _iter_hash_folders(output, recursive=recursive): if not (folder / "pages").is_dir(): continue total_folders += 1 try: r, s = reconcile_folder( folder, client=openai_client, model=openai_model, force=force, verbosity=verbosity, ) total_reconciled += r total_skipped += s except Exception as e: _console_error(f"Reconciliation failed for {folder.name}{e}", exc=e, verbosity=verbosity) log.exception("Reconciliation failed for folder %s", folder) print( f"\n Folders: {total_folders} | " f"{GREEN}{total_reconciled} reconciled{RESET} | " f"{DIM}{total_skipped} skipped{RESET}" ) return total_folders
# ───────────────────────────────────────────── # Embedding runner # ─────────────────────────────────────────────
[docs] def run_embedding( output: Path, *, provider: str, force: bool, recursive: bool, verbosity: int, ) -> int: """ Generate embeddings for all reconciled pages in *output*. Reads each page's ``extractions.reconciled.text`` and writes the resulting embedding vector into ``extractions.embedding``. Args: output: Root output directory containing hash-keyed document folders. provider: Embedding provider — ``"ollama"`` or ``"openai"``. force: If ``True``, re-embed even if already embedded with the same model. recursive: If ``True``, walk *output* recursively. verbosity: Console verbosity level (0–3). Returns: Number of document folders visited. """ from .embed import embed_folder model = ( os.getenv("OLLAMA_EMODEL") if provider == "ollama" else os.getenv("OPENAI_EMODEL") ) print(f"\n{BOLD}[#] Embedding{RESET} — provider: {GREEN}{provider}{RESET} model: {GREEN}{model}{RESET}") print(f" Force : {force}\n") total_folders = 0 total_embedded = 0 total_skipped = 0 for folder in _iter_hash_folders(output, recursive=recursive): if not (folder / "pages").is_dir(): continue total_folders += 1 try: e, s = embed_folder( folder, provider=provider, force=force, verbosity=verbosity, ) total_embedded += e total_skipped += s except Exception as exc: _console_error( f"Embedding failed for {folder.name}{exc}", exc=exc, verbosity=verbosity) log.exception("Embedding failed for folder %s", folder) print( f"\n Folders: {total_folders} | " f"{GREEN}{total_embedded} embedded{RESET} | " f"{DIM}{total_skipped} skipped{RESET}" ) return total_folders
# ───────────────────────────────────────────── # Entity extraction runner # ─────────────────────────────────────────────
[docs] def run_entities( output: Path, *, openai_client, openai_model: str, embed_provider: Optional[str], force: bool, recursive: bool, verbosity: int, ) -> int: """ Run windowed structured entity extraction over all document folders. For each page that has reconciled text, builds a dynamic context window, calls OpenAI to extract structured entities (persons, events, claims, places), and optionally generates per-person embedding vectors. Args: output: Root output directory containing hash-keyed document folders. openai_client: Configured :class:`openai.OpenAI` client. openai_model: OpenAI model name (used as a fallback; the entity model is resolved from ``OPENAI_ENTITY_MODEL``). embed_provider: Embedding provider (``"ollama"``, ``"openai"``) or ``None`` to skip per-person embeddings. force: If ``True``, re-extract even if already done with the same model. recursive: If ``True``, walk *output* recursively. verbosity: Console verbosity level (0–3). Returns: Number of document folders visited. """ from .entities import entity_extract_folder, _get_entity_model entity_model = _get_entity_model() print(f"\n{BOLD}[#] Entity Extraction{RESET} — model: {GREEN}{entity_model}{RESET}") if embed_provider: print(f" Embeddings : {embed_provider}") else: print(f" Embeddings : {DIM}disabled (no provider configured){RESET}") print(f" Force : {force}\n") total_folders = 0 total_extracted = 0 total_skipped = 0 for folder in _iter_hash_folders(output, recursive=recursive): if not (folder / "pages").is_dir(): continue total_folders += 1 try: e, s = entity_extract_folder( folder, client=openai_client, model=entity_model, embed_provider=embed_provider, force=force, verbosity=verbosity, ) total_extracted += e total_skipped += s except Exception as exc: _console_error( f"Entity extraction failed for {folder.name}{exc}", exc=exc, verbosity=verbosity, ) log.exception("Entity extraction failed for folder %s", folder) print( f"\n Folders: {total_folders} | " f"{GREEN}{total_extracted} extracted{RESET} | " f"{DIM}{total_skipped} skipped{RESET}" ) return total_folders
# ───────────────────────────────────────────── # Argument parser # ─────────────────────────────────────────────
[docs] def build_parser() -> argparse.ArgumentParser: """ Build and return the CLI argument parser. Returns: Configured :class:`argparse.ArgumentParser` for the ``askemblaex`` command. """ parser = argparse.ArgumentParser( prog="askemblaex", description="Extract and reconcile text from genealogical documents.", formatter_class=argparse.RawDescriptionHelpFormatter, ) # Directories parser.add_argument("--source", "-s", type=Path, required=False, help="Source directory containing PDFs or images.") parser.add_argument("--output", "-o", type=Path, default=None, help="Output directory. Defaults to --source.") # Actions parser.add_argument("--extract", action="store_true", help="Run extraction.") parser.add_argument("--reconcile", action="store_true", help="Run reconciliation.") parser.add_argument("--entities", action="store_true", help="Run structured entity extraction.") parser.add_argument("--embed", action="store_true", help="Run embedding generation.") parser.add_argument("--list-methods", action="store_true", help="List available extraction methods and exit.") # Method control parser.add_argument("--methods", nargs="+", metavar="METHOD", help="Whitelist specific extraction methods. Default: all.") parser.add_argument("--skip-methods", nargs="+", metavar="METHOD", help="Exclude specific extraction methods.") # Force flags parser.add_argument("--force-extract", action="store_true", help="Re-extract even if extraction.complete=true.") parser.add_argument("--force-reconcile", action="store_true", help="Re-reconcile even if model matches.") parser.add_argument("--force-entities", action="store_true", help="Re-extract entities even if already done with same model.") parser.add_argument("--force-embed", action="store_true", help="Re-embed even if already embedded with same model.") # Options parser.add_argument("--dpi", type=int, default=DEFAULT_DPI, help=f"DPI for PDF page rendering (default: {DEFAULT_DPI}).") parser.add_argument("--recursive", "-r", action="store_true", help="Search source directory recursively.") # Verbosity — count -v flags parser.add_argument("-v", dest="verbosity", action="count", default=0, help="-v INFO, -vv DEBUG, -vvv DEBUG + full tracebacks.") return parser
# ───────────────────────────────────────────── # Main # ─────────────────────────────────────────────
[docs] def main() -> int: """ Main entrypoint for the ``askemblaex`` CLI. Parses arguments, runs preflight checks, then executes whichever pipeline stages were requested (``--extract``, ``--reconcile``, ``--entities``, ``--embed``) in order. Returns: Exit code — ``0`` on success, ``2`` on a configuration error. """ parser = build_parser() args = parser.parse_args() _setup_logging(args.verbosity) if args.list_methods: _print_methods() return 0 if not args.extract and not args.reconcile and not args.entities and not args.embed: parser.print_help() print(f"\n{YELLOW}[~] Nothing to do — pass --extract, --reconcile, --entities, and/or --embed.{RESET}\n") return 0 if not args.source: print(f"{RED}[!] --source is required.{RESET}") return 2 if not args.source.exists() or not args.source.is_dir(): print(f"{RED}[!] Source directory does not exist: {args.source}{RESET}") return 2 output = args.output if args.output else args.source output.mkdir(parents=True, exist_ok=True) print(f"\n{'─' * 60}") print(f" source : {args.source}") print(f" output : {output}") print(f" verbosity : {args.verbosity} ({'-' * max(args.verbosity,1)}v)") print(f"{'─' * 60}") # Resolve requested methods requested_methods = _resolve_methods( args.methods if args.extract else [], args.skip_methods, ) if args.extract else set() # ── Preflight ── from .preflight import run_preflight preflight = run_preflight( requested_methods=requested_methods, needs_reconcile=args.reconcile, needs_entities=args.entities, needs_embed=args.embed, verbose=args.verbosity, ) # ── Extraction ── if args.extract: run_extraction( args.source, output, active_methods=preflight.active_methods, service_status=preflight.services, force=args.force_extract, dpi=args.dpi, recursive=args.recursive, verbosity=args.verbosity, ) # ── Reconciliation ── if args.reconcile: if not preflight.openai_available: _console_warn("OpenAI is not available — skipping reconciliation.") else: run_reconciliation( output, openai_client=preflight.openai_client, openai_model=preflight.openai_model, force=args.force_reconcile, recursive=args.recursive, verbosity=args.verbosity, ) # ── Entity Extraction ── if args.entities: if not preflight.openai_available: _console_warn("OpenAI is not available — skipping entity extraction.") else: run_entities( output, openai_client=preflight.openai_client, openai_model=preflight.openai_model, embed_provider=preflight.embed_provider, force=args.force_entities, recursive=args.recursive, verbosity=args.verbosity, ) # ── Embedding ── if args.embed: if not preflight.embed_provider: _console_warn("No embedding provider available — skipping embeddings.") else: run_embedding( output, provider=preflight.embed_provider, force=args.force_embed, recursive=args.recursive, verbosity=args.verbosity, ) print(f"\n{GREEN}[✓] Done.{RESET}\n") return 0
if __name__ == "__main__": raise SystemExit(main())