Coverage for src / dataknobs_xization / ingestion / processor.py: 89%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 16:20 -0700

1"""Directory processor for knowledge base ingestion. 

2 

3This module provides the DirectoryProcessor class for processing 

4documents from a directory into chunks ready for embedding. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import os 

11from dataclasses import dataclass, field 

12from pathlib import Path 

13from typing import Any, Iterator, Literal 

14 

15from dataknobs_xization.ingestion.config import ( 

16 FilePatternConfig, 

17 KnowledgeBaseConfig, 

18) 

19from dataknobs_xization.json import JSONChunk, JSONChunkConfig, JSONChunker 

20from dataknobs_xization.markdown import ( 

21 Chunk, 

22 ChunkQualityConfig, 

23 ChunkQualityFilter, 

24 HeadingInclusion, 

25 chunk_markdown_tree, 

26 parse_markdown, 

27) 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32@dataclass 

33class ProcessedDocument: 

34 """A processed document ready for embedding and storage. 

35 

36 Contains chunks from a single source file along with metadata 

37 about the processing. 

38 

39 Attributes: 

40 source_file: Path to the source file 

41 document_type: Type of document (markdown, json, jsonl) 

42 chunks: List of processed chunks 

43 metadata: Document-level metadata 

44 errors: Any errors encountered during processing 

45 """ 

46 

47 source_file: str 

48 document_type: Literal["markdown", "json", "jsonl"] 

49 chunks: list[dict[str, Any]] 

50 metadata: dict[str, Any] = field(default_factory=dict) 

51 errors: list[str] = field(default_factory=list) 

52 

53 @property 

54 def chunk_count(self) -> int: 

55 """Number of chunks in this document.""" 

56 return len(self.chunks) 

57 

58 @property 

59 def has_errors(self) -> bool: 

60 """Whether processing encountered errors.""" 

61 return len(self.errors) > 0 

62 

63 

64# File size threshold for streaming (10MB) 

65STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 

66 

67# Config file names to always exclude from processing 

68CONFIG_FILE_NAMES = {"knowledge_base.json", "knowledge_base.yaml", "knowledge_base.yml"} 

69 

70 

71class DirectoryProcessor: 

72 """Process documents from a directory for knowledge base ingestion. 

73 

74 Handles markdown and JSON files with configurable chunking, 

75 supporting both in-memory and streaming processing for large files. 

76 

77 Attributes: 

78 config: Knowledge base configuration 

79 root_dir: Root directory for processing 

80 """ 

81 

82 def __init__(self, config: KnowledgeBaseConfig, root_dir: str | Path): 

83 """Initialize the directory processor. 

84 

85 Args: 

86 config: Knowledge base configuration 

87 root_dir: Root directory containing documents 

88 """ 

89 self.config = config 

90 self.root_dir = Path(root_dir) 

91 

92 def process(self) -> Iterator[ProcessedDocument]: 

93 """Process all documents in the directory. 

94 

95 Yields ProcessedDocument for each file, automatically using 

96 streaming for large JSON files to avoid memory exhaustion. 

97 

98 Yields: 

99 ProcessedDocument for each processed file 

100 """ 

101 # Collect all files first 

102 files = self._collect_files() 

103 

104 for filepath in files: 

105 rel_path = filepath.relative_to(self.root_dir) 

106 

107 # Skip config files 

108 if filepath.name in CONFIG_FILE_NAMES: 

109 logger.debug(f"Skipping config file: {rel_path}") 

110 continue 

111 

112 # Skip excluded files 

113 if self.config.is_excluded(rel_path): 

114 logger.debug(f"Skipping excluded file: {rel_path}") 

115 continue 

116 

117 # Get pattern config if any 

118 pattern_config = self.config.get_pattern_config(rel_path) 

119 

120 # Process based on file type 

121 suffix = filepath.suffix.lower() 

122 if suffix == ".md": 

123 yield from self._process_markdown(filepath, pattern_config) 

124 elif suffix in (".json", ".jsonl", ".ndjson"): 

125 yield from self._process_json(filepath, pattern_config) 

126 elif suffix == ".gz": 

127 # Check inner extension for compressed files 

128 inner_suffix = Path(filepath.stem).suffix.lower() 

129 if inner_suffix in (".json", ".jsonl", ".ndjson"): 

130 yield from self._process_json(filepath, pattern_config) 

131 else: 

132 logger.debug(f"Skipping unsupported compressed file: {rel_path}") 

133 else: 

134 logger.debug(f"Skipping unsupported file type: {rel_path}") 

135 

136 def _collect_files(self) -> list[Path]: 

137 """Collect all files to process from the directory. 

