Coverage for intelligence_toolkit/query_text_data/answer_builder.py: 80%

109 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3 

4import re 

5from json import loads, dumps 

6from collections import defaultdict 

7 

8import intelligence_toolkit.AI.utils as utils 

9import intelligence_toolkit.query_text_data.answer_schema as answer_schema 

10import intelligence_toolkit.query_text_data.prompts as prompts 

11from intelligence_toolkit.query_text_data.classes import AnswerObject 

12import sklearn.cluster as cluster 

13 

14 

15def _split_on_multiple_delimiters(string, delimiters): 

16 # Create a regular expression pattern with the delimiters 

17 pattern = "|".join(map(re.escape, delimiters)) 

18 # Split the string using the pattern 

19 return re.split(pattern, string) 

20 

21 

22def extract_and_link_chunk_references(text, link=True): 

23 source_spans = list(re.finditer(r"\[source: ([^\]]+)\]", text, re.MULTILINE)) 

24 references = set() 

25 for source_span in source_spans: 

26 old_span = source_span.group(0) 

27 new_span = "[source: " 

28 # split on , or ; and remove whitespace 

29 parts = [ 

30 x.strip() 

31 for x in _split_on_multiple_delimiters(source_span.group(1), [",", ";"]) 

32 ] 

33 matched_parts = [x for x in parts if re.match(r"^\d+$", x)] 

34 references.update(matched_parts) 

35 if link: 

36 new_span += ( 

37 ", ".join([f"[{part}](#source-{part})" for part in matched_parts]) + "]" 

38 ) 

39 text = text.replace(old_span, new_span) 

40 references = [int(cid) for cid in references if cid.isdigit()] 

41 references = sorted(references) 

42 return text, references 

43 

44def _build_theme_summaries_from_commentary(commentary): 

45 if commentary is None: 

46 return [] 

47 

48 structure = getattr(commentary, "structure", None) 

49 if not structure: 

50 return [] 

51 

52 themes = structure.get("themes") or {} 

53 points = structure.get("points") or {} 

54 point_sources = structure.get("point_sources") or {} 

55 

56 summaries = [] 

57 for theme_title, point_ids in themes.items(): 

58 theme_points = [] 

59 for point_id in point_ids: 

60 point_title = points.get(point_id) 

61 if not point_title: 

62 continue 

63 

64 sources = point_sources.get(point_id, []) 

65 if sources: 

66 # Preserve insertion order while ensuring unique references. 

67 seen = set() 

68 ordered_sources = [str(src) for src in sources if not (src in seen or seen.add(src))] 

69 sources_text = ", ".join(ordered_sources) 

70 evidence_suffix = f" [source: {sources_text}]" 

71 else: 

72 evidence_suffix = "" 

73 

74 theme_points.append( 

75 { 

76 "point_title": point_title, 

77 "point_evidence": f"**Source evidence**: {point_title}{evidence_suffix}", 

78 "point_commentary": f"**AI commentary**: {point_title}", 

79 } 

80 ) 

81 

82 if theme_points: 

83 summaries.append( 

84 dumps( 

85 { 

86 "theme_title": theme_title, 

87 "theme_points": theme_points, 

88 } 

89 ) 

90 ) 

91 

92 return summaries 

93 

94async def answer_query( 

95 ai_configuration, 

96 query, 

97 expanded_query, 

98 processed_chunks, 

99 commentary, 

100): 

101 print(f"Answering query with clustered ids: {commentary.get_clustered_cids()}") 

102 partitioned_texts = {} 

103 for theme, cids in commentary.get_clustered_cids().items(): 

104 partitioned_texts[theme] = [f"{cid}: {processed_chunks.cid_to_text[cid]}" for cid in cids] 

105 net_new_sources = 0 

106 

107 summarized_themes_analysis = _build_theme_summaries_from_commentary(commentary) 

108 batched_summarization_messages = [] 

109 for i, (theme, texts) in enumerate(partitioned_texts.items()): 

110 previous_themes = list(theme for theme, _ in partitioned_texts.items())[:i] if i > 0 else [] 

