Source code for askemblaex.entities
"""
askemblaex/entities.py
Structured entity extraction for reconciled page text.
Uses a dynamic context window around each page and calls OpenAI to extract
structured entities (persons, events, claims, places) from genealogical
documents.
For each page two outputs are written (both under pages/):
1. Merged into the existing page JSON under extractions.entities
2. One person file per person mention:
<doc_id>.person.<page:04d>.<idx:04d>.json
Environment variables:
OPENAI_KEY OpenAI API key
OPENAI_ENTITY_MODEL Model for entity extraction; falls back to OPENAI_MODEL then gpt-4o.
OPENAI_EMODEL OpenAI embedding model (used when embed_provider='openai')
OLLAMA_EMODEL Ollama embedding model (used when embed_provider='ollama')
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from openai import OpenAI
from .env import load_env
from .pages import get_page_number, save_or_merge_page
from .window import build_dynamic_extraction_window
load_env()
log = logging.getLogger("askemblaex.entities")
RED = "\x1b[31m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
DIM = "\x1b[2m"
RESET = "\x1b[0m"
# ─────────────────────────────────────────────
# Config
# ─────────────────────────────────────────────
DEFAULT_ENTITY_MODEL = "gpt-4o"
ENTITY_SYSTEM_PROMPT = """\
You are an expert genealogical records analyst. Extract all structured entities \
from the historical document text provided.
The input contains one or more pages. The TARGET page is marked \
"=== PAGE N (TARGET) ===". Context pages (if present) provide surrounding \
information to help resolve names and relationships; extract entities from the \
TARGET page only.
Output ONLY a valid JSON object with this exact schema — no commentary, no \
markdown code fences:
{
"persons": [
{
"name": "Full name as written in the document",
"alt_names": ["maiden name", "alias", "spelling variant"],
"summary": "One sentence describing who this person is in this record.",
"roles": ["head of household", "witness", "deceased", ...],
"birth": {"date_text": "12 Jan 1842", "place": "County Cork, Ireland"},
"death": {"date_text": "1910", "place": "Melbourne, Victoria"},
"residences": ["Place Name", ...],
"relationships": {
"parents": ["Father Name", "Mother Name"],
"spouse": ["Spouse Name"],
"children": ["Child Name", ...],
"siblings": ["Sibling Name", ...]
},
"attributes": ["occupation or trade", "religion", "nationality", ...],
"events": ["Short description of a life event", ...],
"evidence_phrases": ["short direct quote from source text", ...]
}
],
"events": [
{
"type": "marriage",
"date_text": "1865",
"place": "Dublin",
"people": ["Person A", "Person B"],
"details": ["detail 1", ...]
}
],
"claims": [
{
"claim_type": "parentage",
"people": ["Person A", "Person B"],
"supporting_facts": ["supporting fact", ...],
"conflicts": ["conflicting evidence", ...]
}
],
"places": ["Place Name", ...],
"notes": ["Any noteworthy observation about the document itself", ...]
}
Rules:
- Extract only what is explicitly stated or strongly implied in the TARGET page.
- Use the exact spelling of names as they appear in the document.
- Keep evidence_phrases as short direct quotes, under 20 words each.
- Use null for unknown date/place fields; use [] for empty lists.
- Output ONLY the JSON object.
"""
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _get_entity_model() -> str:
return (
os.getenv("OPENAI_ENTITY_MODEL", "").strip()
or os.getenv("OPENAI_MODEL", "").strip()
or DEFAULT_ENTITY_MODEL
)
def _norm_key(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^a-z0-9]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
return s
def _embedding_model_name(provider: str) -> str:
if provider == "ollama":
return os.getenv("OLLAMA_EMODEL", "")
return os.getenv("OPENAI_EMODEL", "")
def _console_error(msg: str, exc: Optional[Exception] = None, verbosity: int = 0) -> None:
print(f" {RED}[!] {msg}{RESET}", file=sys.stderr)
if exc and verbosity >= 3:
import traceback
traceback.print_exc()
elif exc and verbosity >= 1:
print(f" {DIM}{type(exc).__name__}: {exc}{RESET}", file=sys.stderr)
def _console_warn(msg: str) -> None:
print(f" {YELLOW}[~] {msg}{RESET}")
def _console_info(msg: str, verbosity: int) -> None:
if verbosity >= 1:
print(f" {msg}")
def _console_debug(msg: str, verbosity: int) -> None:
if verbosity >= 2:
print(f" {DIM}{msg}{RESET}")
# ─────────────────────────────────────────────
# OpenAI entity extraction
# ─────────────────────────────────────────────
def _strip_json_fences(text: str) -> str:
"""Remove markdown code fences that the model may accidentally include."""
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```[a-z]*\n?", "", text)
text = re.sub(r"\n?```$", "", text)
return text.strip()
[docs]
def call_entity_extraction(
window_text: str,
*,
client: OpenAI,
model: str,
) -> Optional[Dict[str, Any]]:
"""
Call OpenAI to extract structured entities from *window_text*.
Returns the parsed JSON dict, or None on failure.
"""
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": ENTITY_SYSTEM_PROMPT},
{"role": "user", "content": window_text},
],
temperature=0.0,
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content or ""
raw = _strip_json_fences(raw)
try:
return json.loads(raw)
except json.JSONDecodeError as e:
log.error("Entity extraction: JSON parse failed — %s\nRaw: %.200s", e, raw)
return None
# ─────────────────────────────────────────────
# Person embedding text builder
# ─────────────────────────────────────────────
[docs]
def build_person_embeddings(extraction_json: str) -> List[str]:
"""
Build embedding-ready strings (one per person) from structured extraction JSON.
Expected top-level keys:
persons: list of person objects
events: list of global event objects (optional)
claims: list of global claim objects (optional)
Returns:
List of formatted strings, one per person, suitable for embedding.
"""
data: Dict[str, Any] = json.loads(extraction_json)
persons: List[Dict[str, Any]] = data.get("persons", []) or []
global_events: List[Dict[str, Any]] = data.get("events", []) or []
global_claims: List[Dict[str, Any]] = data.get("claims", []) or []
def _clean(s: Optional[str]) -> Optional[str]:
if s is None:
return None
s = re.sub(r"\s+", " ", str(s)).strip()
s = "".join(ch for ch in s if ch >= " " and ch != "\u007f")
return s or None
def _clean_list(xs: Any) -> List[str]:
if not xs:
return []
out: List[str] = []
for x in (xs if isinstance(xs, list) else [xs]):
cx = _clean(x)
if cx:
out.append(cx)
seen: set[str] = set()
deduped: List[str] = []
for x in out:
if x not in seen:
seen.add(x)
deduped.append(x)
return deduped
def _safe_get(d: Dict[str, Any], key: str, default: Any) -> Any:
v = d.get(key, default)
return default if v is None else v
def _find_years(texts: List[str]) -> Tuple[Optional[int], Optional[int]]:
years: List[int] = []
for t in texts:
for m in re.findall(r"\b(1[6-9]\d{2}|20\d{2}|21\d{2})\b", t):
try:
years.append(int(m))
except ValueError:
pass
if not years:
return None, None
return min(years), max(years)
def _globals_for_person(
person_name: str,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
evs = [e for e in global_events if person_name in (e.get("people") or [])]
cls = [c for c in global_claims if person_name in (c.get("people") or [])]
return evs, cls
embeddings: List[str] = []
for p in persons:
name = _clean(p.get("name"))
summary = _clean(p.get("summary", ""))
if not name:
continue
alt_names = _clean_list(p.get("alt_names"))
roles = _clean_list(p.get("roles"))
attributes = _clean_list(p.get("attributes"))
residences = _clean_list(p.get("residences"))
person_events = _clean_list(p.get("events"))
evidence_phrases = _clean_list(p.get("evidence_phrases"))
birth = p.get("birth") or {}
death = p.get("death") or {}
birth_date = _clean(birth.get("date_text"))
birth_place = _clean(birth.get("place"))
death_date = _clean(death.get("date_text"))
death_place = _clean(death.get("place"))
rel = p.get("relationships") or {}
parents = _clean_list(rel.get("parents"))
spouses = _clean_list(rel.get("spouse"))
children = _clean_list(rel.get("children"))
siblings = _clean_list(rel.get("siblings"))
evs_for_person, claims_for_person = _globals_for_person(name)
lines: List[str] = []
lines.append("TYPE: Person")
lines.append(f"NAME: {name}")
lines.append(f"ALT_NAMES: {', '.join(alt_names) if alt_names else 'none'}")
if summary:
lines.append(f"SUMMARY: {summary}")
vitals_parts: List[str] = []
if birth_date or birth_place:
vitals_parts.append(
"born " + " ".join(x for x in [birth_date, birth_place] if x)
)
if death_date or death_place:
vitals_parts.append(
"died " + " ".join(x for x in [death_date, death_place] if x)
)
lines.append(f"VITALS: {'; '.join(vitals_parts) if vitals_parts else 'unknown'}")
if roles:
lines.append(f"ROLES: {', '.join(roles)}")
if attributes:
lines.append(f"ATTRIBUTES: {', '.join(attributes)}")
if residences:
lines.append(f"RESIDENCES: {', '.join(residences)}")
rel_lines: List[str] = []
if spouses:
rel_lines.append(f"- spouse: {', '.join(spouses)}")
if parents:
rel_lines.append(f"- parents: {', '.join(parents)}")
if children:
rel_lines.append(f"- children: {', '.join(children)}")
if siblings:
rel_lines.append(f"- siblings: {', '.join(siblings)}")
if rel_lines:
lines.append("RELATIONSHIPS:")
lines.extend(rel_lines)
event_lines: List[str] = []
for e in person_events:
event_lines.append(f"- {e}")
for e in evs_for_person:
etype = _clean(e.get("type")) or "event"
dtext = _clean(e.get("date_text"))
pl = _clean(e.get("place"))
people = _clean_list(e.get("people"))
details = _clean_list(e.get("details"))
parts = [etype]
if dtext:
parts.append(dtext)
if pl:
parts.append(pl)
if people:
others = [x for x in people if x != name]
if others:
parts.append("with " + ", ".join(others))
line = " \u2014 ".join(parts)
if details:
line += " | " + "; ".join(details[:2])
event_lines.append(f"- {line}")
if event_lines:
lines.append("EVENTS:")
seen: set[str] = set()
for ln in event_lines:
if ln not in seen:
seen.add(ln)
lines.append(ln)
claim_lines: List[str] = []
for c in claims_for_person:
ctype = _clean(c.get("claim_type")) or "claim"
people = _clean_list(c.get("people"))
support = _clean_list(c.get("supporting_facts"))
conflicts = _clean_list(c.get("conflicts"))
others = [x for x in people if x != name]
base = f"- {ctype}"
if others:
base += f" involving {', '.join(others)}"
if support:
base += f": {support[0]}"
if conflicts:
base += f" (conflict: {conflicts[0]})"
claim_lines.append(base)
if claim_lines:
lines.append("CLAIMS:")
lines.extend(claim_lines)
if evidence_phrases:
lines.append("EVIDENCE_ANCHORS:")
for ph in evidence_phrases[:3]:
lines.append(f"- {ph}")
year_start, year_end = _find_years(
[birth_date or "", death_date or ""] + residences + person_events
)
if year_start or year_end:
if year_start == year_end:
lines.append(f"YEAR_HINT: {year_start}")
else:
lines.append(f"YEAR_HINT: {year_start}\u2013{year_end}")
embeddings.append("\n".join(lines).strip())
return embeddings
# ─────────────────────────────────────────────
# Page-level entity extraction
# ─────────────────────────────────────────────
[docs]
def entity_extract_page_file(
page_file: Path,
parent_folder: Path,
page_num: int,
all_page_texts: Dict[int, str],
*,
client: OpenAI,
model: str,
embed_provider: Optional[str] = None,
force: bool = False,
verbosity: int = 0,
) -> bool:
"""
Extract structured entities from one page and write results to pages/.
Reads the reconciled text from *page_file*, builds a context window,
calls OpenAI, stores entity data back into the page file under
``extractions.entities``, and writes one person JSON file per person
mention into the same ``pages/`` directory.
Args:
page_file: Path to the existing page JSON file.
parent_folder: Hash-keyed document folder (parent of ``pages/``).
page_num: Zero-based page number.
all_page_texts: Dict of page_number → reconciled text for windowing.
client: Configured OpenAI client.
model: OpenAI model name for entity extraction.
embed_provider: Embedding provider (``'ollama'``, ``'openai'``, or
``None`` to skip embeddings).
force: Re-extract even if already done with same model.
verbosity: Console verbosity level (0–3).
Returns:
True if extraction ran, False if skipped.
"""
try:
page_data = json.loads(page_file.read_text(encoding="utf-8"))
except Exception as e:
_console_error(f"Failed to load {page_file.name}: {e}", exc=e, verbosity=verbosity)
return False
doc_id = page_data.get("doc_id", parent_folder.name)
extractions = page_data.get("extractions", {})
existing = extractions.get("entities", {})
# Skip if already extracted with same model.
if not force and existing.get("raw") and existing.get("model") == model:
_console_debug(
f"page.{page_num:04d} already has entities (model={model}), skipping",
verbosity,
)
return False
# Need reconciled text to proceed.
reconciled_text = (extractions.get("reconciled", {}).get("text") or "").strip()
if not reconciled_text:
_console_warn(f"page.{page_num:04d} — no reconciled text, skipping")
log.warning("No reconciled text for page %d in %s", page_num, doc_id)
return False
# Build context window.
window = build_dynamic_extraction_window(pages=all_page_texts, anchor_page=page_num)
_console_debug(
f"page.{page_num:04d} window: {window.char_count} chars "
f"pages={window.pages_included}",
verbosity,
)
# Call OpenAI.
_console_debug(f"page.{page_num:04d} calling entity extraction (model={model})...", verbosity)
try:
structured = call_entity_extraction(window.text, client=client, model=model)
except Exception as e:
_console_error(
f"Entity extraction failed page {page_num}: {e}", exc=e, verbosity=verbosity
)
log.error("Entity extraction failed page %d: %s", page_num, e)
return False
if structured is None:
_console_error(f"page.{page_num:04d} entity extraction returned no data", verbosity=verbosity)
return False
persons: List[Dict[str, Any]] = structured.get("persons") or []
now = _utc_now()
# Build person embedding texts.
try:
person_texts = build_person_embeddings(json.dumps(structured))
except Exception as e:
_console_error(
f"build_person_embeddings failed page {page_num}: {e}", exc=e, verbosity=verbosity
)
log.error("build_person_embeddings failed page %d: %s", page_num, e)
person_texts = []
# Write one person file per person into pages/.
pages_dir = parent_folder / "pages"
person_file_names: List[str] = []
for i, person_text in enumerate(person_texts):
person_name = (
persons[i].get("name") if i < len(persons) else "unknown"
) or "unknown"
base_key = f"person:{person_name}:{doc_id}:p{page_num}:n{i}"
if len(base_key) > 240:
base_key = "person:" + hashlib.sha1(base_key.encode("utf-8")).hexdigest()
embedding_values: list[float] = []
embedding_model: Optional[str] = None
if embed_provider:
try:
from .embed import generate_embedding
embedding_values = generate_embedding(person_text, embed_provider)
embedding_model = _embedding_model_name(embed_provider)
_console_debug(
f" person {i} embedded (dim={len(embedding_values)})", verbosity
)
except Exception as e:
_console_error(
f"Embedding failed person {i} page {page_num}: {e}",
exc=e, verbosity=verbosity,
)
log.error("Embedding failed person %d page %d: %s", i, page_num, e)
person_data = {
"_key": _norm_key(base_key),
"type": {"entity": "person", "source": "mention"},
"process_version": 2,
"schema_version": 4,
"method": "windowed_default",
"datetime": now,
"person_name": person_name,
"source_document": doc_id,
"page_num": page_num,
"person_num": i,
"page_text": all_page_texts.get(page_num, ""),
"window_text": window.text,
"embedding_text": person_text,
"embedding_model": embedding_model,
"embedding": embedding_values,
"structure": structured,
}
person_file = pages_dir / f"{doc_id}.person.{page_num:04d}.{i:04d}.json"
person_file.write_text(
json.dumps(person_data, indent=2, ensure_ascii=False), encoding="utf-8"
)
person_file_names.append(person_file.name)
_console_debug(f" wrote {person_file.name}", verbosity)
log.info(
"Wrote %d person file(s) for page %d in %s", len(person_file_names), page_num, doc_id
)
# Merge entity results into the page file.
save_or_merge_page(parent_folder, doc_id, page_num, {
"entities": {
"raw": structured,
"model": model,
"extracted_at": now,
"person_count": len(persons),
"window_pages": window.pages_included,
"person_files": person_file_names,
}
})
return True
# ─────────────────────────────────────────────
# Folder-level entity extraction
# ─────────────────────────────────────────────
[docs]
def entity_extract_folder(
folder: Path,
*,
client: OpenAI,
model: str,
embed_provider: Optional[str] = None,
force: bool = False,
verbosity: int = 0,
) -> tuple[int, int]:
"""
Extract structured entities for all reconciled pages in a document folder.
Scans ``<folder>/pages/`` for page files (``*.page.*.json``), loads their
reconciled text for windowing, then calls
:func:`entity_extract_page_file` for each page that needs processing.
Args:
folder: Hash-keyed document folder containing ``pages/``.
client: Configured OpenAI client.
model: OpenAI model name for entity extraction.
embed_provider: Embedding provider or ``None`` to skip embeddings.
force: Re-extract even if already done with same model.
verbosity: Console verbosity level (0–3).
Returns:
A ``(extracted_count, skipped_count)`` tuple.
"""
pages_dir = folder / "pages"
if not pages_dir.is_dir():
log.debug("No pages/ folder in %s", folder.name)
return 0, 0
doc_id = folder.name
# Only page files, not person files or other outputs.
all_files = sorted(pages_dir.glob("*.json"))
page_files = [f for f in all_files if ".page." in f.stem]
if not page_files:
log.debug("No page files in %s", pages_dir)
return 0, 0
log.info(
"Entity extraction for %s — %d pages model=%s", folder.name, len(page_files), model
)
if verbosity >= 1:
print(f"\n[>] {folder.name} — {len(page_files)} pages")
# First pass: collect reconciled text from every page for windowing.
all_page_texts: Dict[int, str] = {}
for pf in page_files:
page_num = get_page_number(pf)
if page_num is None:
continue
try:
data = json.loads(pf.read_text(encoding="utf-8"))
text = (
data.get("extractions", {})
.get("reconciled", {})
.get("text") or ""
).strip()
if text:
all_page_texts[page_num] = text
except Exception as e:
log.warning("Could not read reconciled text from %s: %s", pf.name, e)
if not all_page_texts:
_console_warn(f"{folder.name} — no reconciled text found, skipping")
log.warning("No reconciled text found in %s", folder.name)
return 0, 0
# Second pass: extract entities for each page.
extracted_count = 0
skipped_count = 0
for pf in page_files:
page_num = get_page_number(pf)
if page_num is None:
log.warning("Could not parse page number from %s, skipping", pf.name)
continue
ran = entity_extract_page_file(
pf,
folder,
page_num,
all_page_texts,
client=client,
model=model,
embed_provider=embed_provider,
force=force,
verbosity=verbosity,
)
if ran:
extracted_count += 1
if verbosity >= 1:
print(f" {GREEN}[✓]{RESET} page.{page_num:04d}")
else:
skipped_count += 1
if verbosity >= 2:
print(f" {DIM}[~] page.{page_num:04d} skipped{RESET}")
# Stamp metadata.
if extracted_count > 0:
from .metadata import load_metadata, write_metadata
metadata = load_metadata(folder, doc_id)
if metadata:
metadata["extraction"]["steps"]["entities"] = True
write_metadata(folder, doc_id, metadata)
log.info("Stamped entities=true in metadata for %s", folder.name)
return extracted_count, skipped_count