138 

139 Returns: 

140 List of file paths 

141 """ 

142 files = [] 

143 

144 # If patterns are defined, use them to find files 

145 if self.config.patterns: 

146 for pattern_config in self.config.patterns: 

147 if pattern_config.enabled: 

148 for filepath in self.root_dir.glob(pattern_config.pattern): 

149 if filepath.is_file() and filepath not in files: 

150 files.append(filepath) 

151 else: 

152 # Default: find all supported files 

153 for pattern in ["**/*.md", "**/*.json", "**/*.jsonl", "**/*.ndjson", 

154 "**/*.json.gz", "**/*.jsonl.gz", "**/*.ndjson.gz"]: 

155 for filepath in self.root_dir.glob(pattern): 

156 if filepath.is_file() and filepath not in files: 

157 files.append(filepath) 

158 

159 return sorted(files) 

160 

161 def _process_markdown( 

162 self, 

163 filepath: Path, 

164 pattern_config: FilePatternConfig | None, 

165 ) -> Iterator[ProcessedDocument]: 

166 """Process a markdown file. 

167 

168 Args: 

169 filepath: Path to markdown file 

170 pattern_config: Optional pattern-specific configuration 

171 

172 Yields: 

173 ProcessedDocument for the file 

174 """ 

175 errors: list[str] = [] 

176 chunks: list[dict[str, Any]] = [] 

177 

178 try: 

179 # Read file 

180 with open(filepath, encoding="utf-8") as f: 

181 content = f.read() 

182 

183 # Get chunking config 

184 chunking_config = self.config.get_chunking_config( 

185 filepath.relative_to(self.root_dir) 

186 ) 

187 

188 # Build quality filter if configured 

189 quality_filter = None 

190 if self.config.default_quality_filter: 

191 quality_filter = ChunkQualityConfig(**self.config.default_quality_filter) 

192 

193 # Parse and chunk 

194 tree = parse_markdown(content) 

195 md_chunks = chunk_markdown_tree( 

196 tree, 

197 max_chunk_size=chunking_config.get("max_chunk_size", 500), 

198 chunk_overlap=chunking_config.get("chunk_overlap", 50), 

199 heading_inclusion=HeadingInclusion.IN_METADATA, 

200 combine_under_heading=chunking_config.get("combine_under_heading", True), 

201 quality_filter=quality_filter, 

202 generate_embeddings=True, 

203 ) 

204 

205 # Convert to dictionaries 

206 for i, chunk in enumerate(md_chunks): 

207 chunk_dict = { 

208 "text": chunk.text, 

209 "embedding_text": chunk.metadata.embedding_text or chunk.text, 

210 "chunk_index": i, 

211 "source_path": "", 

212 "metadata": { 

213 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(), 

214 "headings": chunk.metadata.headings, 

215 "heading_levels": chunk.metadata.heading_levels, 

216 "line_number": chunk.metadata.line_number, 

217 "chunk_size": chunk.metadata.chunk_size, 

218 }, 

219 } 

220 chunks.append(chunk_dict) 

221 

222 except Exception as e: 

223 errors.append(f"Failed to process markdown: {e}") 

224 logger.error(f"Error processing {filepath}: {e}") 

225 

226 # Build metadata 

227 metadata = self.config.get_metadata(filepath.relative_to(self.root_dir)) 

228 

229 yield ProcessedDocument( 

230 source_file=str(filepath), 

231 document_type="markdown", 

232 chunks=chunks, 

233 metadata=metadata, 

234 errors=errors, 

235 ) 

236 

237 def _process_json( 

238 self, 

239 filepath: Path, 

240 pattern_config: FilePatternConfig | None, 

241 ) -> Iterator[ProcessedDocument]: 

242 """Process a JSON or JSONL file. 

243 

244 Automatically uses streaming for large files or JSONL format. 

245 

246 Args: 

247 filepath: Path to JSON file 

248 pattern_config: Optional pattern-specific configuration 

249 

250 Yields: 

