Coverage for intelligence_toolkit/query_text_data/api.py: 0%
114 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3from collections import defaultdict
4from enum import Enum
6import networkx as nx
7import pandas as pd
9import intelligence_toolkit.AI.utils as utils
10import intelligence_toolkit.query_text_data.answer_builder as answer_builder
11import intelligence_toolkit.query_text_data.graph_builder as graph_builder
12import intelligence_toolkit.query_text_data.helper_functions as helper_functions
13import intelligence_toolkit.query_text_data.input_processor as input_processor
14import intelligence_toolkit.query_text_data.prompts as prompts
15import intelligence_toolkit.query_text_data.query_rewriter as query_rewriter
16import intelligence_toolkit.query_text_data.relevance_assessor as relevance_assessor
17import intelligence_toolkit.helpers.document_processor as document_processor
18from intelligence_toolkit.AI.base_embedder import BaseEmbedder
19from intelligence_toolkit.AI.client import OpenAIClient
20from intelligence_toolkit.AI.openai_configuration import OpenAIConfiguration
21from intelligence_toolkit.query_text_data.classes import (
22 AnswerObject,
23 ChunkSearchConfig,
24 ProcessedChunks,
25)
26from intelligence_toolkit.query_text_data.commentary import Commentary
28class QueryTextDataStage(Enum):
29 """
30 Enum for the stages of the QueryTextData workflow.
32 Attributes:
33 INITIAL: The initial stage of the workflow.
34 CHUNKS_CREATED: The chunks have been created.
35 CHUNKS_PROCESSED: The chunks have been processed.
36 CHUNKS_EMBEDDED: The chunks have been embedded.
37 CHUNKS_MINED: The chunks have been mined.
38 QUESTION_ANSWERED: The query has been answered.
39 """
41 INITIAL = 0
42 CHUNKS_CREATED = 1
43 CHUNKS_PROCESSED = 2
44 CHUNKS_EMBEDDED = 3
45 CHUNKS_MINED = 4
46 QUESTION_ANSWERED = 5
49class QueryTextData:
50 def __init__(self) -> None:
51 self.reset_workflow()
53 def set_ai_config(
54 self, ai_configuration: OpenAIConfiguration, embedding_cache: str
55 ) -> None:
56 """
57 Set the AI configuration and embedding cache for the workflow.
59 Args:
60 ai_configuration (OpenAIConfiguration): The OpenAI configuration
61 embedding_cache (str): The embedding cache
62 """
63 self.ai_configuration = ai_configuration
64 self.embedding_cache = embedding_cache
66 def reset_workflow(self) -> None:
67 """
68 Resets the workflow to its initial state.
69 """
70 self.stage = QueryTextDataStage.INITIAL
71 self.label_to_chunks = None
72 self.processed_chunks = None
73 self.cid_to_vector = None
74 self.query = None
75 self.expanded_query = None
76 self.chunk_search_config = None
77 self.relevant_cids = None
78 self.search_summary = None
79 self.answer_config = None
80 self.answer_object = None
81 self.level_to_label_to_network = None
83 def set_embedder(self, text_embedder: BaseEmbedder) -> None:
84 """
85 Set the text embedder for the workflow.
87 Args:
88 text_embedder (BaseEmbedder): The text embedder
89 """
90 self.text_embedder = text_embedder
92 def process_data_from_files(
93 self,
94 input_files: list[str],
95 chunk_size: int = 1000,
96 callbacks: list = [],
97 ) -> dict[str, list[str]]:
98 """
99 Process data from files.
101 Args:
102 input_files (str): The list of input files
103 new_after_n_chars (int): The minimum partition size (characters)
104 max_characters (int): The maximum partition size (characters)
105 callbacks (list): The list of callbacks
107 Returns:
108 dict[str, list[str]]: The label to chunks mapping
109 """
110 self.label_to_chunks = document_processor.convert_files_to_chunks(
111 input_files,
112 chunk_size=chunk_size,
113 callbacks=callbacks
114 )
115 self.stage = QueryTextDataStage.CHUNKS_CREATED
116 return self.label_to_chunks
118 def process_text_chunks(
119 self,
120 max_cluster_size: int = 25,
121 min_edge_weight: int = 2,
122 min_node_degree: int = 2,
123 callbacks=[],
124 ) -> ProcessedChunks:
125 """
126 Process text chunks by extracting noun-phrase coooccurrences into a concept graph.
128 Args:
129 max_cluster_size (int): The maximum cluster size
130 min_edge_weight (int): The minimum edge weight
131 min_node_degree (int): The minimum node degree
132 callbacks (list): The list of callbacks
134 Returns:
135 ProcessedChunks: The processed chunks
136 """
137 self.processed_chunks = input_processor.process_chunks(
138 self.label_to_chunks,
139 max_cluster_size,
140 min_edge_weight,
141 min_node_degree,
142 callbacks=callbacks,
143 )
144 self.stage = QueryTextDataStage.CHUNKS_PROCESSED
145 return self.processed_chunks
147 async def embed_text_chunks(self, callbacks: list = []) -> dict[int, list[float]]:
148 """
149 Embed text chunks.
151 Args:
152 callbacks (list): The list of callbacks
154 Returns:
155 dict[int, list[float]]: The chunk ID to vector mapping
156 """
157 self.cid_to_vector = await helper_functions.embed_texts(
158 self.processed_chunks.cid_to_text,
159 self.text_embedder,
160 cache_data=self.embedding_cache,
161 callbacks=callbacks,
162 )
163 self.stage = QueryTextDataStage.CHUNKS_EMBEDDED
164 return self.cid_to_vector
166 async def anchor_query_to_concepts(
167 self, query: str, top_concepts: int = 100
168 ) -> str:
169 """
170 Anchor the query to the top concepts in the graph.
171 """
172 anchored_query = await query_rewriter.rewrite_query(
173 self.ai_configuration,
174 query,
175 self.processed_chunks.period_concept_graphs["ALL"],
176 top_concepts,
177 )
178 return anchored_query
180 async def detect_relevant_text_chunks(
181 self,
182 query: str,
183 expanded_query: str,
184 chunk_search_config: ChunkSearchConfig,
185 chunk_progress_callback=None,
186 chunk_callback=None,
187 analysis_callback=None,
188 commentary_callback=None,
189 ) -> tuple[list[int], str]:
190 """
191 Detect relevant text chunks.
193 Args:
194 query (str): The query
195 chunk_search_config (ChunkSearchConfig): The chunk search configuration
196 chunk_progress_callback: The chunk progress callback
197 chunk_callback: The chunk callback
198 analysis_callback: The analysis callback
199 commentary_callback: The commentary callback
201 Returns:
202 tuple[list[int], str]: The relevant chunk IDs and search summary
203 """
204 self.query = query
205 self.expanded_query = expanded_query
206 self.chunk_search_config = chunk_search_config
207 self.commentary = Commentary(
208 self.ai_configuration,
209 self.query,
210 self.processed_chunks.cid_to_text,
211 self.chunk_search_config.analysis_update_interval,
212 analysis_callback,
213 commentary_callback
214 )
215 (
216 self.relevant_cids,
217 self.search_summary,
218 ) = await relevance_assessor.detect_relevant_chunks(
219 ai_configuration=self.ai_configuration,
220 query=self.expanded_query,
221 processed_chunks=self.processed_chunks,
222 cid_to_vector=self.cid_to_vector,
223 embedder=self.text_embedder,
224 embedding_cache=self.embedding_cache,
225 chunk_search_config=self.chunk_search_config,
226 chunk_progress_callback=chunk_progress_callback,
227 chunk_callback=chunk_callback,
228 commentary=self.commentary,
229 )
230 self.stage = QueryTextDataStage.CHUNKS_MINED
231 return self.relevant_cids, self.search_summary
233 async def answer_query_with_relevant_chunks(
234 self,
235 ) -> AnswerObject:
236 """
237 Answer a query with relevant chunks.
239 Returns:
240 AnswerObject: The answer object
241 """
242 self.answer_object: AnswerObject = await answer_builder.answer_query(
243 self.ai_configuration,
244 self.query,
245 self.expanded_query,
246 self.processed_chunks,
247 self.commentary,
248 )
249 self.stage = QueryTextDataStage.QUESTION_ANSWERED
250 return self.answer_object
252 def build_concept_community_graph(self) -> dict[int, dict[str, nx.Graph]]:
253 """
254 Build the concept community graph.
256 Returns:
257 dict[int, dict[str, nx.Graph]]: The community level to community label to community network mapping
258 """
259 self.level_to_label_to_network = graph_builder.build_meta_graph(
260 self.processed_chunks.period_concept_graphs["ALL"],
261 self.processed_chunks.hierarchical_communities,
262 )
263 return self.level_to_label_to_network
265 def condense_answer(self, ai_instructions=prompts.user_prompt, callbacks=[]) -> str:
266 """
267 Condense the answer.
269 Args:
270 ai_instructions: The AI instructions
271 callbacks: The list of callbacks
272 """
273 variables = {
274 "query": self.query,
275 "answer": self.answer_object.extended_answer,
276 }
277 messages = utils.generate_messages(
278 ai_instructions,
279 prompts.list_prompts["report_prompt"],
280 variables,
281 prompts.list_prompts["safety_prompt"],
282 )
283 self.condensed_answer = OpenAIClient(self.ai_configuration).generate_chat(
284 messages, callbacks=callbacks
285 )
286 return self.condensed_answer
288 def prepare_for_new_query(self) -> None:
289 """
290 Prepare for a new query.
291 """
292 self.query = None
293 self.expanded_query = None
294 self.chunk_search_config = None
295 self.relevant_cids = None
296 self.search_summary = None
297 self.answer_config = None
298 self.answer_object = None
299 self.level_to_label_to_network = None
300 self.stage = QueryTextDataStage.CHUNKS_EMBEDDED
302 def prepare_for_new_answer(self) -> None:
303 """
304 Prepare for a new query.
305 """
306 self.answer_config = None
307 self.answer_object = None
308 self.stage = QueryTextDataStage.CHUNKS_MINED
310 def get_chunks_as_df(self) -> pd.DataFrame:
311 flat_data = []
312 for key, json_list in self.label_to_chunks.items():
313 for json_str in json_list:
314 item_data = {
315 "file_name": key,
316 "text_to_label_str": json_str,
317 }
318 flat_data.append(item_data)
320 return pd.DataFrame(flat_data)
322 def import_chunks_from_str(self, data: str) -> None:
323 chunks_df = pd.read_csv(data)
324 data_imported = defaultdict(list)
325 for _, row in chunks_df.iterrows():
326 key = row["file_name"]
327 row_data = row["text_to_label_str"]
328 data_imported[key].append(row_data)
330 self.label_to_chunks = data_imported
332 async def generate_analysis_commentary(self) -> None:
333 return await self.commentary.generate_commentary()
335 def __repr__(self):
336 return f"QueryTextData()"