Coverage for intelligence_toolkit/query_text_data/commentary.py: 0%
95 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
2import intelligence_toolkit.AI.utils as utils
3import intelligence_toolkit.query_text_data.prompts as prompts
4import intelligence_toolkit.query_text_data.answer_schema as answer_schema
5from intelligence_toolkit.AI.client import OpenAIClient
6from json import loads, dumps
8class Commentary:
10 def __init__(self, ai_configuration, query, cid_to_text, update_interval, analysis_callback, commentary_callback):
11 self.ai_configuration = ai_configuration
12 self.query = query
13 self.analysis_callback = analysis_callback
14 self.commentary_callback = commentary_callback
15 self.cid_to_text = cid_to_text
16 self.update_interval = update_interval
17 self.unprocessed_chunks = {}
18 self.structure = {
19 "points": {},
20 "point_sources": {},
21 "themes": {},
22 }
24 def add_chunks(self, chunks: dict[int, str]):
25 self.unprocessed_chunks.update(chunks)
26 if self.update_interval > 0 and len(self.unprocessed_chunks) >= self.update_interval:
27 self.update_analysis(self.unprocessed_chunks)
28 self.unprocessed_chunks = {}
30 def complete_analysis(self):
31 if self.update_interval > 0 and len(self.unprocessed_chunks) > 0:
32 self.update_analysis(self.unprocessed_chunks)
33 self.unprocessed_chunks = {}
35 def update_analysis(self, chunks: dict[int, str]):
36 messages = utils.prepare_messages(
37 prompts.thematic_update_prompt, {"sources": "\n\n".join([f"{k}:\n\n{v}" for k, v in chunks.items()]), "query": self.query, "structure": dumps(self.structure, indent=2)}
38 )
39 callbacks = [self.analysis_callback] if self.analysis_callback is not None else []
40 updates = OpenAIClient(self.ai_configuration).generate_chat(
41 messages,
42 stream=False,
43 response_format=answer_schema.thematic_update_format,
44 callbacks=callbacks
45 )
46 update_obj = loads(updates)
47 self.structure["themes"] = {}
48 for u in update_obj["updates"]:
49 point_id = u["point_id"]
50 point_title = u["point_title"]
51 source_ids = u["source_ids"]
52 if point_id not in self.structure["points"]:
53 self.structure["points"][point_id] = point_title
54 if point_title != "":
55 self.structure["points"][point_id] = point_title
56 if point_id not in self.structure["point_sources"]:
57 self.structure["point_sources"][point_id] = []
58 for s in source_ids:
59 if s not in self.structure["point_sources"][point_id]:
60 self.structure["point_sources"][point_id].append(s)
61 for t in update_obj["themes"]:
62 theme_title = t["theme_title"]
63 point_ids = t["point_ids"]
64 self.structure["themes"][theme_title] = point_ids
65 for callback in callbacks:
66 callback.on_llm_new_token(self.format_structure())
68 def format_structure(self):
69 output = ""
70 all_source_ids = set()
72 # Build the themes and points with clickable links
73 for theme_title, point_ids in self.structure["themes"].items():
74 output += f"- **{theme_title}**\n"
75 for point_id in point_ids:
76 if point_id in self.structure["point_sources"]:
77 # Create clickable markdown links similar to extract_and_link_chunk_references
78 source_links = ", ".join([f"[{x}](#source-{x})" for x in self.structure["point_sources"][point_id]])
79 output += f" - {self.structure['points'][point_id]} [source: {source_links}]\n"
80 # Collect all source IDs for the Sources section
81 all_source_ids.update(self.structure["point_sources"][point_id])
83 # Add Sources section like in build_report_markdown
84 if all_source_ids:
85 output += f"\n## Sources\n\n"
86 home_link = "#final-report" # Link back to the top
87 for cid in sorted(all_source_ids):
88 if cid in self.cid_to_text:
89 from json import loads
90 chunk = loads(self.cid_to_text[cid])
91 output += f'#### Source {cid}\n\n<details>\n\n##### Text chunk: {chunk["title"]} ({chunk["chunk_id"]})\n\n{chunk["text_chunk"]}\n\n'
92 output += f"</details>\n\n[Back to top]({home_link})\n\n"
94 return output
96 def get_clustered_cids(self):
97 if self.update_interval > 0:
98 clustered_cids = {}
99 current_cluster = []
100 for theme_title, point_ids in self.structure["themes"].items():
101 current_cluster = []
102 for point_id in point_ids:
103 if point_id not in self.structure["point_sources"]:
104 continue
105 source_ids = self.structure["point_sources"][point_id]
106 for source_id in source_ids:
107 if source_id not in current_cluster:
108 current_cluster.append(source_id)
109 clustered_cids[theme_title] = current_cluster
110 return clustered_cids
111 else:
112 return {"All relevant chunks": list(self.unprocessed_chunks.keys())}
114 async def generate_commentary(self):
115 structure = self.format_structure()
116 selected_cids = set()
117 for theme, cid_list in self.get_clustered_cids().items():
118 selected_cids.update(cid_list[:3])
119 indexed_chunks = "\n\n".join([f"{cid}:\n\n{self.cid_to_text[cid]}" for cid in selected_cids])
120 messages = utils.prepare_messages(
121 prompts.commentary_prompt, {"query": self.query, "structure": structure, "chunks": indexed_chunks}
122 )
123 callbacks = [self.commentary_callback] if self.commentary_callback is not None else []
124 commentary = await OpenAIClient(self.ai_configuration).generate_chat_async(
125 messages,
126 stream=True,
127 callbacks=callbacks
128 )
129 return commentary