Coverage for src/dataknobs_xization/markdown/md_streaming.py: 27%

63 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-18 17:41 -0700

1"""Streaming processor for incremental markdown chunking. 

2 

3This module provides functionality to process large markdown documents 

4incrementally, managing memory constraints while generating chunks. 

5""" 

6 

7from __future__ import annotations 

8 

9from typing import Iterator, TextIO 

10 

11from dataknobs_structures.tree import Tree 

12 

13from dataknobs_xization.markdown.md_chunker import Chunk, ChunkFormat, HeadingInclusion, MarkdownChunker 

14from dataknobs_xization.markdown.md_parser import MarkdownNode, MarkdownParser 

15 

16 

17class StreamingMarkdownProcessor: 

18 """Streaming processor for incremental markdown chunking. 

19 

20 Processes markdown documents line-by-line, building tree structure 

21 incrementally and yielding chunks as they become available. Manages 

22 memory by pruning processed sections of the tree. 

23 """ 

24 

25 def __init__( 

26 self, 

27 max_chunk_size: int = 1000, 

28 chunk_overlap: int = 100, 

29 max_line_length: int | None = None, 

30 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH, 

31 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN, 

32 max_tree_depth: int = 100, 

33 memory_limit_nodes: int | None = None, 

34 ): 

35 """Initialize the streaming processor. 

36 

37 Args: 

38 max_chunk_size: Maximum size of chunk text in characters 

39 chunk_overlap: Number of characters to overlap between chunks 

40 max_line_length: Maximum length for individual lines 

41 heading_inclusion: How to include headings in chunks 

42 chunk_format: Output format for chunks 

43 max_tree_depth: Maximum depth of tree to maintain 

44 memory_limit_nodes: Maximum number of nodes to keep in memory 

45 (None for unlimited) 

46 """ 

47 self.parser = MarkdownParser( 

48 max_line_length=max_line_length, 

49 preserve_empty_lines=False, 

50 ) 

51 self.chunker = MarkdownChunker( 

52 max_chunk_size=max_chunk_size, 

53 chunk_overlap=chunk_overlap, 

54 heading_inclusion=heading_inclusion, 

55 chunk_format=chunk_format, 

56 combine_under_heading=True, 

57 ) 

58 self.max_tree_depth = max_tree_depth 

59 self.memory_limit_nodes = memory_limit_nodes 

60 

61 def process_stream( 

62 self, 

63 source: str | TextIO | Iterator[str], 

64 ) -> Iterator[Chunk]: 

65 """Process markdown from a stream, yielding chunks incrementally. 

66 

67 Args: 

68 source: Markdown content as string, file object, or line iterator 

69 

70 Yields: 

71 Chunk objects as they become available 

72 """ 

73 # For simplicity in v1, we'll use a batch processing approach 

74 # that processes complete sections under headings 

75 # 

76 # Future enhancement: true streaming with incremental tree building 

77 

78 tree = self.parser.parse(source) 

79 

80 # Generate chunks 

81 yield from self.chunker.chunk(tree) 

82 

83 def process_file(self, file_path: str) -> Iterator[Chunk]: 

84 """Process a markdown file, yielding chunks incrementally. 

85 

86 Args: 

87 file_path: Path to markdown file 

88 

89 Yields: 

90 Chunk objects 

91 """ 

92 with open(file_path, encoding='utf-8') as f: 

93 yield from self.process_stream(f) 

94 

95 def process_string(self, content: str) -> Iterator[Chunk]: 

96 """Process markdown from a string, yielding chunks. 

97 

98 Args: 

99 content: Markdown content string 

100 

101 Yields: 

102 Chunk objects 

103 """ 

104 yield from self.process_stream(content) 

105 

106 

107class AdaptiveStreamingProcessor(StreamingMarkdownProcessor): 

108 """Streaming processor that adapts to memory constraints. 

109 

110 This processor monitors tree size and adaptively chunks sections 

111 when memory limits are approached, preventing memory overflow on 

112 large documents. 

113 """ 

114 

115 def __init__( 

116 self, 

117 max_chunk_size: int = 1000, 

118 chunk_overlap: int = 100, 

119 max_line_length: int | None = None, 

120 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH, 

121 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN, 

122 max_tree_depth: int = 100, 

123 memory_limit_nodes: int = 10000, 

124 adaptive_threshold: float = 0.8, 

125 ): 

126 """Initialize the adaptive streaming processor. 

127 

128 Args: 

129 max_chunk_size: Maximum size of chunk text in characters 

130 chunk_overlap: Number of characters to overlap between chunks 

131 max_line_length: Maximum length for individual lines 

132 heading_inclusion: How to include headings in chunks 

133 chunk_format: Output format for chunks 

134 max_tree_depth: Maximum depth of tree to maintain 

135 memory_limit_nodes: Maximum number of nodes to keep in memory 

136 adaptive_threshold: Fraction of memory_limit at which to trigger 

137 adaptive chunking (0.0-1.0) 

138 """ 

139 super().__init__( 

140 max_chunk_size=max_chunk_size, 

141 chunk_overlap=chunk_overlap, 

142 max_line_length=max_line_length, 

143 heading_inclusion=heading_inclusion, 

144 chunk_format=chunk_format, 

145 max_tree_depth=max_tree_depth, 

146 memory_limit_nodes=memory_limit_nodes, 

147 ) 

