Coverage for src/dataknobs_xization/markdown/md_parser.py: 16%

264 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-18 17:41 -0700

1"""Markdown parser for converting markdown documents into tree structures. 

2 

3This module provides functionality to parse markdown text and build a Tree 

4structure that preserves the document's heading hierarchy and body text. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from dataclasses import dataclass, field 

11from typing import Any, Iterator, TextIO 

12 

13from dataknobs_structures.tree import Tree 

14 

15 

16@dataclass 

17class MarkdownNode: 

18 """Data container for markdown tree nodes. 

19 

20 Represents various markdown constructs in the document. 

21 

22 Attributes: 

23 text: The text content 

24 level: Heading level (1-6) or 0 for non-headings 

25 node_type: Type of node ('heading', 'body', 'code', 'list', 'table', 

26 'blockquote', 'horizontal_rule') 

27 line_number: Line number in source document (for debugging) 

28 metadata: Additional metadata about the node (e.g., language for code blocks, 

29 list type, etc.) 

30 """ 

31 

32 text: str 

33 level: int = 0 # 0 for body text, 1-6 for headings 

34 node_type: str = "body" 

35 line_number: int = 0 

36 metadata: dict[str, Any] = field(default_factory=dict) 

37 

38 def __str__(self) -> str: 

39 """String representation of the node.""" 

40 if self.node_type == "heading": 

41 return f"H{self.level}: {self.text}" 

42 elif self.node_type == "code": 

43 lang = self.metadata.get("language", "") 

44 return f"Code({lang}): {self.text[:50]}..." 

45 elif self.node_type == "list": 

46 list_type = self.metadata.get("list_type", "unordered") 

47 return f"List({list_type}): {len(self.text.splitlines())} items" 

48 elif self.node_type == "table": 

49 return f"Table: {len(self.text.splitlines())} rows" 

50 elif self.node_type == "blockquote": 

51 return f"Quote: {self.text[:50]}..." 

52 elif self.node_type == "horizontal_rule": 

53 return "---" 

54 return self.text 

55 

56 def is_heading(self) -> bool: 

57 """Check if this node represents a heading.""" 

58 return self.node_type == "heading" 

59 

60 def is_body(self) -> bool: 

61 """Check if this node represents body text.""" 

62 return self.node_type == "body" 

63 

64 def is_code(self) -> bool: 

65 """Check if this node represents a code block.""" 

66 return self.node_type == "code" 

67 

68 def is_list(self) -> bool: 

69 """Check if this node represents a list.""" 

70 return self.node_type == "list" 

71 

72 def is_table(self) -> bool: 

73 """Check if this node represents a table.""" 

74 return self.node_type == "table" 

75 

76 def is_blockquote(self) -> bool: 

77 """Check if this node represents a blockquote.""" 

78 return self.node_type == "blockquote" 

79 

80 def is_horizontal_rule(self) -> bool: 

81 """Check if this node represents a horizontal rule.""" 

82 return self.node_type == "horizontal_rule" 

83 

84 def is_atomic(self) -> bool: 

85 """Check if this node should not be split during chunking.""" 

86 return self.node_type in ("code", "table", "list", "horizontal_rule") 

87 

88 

89class MarkdownParser: 

90 """Parser for converting markdown text into a tree structure. 

91 

92 The parser builds a Tree where: 

93 - Heading nodes are parents to their sub-headings and body text 

94 - Special constructs (code, lists, tables) are leaf nodes 

95 - The hierarchy mirrors the markdown heading structure 

