Coverage for intelligence_toolkit/query_text_data/answer_builder.py: 80%
109 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
4import re
5from json import loads, dumps
6from collections import defaultdict
8import intelligence_toolkit.AI.utils as utils
9import intelligence_toolkit.query_text_data.answer_schema as answer_schema
10import intelligence_toolkit.query_text_data.prompts as prompts
11from intelligence_toolkit.query_text_data.classes import AnswerObject
12import sklearn.cluster as cluster
15def _split_on_multiple_delimiters(string, delimiters):
16 # Create a regular expression pattern with the delimiters
17 pattern = "|".join(map(re.escape, delimiters))
18 # Split the string using the pattern
19 return re.split(pattern, string)
22def extract_and_link_chunk_references(text, link=True):
23 source_spans = list(re.finditer(r"\[source: ([^\]]+)\]", text, re.MULTILINE))
24 references = set()
25 for source_span in source_spans:
26 old_span = source_span.group(0)
27 new_span = "[source: "
28 # split on , or ; and remove whitespace
29 parts = [
30 x.strip()
31 for x in _split_on_multiple_delimiters(source_span.group(1), [",", ";"])
32 ]
33 matched_parts = [x for x in parts if re.match(r"^\d+$", x)]
34 references.update(matched_parts)
35 if link:
36 new_span += (
37 ", ".join([f"[{part}](#source-{part})" for part in matched_parts]) + "]"
38 )
39 text = text.replace(old_span, new_span)
40 references = [int(cid) for cid in references if cid.isdigit()]
41 references = sorted(references)
42 return text, references
44def _build_theme_summaries_from_commentary(commentary):
45 if commentary is None:
46 return []
48 structure = getattr(commentary, "structure", None)
49 if not structure:
50 return []
52 themes = structure.get("themes") or {}
53 points = structure.get("points") or {}
54 point_sources = structure.get("point_sources") or {}
56 summaries = []
57 for theme_title, point_ids in themes.items():
58 theme_points = []
59 for point_id in point_ids:
60 point_title = points.get(point_id)
61 if not point_title:
62 continue
64 sources = point_sources.get(point_id, [])
65 if sources:
66 # Preserve insertion order while ensuring unique references.
67 seen = set()
68 ordered_sources = [str(src) for src in sources if not (src in seen or seen.add(src))]
69 sources_text = ", ".join(ordered_sources)
70 evidence_suffix = f" [source: {sources_text}]"
71 else:
72 evidence_suffix = ""
74 theme_points.append(
75 {
76 "point_title": point_title,
77 "point_evidence": f"**Source evidence**: {point_title}{evidence_suffix}",
78 "point_commentary": f"**AI commentary**: {point_title}",
79 }
80 )
82 if theme_points:
83 summaries.append(
84 dumps(
85 {
86 "theme_title": theme_title,
87 "theme_points": theme_points,
88 }
89 )
90 )
92 return summaries
94async def answer_query(
95 ai_configuration,
96 query,
97 expanded_query,
98 processed_chunks,
99 commentary,
100):
101 print(f"Answering query with clustered ids: {commentary.get_clustered_cids()}")
102 partitioned_texts = {}
103 for theme, cids in commentary.get_clustered_cids().items():
104 partitioned_texts[theme] = [f"{cid}: {processed_chunks.cid_to_text[cid]}" for cid in cids]
105 net_new_sources = 0
107 summarized_themes_analysis = _build_theme_summaries_from_commentary(commentary)
108 batched_summarization_messages = []
109 for i, (theme, texts) in enumerate(partitioned_texts.items()):
110 previous_themes = list(theme for theme, _ in partitioned_texts.items())[:i] if i > 0 else []
111 batched_summarization_messages.append(
112 utils.prepare_messages(
113 prompts.theme_summarization_prompt,
114 {"chunks": texts, "theme": theme, "previous_themes": previous_themes, "query": expanded_query},
115 )
116 )
118 summarized_themes = await utils.map_generate_text(
119 ai_configuration,
120 batched_summarization_messages,
121 response_format=answer_schema.theme_summarization_format,
122 )
124 theme_integration_messages = utils.prepare_messages(
125 prompts.theme_integration_prompt,
126 {"content": summarized_themes, "query": query},
127 )
129 report_wrapper = utils.generate_text(
130 ai_configuration,
131 theme_integration_messages,
132 response_format=answer_schema.theme_integration_format,
133 )
135 report, references, matched_chunks = build_report_markdown(
136 query,
137 expanded_query,
138 summarized_themes_analysis or summarized_themes,
139 report_wrapper,
140 processed_chunks.cid_to_text
141 )
142 return AnswerObject(
143 extended_answer=report,
144 references=references,
145 referenced_chunks=matched_chunks,
146 net_new_sources=net_new_sources,
147 )
150def build_report_markdown(
151 query,
152 expanded_query,
153 summarized_themes,
154 report_wrapper,
155 cid_to_text
156):
157 summarized_themes_objs = [loads(text) for text in summarized_themes]
158 report_wrapper_obj = loads(report_wrapper)
159 text_jsons = [loads(text) for text in cid_to_text.values()]
160 matched_chunks = {
161 f"{text['title']} ({text['chunk_id']})": text for text in text_jsons
162 }
163 home_link = "#final-report"
164 report = f'## Query\n\n*{query}*\n\n## Expanded Query\n\n*{expanded_query}*\n\n## Answer\n\n{report_wrapper_obj["answer"]}\n\n## Analysis\n\n### {report_wrapper_obj["report_title"]}\n\n{report_wrapper_obj["report_overview"]}\n\n'
165 for theme in summarized_themes_objs:
166 report += f'#### Theme: {theme["theme_title"]}\n\n'
167 for point in theme["theme_points"]:
168 report += f'##### {point["point_title"]}\n\n{point["point_evidence"]}\n\n{point["point_commentary"]}\n\n'
169 report += (
170 f'#### Implications\n\n{report_wrapper_obj["report_implications"]}\n\n'
171 )
172 report, references = extract_and_link_chunk_references(report)
173 print(f"Extracted references: {references}")
174 report += f"## Sources\n\n"
175 for cid in references:
176 if cid in cid_to_text.keys():
177 chunk = loads(cid_to_text[cid])
178 report += f'#### Source {cid}\n\n<details>\n\n##### Text chunk: {chunk["title"]} ({chunk["chunk_id"]})\n\n{chunk["text_chunk"]}\n\n'
179 report += f"</details>\n\n[Back to top]({home_link})\n\n"
180 else:
181 print(f"No match for {cid}")
183 return report, references, matched_chunks
185def cluster_cids(relevant_cids, cid_to_vector, target_clusters):
186 clustered_cids = {}
187 if len(relevant_cids) > 0:
188 # use k-means clustering to group relevant cids into target_clusters clusters
189 cids = []
190 vectors = []
191 for relevant_cid in relevant_cids:
192 if relevant_cid in cid_to_vector:
193 cids.append(relevant_cid)
194 vectors.append(cid_to_vector[relevant_cid])
195 kmeans = cluster.KMeans(n_clusters=target_clusters)
196 kmeans.fit(vectors)
197 cluster_assignments = kmeans.predict(vectors)
199 for i, cid in enumerate(cids):
200 cluster_assignment = cluster_assignments[i]
201 if cluster_assignment not in clustered_cids:
202 clustered_cids[cluster_assignment] = []
203 clustered_cids[cluster_assignment].append(cid)
204 return clustered_cids