Coverage for src / dataknobs_xization / ingestion / processor.py: 89%
126 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:20 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:20 -0700
1"""Directory processor for knowledge base ingestion.
3This module provides the DirectoryProcessor class for processing
4documents from a directory into chunks ready for embedding.
5"""
7from __future__ import annotations
9import logging
10import os
11from dataclasses import dataclass, field
12from pathlib import Path
13from typing import Any, Iterator, Literal
15from dataknobs_xization.ingestion.config import (
16 FilePatternConfig,
17 KnowledgeBaseConfig,
18)
19from dataknobs_xization.json import JSONChunk, JSONChunkConfig, JSONChunker
20from dataknobs_xization.markdown import (
21 Chunk,
22 ChunkQualityConfig,
23 ChunkQualityFilter,
24 HeadingInclusion,
25 chunk_markdown_tree,
26 parse_markdown,
27)
29logger = logging.getLogger(__name__)
32@dataclass
33class ProcessedDocument:
34 """A processed document ready for embedding and storage.
36 Contains chunks from a single source file along with metadata
37 about the processing.
39 Attributes:
40 source_file: Path to the source file
41 document_type: Type of document (markdown, json, jsonl)
42 chunks: List of processed chunks
43 metadata: Document-level metadata
44 errors: Any errors encountered during processing
45 """
47 source_file: str
48 document_type: Literal["markdown", "json", "jsonl"]
49 chunks: list[dict[str, Any]]
50 metadata: dict[str, Any] = field(default_factory=dict)
51 errors: list[str] = field(default_factory=list)
53 @property
54 def chunk_count(self) -> int:
55 """Number of chunks in this document."""
56 return len(self.chunks)
58 @property
59 def has_errors(self) -> bool:
60 """Whether processing encountered errors."""
61 return len(self.errors) > 0
64# File size threshold for streaming (10MB)
65STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024
67# Config file names to always exclude from processing
68CONFIG_FILE_NAMES = {"knowledge_base.json", "knowledge_base.yaml", "knowledge_base.yml"}
71class DirectoryProcessor:
72 """Process documents from a directory for knowledge base ingestion.
74 Handles markdown and JSON files with configurable chunking,
75 supporting both in-memory and streaming processing for large files.
77 Attributes:
78 config: Knowledge base configuration
79 root_dir: Root directory for processing
80 """
82 def __init__(self, config: KnowledgeBaseConfig, root_dir: str | Path):
83 """Initialize the directory processor.
85 Args:
86 config: Knowledge base configuration
87 root_dir: Root directory containing documents
88 """
89 self.config = config
90 self.root_dir = Path(root_dir)
92 def process(self) -> Iterator[ProcessedDocument]:
93 """Process all documents in the directory.
95 Yields ProcessedDocument for each file, automatically using
96 streaming for large JSON files to avoid memory exhaustion.
98 Yields:
99 ProcessedDocument for each processed file
100 """
101 # Collect all files first
102 files = self._collect_files()
104 for filepath in files:
105 rel_path = filepath.relative_to(self.root_dir)
107 # Skip config files
108 if filepath.name in CONFIG_FILE_NAMES:
109 logger.debug(f"Skipping config file: {rel_path}")
110 continue
112 # Skip excluded files
113 if self.config.is_excluded(rel_path):
114 logger.debug(f"Skipping excluded file: {rel_path}")
115 continue
117 # Get pattern config if any
118 pattern_config = self.config.get_pattern_config(rel_path)
120 # Process based on file type
121 suffix = filepath.suffix.lower()
122 if suffix == ".md":
123 yield from self._process_markdown(filepath, pattern_config)
124 elif suffix in (".json", ".jsonl", ".ndjson"):
125 yield from self._process_json(filepath, pattern_config)
126 elif suffix == ".gz":
127 # Check inner extension for compressed files
128 inner_suffix = Path(filepath.stem).suffix.lower()
129 if inner_suffix in (".json", ".jsonl", ".ndjson"):
130 yield from self._process_json(filepath, pattern_config)
131 else:
132 logger.debug(f"Skipping unsupported compressed file: {rel_path}")
133 else:
134 logger.debug(f"Skipping unsupported file type: {rel_path}")
136 def _collect_files(self) -> list[Path]:
137 """Collect all files to process from the directory.
139 Returns:
140 List of file paths
141 """
142 files = []
144 # If patterns are defined, use them to find files
145 if self.config.patterns:
146 for pattern_config in self.config.patterns:
147 if pattern_config.enabled:
148 for filepath in self.root_dir.glob(pattern_config.pattern):
149 if filepath.is_file() and filepath not in files:
150 files.append(filepath)
151 else:
152 # Default: find all supported files
153 for pattern in ["**/*.md", "**/*.json", "**/*.jsonl", "**/*.ndjson",
154 "**/*.json.gz", "**/*.jsonl.gz", "**/*.ndjson.gz"]:
155 for filepath in self.root_dir.glob(pattern):
156 if filepath.is_file() and filepath not in files:
157 files.append(filepath)
159 return sorted(files)
161 def _process_markdown(
162 self,
163 filepath: Path,
164 pattern_config: FilePatternConfig | None,
165 ) -> Iterator[ProcessedDocument]:
166 """Process a markdown file.
168 Args:
169 filepath: Path to markdown file
170 pattern_config: Optional pattern-specific configuration
172 Yields:
173 ProcessedDocument for the file
174 """
175 errors: list[str] = []
176 chunks: list[dict[str, Any]] = []
178 try:
179 # Read file
180 with open(filepath, encoding="utf-8") as f:
181 content = f.read()
183 # Get chunking config
184 chunking_config = self.config.get_chunking_config(
185 filepath.relative_to(self.root_dir)
186 )
188 # Build quality filter if configured
189 quality_filter = None
190 if self.config.default_quality_filter:
191 quality_filter = ChunkQualityConfig(**self.config.default_quality_filter)
193 # Parse and chunk
194 tree = parse_markdown(content)
195 md_chunks = chunk_markdown_tree(
196 tree,
197 max_chunk_size=chunking_config.get("max_chunk_size", 500),
198 chunk_overlap=chunking_config.get("chunk_overlap", 50),
199 heading_inclusion=HeadingInclusion.IN_METADATA,
200 combine_under_heading=chunking_config.get("combine_under_heading", True),
201 quality_filter=quality_filter,
202 generate_embeddings=True,
203 )
205 # Convert to dictionaries
206 for i, chunk in enumerate(md_chunks):
207 chunk_dict = {
208 "text": chunk.text,
209 "embedding_text": chunk.metadata.embedding_text or chunk.text,
210 "chunk_index": i,
211 "source_path": "",
212 "metadata": {
213 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(),
214 "headings": chunk.metadata.headings,
215 "heading_levels": chunk.metadata.heading_levels,
216 "line_number": chunk.metadata.line_number,
217 "chunk_size": chunk.metadata.chunk_size,
218 },
219 }
220 chunks.append(chunk_dict)
222 except Exception as e:
223 errors.append(f"Failed to process markdown: {e}")
224 logger.error(f"Error processing {filepath}: {e}")
226 # Build metadata
227 metadata = self.config.get_metadata(filepath.relative_to(self.root_dir))
229 yield ProcessedDocument(
230 source_file=str(filepath),
231 document_type="markdown",
232 chunks=chunks,
233 metadata=metadata,
234 errors=errors,
235 )
237 def _process_json(
238 self,
239 filepath: Path,
240 pattern_config: FilePatternConfig | None,
241 ) -> Iterator[ProcessedDocument]:
242 """Process a JSON or JSONL file.
244 Automatically uses streaming for large files or JSONL format.
246 Args:
247 filepath: Path to JSON file
248 pattern_config: Optional pattern-specific configuration
250 Yields:
251 ProcessedDocument for the file
252 """
253 errors: list[str] = []
254 chunks: list[dict[str, Any]] = []
256 try:
257 # Build JSON chunker config
258 chunking_config = self.config.get_chunking_config(
259 filepath.relative_to(self.root_dir)
260 )
262 json_config = JSONChunkConfig(
263 max_chunk_size=chunking_config.get("max_chunk_size", 1000),
264 nested_separator=chunking_config.get("nested_separator", "."),
265 array_handling=chunking_config.get("array_handling", "expand"),
266 include_field_names=chunking_config.get("include_field_names", True),
267 skip_technical_fields=chunking_config.get("skip_technical_fields", True),
268 )
270 # Apply pattern-specific overrides
271 if pattern_config:
272 if pattern_config.text_template:
273 json_config.text_template = pattern_config.text_template
274 if pattern_config.text_fields:
275 json_config.text_fields = pattern_config.text_fields
277 chunker = JSONChunker(json_config)
279 # Determine if we should stream
280 is_jsonl = self._is_jsonl_file(str(filepath))
281 file_size = os.path.getsize(filepath)
282 should_stream = is_jsonl or file_size > STREAMING_THRESHOLD_BYTES
284 if should_stream:
285 # Stream chunks for large files or JSONL
286 for json_chunk in chunker.stream_chunks(filepath):
287 chunk_dict = self._json_chunk_to_dict(json_chunk)
288 chunks.append(chunk_dict)
289 else:
290 # Load and chunk in memory for small files
291 import json as json_lib
292 with open(filepath, encoding="utf-8") as f:
293 data = json_lib.load(f)
295 for json_chunk in chunker.chunk(data, source=str(filepath)):
296 chunk_dict = self._json_chunk_to_dict(json_chunk)
297 chunks.append(chunk_dict)
299 except Exception as e:
300 errors.append(f"Failed to process JSON: {e}")
301 logger.error(f"Error processing {filepath}: {e}")
303 # Build metadata
304 metadata = self.config.get_metadata(filepath.relative_to(self.root_dir))
306 # Determine document type
307 doc_type: Literal["json", "jsonl"] = "jsonl" if self._is_jsonl_file(str(filepath)) else "json"
309 yield ProcessedDocument(
310 source_file=str(filepath),
311 document_type=doc_type,
312 chunks=chunks,
313 metadata=metadata,
314 errors=errors,
315 )
317 def _json_chunk_to_dict(self, chunk: JSONChunk) -> dict[str, Any]:
318 """Convert a JSONChunk to a dictionary.
320 Args:
321 chunk: JSONChunk instance
323 Returns:
324 Dictionary representation
325 """
326 return {
327 "text": chunk.text,
328 "embedding_text": chunk.embedding_text or chunk.text,
329 "chunk_index": chunk.chunk_index,
330 "source_path": chunk.source_path,
331 "metadata": chunk.metadata,
332 }
334 def _is_jsonl_file(self, filepath: str) -> bool:
335 """Check if a file is JSONL format based on extension.
337 Args:
338 filepath: Path to check
340 Returns:
341 True if file is JSONL format
342 """
343 filepath_lower = filepath.lower()
344 return any(
345 filepath_lower.endswith(ext)
346 for ext in [".jsonl", ".ndjson", ".jsonl.gz", ".ndjson.gz"]
347 )
350def process_directory(
351 directory: str | Path,
352 config: KnowledgeBaseConfig | None = None,
353) -> Iterator[ProcessedDocument]:
354 """Convenience function to process a directory.
356 Args:
357 directory: Directory to process
358 config: Optional configuration (loads from directory if not provided)
360 Yields:
361 ProcessedDocument for each file
362 """
363 directory = Path(directory)
365 if config is None:
366 config = KnowledgeBaseConfig.load(directory)
368 processor = DirectoryProcessor(config, directory)
369 yield from processor.process()