96 """ 

97 

98 # Regex patterns for markdown constructs 

99 HEADING_PATTERN = re.compile(r'^(#{1,6})\s+(.+)$') 

100 FENCED_CODE_START = re.compile(r'^```(\w*).*$') 

101 FENCED_CODE_END = re.compile(r'^```\s*$') 

102 INDENTED_CODE = re.compile(r'^( |\t)(.*)$') 

103 HORIZONTAL_RULE = re.compile(r'^(\*{3,}|-{3,}|_{3,})\s*$') 

104 UNORDERED_LIST = re.compile(r'^(\s*)([-*+])\s+(.+)$') 

105 ORDERED_LIST = re.compile(r'^(\s*)(\d+\.)\s+(.+)$') 

106 BLOCKQUOTE = re.compile(r'^>\s*(.*)$') 

107 TABLE_ROW = re.compile(r'^\|(.+)\|$') 

108 TABLE_SEPARATOR = re.compile(r'^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)*\|?\s*$') 

109 

110 def __init__( 

111 self, 

112 max_line_length: int | None = None, 

113 preserve_empty_lines: bool = False, 

114 ): 

115 """Initialize the markdown parser. 

116 

117 Args: 

118 max_line_length: Maximum length for body text lines (None for unlimited) 

119 preserve_empty_lines: Whether to preserve empty lines in output 

120 """ 

121 self.max_line_length = max_line_length 

122 self.preserve_empty_lines = preserve_empty_lines 

123 self._current_line = 0 

124 

125 def parse(self, source: str | TextIO | Iterator[str]) -> Tree: 

126 """Parse markdown content into a tree structure. 

127 

128 Args: 

129 source: Markdown content as string, file object, or line iterator 

130 

131 Returns: 

132 Tree with root node containing the document structure 

