Coverage for src \ truenex_memory \ retrieval \ scoring.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""BM25 keyword scoring for truenex-memory retrieval.""" 

2 

3from __future__ import annotations 

4 

5import math 

6import re 

7from dataclasses import dataclass 

8 

9 

10SOURCE_TYPE_BOOST: dict[str, float] = { 

11 "project_docs": 1.0, 

12 "agent_session": 0.75, 

13} 

14DEFAULT_SOURCE_BOOST = 0.85 

15 

16 

17def tokenize(text: str) -> list[str]: 

18 """Tokenize text into lowercase words (Unicode-aware).""" 

19 return re.findall(r"\w+", text.lower()) 

20 

21 

22def tokenize_set(text: str) -> set[str]: 

23 """Return unique lowercase tokens from text.""" 

24 return set(tokenize(text)) 

25 

26 

27@dataclass 

28class BM25: 

29 """Okapi BM25 scorer over a fixed corpus. 

30 

31 Build once per query call with the candidate corpus, then call 

32 get_scores() to rank all documents against a query. 

33 """ 

34 

35 corpus: list[list[str]] 

36 k1: float = 1.5 

37 b: float = 0.75 

38 

39 def __post_init__(self) -> None: 

40 self._N = len(self.corpus) 

41 self._avgdl = ( 

42 sum(len(d) for d in self.corpus) / max(self._N, 1) 

43 ) 

44 self._df: dict[str, int] = {} 

45 for doc in self.corpus: 

46 for term in set(doc): 

47 self._df[term] = self._df.get(term, 0) + 1 

48 

49 def _idf(self, term: str) -> float: 

50 df = self._df.get(term, 0) 

51 return math.log((self._N - df + 0.5) / (df + 0.5) + 1) 

52 

53 def score(self, query_tokens: list[str], doc_tokens: list[str]) -> float: 

54 """BM25 score for a single document against the query.""" 

55 dl = len(doc_tokens) 

56 tf_map: dict[str, int] = {} 

57 for t in doc_tokens: 

58 tf_map[t] = tf_map.get(t, 0) + 1 

59 result = 0.0 

60 for term in query_tokens: 

61 tf = tf_map.get(term, 0) 

62 if tf == 0: 

63 continue 

64 idf = self._idf(term) 

65 num = tf * (self.k1 + 1) 

66 den = tf + self.k1 * (1.0 - self.b + self.b * dl / self._avgdl) 

67 result += idf * num / den 

68 return result 

69 

70 def get_scores(self, query_tokens: list[str]) -> list[float]: 

71 """Return BM25 score for every document in the corpus.""" 

72 return [self.score(query_tokens, doc) for doc in self.corpus] 

73 

74 

75def source_boost(source_type: str | None) -> float: 

76 """Return the score multiplier for a given source_type.""" 

77 if source_type is None: 

78 return DEFAULT_SOURCE_BOOST 

79 return SOURCE_TYPE_BOOST.get(source_type, DEFAULT_SOURCE_BOOST)