Coverage for intelligence_toolkit/query_text_data/api.py: 0%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3from collections import defaultdict 

4from enum import Enum 

5 

6import networkx as nx 

7import pandas as pd 

8 

9import intelligence_toolkit.AI.utils as utils 

10import intelligence_toolkit.query_text_data.answer_builder as answer_builder 

11import intelligence_toolkit.query_text_data.graph_builder as graph_builder 

12import intelligence_toolkit.query_text_data.helper_functions as helper_functions 

13import intelligence_toolkit.query_text_data.input_processor as input_processor 

14import intelligence_toolkit.query_text_data.prompts as prompts 

15import intelligence_toolkit.query_text_data.query_rewriter as query_rewriter 

16import intelligence_toolkit.query_text_data.relevance_assessor as relevance_assessor 

17import intelligence_toolkit.helpers.document_processor as document_processor 

18from intelligence_toolkit.AI.base_embedder import BaseEmbedder 

19from intelligence_toolkit.AI.client import OpenAIClient 

20from intelligence_toolkit.AI.openai_configuration import OpenAIConfiguration 

21from intelligence_toolkit.query_text_data.classes import ( 

22 AnswerObject, 

23 ChunkSearchConfig, 

24 ProcessedChunks, 

25) 

26from intelligence_toolkit.query_text_data.commentary import Commentary 

27 

28class QueryTextDataStage(Enum): 

29 """ 

30 Enum for the stages of the QueryTextData workflow. 

31 

32 Attributes: 

33 INITIAL: The initial stage of the workflow. 

34 CHUNKS_CREATED: The chunks have been created. 

35 CHUNKS_PROCESSED: The chunks have been processed. 

36 CHUNKS_EMBEDDED: The chunks have been embedded. 

37 CHUNKS_MINED: The chunks have been mined. 

38 QUESTION_ANSWERED: The query has been answered. 

39 """ 

40 

41 INITIAL = 0 

42 CHUNKS_CREATED = 1 

43 CHUNKS_PROCESSED = 2 

44 CHUNKS_EMBEDDED = 3 

45 CHUNKS_MINED = 4 

46 QUESTION_ANSWERED = 5 

47 

48 

49class QueryTextData: 

50 def __init__(self) -> None: 

51 self.reset_workflow() 

52 

53 def set_ai_config( 

54 self, ai_configuration: OpenAIConfiguration, embedding_cache: str 

55 ) -> None: 

56 """ 

57 Set the AI configuration and embedding cache for the workflow. 

58 

59 Args: 

60 ai_configuration (OpenAIConfiguration): The OpenAI configuration 

61 embedding_cache (str): The embedding cache 

62 """ 

63 self.ai_configuration = ai_configuration 

64 self.embedding_cache = embedding_cache 

65 

66 def reset_workflow(self) -> None: 

67 """ 

68 Resets the workflow to its initial state. 

69 """ 

70 self.stage = QueryTextDataStage.INITIAL 

71 self.label_to_chunks = None 

72 self.processed_chunks = None 

73 self.cid_to_vector = None 

74 self.query = None 

75 self.expanded_query = None 

76 self.chunk_search_config = None 

77 self.relevant_cids = None 

78 self.search_summary = None 

79 self.answer_config = None 

80 self.answer_object = None 

81 self.level_to_label_to_network = None 

82 

83 def set_embedder(self, text_embedder: BaseEmbedder) -> None: 

84 """ 

85 Set the text embedder for the workflow. 

86 

87 Args: 

88 text_embedder (BaseEmbedder): The text embedder 

89 """ 

90 self.text_embedder = text_embedder 

91 

92 def process_data_from_files( 

93 self, 

94 input_files: list[str], 

95 chunk_size: int = 1000, 

96 callbacks: list = [], 

97 ) -> dict[str, list[str]]: 

98 """ 

99 Process data from files. 

100 

101 Args: 

102 input_files (str): The list of input files 

103 new_after_n_chars (int): The minimum partition size (characters) 

104 max_characters (int): The maximum partition size (characters) 

105 callbacks (list): The list of callbacks 

106 

107 Returns: 

108 dict[str, list[str]]: The label to chunks mapping 

109 """ 