133 """ 

134 # Create root node 

135 root = Tree(MarkdownNode(text="ROOT", level=0, node_type="root", line_number=0)) 

136 

137 # Convert source to list for lookahead capability 

138 lines = list(self._get_line_iterator(source)) 

139 

140 # Track current position in tree 

141 current_parent = root 

142 self._current_line = 0 

143 

144 # State tracking for multi-line constructs 

145 i = 0 

146 while i < len(lines): 

147 self._current_line = i + 1 

148 line = lines[i] 

149 

150 # Skip empty lines unless preserving them 

151 if not line.strip(): 

152 if self.preserve_empty_lines: 

153 node_data = MarkdownNode( 

154 text="", 

155 level=0, 

156 node_type="body", 

157 line_number=self._current_line, 

158 ) 

159 current_parent.add_child(node_data) 

160 i += 1 

161 continue 

162 

163 # Check for heading 

164 heading_match = self.HEADING_PATTERN.match(line) 

165 if heading_match: 

166 level = len(heading_match.group(1)) 

167 text = heading_match.group(2).strip() 

168 

169 node_data = MarkdownNode( 

170 text=text, 

171 level=level, 

172 node_type="heading", 

173 line_number=self._current_line, 

174 ) 

175 

176 current_parent, _ = self._find_heading_parent( 

177 root, current_parent, level 

178 ) 

179 

180 heading_node = current_parent.add_child(node_data) 

181 current_parent = heading_node 

182 i += 1 

183 continue 

184 

185 # Check for horizontal rule 

186 if self.HORIZONTAL_RULE.match(line): 

187 node_data = MarkdownNode( 

188 text=line.strip(), 

189 level=0, 

190 node_type="horizontal_rule", 

191 line_number=self._current_line, 

192 ) 

193 current_parent.add_child(node_data) 

194 i += 1 

195 continue 

196 

197 # Check for fenced code block 

198 code_match = self.FENCED_CODE_START.match(line) 

199 if code_match: 

200 code_lines, lines_consumed = self._parse_fenced_code_block(lines, i) 

201 if code_lines: 

202 language = code_match.group(1) or "" 

203 node_data = MarkdownNode( 

204 text="\n".join(code_lines), 

205 level=0, 

206 node_type="code", 

207 line_number=self._current_line, 

208 metadata={"language": language, "fence_type": "```"}, 

209 ) 

210 current_parent.add_child(node_data) 

211 i += lines_consumed 

212 continue 

213 

214 # Check for table 

215 if self.TABLE_ROW.match(line) and i + 1 < len(lines): 

216 if self.TABLE_SEPARATOR.match(lines[i + 1]): 

217 table_lines, lines_consumed = self._parse_table(lines, i) 

218 if table_lines: 

219 node_data = MarkdownNode( 

220 text="\n".join(table_lines), 

221 level=0, 

222 node_type="table", 

223 line_number=self._current_line, 

224 metadata={"rows": len(table_lines)}, 

225 ) 

226 current_parent.add_child(node_data) 

227 i += lines_consumed 

228 continue 

229 

230 # Check for list 

231 list_match = self.UNORDERED_LIST.match(line) or self.ORDERED_LIST.match(line) 

232 if list_match: 

233 list_lines, lines_consumed, list_type = self._parse_list(lines, i) 

234 if list_lines: 

235 node_data = MarkdownNode( 

236 text="\n".join(list_lines), 

237 level=0, 

238 node_type="list", 

239 line_number=self._current_line, 

240 metadata={"list_type": list_type, "items": len(list_lines)}, 

241 ) 

242 current_parent.add_child(node_data) 

243 i += lines_consumed 

244 continue 

245 

246 # Check for blockquote 

247 if self.BLOCKQUOTE.match(line): 

248 quote_lines, lines_consumed = self._parse_blockquote(lines, i) 

249 if quote_lines: 

250 node_data = MarkdownNode( 

251 text="\n".join(quote_lines), 

252 level=0, 

253 node_type="blockquote", 

254 line_number=self._current_line, 

255 ) 

256 current_parent.add_child(node_data) 

257 i += lines_consumed 

258 continue 

259 

260 # Check for indented code block (4 spaces or tab) 

261 if self.INDENTED_CODE.match(line): 

262 code_lines, lines_consumed = self._parse_indented_code_block(lines, i) 

263 if code_lines: 

264 node_data = MarkdownNode( 

265 text="\n".join(code_lines), 

266 level=0, 

267 node_type="code", 

268 line_number=self._current_line, 

269 metadata={"language": "", "fence_type": "indent"}, 

270 ) 

271 current_parent.add_child(node_data) 

272 i += lines_consumed 

273 continue 

274 

275 # Default: body text 

276 text = line.rstrip('\n') 

277 

278 if self.max_line_length and len(text) > self.max_line_length: 

279 for chunk in self._split_text_intelligently(text): 

280 node_data = MarkdownNode( 

281 text=chunk, 

282 level=0, 

283 node_type="body", 

284 line_number=self._current_line, 

285 ) 

286 current_parent.add_child(node_data) 

287 else: 

288 node_data = MarkdownNode( 

289 text=text, 

290 level=0, 

291 node_type="body", 

292 line_number=self._current_line, 

293 ) 

294 current_parent.add_child(node_data) 

295 

296 i += 1 

297 

298 return root 

299 

300 def _get_line_iterator(self, source: str | TextIO | Iterator[str]) -> Iterator[str]: 

301 """Convert various source types to line iterator. 

302 

303 Args: 

304 source: Input source 

305 

306 Returns: 

307 Iterator over lines 

308 """ 

309 if isinstance(source, str): 

310 return iter(source.splitlines()) 

311 elif hasattr(source, 'read'): 

312 # File-like object 

313 return iter(source) 

314 else: 

315 # Already an iterator 

316 return source 

317 

318 def _parse_fenced_code_block( 

319 self, 

320 lines: list[str], 

321 start_idx: int, 

322 ) -> tuple[list[str], int]: 

323 """Parse a fenced code block. 

324 

325 Args: 

326 lines: All lines in the document 

327 start_idx: Index of the opening fence 

328 

329 Returns: 

330 Tuple of (code_lines, lines_consumed) 

331 """ 

332 code_lines = [] 

333 i = start_idx + 1 

334 

335 while i < len(lines): 

336 if self.FENCED_CODE_END.match(lines[i]): 

337 return code_lines, i - start_idx + 1 

338 code_lines.append(lines[i].rstrip('\n')) 

339 i += 1 

340 

341 # No closing fence found, treat as code anyway 

342 return code_lines, i - start_idx 

343 

344 def _parse_indented_code_block( 

345 self, 

346 lines: list[str], 

347 start_idx: int, 

348 ) -> tuple[list[str], int]: 

349 """Parse an indented code block. 

