Coverage for src \ truenex_memory \ core \ chunker.py: 93%

46 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Deterministic text chunking with lightweight Markdown heading tracking.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6import hashlib 

7import re 

8 

9 

10HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") 

11 

12 

13@dataclass(frozen=True) 

14class TextChunk: 

15 """A source text slice ready for indexing.""" 

16 

17 index: int 

18 content: str 

19 heading_path: str | None 

20 content_hash: str 

21 token_count: int 

22 

23 

24def content_hash(content: str) -> str: 

25 """Return a stable SHA-256 hash for stored content.""" 

26 

27 return hashlib.sha256(content.encode("utf-8")).hexdigest() 

28 

29 

30def estimate_tokens(content: str) -> int: 

31 """Cheap deterministic token estimate suitable for local metadata.""" 

32 

33 return len(re.findall(r"\S+", content)) 

34 

35 

36def chunk_text(text: str, *, max_chars: int = 1200) -> list[TextChunk]: 

37 """Split text into stable chunks without external tokenizers.""" 

38 

39 normalized = text.replace("\r\n", "\n").replace("\r", "\n").strip() 

40 if not normalized: 

41 return [] 

42 

43 chunks: list[TextChunk] = [] 

44 heading_stack: list[tuple[int, str]] = [] 

45 current_lines: list[str] = [] 

46 current_heading: str | None = None 

47 

48 def flush() -> None: 

49 nonlocal current_lines, current_heading 

50 body = "\n".join(current_lines).strip() 

51 if not body: 

52 current_lines = [] 

53 return 

54 chunks.append( 

55 TextChunk( 

56 index=len(chunks), 

57 content=body, 

58 heading_path=current_heading, 

59 content_hash=content_hash(body), 

60 token_count=estimate_tokens(body), 

61 ) 

62 ) 

63 current_lines = [] 

64 

65 for line in normalized.split("\n"): 

66 heading_match = HEADING_RE.match(line) 

67 if heading_match: 

68 if current_lines: 

69 flush() 

70 level = len(heading_match.group(1)) 

71 title = heading_match.group(2).strip() 

72 heading_stack = [(lvl, txt) for lvl, txt in heading_stack if lvl < level] 

73 heading_stack.append((level, title)) 

74 current_heading = " > ".join(txt for _, txt in heading_stack) 

75 

76 if current_lines and sum(len(item) + 1 for item in current_lines) + len(line) > max_chars: 

77 flush() 

78 

79 current_lines.append(line) 

80 

81 flush() 

82 return chunks