110 self.label_to_chunks = document_processor.convert_files_to_chunks( 

111 input_files, 

112 chunk_size=chunk_size, 

113 callbacks=callbacks 

114 ) 

115 self.stage = QueryTextDataStage.CHUNKS_CREATED 

116 return self.label_to_chunks 

117 

118 def process_text_chunks( 

119 self, 

120 max_cluster_size: int = 25, 

121 min_edge_weight: int = 2, 

122 min_node_degree: int = 2, 

123 callbacks=[], 

124 ) -> ProcessedChunks: 

125 """ 

126 Process text chunks by extracting noun-phrase coooccurrences into a concept graph. 

127 

128 Args: 

129 max_cluster_size (int): The maximum cluster size 

130 min_edge_weight (int): The minimum edge weight 

131 min_node_degree (int): The minimum node degree 

132 callbacks (list): The list of callbacks 

133 

134 Returns: 

135 ProcessedChunks: The processed chunks 

136 """ 

137 self.processed_chunks = input_processor.process_chunks( 

138 self.label_to_chunks, 

139 max_cluster_size, 

140 min_edge_weight, 

141 min_node_degree, 

142 callbacks=callbacks, 

143 ) 

144 self.stage = QueryTextDataStage.CHUNKS_PROCESSED 

145 return self.processed_chunks 

146 

147 async def embed_text_chunks(self, callbacks: list = []) -> dict[int, list[float]]: 

148 """ 

149 Embed text chunks. 

150 

151 Args: 

152 callbacks (list): The list of callbacks 

153 

154 Returns: 

155 dict[int, list[float]]: The chunk ID to vector mapping 

156 """ 

157 self.cid_to_vector = await helper_functions.embed_texts( 

158 self.processed_chunks.cid_to_text, 

159 self.text_embedder, 

160 cache_data=self.embedding_cache, 

161 callbacks=callbacks, 

162 ) 

163 self.stage = QueryTextDataStage.CHUNKS_EMBEDDED 

164 return self.cid_to_vector 

165 

166 async def anchor_query_to_concepts( 

167 self, query: str, top_concepts: int = 100 

168 ) -> str: 

169 """ 

170 Anchor the query to the top concepts in the graph. 

171 """ 

172 anchored_query = await query_rewriter.rewrite_query( 

173 self.ai_configuration, 

174 query, 

175 self.processed_chunks.period_concept_graphs["ALL"], 

176 top_concepts, 

177 ) 

178 return anchored_query 

179 

180 async def detect_relevant_text_chunks( 

181 self, 

182 query: str, 

183 expanded_query: str, 

184 chunk_search_config: ChunkSearchConfig, 

185 chunk_progress_callback=None, 

186 chunk_callback=None, 

187 analysis_callback=None, 

188 commentary_callback=None, 

189 ) -> tuple[list[int], str]: 

190 """ 

191 Detect relevant text chunks. 

192 

193 Args: 

194 query (str): The query 

195 chunk_search_config (ChunkSearchConfig): The chunk search configuration 

196 chunk_progress_callback: The chunk progress callback 

197 chunk_callback: The chunk callback 

198 analysis_callback: The analysis callback 

199 commentary_callback: The commentary callback 

200 

201 Returns: 

202 tuple[list[int], str]: The relevant chunk IDs and search summary 

203 """ 

204 self.query = query 

205 self.expanded_query = expanded_query 

206 self.chunk_search_config = chunk_search_config 

207 self.commentary = Commentary( 

208 self.ai_configuration, 

209 self.query, 

210 self.processed_chunks.cid_to_text, 

211 self.chunk_search_config.analysis_update_interval, 

212 analysis_callback, 

213 commentary_callback 

214 ) 

