Coverage for src \ truenex_memory \ ingestion \ engine.py: 83%

71 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Ingestion engine: load manifest, parse sources, index into repository.""" 

2 

3from __future__ import annotations 

4 

5from pathlib import Path 

6import json 

7import tempfile 

8 

9from truenex_memory.core.chunker import chunk_text 

10from truenex_memory.ingestion.manifest import ( 

11 PARSE_LATER_SOURCE_TYPES, 

12 IngestionRecord, 

13 SourceManifest, 

14) 

15from truenex_memory.ingestion.parsers import get_parser 

16from truenex_memory.store.repository import MemoryRepository 

17 

18 

19def ingest_manifest( 

20 manifest_path: Path, 

21 project_root: Path, 

22 repository: MemoryRepository, 

23 *, 

24 dry_run: bool = False, 

25) -> dict[str, object]: 

26 """Run ingestion from a source manifest. 

27 

28 Args: 

29 manifest_path: Path to the manifest JSON file. 

30 project_root: Root directory for resolving relative source paths. 

31 repository: MemoryRepository for upserting documents. 

32 dry_run: If True, report only without modifying the database. 

33 

34 Returns: 

35 Report dict with keys: index_now, parse_later, skipped, errors. 

36 """ 

37 report: dict[str, list[dict[str, object]]] = { 

38 "index_now": [], 

39 "parse_later": [], 

40 "skipped": [], 

41 "errors": [], 

42 } 

43 

44 manifest_dir = manifest_path.parent.resolve() 

45 root = project_root.resolve() 

46 

47 try: 

48 manifest = SourceManifest.from_path(manifest_path) 

49 except (FileNotFoundError, ValueError) as exc: 

50 report["errors"].append({"source_path": str(manifest_path), "error": str(exc)}) 

51 return report 

52 

53 for entry in manifest.sources: 

54 if entry.source_type in PARSE_LATER_SOURCE_TYPES: 

55 report["parse_later"].append( 

56 { 

57 "source_type": entry.source_type, 

58 "source_path": entry.source_path, 

59 "source_tool": entry.source_tool, 

60 "reason": "parser not yet implemented", 

61 } 

62 ) 

63 continue 

64 

65 parser = get_parser(entry.source_type) 

66 if parser is None: 

67 report["skipped"].append( 

68 { 

69 "source_type": entry.source_type, 

70 "source_path": entry.source_path, 

71 "source_tool": entry.source_tool, 

72 "reason": f"no parser registered for {entry.source_type!r}", 

73 } 

74 ) 

75 continue 

76 

77 source_dir = _resolve_source_dir(entry.source_path, manifest_dir, root) 

78 

79 try: 

80 records = parser( 

81 source_dir, 

82 manifest.project, 

83 entry.source_tool, 

84 entry.privacy_scope, 

85 ) 

86 except Exception as exc: 

87 report["errors"].append( 

88 { 

89 "source_type": entry.source_type, 

90 "source_path": entry.source_path, 

91 "source_tool": entry.source_tool, 

92 "error": f"{type(exc).__name__}: {exc}", 

93 } 

94 ) 

95 continue 

96 

97 for record in records: 

98 report_item = _record_report_item(record) 

99 report["index_now"].append(report_item) 

100 if not dry_run: 

101 try: 

102 _index_record(record, root, repository) 

103 except Exception as exc: 

104 report["errors"].append( 

105 { 

106 "source_type": record.source_type, 

107 "source_path": record.source_path, 

108 "source_tool": record.source_tool, 

109 "error": f"{type(exc).__name__}: {exc}", 

110 } 

111 ) 

112 report["index_now"].remove(report_item) 

113 

114 return report 

115 

116 

117def _resolve_source_dir(source_path: str, manifest_dir: Path, project_root: Path) -> Path: 

118 """Resolve a source path from the manifest. 

119 

120 Relative paths are resolved against the manifest directory first, 

121 then the project root. Absolute paths are used as-is. 

122 """ 

123 candidate = Path(source_path) 

124 if candidate.is_absolute(): 

125 return candidate 

126 

127 # Try manifest-relative first 

128 manifest_relative = (manifest_dir / candidate).resolve() 

129 if manifest_relative.exists(): 

130 return manifest_relative 

131 

132 # Fall back to project-root-relative 

133 project_relative = (project_root / candidate).resolve() 

134 return project_relative 

135 

136 

137def _record_report_item(record: IngestionRecord) -> dict[str, object]: 

138 return { 

139 "project": record.project, 

140 "source_type": record.source_type, 

141 "source_path": record.source_path, 

142 "source_tool": record.source_tool, 

143 "privacy_scope": record.privacy_scope, 

144 "chars": len(record.text), 

145 "session_id": record.session_id, 

146 } 

147 

148 

149def _index_record(record: IngestionRecord, project_root: Path, repository: MemoryRepository) -> None: 

150 """Index a single ingestion record into the repository. 

151 

152 Writes text to a temporary file so it can flow through the standard 

153 upsert_document path. The source_path stored in the DB is the logical 

154 path from the ingestion record. 

155 """ 

156 indexed_text = _record_text(record) 

157 chunks = chunk_text(indexed_text) 

158 if not chunks: 

159 return 

160 

161 with tempfile.NamedTemporaryFile( 

162 mode="w", 

163 suffix=".txt", 

164 encoding="utf-8", 

165 delete=False, 

166 ) as tmp: 

167 tmp.write(indexed_text) 

168 tmp_path = Path(tmp.name) 

169 

170 try: 

171 repository.upsert_document( 

172 path=tmp_path, 

173 relative_path=record.source_path, 

174 chunks=chunks, 

175 ) 

176 finally: 

177 try: 

178 tmp_path.unlink() 

179 except OSError: 

180 pass 

181 

182 

183def _record_text(record: IngestionRecord) -> str: 

184 """Return index text with a small metadata preamble. 

185 

186 SQLite currently stores document path and chunk text but not arbitrary 

187 ingestion metadata. Keeping a compact preamble makes project/source/session 

188 fields searchable without changing the schema yet. 

189 """ 

190 metadata = { 

191 "project": record.project, 

192 "source_type": record.source_type, 

193 "source_tool": record.source_tool, 

194 "source_path": record.source_path, 

195 "session_id": record.session_id, 

196 "created_at": record.created_at, 

197 "last_modified": record.last_modified, 

198 "privacy_scope": record.privacy_scope, 

199 **record.metadata, 

200 } 

201 metadata = {key: value for key, value in metadata.items() if value not in (None, "")} 

202 preamble = json.dumps(metadata, ensure_ascii=False, sort_keys=True) 

203 return f"TRUENEX_INGESTION_METADATA {preamble}\n\n{record.text}"