148 self.adaptive_threshold = adaptive_threshold 

149 

150 def process_stream(self, source: str | TextIO | Iterator[str]) -> Iterator[Chunk]: 

151 """Process stream with adaptive memory management. 

152 

153 Args: 

154 source: Markdown content source 

155 

156 Yields: 

157 Chunk objects 

158 """ 

159 # Build tree incrementally with memory monitoring 

160 root = Tree(MarkdownNode(text="ROOT", level=0, node_type="root", line_number=0)) 

161 current_parent = root 

162 line_number = 0 

163 

164 lines = self.parser._get_line_iterator(source) 

165 

166 pending_nodes = [] # Nodes waiting to be chunked 

167 

168 for line in lines: 

169 line_number += 1 

170 

171 if not line.strip(): 

172 continue 

173 

174 # Check if line is a heading 

175 heading_match = self.parser.HEADING_PATTERN.match(line) 

176 

177 if heading_match: 

178 # Before adding new heading, check if we should chunk pending nodes 

179 if self.memory_limit_nodes: 

180 node_count = len(root.find_nodes(lambda _: True)) 

181 if node_count >= self.memory_limit_nodes * self.adaptive_threshold: 

182 # Chunk and yield accumulated body text 

183 if pending_nodes: 

184 yield from self._chunk_nodes(pending_nodes) 

185 pending_nodes = [] 

186 # Prune processed subtrees to free memory 

187 self._prune_processed_nodes(root) 

188 

189 # Process heading 

190 level = len(heading_match.group(1)) 

191 text = heading_match.group(2).strip() 

192 

193 node_data = MarkdownNode( 

194 text=text, 

195 level=level, 

196 node_type="heading", 

197 line_number=line_number, 

198 ) 

199 

200 current_parent, _ = self.parser._find_heading_parent( 

201 root, current_parent, level 

202 ) 

203 

204 heading_node = current_parent.add_child(node_data) 

205 current_parent = heading_node 

206 

207 else: 

208 # Body text 

209 node_data = MarkdownNode( 

210 text=line.rstrip('\n'), 

211 level=0, 

212 node_type="body", 

213 line_number=line_number, 

214 ) 

215 body_node = current_parent.add_child(node_data) 

216 pending_nodes.append(body_node) 

217 

218 # Process any remaining pending nodes 

219 if pending_nodes: 

220 yield from self._chunk_nodes(pending_nodes) 

221 

222 def _chunk_nodes(self, nodes: list[Tree]) -> Iterator[Chunk]: 

223 """Chunk a list of body text nodes. 

224 

225 Args: 

226 nodes: List of body text tree nodes 

227 

228 Yields: 

229 Chunk objects 

230 """ 

231 yield from self.chunker._chunk_by_heading(nodes) 

232 

233 def _prune_processed_nodes(self, root: Tree) -> None: 

234 """Prune processed leaf nodes to free memory. 

235 

236 Args: 

237 root: Root of tree to prune 

238 """ 

239 # Find terminal nodes that have been processed 

240 # For now, we'll keep the tree structure but could optimize further 

241 # by removing fully processed subtrees 

242 pass 

243 

244 

245def stream_markdown_file( 

246 file_path: str, 

247 max_chunk_size: int = 1000, 

248 chunk_overlap: int = 100, 

249 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH, 

250 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN, 

251) -> Iterator[Chunk]: 

252 """Stream chunks from a markdown file. 

253 

254 Convenience function for processing a file with default settings. 

255 

256 Args: 

257 file_path: Path to markdown file 

258 max_chunk_size: Maximum size of chunk text in characters 

259 chunk_overlap: Number of characters to overlap between chunks 

260 heading_inclusion: How to include headings in chunks 

261 chunk_format: Output format for chunks 

262 

263 Yields: 

264 Chunk objects 

265 """ 

266 processor = StreamingMarkdownProcessor( 

267 max_chunk_size=max_chunk_size, 

268 chunk_overlap=chunk_overlap, 

269 heading_inclusion=heading_inclusion, 

270 chunk_format=chunk_format, 

271 ) 

272 yield from processor.process_file(file_path) 

273 

274 

275def stream_markdown_string( 

276 content: str, 

277 max_chunk_size: int = 1000, 

278 chunk_overlap: int = 100, 

279 heading_inclusion: HeadingInclusion = HeadingInclusion.BOTH, 

280 chunk_format: ChunkFormat = ChunkFormat.MARKDOWN, 

281) -> Iterator[Chunk]: 

282 """Stream chunks from a markdown string. 

283 

284 Convenience function for processing a string with default settings. 

285 

286 Args: 

287 content: Markdown content string 

288 max_chunk_size: Maximum size of chunk text in characters 

289 chunk_overlap: Number of characters to overlap between chunks 

290 heading_inclusion: How to include headings in chunks 

291 chunk_format: Output format for chunks 

292 

293 Yields: 

294 Chunk objects 

295 """ 

296 processor = StreamingMarkdownProcessor( 

297 max_chunk_size=max_chunk_size, 

298 chunk_overlap=chunk_overlap, 

299 heading_inclusion=heading_inclusion, 

300 chunk_format=chunk_format, 

301 ) 

302 yield from processor.process_string(content)