Coverage for src \ truenex_memory \ ingestion \ engine.py: 83%
71 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Ingestion engine: load manifest, parse sources, index into repository."""
3from __future__ import annotations
5from pathlib import Path
6import json
7import tempfile
9from truenex_memory.core.chunker import chunk_text
10from truenex_memory.ingestion.manifest import (
11 PARSE_LATER_SOURCE_TYPES,
12 IngestionRecord,
13 SourceManifest,
14)
15from truenex_memory.ingestion.parsers import get_parser
16from truenex_memory.store.repository import MemoryRepository
19def ingest_manifest(
20 manifest_path: Path,
21 project_root: Path,
22 repository: MemoryRepository,
23 *,
24 dry_run: bool = False,
25) -> dict[str, object]:
26 """Run ingestion from a source manifest.
28 Args:
29 manifest_path: Path to the manifest JSON file.
30 project_root: Root directory for resolving relative source paths.
31 repository: MemoryRepository for upserting documents.
32 dry_run: If True, report only without modifying the database.
34 Returns:
35 Report dict with keys: index_now, parse_later, skipped, errors.
36 """
37 report: dict[str, list[dict[str, object]]] = {
38 "index_now": [],
39 "parse_later": [],
40 "skipped": [],
41 "errors": [],
42 }
44 manifest_dir = manifest_path.parent.resolve()
45 root = project_root.resolve()
47 try:
48 manifest = SourceManifest.from_path(manifest_path)
49 except (FileNotFoundError, ValueError) as exc:
50 report["errors"].append({"source_path": str(manifest_path), "error": str(exc)})
51 return report
53 for entry in manifest.sources:
54 if entry.source_type in PARSE_LATER_SOURCE_TYPES:
55 report["parse_later"].append(
56 {
57 "source_type": entry.source_type,
58 "source_path": entry.source_path,
59 "source_tool": entry.source_tool,
60 "reason": "parser not yet implemented",
61 }
62 )
63 continue
65 parser = get_parser(entry.source_type)
66 if parser is None:
67 report["skipped"].append(
68 {
69 "source_type": entry.source_type,
70 "source_path": entry.source_path,
71 "source_tool": entry.source_tool,
72 "reason": f"no parser registered for {entry.source_type!r}",
73 }
74 )
75 continue
77 source_dir = _resolve_source_dir(entry.source_path, manifest_dir, root)
79 try:
80 records = parser(
81 source_dir,
82 manifest.project,
83 entry.source_tool,
84 entry.privacy_scope,
85 )
86 except Exception as exc:
87 report["errors"].append(
88 {
89 "source_type": entry.source_type,
90 "source_path": entry.source_path,
91 "source_tool": entry.source_tool,
92 "error": f"{type(exc).__name__}: {exc}",
93 }
94 )
95 continue
97 for record in records:
98 report_item = _record_report_item(record)
99 report["index_now"].append(report_item)
100 if not dry_run:
101 try:
102 _index_record(record, root, repository)
103 except Exception as exc:
104 report["errors"].append(
105 {
106 "source_type": record.source_type,
107 "source_path": record.source_path,
108 "source_tool": record.source_tool,
109 "error": f"{type(exc).__name__}: {exc}",
110 }
111 )
112 report["index_now"].remove(report_item)
114 return report
117def _resolve_source_dir(source_path: str, manifest_dir: Path, project_root: Path) -> Path:
118 """Resolve a source path from the manifest.
120 Relative paths are resolved against the manifest directory first,
121 then the project root. Absolute paths are used as-is.
122 """
123 candidate = Path(source_path)
124 if candidate.is_absolute():
125 return candidate
127 # Try manifest-relative first
128 manifest_relative = (manifest_dir / candidate).resolve()
129 if manifest_relative.exists():
130 return manifest_relative
132 # Fall back to project-root-relative
133 project_relative = (project_root / candidate).resolve()
134 return project_relative
137def _record_report_item(record: IngestionRecord) -> dict[str, object]:
138 return {
139 "project": record.project,
140 "source_type": record.source_type,
141 "source_path": record.source_path,
142 "source_tool": record.source_tool,
143 "privacy_scope": record.privacy_scope,
144 "chars": len(record.text),
145 "session_id": record.session_id,
146 }
149def _index_record(record: IngestionRecord, project_root: Path, repository: MemoryRepository) -> None:
150 """Index a single ingestion record into the repository.
152 Writes text to a temporary file so it can flow through the standard
153 upsert_document path. The source_path stored in the DB is the logical
154 path from the ingestion record.
155 """
156 indexed_text = _record_text(record)
157 chunks = chunk_text(indexed_text)
158 if not chunks:
159 return
161 with tempfile.NamedTemporaryFile(
162 mode="w",
163 suffix=".txt",
164 encoding="utf-8",
165 delete=False,
166 ) as tmp:
167 tmp.write(indexed_text)
168 tmp_path = Path(tmp.name)
170 try:
171 repository.upsert_document(
172 path=tmp_path,
173 relative_path=record.source_path,
174 chunks=chunks,
175 )
176 finally:
177 try:
178 tmp_path.unlink()
179 except OSError:
180 pass
183def _record_text(record: IngestionRecord) -> str:
184 """Return index text with a small metadata preamble.
186 SQLite currently stores document path and chunk text but not arbitrary
187 ingestion metadata. Keeping a compact preamble makes project/source/session
188 fields searchable without changing the schema yet.
189 """
190 metadata = {
191 "project": record.project,
192 "source_type": record.source_type,
193 "source_tool": record.source_tool,
194 "source_path": record.source_path,
195 "session_id": record.session_id,
196 "created_at": record.created_at,
197 "last_modified": record.last_modified,
198 "privacy_scope": record.privacy_scope,
199 **record.metadata,
200 }
201 metadata = {key: value for key, value in metadata.items() if value not in (None, "")}
202 preamble = json.dumps(metadata, ensure_ascii=False, sort_keys=True)
203 return f"TRUENEX_INGESTION_METADATA {preamble}\n\n{record.text}"