251 ProcessedDocument for the file 

252 """ 

253 errors: list[str] = [] 

254 chunks: list[dict[str, Any]] = [] 

255 

256 try: 

257 # Build JSON chunker config 

258 chunking_config = self.config.get_chunking_config( 

259 filepath.relative_to(self.root_dir) 

260 ) 

261 

262 json_config = JSONChunkConfig( 

263 max_chunk_size=chunking_config.get("max_chunk_size", 1000), 

264 nested_separator=chunking_config.get("nested_separator", "."), 

265 array_handling=chunking_config.get("array_handling", "expand"), 

266 include_field_names=chunking_config.get("include_field_names", True), 

267 skip_technical_fields=chunking_config.get("skip_technical_fields", True), 

268 ) 

269 

270 # Apply pattern-specific overrides 

271 if pattern_config: 

272 if pattern_config.text_template: 

273 json_config.text_template = pattern_config.text_template 

274 if pattern_config.text_fields: 

275 json_config.text_fields = pattern_config.text_fields 

276 

277 chunker = JSONChunker(json_config) 

278 

279 # Determine if we should stream 

280 is_jsonl = self._is_jsonl_file(str(filepath)) 

281 file_size = os.path.getsize(filepath) 

282 should_stream = is_jsonl or file_size > STREAMING_THRESHOLD_BYTES 

283 

284 if should_stream: 

285 # Stream chunks for large files or JSONL 

286 for json_chunk in chunker.stream_chunks(filepath): 

287 chunk_dict = self._json_chunk_to_dict(json_chunk) 

288 chunks.append(chunk_dict) 

289 else: 

290 # Load and chunk in memory for small files 

291 import json as json_lib 

292 with open(filepath, encoding="utf-8") as f: 

293 data = json_lib.load(f) 

294 

295 for json_chunk in chunker.chunk(data, source=str(filepath)): 

296 chunk_dict = self._json_chunk_to_dict(json_chunk) 

297 chunks.append(chunk_dict) 

298 

299 except Exception as e: 

300 errors.append(f"Failed to process JSON: {e}") 

301 logger.error(f"Error processing {filepath}: {e}") 

302 

303 # Build metadata 

304 metadata = self.config.get_metadata(filepath.relative_to(self.root_dir)) 

305 

306 # Determine document type 

307 doc_type: Literal["json", "jsonl"] = "jsonl" if self._is_jsonl_file(str(filepath)) else "json" 

308 

309 yield ProcessedDocument( 

310 source_file=str(filepath), 

311 document_type=doc_type, 

312 chunks=chunks, 

313 metadata=metadata, 

314 errors=errors, 

315 ) 

316 

317 def _json_chunk_to_dict(self, chunk: JSONChunk) -> dict[str, Any]: 

318 """Convert a JSONChunk to a dictionary. 

319 

320 Args: 

321 chunk: JSONChunk instance 

322 

323 Returns: 

324 Dictionary representation 

325 """ 

326 return { 

327 "text": chunk.text, 

328 "embedding_text": chunk.embedding_text or chunk.text, 

329 "chunk_index": chunk.chunk_index, 

330 "source_path": chunk.source_path, 

331 "metadata": chunk.metadata, 

332 } 

333 

334 def _is_jsonl_file(self, filepath: str) -> bool: 

335 """Check if a file is JSONL format based on extension. 

336 

337 Args: 

338 filepath: Path to check 

339 

340 Returns: 

341 True if file is JSONL format 

342 """ 

343 filepath_lower = filepath.lower() 

344 return any( 

345 filepath_lower.endswith(ext) 

346 for ext in [".jsonl", ".ndjson", ".jsonl.gz", ".ndjson.gz"] 

347 ) 

348 

349 

350def process_directory( 

351 directory: str | Path, 

352 config: KnowledgeBaseConfig | None = None, 

353) -> Iterator[ProcessedDocument]: 

354 """Convenience function to process a directory. 

355 

356 Args: 

357 directory: Directory to process 

358 config: Optional configuration (loads from directory if not provided) 

359 

360 Yields: 

361 ProcessedDocument for each file 

362 """ 

363 directory = Path(directory) 

364 

365 if config is None: 

366 config = KnowledgeBaseConfig.load(directory) 

367 

368 processor = DirectoryProcessor(config, directory) 

369 yield from processor.process()