Source code for askemblaex.extract
"""
askemblaex/extract.py
File discovery, per-page text extraction, and processing pipeline.
"""
from __future__ import annotations
import csv
import os
import sys
import time
import traceback
from dataclasses import dataclass, field
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Dict, List, Optional, Union
import fitz # pymupdf
import pdfplumber
from PIL import Image
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
from msrest.authentication import CognitiveServicesCredentials
from .config import get_img_exts
from .env import load_env
from .hash import hash_file
from .logging_ import get_folder_logger
from .metadata import build_metadata, load_metadata, write_metadata
from .pages import METHOD_MAP, save_or_merge_page
# Ensure .env is loaded even when extract is used directly or in subprocesses
load_env()
RED = "\x1b[31m"
YELLOW = "\x1b[33m"
DIM = "\x1b[2m"
RESET = "\x1b[0m"
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _console_error(msg: str, exc: Optional[Exception] = None, verbosity: int = 0) -> None:
"""Always print errors to stderr regardless of verbosity."""
print(f" {RED}[!] {msg}{RESET}", file=sys.stderr)
if exc and verbosity >= 3:
traceback.print_exc()
elif exc and verbosity >= 1:
print(f" {DIM}{type(exc).__name__}: {exc}{RESET}", file=sys.stderr)
def _console_info(msg: str, verbosity: int) -> None:
if verbosity >= 1:
print(f" {msg}")
def _console_debug(msg: str, verbosity: int) -> None:
if verbosity >= 2:
print(f" {DIM}{msg}{RESET}")
# ─────────────────────────────────────────────
# Data types
# ─────────────────────────────────────────────
[docs]
@dataclass
class PageExtraction:
page_index: int
methods: Dict[str, str] = field(default_factory=dict)
[docs]
@dataclass(frozen=True, slots=True)
class ExtractResult:
text: str
source: Optional[str] = None
# ─────────────────────────────────────────────
# Azure Computer Vision helpers
# ─────────────────────────────────────────────
[docs]
def get_azure_cv_client() -> ComputerVisionClient:
endpoint = os.getenv("AZURE_VISION_ENDPOINT", "").strip()
key = os.getenv("AZURE_VISION_KEY", "").strip()
if not endpoint or not key:
raise ValueError("AZURE_VISION_ENDPOINT and AZURE_VISION_KEY must be set.")
return ComputerVisionClient(endpoint, CognitiveServicesCredentials(key))
[docs]
def submit_azure_job(image_data: BytesIO, client: ComputerVisionClient) -> str:
response = client.read_in_stream(image_data, raw=True)
return response.headers["Operation-Location"].split("/")[-1]
[docs]
def check_azure_job(operation_id: str, client: ComputerVisionClient) -> tuple[bool, str]:
result = client.get_read_result(operation_id)
if result.status in [OperationStatusCodes.running, OperationStatusCodes.not_started]:
return False, ""
if result.status == OperationStatusCodes.succeeded:
lines = [
line.text
for page in result.analyze_result.read_results
for line in page.lines
]
return True, "\n".join(lines)
return True, ""
# ─────────────────────────────────────────────
# PDF image + table extraction
# ─────────────────────────────────────────────
[docs]
def extract_pdf_images_pymupdf(
pdf_path: Path,
images_dir: Path,
file_hash: str,
logger,
*,
skip_fullpage: bool = True,
verbosity: int = 0,
) -> None:
"""Extract embedded (non-full-page) images from a PDF into images_dir."""
doc = fitz.open(str(pdf_path))
try:
for page_index in range(len(doc)):
page = doc[page_index]
page_area = page.rect.width * page.rect.height
for img_index, img_info in enumerate(page.get_images(full=True)):
xref = img_info[0]
rects = page.get_image_rects(xref)
if skip_fullpage and rects:
max_area = max(r.width * r.height for r in rects)
if max_area / page_area >= 0.85:
logger.debug("Skipping full-page image page %d xref=%d", page_index, xref)
continue
extracted = doc.extract_image(xref)
img_bytes = extracted.get("image")
img_ext = extracted.get("ext", "bin")
if not img_bytes:
continue
out_name = f"{file_hash}.pymupdf.image.{page_index}.{img_index}.{img_ext}"
(images_dir / out_name).write_bytes(img_bytes)
logger.info("Wrote embedded image %s", out_name)
_console_debug(f"Wrote embedded image {out_name}", verbosity)
finally:
doc.close()
[docs]
def extract_pdf_tables_pdfplumber(
pdf_path: Path,
out_dir: Path,
file_hash: str,
logger,
verbosity: int = 0,
) -> None:
"""Extract tables from a PDF as CSV files into out_dir."""
try:
with pdfplumber.open(str(pdf_path)) as pdf:
for page_index, page in enumerate(pdf.pages):
try:
tables = page.extract_tables() or []
except Exception as e:
_console_error(f"pdfplumber extract_tables failed page {page_index}: {e}",
exc=e, verbosity=verbosity)
logger.exception("pdfplumber extract_tables failed on page %d", page_index)
continue
for t_index, table in enumerate(tables):
try:
out_name = f"{file_hash}.pdfplumber.table.{page_index}.{t_index}.csv"
with (out_dir / out_name).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
for row in table:
w.writerow(["" if c is None else str(c) for c in row])
logger.info("Wrote table %s", out_name)
_console_debug(f"Wrote table {out_name}", verbosity)
except Exception as e:
_console_error(f"Failed writing table page {page_index} table {t_index}: {e}",
exc=e, verbosity=verbosity)
logger.exception("Failed writing table page %d table %d", page_index, t_index)
except Exception as e:
_console_error(f"pdfplumber failed to open {pdf_path.name}: {e}", exc=e, verbosity=verbosity)
logger.exception("pdfplumber failed to open PDF: %s", pdf_path.name)
# ─────────────────────────────────────────────
# Per-page text extraction
# ─────────────────────────────────────────────
[docs]
def extract_page_text(
path: Union[str, Path],
logger,
*,
active_methods: set | None = None,
images_dir: Path | None = None,
pdf_dpi: int = 300,
poll_interval: float = 0.5,
max_wait_time: float = 300.0,
verbosity: int = 0,
) -> List[PageExtraction]:
"""
Extract text from each page of a PDF or image file.
Saves page renders (PNG) and single-page PDFs to images_dir regardless
of whether Azure services are available, so images are always preserved.
Args:
path: Path to the source PDF or image.
logger: Logger instance.
active_methods: Set of method names to run. Defaults to all.
images_dir: Directory to save page renders and single-page PDFs.
pdf_dpi: DPI for PDF-to-image rendering.
poll_interval: Azure CV polling interval in seconds.
max_wait_time: Maximum Azure CV wait time in seconds.
verbosity: Console verbosity level (0-3).
"""
p = Path(path)
is_pdf = p.suffix.lower() == ".pdf"
methods_to_run = set(active_methods) if active_methods is not None else set(METHOD_MAP.values())
logger.info("Extraction started: %s | methods: %s", p.name, sorted(methods_to_run))
_console_debug(f"methods: {sorted(methods_to_run)}", verbosity)
# Init Azure CV client if needed
azure_cv_client = None
if "azure_computer_vision" in methods_to_run:
try:
azure_cv_client = get_azure_cv_client()
except Exception as e:
_console_error(f"Azure CV client init failed — skipping azure_computer_vision: {e}",
exc=e, verbosity=verbosity)
logger.exception("Azure CV client init failed")
methods_to_run = methods_to_run - {"azure_computer_vision"}
results: List[PageExtraction] = []
if is_pdf:
from pdf2image import convert_from_path
import pypdf
mu_doc = None
pl_pdf = None
if "pymupdf" in methods_to_run:
try:
mu_doc = fitz.open(str(p))
except Exception as e:
_console_error(f"PyMuPDF failed to open {p.name}: {e}", exc=e, verbosity=verbosity)
logger.exception("pymupdf failed to open: %s", p.name)
if "pdfplumber" in methods_to_run:
try:
pl_pdf = pdfplumber.open(str(p))
except Exception as e:
_console_error(f"pdfplumber failed to open {p.name}: {e}", exc=e, verbosity=verbosity)
logger.exception("pdfplumber failed to open: %s", p.name)
# ── Render pages to PNG (always — even if Azure CV unavailable) ──
page_images = []
try:
page_images = convert_from_path(str(p), dpi=pdf_dpi)
logger.info("Rendered %d pages at %d DPI", len(page_images), pdf_dpi)
_console_info(f"Rendered {len(page_images)} pages at {pdf_dpi} DPI", verbosity)
if images_dir:
images_dir.mkdir(parents=True, exist_ok=True)
for i, img in enumerate(page_images):
img_path = images_dir / f"page.{i:04d}.png"
if not img_path.exists():
img.save(str(img_path), format="PNG")
_console_debug(f"Saved render {img_path.name}", verbosity)
logger.info("Saved %d page renders to images/", len(page_images))
except Exception as e:
_console_error(f"PDF page rendering failed for {p.name}: {e}", exc=e, verbosity=verbosity)
logger.exception("Failed to render PDF pages: %s", p.name)
# ── Save single-page PDFs for DocInt (always — even if DocInt unavailable) ──
if images_dir and ("azure_docint" in methods_to_run or True):
try:
reader = pypdf.PdfReader(str(p))
for i in range(len(reader.pages)):
pdf_page_path = images_dir / f"page.{i:04d}.pdf"
if not pdf_page_path.exists():
writer = pypdf.PdfWriter()
writer.add_page(reader.pages[i])
with pdf_page_path.open("wb") as f:
writer.write(f)
_console_debug(f"Saved single-page PDF {pdf_page_path.name}", verbosity)
logger.info("Saved %d single-page PDFs to images/", len(reader.pages))
except Exception as e:
_console_error(f"Failed saving single-page PDFs for {p.name}: {e}",
exc=e, verbosity=verbosity)
logger.exception("Failed saving single-page PDFs: %s", p.name)
# ── Phase 1: Submit Azure CV jobs ──
jobs: List[AzureJob] = []
if azure_cv_client and page_images:
_console_info(f"Submitting {len(page_images)} pages to Azure CV...", verbosity)
for i, img in enumerate(page_images):
try:
buf = BytesIO()
img.save(buf, format="PNG")
buf.seek(0)
op_id = submit_azure_job(buf, azure_cv_client)
jobs.append(AzureJob(page_index=i, operation_id=op_id, submitted_at=time.time()))
_console_debug(f"Submitted page {i} to Azure CV", verbosity)
except Exception as e:
_console_error(f"Azure CV submit failed page {i}: {e}", exc=e, verbosity=verbosity)
logger.exception("Azure CV submit failed page %d", i)
jobs.append(AzureJob(page_index=i, operation_id="", submitted_at=time.time()))
# ── Phase 2: Poll Azure CV results ──
pending = [j for j in jobs if j.operation_id]
completed_azure: Dict[int, str] = {}
start = time.time()
while pending:
if time.time() - start > max_wait_time:
_console_error(f"Azure CV timeout — {len(pending)} pages still pending", verbosity=verbosity)
logger.warning("Azure CV timeout. %d jobs still pending.", len(pending))
break
for job in pending[:]:
try:
done, text = check_azure_job(job.operation_id, azure_cv_client)
if done:
completed_azure[job.page_index] = text
pending.remove(job)
_console_debug(
f"Azure CV page {job.page_index} done "
f"({len(completed_azure)}/{len(jobs)})", verbosity)
except Exception as e:
_console_error(f"Azure CV poll failed page {job.page_index}: {e}",
exc=e, verbosity=verbosity)
logger.exception("Azure CV poll failed page %d", job.page_index)
completed_azure[job.page_index] = ""
pending.remove(job)
if pending:
time.sleep(poll_interval)
if jobs:
_console_info(
f"Azure CV complete — {len(completed_azure)}/{len(jobs)} pages", verbosity)
# ── Phase 3: Collect per-page results ──
total_pages = (
len(page_images) or
(len(mu_doc) if mu_doc else 0) or
(len(pl_pdf.pages) if pl_pdf else 0)
)
for i in range(total_pages):
methods: Dict[str, str] = {}
if "azure_computer_vision" in methods_to_run:
methods["azure"] = (completed_azure.get(i) or "").strip()
if "pymupdf" in methods_to_run:
if mu_doc is not None and i < len(mu_doc):
try:
methods["pymupdf"] = (mu_doc[i].get_text("text") or "").strip()
except Exception as e:
_console_error(f"PyMuPDF failed page {i}: {e}", exc=e, verbosity=verbosity)
logger.exception("pymupdf failed on page %d", i)
methods["pymupdf"] = ""
else:
methods["pymupdf"] = ""
if "pdfplumber" in methods_to_run:
if pl_pdf is not None and i < len(pl_pdf.pages):
try:
methods["pdfplumber"] = (pl_pdf.pages[i].extract_text() or "").strip()
except Exception as e:
_console_error(f"pdfplumber failed page {i}: {e}", exc=e, verbosity=verbosity)
logger.exception("pdfplumber failed on page %d", i)
methods["pdfplumber"] = ""
else:
methods["pdfplumber"] = ""
results.append(PageExtraction(page_index=i, methods=methods))
if mu_doc:
try: mu_doc.close()
except Exception: pass
if pl_pdf:
try: pl_pdf.close()
except Exception: pass
# ── Phase 4: Azure Document Intelligence ──
if "azure_docint" in methods_to_run:
docint_endpoint = os.getenv("AZURE_DOCINT_ENDPOINT", "").strip()
docint_key = os.getenv("AZURE_DOCINT_KEY", "").strip()
if docint_endpoint and docint_key:
_console_info("Running Azure Document Intelligence...", verbosity)
try:
from .azure_hooks import docint_analyze_pdf_layout_kv
docint_results = docint_analyze_pdf_layout_kv(
endpoint=docint_endpoint,
key=docint_key,
pdf=p,
logger=logger,
)
for i, di_result in docint_results.items():
if i < len(results):
results[i].methods["azure_docint"] = di_result.content
_console_info(
f"Azure DocInt complete — {len(docint_results)} pages", verbosity)
logger.info("Azure DocInt completed for %d pages", len(docint_results))
except Exception as e:
_console_error(f"Azure DocInt failed: {e}", exc=e, verbosity=verbosity)
logger.exception("Azure DocInt extraction failed")
else:
logger.warning("azure_docint requested but credentials not set, skipping")
logger.info("Extraction complete: %d pages", len(results))
return results
# ── Single image file ──
try:
with Image.open(p) as img:
if images_dir:
images_dir.mkdir(parents=True, exist_ok=True)
img_save_path = images_dir / f"page.0000{p.suffix}"
if not img_save_path.exists():
img.save(str(img_save_path))
if azure_cv_client:
buf = BytesIO()
img.save(buf, format=img.format or "PNG")
buf.seek(0)
try:
op_id = submit_azure_job(buf, azure_cv_client)
_console_debug(f"Submitted image to Azure CV: {op_id}", verbosity)
start = time.time()
while time.time() - start < max_wait_time:
done, text = check_azure_job(op_id, azure_cv_client)
if done:
results.append(PageExtraction(page_index=0,
methods={"azure": text.strip()}))
_console_info(
f"Azure CV extracted {len(text)} chars from image", verbosity)
break
time.sleep(poll_interval)
else:
_console_error("Azure CV timeout on image", verbosity=verbosity)
results.append(PageExtraction(page_index=0, methods={"azure": ""}))
except Exception as e:
_console_error(f"Azure CV failed on image {p.name}: {e}",
exc=e, verbosity=verbosity)
logger.exception("Azure CV failed on image: %s", p.name)
results.append(PageExtraction(page_index=0, methods={"azure": ""}))
except Exception as e:
_console_error(f"Fatal error opening image {p.name}: {e}", exc=e, verbosity=verbosity)
logger.exception("Fatal error opening image: %s", p)
return results
# ─────────────────────────────────────────────
# Discovery
# ─────────────────────────────────────────────
PDF_EXTS = {".pdf"}
# Subfolders created by askemblaex inside output directories — never walk into these
_SKIP_SUBDIRS = {"pages", "images", "logs"}
def _is_hash_folder(path: Path) -> bool:
"""Return True if the directory looks like a hash-keyed output folder."""
name = path.name
# SHA-256 hex digest = 64 chars, SHA-1 = 40 chars, MD5 = 32 chars
return len(name) in {32, 40, 64} and all(c in "0123456789abcdef" for c in name.lower())
[docs]
def discover_files(root: Path, *, recursive: bool = False) -> list[Path]:
"""
Discover all PDF and image files under *root*.
By default only searches the top level of *root*. Pass
``recursive=True`` to walk subdirectories, which will skip any
directory that looks like an askemblaex output folder (hash-keyed
folders, ``pages/``, ``images/``, ``logs/``).
The set of recognised image extensions is controlled by the
``ASKEMBLAEX_IMG_EXTS`` environment variable (comma-separated).
Args:
root: Root directory to search.
recursive: If True, walk subdirectories (skipping output folders).
Returns:
List of Path objects for every matching file found.
"""
allowed = PDF_EXTS | get_img_exts()
out: list[Path] = []
if not recursive:
# Top level only
for p in root.iterdir():
if p.is_file() and p.suffix.lower() in allowed:
out.append(p)
return out
# Recursive — skip output subfolders
for dirpath, dirnames, filenames in os.walk(root):
current = Path(dirpath)
# Prune subdirs in-place so os.walk won't descend into them
dirnames[:] = [
d for d in dirnames
if d not in _SKIP_SUBDIRS
and not _is_hash_folder(current / d)
]
for name in filenames:
if Path(name).suffix.lower() in allowed:
out.append(current / name)
return out
# ─────────────────────────────────────────────
# Processing pipeline
# ─────────────────────────────────────────────
[docs]
def process_all(source_root: Path, output_root: Path, logger) -> None:
"""
Extract all PDF and image files found under *source_root*.
Args:
source_root: Root directory containing source files.
output_root: Root directory for extracted output.
logger: Logger instance.
"""
for src_path in discover_files(source_root, recursive=False):
process_one(src_path, output_root, logger)
[docs]
def process_one(
src_path: Path,
output_root: Path,
logger,
*,
active_methods: set[str] | None = None,
service_status: dict | None = None,
force: bool = False,
pdf_dpi: int = 300,
verbosity: int = 0,
) -> None:
"""
Extract text from a single source file into the output folder.
Args:
src_path: Path to the source PDF or image.
output_root: Root output directory.
logger: Logger instance.
active_methods: Set of method names to run. Defaults to all.
service_status: Dict of ServiceStatus from preflight, used to annotate metadata.
force: If True, reprocess even if extraction.complete=true.
pdf_dpi: DPI for PDF page rendering.
verbosity: Console verbosity level (0-3).
"""
all_methods = set(METHOD_MAP.values())
methods = active_methods if active_methods is not None else all_methods
file_hash = hash_file(src_path)
out_dir = output_root / file_hash
# Skip if already complete and not forcing
if not force and out_dir.exists():
existing = load_metadata(out_dir, file_hash)
if existing and existing.get("extraction", {}).get("complete"):
logger.info("Already completed, skipping: %s", src_path.name)
_console_debug(f"Skipping {src_path.name} (already complete)", verbosity)
return
out_dir.mkdir(parents=True, exist_ok=True)
images_dir = out_dir / "images"
images_dir.mkdir(parents=True, exist_ok=True)
folder_logger = get_folder_logger(out_dir, log_name=file_hash)
folder_logger.info("Starting extraction: %s", src_path.name)
folder_logger.info("Active methods: %s", sorted(methods))
# Load existing metadata or build fresh
existing_meta = load_metadata(out_dir, file_hash)
metadata = existing_meta if (existing_meta and not force) else build_metadata(src_path, file_hash=file_hash)
metadata["extraction"]["started_utc"] = _utc_now()
# Annotate service availability from preflight
if service_status:
metadata["extraction"]["services"] = {
name: ("ok" if svc.available else f"unavailable: {svc.reason}")
for name, svc in service_status.items()
}
write_metadata(out_dir, file_hash, metadata)
if src_path.suffix.lower() == ".pdf":
# Embedded images → images/
if force or not metadata["extraction"]["steps"].get("images"):
extract_pdf_images_pymupdf(
src_path, images_dir, file_hash, folder_logger,
verbosity=verbosity)
metadata["extraction"]["steps"]["images"] = True
write_metadata(out_dir, file_hash, metadata)
# Tables → out_dir root
if force or not metadata["extraction"]["steps"].get("tables"):
extract_pdf_tables_pdfplumber(
src_path, out_dir, file_hash, folder_logger,
verbosity=verbosity)
metadata["extraction"]["steps"]["tables"] = True
write_metadata(out_dir, file_hash, metadata)
# Per-page text extraction
page_results = extract_page_text(
src_path,
logger=folder_logger,
active_methods=methods,
images_dir=images_dir,
pdf_dpi=pdf_dpi,
verbosity=verbosity,
)
for page_result in page_results:
data = {}
for method, text in page_result.methods.items():
schema_method = METHOD_MAP.get(method, method)
if schema_method not in methods:
continue
data[schema_method] = {
"text": text,
"method": schema_method,
"extracted_at": _utc_now(),
}
if data:
save_or_merge_page(out_dir, file_hash, page_result.page_index, data)
_console_debug(
f"Wrote page {page_result.page_index:04d} "
f"({', '.join(data.keys())})", verbosity)
# Stamp completed steps
for method in methods:
metadata["extraction"]["steps"][method] = True
docint_ran = any("azure_docint" in r.methods for r in page_results)
if docint_ran:
metadata["extraction"]["steps"]["azure_docint"] = True
# Mark complete if all requested non-reconcile steps are done
required = {"azure_computer_vision", "pymupdf", "pdfplumber"} & methods
if all(metadata["extraction"]["steps"].get(m) for m in required):
metadata["extraction"]["complete"] = True
metadata["extraction"]["completed_utc"] = _utc_now()
write_metadata(out_dir, file_hash, metadata)
folder_logger.info("Extraction completed: %s", src_path.name)
[docs]
def extract_text(data: str, *, source: Optional[str] = None) -> ExtractResult:
return ExtractResult(text=data, source=source)