111 batched_summarization_messages.append( 

112 utils.prepare_messages( 

113 prompts.theme_summarization_prompt, 

114 {"chunks": texts, "theme": theme, "previous_themes": previous_themes, "query": expanded_query}, 

115 ) 

116 ) 

117 

118 summarized_themes = await utils.map_generate_text( 

119 ai_configuration, 

120 batched_summarization_messages, 

121 response_format=answer_schema.theme_summarization_format, 

122 ) 

123 

124 theme_integration_messages = utils.prepare_messages( 

125 prompts.theme_integration_prompt, 

126 {"content": summarized_themes, "query": query}, 

127 ) 

128 

129 report_wrapper = utils.generate_text( 

130 ai_configuration, 

131 theme_integration_messages, 

132 response_format=answer_schema.theme_integration_format, 

133 ) 

134 

135 report, references, matched_chunks = build_report_markdown( 

136 query, 

137 expanded_query, 

138 summarized_themes_analysis or summarized_themes, 

139 report_wrapper, 

140 processed_chunks.cid_to_text 

141 ) 

142 return AnswerObject( 

143 extended_answer=report, 

144 references=references, 

145 referenced_chunks=matched_chunks, 

146 net_new_sources=net_new_sources, 

147 ) 

148 

149 

150def build_report_markdown( 

151 query, 

152 expanded_query, 

153 summarized_themes, 

154 report_wrapper, 

155 cid_to_text 

156): 

157 summarized_themes_objs = [loads(text) for text in summarized_themes] 

158 report_wrapper_obj = loads(report_wrapper) 

159 text_jsons = [loads(text) for text in cid_to_text.values()] 

160 matched_chunks = { 

161 f"{text['title']} ({text['chunk_id']})": text for text in text_jsons 

162 } 

163 home_link = "#final-report" 

164 report = f'## Query\n\n*{query}*\n\n## Expanded Query\n\n*{expanded_query}*\n\n## Answer\n\n{report_wrapper_obj["answer"]}\n\n## Analysis\n\n### {report_wrapper_obj["report_title"]}\n\n{report_wrapper_obj["report_overview"]}\n\n' 

165 for theme in summarized_themes_objs: 

166 report += f'#### Theme: {theme["theme_title"]}\n\n' 

167 for point in theme["theme_points"]: 

168 report += f'##### {point["point_title"]}\n\n{point["point_evidence"]}\n\n{point["point_commentary"]}\n\n' 

169 report += ( 

170 f'#### Implications\n\n{report_wrapper_obj["report_implications"]}\n\n' 

171 ) 

172 report, references = extract_and_link_chunk_references(report) 

173 print(f"Extracted references: {references}") 

174 report += f"## Sources\n\n" 

175 for cid in references: 

176 if cid in cid_to_text.keys(): 

177 chunk = loads(cid_to_text[cid]) 

178 report += f'#### Source {cid}\n\n<details>\n\n##### Text chunk: {chunk["title"]} ({chunk["chunk_id"]})\n\n{chunk["text_chunk"]}\n\n' 

179 report += f"</details>\n\n[Back to top]({home_link})\n\n" 

180 else: 

181 print(f"No match for {cid}") 

182 

183 return report, references, matched_chunks 

184 

185def cluster_cids(relevant_cids, cid_to_vector, target_clusters): 

186 clustered_cids = {} 

187 if len(relevant_cids) > 0: 

188 # use k-means clustering to group relevant cids into target_clusters clusters 

189 cids = [] 

190 vectors = [] 

191 for relevant_cid in relevant_cids: 

192 if relevant_cid in cid_to_vector: 

193 cids.append(relevant_cid) 

194 vectors.append(cid_to_vector[relevant_cid]) 

195 kmeans = cluster.KMeans(n_clusters=target_clusters) 

196 kmeans.fit(vectors) 

197 cluster_assignments = kmeans.predict(vectors) 

198 

199 for i, cid in enumerate(cids): 

200 cluster_assignment = cluster_assignments[i] 

201 if cluster_assignment not in clustered_cids: 

202 clustered_cids[cluster_assignment] = [] 

203 clustered_cids[cluster_assignment].append(cid) 

204 return clustered_cids 

205 

206