Coverage for src / dataknobs_xization / markdown / md_parser.py: 34%
264 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:15 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:15 -0700
1"""Markdown parser for converting markdown documents into tree structures.
3This module provides functionality to parse markdown text and build a Tree
4structure that preserves the document's heading hierarchy and body text.
5"""
7from __future__ import annotations
9import re
10from dataclasses import dataclass, field
11from typing import Any, Iterator, TextIO
13from dataknobs_structures.tree import Tree
16@dataclass
17class MarkdownNode:
18 """Data container for markdown tree nodes.
20 Represents various markdown constructs in the document.
22 Attributes:
23 text: The text content
24 level: Heading level (1-6) or 0 for non-headings
25 node_type: Type of node ('heading', 'body', 'code', 'list', 'table',
26 'blockquote', 'horizontal_rule')
27 line_number: Line number in source document (for debugging)
28 metadata: Additional metadata about the node (e.g., language for code blocks,
29 list type, etc.)
30 """
32 text: str
33 level: int = 0 # 0 for body text, 1-6 for headings
34 node_type: str = "body"
35 line_number: int = 0
36 metadata: dict[str, Any] = field(default_factory=dict)
38 def __str__(self) -> str:
39 """String representation of the node."""
40 if self.node_type == "heading":
41 return f"H{self.level}: {self.text}"
42 elif self.node_type == "code":
43 lang = self.metadata.get("language", "")
44 return f"Code({lang}): {self.text[:50]}..."
45 elif self.node_type == "list":
46 list_type = self.metadata.get("list_type", "unordered")
47 return f"List({list_type}): {len(self.text.splitlines())} items"
48 elif self.node_type == "table":
49 return f"Table: {len(self.text.splitlines())} rows"
50 elif self.node_type == "blockquote":
51 return f"Quote: {self.text[:50]}..."
52 elif self.node_type == "horizontal_rule":
53 return "---"
54 return self.text
56 def is_heading(self) -> bool:
57 """Check if this node represents a heading."""
58 return self.node_type == "heading"
60 def is_body(self) -> bool:
61 """Check if this node represents body text."""
62 return self.node_type == "body"
64 def is_code(self) -> bool:
65 """Check if this node represents a code block."""
66 return self.node_type == "code"
68 def is_list(self) -> bool:
69 """Check if this node represents a list."""
70 return self.node_type == "list"
72 def is_table(self) -> bool:
73 """Check if this node represents a table."""
74 return self.node_type == "table"
76 def is_blockquote(self) -> bool:
77 """Check if this node represents a blockquote."""
78 return self.node_type == "blockquote"
80 def is_horizontal_rule(self) -> bool:
81 """Check if this node represents a horizontal rule."""
82 return self.node_type == "horizontal_rule"
84 def is_atomic(self) -> bool:
85 """Check if this node should not be split during chunking."""
86 return self.node_type in ("code", "table", "list", "horizontal_rule")
89class MarkdownParser:
90 """Parser for converting markdown text into a tree structure.
92 The parser builds a Tree where:
93 - Heading nodes are parents to their sub-headings and body text
94 - Special constructs (code, lists, tables) are leaf nodes
95 - The hierarchy mirrors the markdown heading structure
96 """
98 # Regex patterns for markdown constructs
99 HEADING_PATTERN = re.compile(r'^(#{1,6})\s+(.+)$')
100 FENCED_CODE_START = re.compile(r'^```(\w*).*$')
101 FENCED_CODE_END = re.compile(r'^```\s*$')
102 INDENTED_CODE = re.compile(r'^( |\t)(.*)$')
103 HORIZONTAL_RULE = re.compile(r'^(\*{3,}|-{3,}|_{3,})\s*$')
104 UNORDERED_LIST = re.compile(r'^(\s*)([-*+])\s+(.+)$')
105 ORDERED_LIST = re.compile(r'^(\s*)(\d+\.)\s+(.+)$')
106 BLOCKQUOTE = re.compile(r'^>\s*(.*)$')
107 TABLE_ROW = re.compile(r'^\|(.+)\|$')
108 TABLE_SEPARATOR = re.compile(r'^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)*\|?\s*$')
110 def __init__(
111 self,
112 max_line_length: int | None = None,
113 preserve_empty_lines: bool = False,
114 ):
115 """Initialize the markdown parser.
117 Args:
118 max_line_length: Maximum length for body text lines (None for unlimited)
119 preserve_empty_lines: Whether to preserve empty lines in output
120 """
121 self.max_line_length = max_line_length
122 self.preserve_empty_lines = preserve_empty_lines
123 self._current_line = 0
125 def parse(self, source: str | TextIO | Iterator[str]) -> Tree:
126 """Parse markdown content into a tree structure.
128 Args:
129 source: Markdown content as string, file object, or line iterator
131 Returns:
132 Tree with root node containing the document structure
133 """
134 # Create root node
135 root = Tree(MarkdownNode(text="ROOT", level=0, node_type="root", line_number=0))
137 # Convert source to list for lookahead capability
138 lines = list(self._get_line_iterator(source))
140 # Track current position in tree
141 current_parent = root
142 self._current_line = 0
144 # State tracking for multi-line constructs
145 i = 0
146 while i < len(lines):
147 self._current_line = i + 1
148 line = lines[i]
150 # Skip empty lines unless preserving them
151 if not line.strip():
152 if self.preserve_empty_lines:
153 node_data = MarkdownNode(
154 text="",
155 level=0,
156 node_type="body",
157 line_number=self._current_line,
158 )
159 current_parent.add_child(node_data)
160 i += 1
161 continue
163 # Check for heading
164 heading_match = self.HEADING_PATTERN.match(line)
165 if heading_match:
166 level = len(heading_match.group(1))
167 text = heading_match.group(2).strip()
169 node_data = MarkdownNode(
170 text=text,
171 level=level,
172 node_type="heading",
173 line_number=self._current_line,
174 )
176 current_parent, _ = self._find_heading_parent(
177 root, current_parent, level
178 )
180 heading_node = current_parent.add_child(node_data)
181 current_parent = heading_node
182 i += 1
183 continue
185 # Check for horizontal rule
186 if self.HORIZONTAL_RULE.match(line):
187 node_data = MarkdownNode(
188 text=line.strip(),
189 level=0,
190 node_type="horizontal_rule",
191 line_number=self._current_line,
192 )
193 current_parent.add_child(node_data)
194 i += 1
195 continue
197 # Check for fenced code block
198 code_match = self.FENCED_CODE_START.match(line)
199 if code_match:
200 code_lines, lines_consumed = self._parse_fenced_code_block(lines, i)
201 if code_lines:
202 language = code_match.group(1) or ""
203 node_data = MarkdownNode(
204 text="\n".join(code_lines),
205 level=0,
206 node_type="code",
207 line_number=self._current_line,
208 metadata={"language": language, "fence_type": "```"},
209 )
210 current_parent.add_child(node_data)
211 i += lines_consumed
212 continue
214 # Check for table
215 if self.TABLE_ROW.match(line) and i + 1 < len(lines):
216 if self.TABLE_SEPARATOR.match(lines[i + 1]):
217 table_lines, lines_consumed = self._parse_table(lines, i)
218 if table_lines:
219 node_data = MarkdownNode(
220 text="\n".join(table_lines),
221 level=0,
222 node_type="table",
223 line_number=self._current_line,
224 metadata={"rows": len(table_lines)},
225 )
226 current_parent.add_child(node_data)
227 i += lines_consumed
228 continue
230 # Check for list
231 list_match = self.UNORDERED_LIST.match(line) or self.ORDERED_LIST.match(line)
232 if list_match:
233 list_lines, lines_consumed, list_type = self._parse_list(lines, i)
234 if list_lines:
235 node_data = MarkdownNode(
236 text="\n".join(list_lines),
237 level=0,
238 node_type="list",
239 line_number=self._current_line,
240 metadata={"list_type": list_type, "items": len(list_lines)},
241 )
242 current_parent.add_child(node_data)
243 i += lines_consumed
244 continue
246 # Check for blockquote
247 if self.BLOCKQUOTE.match(line):
248 quote_lines, lines_consumed = self._parse_blockquote(lines, i)
249 if quote_lines:
250 node_data = MarkdownNode(
251 text="\n".join(quote_lines),
252 level=0,
253 node_type="blockquote",
254 line_number=self._current_line,
255 )
256 current_parent.add_child(node_data)
257 i += lines_consumed
258 continue
260 # Check for indented code block (4 spaces or tab)
261 if self.INDENTED_CODE.match(line):
262 code_lines, lines_consumed = self._parse_indented_code_block(lines, i)
263 if code_lines:
264 node_data = MarkdownNode(
265 text="\n".join(code_lines),
266 level=0,
267 node_type="code",
268 line_number=self._current_line,
269 metadata={"language": "", "fence_type": "indent"},
270 )
271 current_parent.add_child(node_data)
272 i += lines_consumed
273 continue
275 # Default: body text
276 text = line.rstrip('\n')
278 if self.max_line_length and len(text) > self.max_line_length:
279 for chunk in self._split_text_intelligently(text):
280 node_data = MarkdownNode(
281 text=chunk,
282 level=0,
283 node_type="body",
284 line_number=self._current_line,
285 )
286 current_parent.add_child(node_data)
287 else:
288 node_data = MarkdownNode(
289 text=text,
290 level=0,
291 node_type="body",
292 line_number=self._current_line,
293 )
294 current_parent.add_child(node_data)
296 i += 1
298 return root
300 def _get_line_iterator(self, source: str | TextIO | Iterator[str]) -> Iterator[str]:
301 """Convert various source types to line iterator.
303 Args:
304 source: Input source
306 Returns:
307 Iterator over lines
308 """
309 if isinstance(source, str):
310 return iter(source.splitlines())
311 elif hasattr(source, 'read'):
312 # File-like object
313 return iter(source)
314 else:
315 # Already an iterator
316 return source
318 def _parse_fenced_code_block(
319 self,
320 lines: list[str],
321 start_idx: int,
322 ) -> tuple[list[str], int]:
323 """Parse a fenced code block.
325 Args:
326 lines: All lines in the document
327 start_idx: Index of the opening fence
329 Returns:
330 Tuple of (code_lines, lines_consumed)
331 """
332 code_lines = []
333 i = start_idx + 1
335 while i < len(lines):
336 if self.FENCED_CODE_END.match(lines[i]):
337 return code_lines, i - start_idx + 1
338 code_lines.append(lines[i].rstrip('\n'))
339 i += 1
341 # No closing fence found, treat as code anyway
342 return code_lines, i - start_idx
344 def _parse_indented_code_block(
345 self,
346 lines: list[str],
347 start_idx: int,
348 ) -> tuple[list[str], int]:
349 """Parse an indented code block.
351 Args:
352 lines: All lines in the document
353 start_idx: Index of first indented line
355 Returns:
356 Tuple of (code_lines, lines_consumed)
357 """
358 code_lines = []
359 i = start_idx
361 while i < len(lines):
362 # Empty lines within code block are allowed
363 if not lines[i].strip():
364 code_lines.append("")
365 i += 1
366 continue
368 # Check if still indented
369 match = self.INDENTED_CODE.match(lines[i])
370 if not match:
371 break
373 code_lines.append(match.group(2))
374 i += 1
376 # Remove trailing empty lines
377 while code_lines and not code_lines[-1]:
378 code_lines.pop()
380 return code_lines, i - start_idx
382 def _parse_table(
383 self,
384 lines: list[str],
385 start_idx: int,
386 ) -> tuple[list[str], int]:
387 """Parse a markdown table.
389 Args:
390 lines: All lines in the document
391 start_idx: Index of table header row
393 Returns:
394 Tuple of (table_lines, lines_consumed)
395 """
396 table_lines = []
397 i = start_idx
399 while i < len(lines):
400 if not self.TABLE_ROW.match(lines[i]):
401 break
402 table_lines.append(lines[i].rstrip('\n'))
403 i += 1
405 return table_lines, i - start_idx
407 def _parse_list(
408 self,
409 lines: list[str],
410 start_idx: int,
411 ) -> tuple[list[str], int, str]:
412 """Parse a markdown list (ordered or unordered).
414 Args:
415 lines: All lines in the document
416 start_idx: Index of first list item
418 Returns:
419 Tuple of (list_lines, lines_consumed, list_type)
420 """
421 list_lines = []
422 i = start_idx
424 # Determine list type from first line
425 first_line = lines[start_idx]
426 is_ordered = bool(self.ORDERED_LIST.match(first_line))
427 list_type = "ordered" if is_ordered else "unordered"
429 while i < len(lines):
430 line = lines[i]
432 # Check for list item
433 if is_ordered:
434 match = self.ORDERED_LIST.match(line)
435 else:
436 match = self.UNORDERED_LIST.match(line)
438 if match:
439 list_lines.append(line.rstrip('\n'))
440 i += 1
441 elif line.strip() and line.startswith((' ', '\t')):
442 # Continuation line (indented)
443 if list_lines:
444 list_lines.append(line.rstrip('\n'))
445 i += 1
446 else:
447 break
448 elif not line.strip():
449 # Empty line might be part of list or end it
450 # Look ahead to see if more list items follow
451 if i + 1 < len(lines):
452 next_match = (self.ORDERED_LIST.match(lines[i + 1])
453 if is_ordered
454 else self.UNORDERED_LIST.match(lines[i + 1]))
455 if next_match:
456 list_lines.append("")
457 i += 1
458 continue
459 break
460 else:
461 break
463 # Remove trailing empty lines
464 while list_lines and not list_lines[-1]:
465 list_lines.pop()
467 return list_lines, i - start_idx, list_type
469 def _parse_blockquote(
470 self,
471 lines: list[str],
472 start_idx: int,
473 ) -> tuple[list[str], int]:
474 """Parse a blockquote.
476 Args:
477 lines: All lines in the document
478 start_idx: Index of first quote line
480 Returns:
481 Tuple of (quote_lines, lines_consumed)
482 """
483 quote_lines = []
484 i = start_idx
486 while i < len(lines):
487 match = self.BLOCKQUOTE.match(lines[i])
488 if match:
489 quote_lines.append(match.group(1))
490 i += 1
491 elif not lines[i].strip():
492 # Empty line might continue the quote
493 if i + 1 < len(lines) and self.BLOCKQUOTE.match(lines[i + 1]):
494 quote_lines.append("")
495 i += 1
496 else:
497 break
498 else:
499 break
501 return quote_lines, i - start_idx
503 def _find_heading_parent(
504 self,
505 root: Tree,
506 current_parent: Tree,
507 new_level: int,
508 ) -> tuple[Tree, int]:
509 """Find the appropriate parent node for a heading at the given level.
511 Args:
512 root: Root node of the tree
513 current_parent: Current parent node
514 new_level: Level of the new heading
516 Returns:
517 Tuple of (parent_node, parent_level)
518 """
519 # If new heading is deeper than current level, it's a child
520 if new_level > current_parent.data.level:
521 return current_parent, current_parent.data.level
523 # Otherwise, traverse up to find appropriate level
524 node = current_parent
525 while node is not None and node != root:
526 if node.data.is_heading() and node.data.level < new_level:
527 return node, node.data.level
528 node = node.parent
530 # Default to root
531 return root, 0
533 def _split_text_intelligently(self, text: str) -> list[str]:
534 """Split text at sentence boundaries or other natural break points.
536 Args:
537 text: Text to split
539 Returns:
540 List of text chunks
541 """
542 if not self.max_line_length or len(text) <= self.max_line_length:
543 return [text]
545 chunks = []
547 # Try to split at sentence boundaries
548 sentences = re.split(r'([.!?]+\s+)', text)
549 current_chunk = ""
551 for segment in sentences:
552 if len(current_chunk) + len(segment) <= self.max_line_length:
553 current_chunk += segment
554 else:
555 if current_chunk:
556 chunks.append(current_chunk)
557 current_chunk = segment
559 if current_chunk:
560 chunks.append(current_chunk)
562 # If we still have chunks that are too long, split at spaces
563 final_chunks = []
564 for chunk in chunks:
565 if len(chunk) <= self.max_line_length:
566 final_chunks.append(chunk)
567 else:
568 # Split at word boundaries
569 words = chunk.split()
570 current = ""
571 for word in words:
572 if len(current) + len(word) + 1 <= self.max_line_length:
573 current += (" " if current else "") + word
574 else:
575 if current:
576 final_chunks.append(current)
577 current = word
578 if current:
579 final_chunks.append(current)
581 return final_chunks if final_chunks else [text]
584def parse_markdown(
585 source: str | TextIO | Iterator[str],
586 max_line_length: int | None = None,
587 preserve_empty_lines: bool = False,
588) -> Tree:
589 """Parse markdown content into a tree structure.
591 Convenience function for creating and using a MarkdownParser.
593 Args:
594 source: Markdown content as string, file object, or line iterator
595 max_line_length: Maximum length for body text lines (None for unlimited)
596 preserve_empty_lines: Whether to preserve empty lines in output
598 Returns:
599 Tree with root node containing the document structure
600 """
601 parser = MarkdownParser(
602 max_line_length=max_line_length,
603 preserve_empty_lines=preserve_empty_lines,
604 )
605 return parser.parse(source)