Coverage for src/dataknobs_xization/markdown/md_streaming.py: 27%
63 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-18 17:41 -0700
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-18 17:41 -0700
1"""Streaming processor for incremental markdown chunking.
3This module provides functionality to process large markdown documents
4incrementally, managing memory constraints while generating chunks.
5"""
7from __future__ import annotations
9from typing import Iterator, TextIO
11from dataknobs_structures.tree import Tree
13from dataknobs_xization.markdown.md_chunker import Chunk, ChunkFormat, HeadingInclusion, MarkdownChunker
14from dataknobs_xization.markdown.md_parser import MarkdownNode, MarkdownParser
17class StreamingMarkdownProcessor:
18 """Streaming processor for incremental markdown chunking.
20 Processes markdown documents line-by-line, building tree structure
21 incrementally and yielding chunks as they become available. Manages
22 memory by pruning processed sections of the tree.
23 """
25 def __init__(
26 self,
27 max_chunk_size: int = 1000,
28 chunk_overlap: int = 100,
29 max_line_length: int | None = None,
30 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH,
31 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN,
32 max_tree_depth: int = 100,
33 memory_limit_nodes: int | None = None,
34 ):
35 """Initialize the streaming processor.
37 Args:
38 max_chunk_size: Maximum size of chunk text in characters
39 chunk_overlap: Number of characters to overlap between chunks
40 max_line_length: Maximum length for individual lines
41 heading_inclusion: How to include headings in chunks
42 chunk_format: Output format for chunks
43 max_tree_depth: Maximum depth of tree to maintain
44 memory_limit_nodes: Maximum number of nodes to keep in memory
45 (None for unlimited)
46 """
47 self.parser = MarkdownParser(
48 max_line_length=max_line_length,
49 preserve_empty_lines=False,
50 )
51 self.chunker = MarkdownChunker(
52 max_chunk_size=max_chunk_size,
53 chunk_overlap=chunk_overlap,
54 heading_inclusion=heading_inclusion,
55 chunk_format=chunk_format,
56 combine_under_heading=True,
57 )
58 self.max_tree_depth = max_tree_depth
59 self.memory_limit_nodes = memory_limit_nodes
61 def process_stream(
62 self,
63 source: str | TextIO | Iterator[str],
64 ) -> Iterator[Chunk]:
65 """Process markdown from a stream, yielding chunks incrementally.
67 Args:
68 source: Markdown content as string, file object, or line iterator
70 Yields:
71 Chunk objects as they become available
72 """
73 # For simplicity in v1, we'll use a batch processing approach
74 # that processes complete sections under headings
75 #
76 # Future enhancement: true streaming with incremental tree building
78 tree = self.parser.parse(source)
80 # Generate chunks
81 yield from self.chunker.chunk(tree)
83 def process_file(self, file_path: str) -> Iterator[Chunk]:
84 """Process a markdown file, yielding chunks incrementally.
86 Args:
87 file_path: Path to markdown file
89 Yields:
90 Chunk objects
91 """
92 with open(file_path, encoding='utf-8') as f:
93 yield from self.process_stream(f)
95 def process_string(self, content: str) -> Iterator[Chunk]:
96 """Process markdown from a string, yielding chunks.
98 Args:
99 content: Markdown content string
101 Yields:
102 Chunk objects
103 """
104 yield from self.process_stream(content)
107class AdaptiveStreamingProcessor(StreamingMarkdownProcessor):
108 """Streaming processor that adapts to memory constraints.
110 This processor monitors tree size and adaptively chunks sections
111 when memory limits are approached, preventing memory overflow on
112 large documents.
113 """
115 def __init__(
116 self,
117 max_chunk_size: int = 1000,
118 chunk_overlap: int = 100,
119 max_line_length: int | None = None,
120 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH,
121 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN,
122 max_tree_depth: int = 100,
123 memory_limit_nodes: int = 10000,
124 adaptive_threshold: float = 0.8,
125 ):
126 """Initialize the adaptive streaming processor.
128 Args:
129 max_chunk_size: Maximum size of chunk text in characters
130 chunk_overlap: Number of characters to overlap between chunks
131 max_line_length: Maximum length for individual lines
132 heading_inclusion: How to include headings in chunks
133 chunk_format: Output format for chunks
134 max_tree_depth: Maximum depth of tree to maintain
135 memory_limit_nodes: Maximum number of nodes to keep in memory
136 adaptive_threshold: Fraction of memory_limit at which to trigger
137 adaptive chunking (0.0-1.0)
138 """
139 super().__init__(
140 max_chunk_size=max_chunk_size,
141 chunk_overlap=chunk_overlap,
142 max_line_length=max_line_length,
143 heading_inclusion=heading_inclusion,
144 chunk_format=chunk_format,
145 max_tree_depth=max_tree_depth,
146 memory_limit_nodes=memory_limit_nodes,
147 )
148 self.adaptive_threshold = adaptive_threshold
150 def process_stream(self, source: str | TextIO | Iterator[str]) -> Iterator[Chunk]:
151 """Process stream with adaptive memory management.
153 Args:
154 source: Markdown content source
156 Yields:
157 Chunk objects
158 """
159 # Build tree incrementally with memory monitoring
160 root = Tree(MarkdownNode(text="ROOT", level=0, node_type="root", line_number=0))
161 current_parent = root
162 line_number = 0
164 lines = self.parser._get_line_iterator(source)
166 pending_nodes = [] # Nodes waiting to be chunked
168 for line in lines:
169 line_number += 1
171 if not line.strip():
172 continue
174 # Check if line is a heading
175 heading_match = self.parser.HEADING_PATTERN.match(line)
177 if heading_match:
178 # Before adding new heading, check if we should chunk pending nodes
179 if self.memory_limit_nodes:
180 node_count = len(root.find_nodes(lambda _: True))
181 if node_count >= self.memory_limit_nodes * self.adaptive_threshold:
182 # Chunk and yield accumulated body text
183 if pending_nodes:
184 yield from self._chunk_nodes(pending_nodes)
185 pending_nodes = []
186 # Prune processed subtrees to free memory
187 self._prune_processed_nodes(root)
189 # Process heading
190 level = len(heading_match.group(1))
191 text = heading_match.group(2).strip()
193 node_data = MarkdownNode(
194 text=text,
195 level=level,
196 node_type="heading",
197 line_number=line_number,
198 )
200 current_parent, _ = self.parser._find_heading_parent(
201 root, current_parent, level
202 )
204 heading_node = current_parent.add_child(node_data)
205 current_parent = heading_node
207 else:
208 # Body text
209 node_data = MarkdownNode(
210 text=line.rstrip('\n'),
211 level=0,
212 node_type="body",
213 line_number=line_number,
214 )
215 body_node = current_parent.add_child(node_data)
216 pending_nodes.append(body_node)
218 # Process any remaining pending nodes
219 if pending_nodes:
220 yield from self._chunk_nodes(pending_nodes)
222 def _chunk_nodes(self, nodes: list[Tree]) -> Iterator[Chunk]:
223 """Chunk a list of body text nodes.
225 Args:
226 nodes: List of body text tree nodes
228 Yields:
229 Chunk objects
230 """
231 yield from self.chunker._chunk_by_heading(nodes)
233 def _prune_processed_nodes(self, root: Tree) -> None:
234 """Prune processed leaf nodes to free memory.
236 Args:
237 root: Root of tree to prune
238 """
239 # Find terminal nodes that have been processed
240 # For now, we'll keep the tree structure but could optimize further
241 # by removing fully processed subtrees
242 pass
245def stream_markdown_file(
246 file_path: str,
247 max_chunk_size: int = 1000,
248 chunk_overlap: int = 100,
249 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH,
250 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN,
251) -> Iterator[Chunk]:
252 """Stream chunks from a markdown file.
254 Convenience function for processing a file with default settings.
256 Args:
257 file_path: Path to markdown file
258 max_chunk_size: Maximum size of chunk text in characters
259 chunk_overlap: Number of characters to overlap between chunks
260 heading_inclusion: How to include headings in chunks
261 chunk_format: Output format for chunks
263 Yields:
264 Chunk objects
265 """
266 processor = StreamingMarkdownProcessor(
267 max_chunk_size=max_chunk_size,
268 chunk_overlap=chunk_overlap,
269 heading_inclusion=heading_inclusion,
270 chunk_format=chunk_format,
271 )
272 yield from processor.process_file(file_path)
275def stream_markdown_string(
276 content: str,
277 max_chunk_size: int = 1000,
278 chunk_overlap: int = 100,
279 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH,
280 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN,
281) -> Iterator[Chunk]:
282 """Stream chunks from a markdown string.
284 Convenience function for processing a string with default settings.
286 Args:
287 content: Markdown content string
288 max_chunk_size: Maximum size of chunk text in characters
289 chunk_overlap: Number of characters to overlap between chunks
290 heading_inclusion: How to include headings in chunks
291 chunk_format: Output format for chunks
293 Yields:
294 Chunk objects
295 """
296 processor = StreamingMarkdownProcessor(
297 max_chunk_size=max_chunk_size,
298 chunk_overlap=chunk_overlap,
299 heading_inclusion=heading_inclusion,
300 chunk_format=chunk_format,
301 )
302 yield from processor.process_string(content)