Coverage for intelligence_toolkit/query_text_data/commentary.py: 0%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1 

2import intelligence_toolkit.AI.utils as utils 

3import intelligence_toolkit.query_text_data.prompts as prompts 

4import intelligence_toolkit.query_text_data.answer_schema as answer_schema 

5from intelligence_toolkit.AI.client import OpenAIClient 

6from json import loads, dumps 

7 

8class Commentary: 

9 

10 def __init__(self, ai_configuration, query, cid_to_text, update_interval, analysis_callback, commentary_callback): 

11 self.ai_configuration = ai_configuration 

12 self.query = query 

13 self.analysis_callback = analysis_callback 

14 self.commentary_callback = commentary_callback 

15 self.cid_to_text = cid_to_text 

16 self.update_interval = update_interval 

17 self.unprocessed_chunks = {} 

18 self.structure = { 

19 "points": {}, 

20 "point_sources": {}, 

21 "themes": {}, 

22 } 

23 

24 def add_chunks(self, chunks: dict[int, str]): 

25 self.unprocessed_chunks.update(chunks) 

26 if self.update_interval > 0 and len(self.unprocessed_chunks) >= self.update_interval: 

27 self.update_analysis(self.unprocessed_chunks) 

28 self.unprocessed_chunks = {} 

29 

30 def complete_analysis(self): 

31 if self.update_interval > 0 and len(self.unprocessed_chunks) > 0: 

32 self.update_analysis(self.unprocessed_chunks) 

33 self.unprocessed_chunks = {} 

34 

35 def update_analysis(self, chunks: dict[int, str]): 

36 messages = utils.prepare_messages( 

37 prompts.thematic_update_prompt, {"sources": "\n\n".join([f"{k}:\n\n{v}" for k, v in chunks.items()]), "query": self.query, "structure": dumps(self.structure, indent=2)} 

38 ) 

39 callbacks = [self.analysis_callback] if self.analysis_callback is not None else [] 

40 updates = OpenAIClient(self.ai_configuration).generate_chat( 

41 messages, 

42 stream=False, 

43 response_format=answer_schema.thematic_update_format, 

44 callbacks=callbacks 

45 ) 

46 update_obj = loads(updates) 

47 self.structure["themes"] = {} 

48 for u in update_obj["updates"]: 

49 point_id = u["point_id"] 

50 point_title = u["point_title"] 

51 source_ids = u["source_ids"] 

52 if point_id not in self.structure["points"]: 

53 self.structure["points"][point_id] = point_title 

54 if point_title != "": 

55 self.structure["points"][point_id] = point_title 

56 if point_id not in self.structure["point_sources"]: 

57 self.structure["point_sources"][point_id] = [] 

58 for s in source_ids: 

59 if s not in self.structure["point_sources"][point_id]: 

60 self.structure["point_sources"][point_id].append(s) 

61 for t in update_obj["themes"]: 

62 theme_title = t["theme_title"] 

63 point_ids = t["point_ids"] 

64 self.structure["themes"][theme_title] = point_ids 

65 for callback in callbacks: 

66 callback.on_llm_new_token(self.format_structure()) 

67 

68 def format_structure(self): 

69 output = "" 

70 all_source_ids = set() 

71 

72 # Build the themes and points with clickable links 

73 for theme_title, point_ids in self.structure["themes"].items(): 

74 output += f"- **{theme_title}**\n" 

75 for point_id in point_ids: 

76 if point_id in self.structure["point_sources"]: 

77 # Create clickable markdown links similar to extract_and_link_chunk_references 

78 source_links = ", ".join([f"[{x}](#source-{x})" for x in self.structure["point_sources"][point_id]]) 

79 output += f" - {self.structure['points'][point_id]} [source: {source_links}]\n" 

80 # Collect all source IDs for the Sources section 

81 all_source_ids.update(self.structure["point_sources"][point_id]) 

82 

83 # Add Sources section like in build_report_markdown 

84 if all_source_ids: 

85 output += f"\n## Sources\n\n" 

86 home_link = "#final-report" # Link back to the top 

87 for cid in sorted(all_source_ids): 

88 if cid in self.cid_to_text: 

89 from json import loads 

90 chunk = loads(self.cid_to_text[cid]) 

91 output += f'#### Source {cid}\n\n<details>\n\n##### Text chunk: {chunk["title"]} ({chunk["chunk_id"]})\n\n{chunk["text_chunk"]}\n\n' 

92 output += f"</details>\n\n[Back to top]({home_link})\n\n" 

93 

94 return output 

95 

96 def get_clustered_cids(self): 

97 if self.update_interval > 0: 

98 clustered_cids = {} 

99 current_cluster = [] 

100 for theme_title, point_ids in self.structure["themes"].items(): 

101 current_cluster = [] 

102 for point_id in point_ids: 

103 if point_id not in self.structure["point_sources"]: 

104 continue 

105 source_ids = self.structure["point_sources"][point_id] 

106 for source_id in source_ids: 

107 if source_id not in current_cluster: 

108 current_cluster.append(source_id) 

109 clustered_cids[theme_title] = current_cluster 

110 return clustered_cids 

111 else: 

112 return {"All relevant chunks": list(self.unprocessed_chunks.keys())} 

113 

114 async def generate_commentary(self): 

115 structure = self.format_structure() 

116 selected_cids = set() 

117 for theme, cid_list in self.get_clustered_cids().items(): 

118 selected_cids.update(cid_list[:3]) 

119 indexed_chunks = "\n\n".join([f"{cid}:\n\n{self.cid_to_text[cid]}" for cid in selected_cids]) 

120 messages = utils.prepare_messages( 

121 prompts.commentary_prompt, {"query": self.query, "structure": structure, "chunks": indexed_chunks} 

122 ) 

123 callbacks = [self.commentary_callback] if self.commentary_callback is not None else [] 

124 commentary = await OpenAIClient(self.ai_configuration).generate_chat_async( 

125 messages, 

126 stream=True, 

127 callbacks=callbacks 

128 ) 

129 return commentary