350 

351 Args: 

352 lines: All lines in the document 

353 start_idx: Index of first indented line 

354 

355 Returns: 

356 Tuple of (code_lines, lines_consumed) 

357 """ 

358 code_lines = [] 

359 i = start_idx 

360 

361 while i < len(lines): 

362 # Empty lines within code block are allowed 

363 if not lines[i].strip(): 

364 code_lines.append("") 

365 i += 1 

366 continue 

367 

368 # Check if still indented 

369 match = self.INDENTED_CODE.match(lines[i]) 

370 if not match: 

371 break 

372 

373 code_lines.append(match.group(2)) 

374 i += 1 

375 

376 # Remove trailing empty lines 

377 while code_lines and not code_lines[-1]: 

378 code_lines.pop() 

379 

380 return code_lines, i - start_idx 

381 

382 def _parse_table( 

383 self, 

384 lines: list[str], 

385 start_idx: int, 

386 ) -> tuple[list[str], int]: 

387 """Parse a markdown table. 

388 

389 Args: 

390 lines: All lines in the document 

391 start_idx: Index of table header row 

392 

393 Returns: 

394 Tuple of (table_lines, lines_consumed) 

395 """ 

396 table_lines = [] 

397 i = start_idx 

398 

399 while i < len(lines): 

400 if not self.TABLE_ROW.match(lines[i]): 

401 break 

402 table_lines.append(lines[i].rstrip('\n')) 

403 i += 1 

404 

405 return table_lines, i - start_idx 

406 

407 def _parse_list( 

408 self, 

409 lines: list[str], 

410 start_idx: int, 

411 ) -> tuple[list[str], int, str]: 

412 """Parse a markdown list (ordered or unordered). 

413 

414 Args: 

415 lines: All lines in the document 

416 start_idx: Index of first list item 

417 

418 Returns: 

419 Tuple of (list_lines, lines_consumed, list_type) 

420 """ 

421 list_lines = [] 

422 i = start_idx 

423 

424 # Determine list type from first line 

425 first_line = lines[start_idx] 

426 is_ordered = bool(self.ORDERED_LIST.match(first_line)) 

427 list_type = "ordered" if is_ordered else "unordered" 

428 

429 while i < len(lines): 

430 line = lines[i] 

431 

432 # Check for list item 

433 if is_ordered: 

434 match = self.ORDERED_LIST.match(line) 

435 else: 

436 match = self.UNORDERED_LIST.match(line) 

437 

438 if match: 

439 list_lines.append(line.rstrip('\n')) 

440 i += 1 

441 elif line.strip() and line.startswith((' ', '\t')): 

442 # Continuation line (indented) 

443 if list_lines: 

444 list_lines.append(line.rstrip('\n')) 

445 i += 1 

446 else: 

447 break 

448 elif not line.strip(): 

449 # Empty line might be part of list or end it 

450 # Look ahead to see if more list items follow 

451 if i + 1 < len(lines): 

452 next_match = (self.ORDERED_LIST.match(lines[i + 1]) 

453 if is_ordered 

454 else self.UNORDERED_LIST.match(lines[i + 1])) 

455 if next_match: 

456 list_lines.append("") 

457 i += 1 

458 continue 

459 break 

460 else: 

461 break 

462 

463 # Remove trailing empty lines 

464 while list_lines and not list_lines[-1]: 

465 list_lines.pop() 

466 

467 return list_lines, i - start_idx, list_type 

468 

469 def _parse_blockquote( 

470 self, 

471 lines: list[str], 

472 start_idx: int, 

473 ) -> tuple[list[str], int]: 

474 """Parse a blockquote. 

475 

476 Args: 

477 lines: All lines in the document 