215 ( 

216 self.relevant_cids, 

217 self.search_summary, 

218 ) = await relevance_assessor.detect_relevant_chunks( 

219 ai_configuration=self.ai_configuration, 

220 query=self.expanded_query, 

221 processed_chunks=self.processed_chunks, 

222 cid_to_vector=self.cid_to_vector, 

223 embedder=self.text_embedder, 

224 embedding_cache=self.embedding_cache, 

225 chunk_search_config=self.chunk_search_config, 

226 chunk_progress_callback=chunk_progress_callback, 

227 chunk_callback=chunk_callback, 

228 commentary=self.commentary, 

229 ) 

230 self.stage = QueryTextDataStage.CHUNKS_MINED 

231 return self.relevant_cids, self.search_summary 

232 

233 async def answer_query_with_relevant_chunks( 

234 self, 

235 ) -> AnswerObject: 

236 """ 

237 Answer a query with relevant chunks. 

238 

239 Returns: 

240 AnswerObject: The answer object 

241 """ 

242 self.answer_object: AnswerObject = await answer_builder.answer_query( 

243 self.ai_configuration, 

244 self.query, 

245 self.expanded_query, 

246 self.processed_chunks, 

247 self.commentary, 

248 ) 

249 self.stage = QueryTextDataStage.QUESTION_ANSWERED 

250 return self.answer_object 

251 

252 def build_concept_community_graph(self) -> dict[int, dict[str, nx.Graph]]: 

253 """ 

254 Build the concept community graph. 

255 

256 Returns: 

257 dict[int, dict[str, nx.Graph]]: The community level to community label to community network mapping 

258 """ 

259 self.level_to_label_to_network = graph_builder.build_meta_graph( 

260 self.processed_chunks.period_concept_graphs["ALL"], 

261 self.processed_chunks.hierarchical_communities, 

262 ) 

263 return self.level_to_label_to_network 

264 

265 def condense_answer(self, ai_instructions=prompts.user_prompt, callbacks=[]) -> str: 

266 """ 

267 Condense the answer. 

268 

269 Args: 

270 ai_instructions: The AI instructions 

271 callbacks: The list of callbacks 

272 """ 

273 variables = { 

274 "query": self.query, 

275 "answer": self.answer_object.extended_answer, 

276 } 

277 messages = utils.generate_messages( 

278 ai_instructions, 

279 prompts.list_prompts["report_prompt"], 

280 variables, 

281 prompts.list_prompts["safety_prompt"], 

282 ) 

283 self.condensed_answer = OpenAIClient(self.ai_configuration).generate_chat( 

284 messages, callbacks=callbacks 

285 ) 

286 return self.condensed_answer 

287 

288 def prepare_for_new_query(self) -> None: 

289 """ 

290 Prepare for a new query. 

291 """ 

292 self.query = None 

293 self.expanded_query = None 

294 self.chunk_search_config = None 

295 self.relevant_cids = None 

296 self.search_summary = None 

297 self.answer_config = None 

298 self.answer_object = None 

299 self.level_to_label_to_network = None 

300 self.stage = QueryTextDataStage.CHUNKS_EMBEDDED 

301 

302 def prepare_for_new_answer(self) -> None: 

303 """ 

304 Prepare for a new query. 

305 """ 

306 self.answer_config = None 

307 self.answer_object = None 

308 self.stage = QueryTextDataStage.CHUNKS_MINED 

309 

310 def get_chunks_as_df(self) -> pd.DataFrame: 

311 flat_data = [] 

312 for key, json_list in self.label_to_chunks.items(): 

313 for json_str in json_list: 

314 item_data = { 

315 "file_name": key, 

316 "text_to_label_str": json_str, 

317 } 

318 flat_data.append(item_data) 

319 

320 return pd.DataFrame(flat_data) 

321 

322 def import_chunks_from_str(self, data: str) -> None: 

323 chunks_df = pd.read_csv(data) 

324 data_imported = defaultdict(list) 

325 for _, row in chunks_df.iterrows(): 

326 key = row["file_name"] 

327 row_data = row["text_to_label_str"] 

328 data_imported[key].append(row_data) 

329 

330 self.label_to_chunks = data_imported 

331 

332 async def generate_analysis_commentary(self) -> None: 

333 return await self.commentary.generate_commentary() 

334 

335 def __repr__(self): 

336 return f"QueryTextData()"