478 start_idx: Index of first quote line 

479 

480 Returns: 

481 Tuple of (quote_lines, lines_consumed) 

482 """ 

483 quote_lines = [] 

484 i = start_idx 

485 

486 while i < len(lines): 

487 match = self.BLOCKQUOTE.match(lines[i]) 

488 if match: 

489 quote_lines.append(match.group(1)) 

490 i += 1 

491 elif not lines[i].strip(): 

492 # Empty line might continue the quote 

493 if i + 1 < len(lines) and self.BLOCKQUOTE.match(lines[i + 1]): 

494 quote_lines.append("") 

495 i += 1 

496 else: 

497 break 

498 else: 

499 break 

500 

501 return quote_lines, i - start_idx 

502 

503 def _find_heading_parent( 

504 self, 

505 root: Tree, 

506 current_parent: Tree, 

507 new_level: int, 

508 ) -> tuple[Tree, int]: 

509 """Find the appropriate parent node for a heading at the given level. 

510 

511 Args: 

512 root: Root node of the tree 

513 current_parent: Current parent node 

514 new_level: Level of the new heading 

515 

516 Returns: 

517 Tuple of (parent_node, parent_level) 

518 """ 

519 # If new heading is deeper than current level, it's a child 

520 if new_level > current_parent.data.level: 

521 return current_parent, current_parent.data.level 

522 

523 # Otherwise, traverse up to find appropriate level 

524 node = current_parent 

525 while node is not None and node != root: 

526 if node.data.is_heading() and node.data.level < new_level: 

527 return node, node.data.level 

528 node = node.parent 

529 

530 # Default to root 

531 return root, 0 

532 

533 def _split_text_intelligently(self, text: str) -> list[str]: 

534 """Split text at sentence boundaries or other natural break points. 

535 

536 Args: 

537 text: Text to split 

538 

539 Returns: 

540 List of text chunks 

541 """ 

542 if not self.max_line_length or len(text) <= self.max_line_length: 

543 return [text] 

544 

545 chunks = [] 

546 

547 # Try to split at sentence boundaries 

548 sentences = re.split(r'([.!?]+\s+)', text) 

549 current_chunk = "" 

550 

551 for segment in sentences: 

552 if len(current_chunk) + len(segment) <= self.max_line_length: 

553 current_chunk += segment 

554 else: 

555 if current_chunk: 

556 chunks.append(current_chunk) 

557 current_chunk = segment 

558 

559 if current_chunk: 

560 chunks.append(current_chunk) 

561 

562 # If we still have chunks that are too long, split at spaces 

563 final_chunks = [] 

564 for chunk in chunks: 

565 if len(chunk) <= self.max_line_length: 

566 final_chunks.append(chunk) 

567 else: 

568 # Split at word boundaries 

569 words = chunk.split() 

570 current = "" 

571 for word in words: 

572 if len(current) + len(word) + 1 <= self.max_line_length: 

573 current += (" " if current else "") + word 

574 else: 

575 if current: 

576 final_chunks.append(current) 

577 current = word 

578 if current: 

579 final_chunks.append(current) 

580 

581 return final_chunks if final_chunks else [text] 

582 

583 

584def parse_markdown( 

585 source: str | TextIO | Iterator[str], 

586 max_line_length: int | None = None, 

587 preserve_empty_lines: bool = False, 

588) -> Tree: 

589 """Parse markdown content into a tree structure. 

590 

591 Convenience function for creating and using a MarkdownParser. 

592 

593 Args: 

594 source: Markdown content as string, file object, or line iterator 

595 max_line_length: Maximum length for body text lines (None for unlimited) 

596 preserve_empty_lines: Whether to preserve empty lines in output 

597 

598 Returns: 

599 Tree with root node containing the document structure 

600 """ 

601 parser = MarkdownParser( 

602 max_line_length=max_line_length, 

603 preserve_empty_lines=preserve_empty_lines, 

604 ) 

605 return parser.parse(source)