Coverage for intelligence_toolkit/query_text_data/input_processor.py: 0%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3 

4import io 

5from collections import defaultdict 

6from datetime import datetime 

7from enum import Enum 

8from json import dumps, loads 

9 

10import networkx as nx 

11 

12import intelligence_toolkit.query_text_data.graph_builder as graph_builder 

13from intelligence_toolkit.AI.text_splitter import TextSplitter 

14from intelligence_toolkit.query_text_data.classes import ProcessedChunks 

15 

16PeriodOption = Enum("Period", "NONE DAY WEEK MONTH QUARTER YEAR") 

17 

18 

19def concert_titled_texts_to_chunks(titled_texts): 

20 text_to_chunks = defaultdict(list) 

21 splitter = TextSplitter() 

22 for title, text in enumerate(titled_texts.items()): 

23 text_chunks = splitter.split(text) 

24 for index, text in enumerate(text_chunks): 

25 chunk = {"title": title, "text_chunk": text, "chunk_id": index + 1} 

26 text_chunks[index] = dumps(chunk, indent=2, ensure_ascii=False) 

27 text_to_chunks[title] = text_chunks 

28 return text_to_chunks 

29 

30def process_json_text(text_json, period: PeriodOption): 

31 def convert_to_year_quarter(datetm): 

32 month = datetm.month 

33 quarter = (month - 1) // 3 + 1 

34 return f"{datetm.year}-Q{quarter}" 

35 

36 chunks = [] 

37 splitter = TextSplitter() 

38 text_chunks = splitter.split(text_json["text"]) 

39 for cx, chunk in enumerate(text_chunks): 

40 chunk_json = {"title": text_json["title"]} 

41 if "timestamp" in text_json and period != PeriodOption.NONE: 

42 timestamp = text_json["timestamp"] 

43 chunk_json["timestamp"] = timestamp 

44 period_str = "" 

45 # Round timestamp to the enclosing period 

46 datetm = datetime.fromisoformat(timestamp) 

47 if period == PeriodOption.DAY: 

48 period_str = datetm.strftime("%Y-%m-%d") 

49 elif period == PeriodOption.WEEK: 

50 period_str = datetm.strftime("%Y-%W") 

51 elif period == PeriodOption.MONTH: 

52 period_str = datetm.strftime("%Y-%m") 

53 elif period == PeriodOption.QUARTER: 

54 period_str = convert_to_year_quarter(datetm) 

55 elif period == PeriodOption.YEAR: 

56 period_str = str(datetm.year) 

57 chunk_json["period"] = period_str 

58 if "metadata" in text_json: 

59 chunk_json["metadata"] = text_json["metadata"] 

60 chunk_json["chunk_id"] = cx + 1 

61 chunk_json["text_chunk"] = chunk 

62 chunks.append(dumps(chunk_json, indent=2, ensure_ascii=False)) 

63 return chunks 

64 

65 

66def process_json_texts(file_to_text_jsons, period: PeriodOption): 

67 file_to_chunks = {} 

68 for file, text_json in file_to_text_jsons.items(): 

69 file_to_chunks[file] = process_json_text(text_json, period) 

70 return file_to_chunks 

71 

72 

73def process_chunks( 

74 file_to_chunks, max_cluster_size, min_edge_weight, min_node_degree, callbacks=[] 

75): 

76 period_concept_graphs = defaultdict(nx.Graph) 

77 period_concept_graphs["ALL"] = nx.Graph() 

78 node_period_counts = defaultdict(lambda: defaultdict(int)) 

79 edge_period_counts = defaultdict(lambda: defaultdict(int)) 

80 previous_chunk = {} 

81 next_chunk = {} 

82 concept_to_cids = defaultdict(list) 

83 cid_to_concepts = defaultdict(list) 

84 period_to_cids = defaultdict(list) 

85 file_cids = [] 

86 cid_to_text = {} 

87 text_to_cid = {} 

88 chunk_id = 1 

89 file_to_cids = defaultdict(list) 

90 for file, chunks in file_to_chunks.items(): 

91 for chunk in chunks: 

92 cid_to_text[chunk_id] = chunk 

93 text_to_cid[chunk] = chunk_id 

94 file_to_cids[file].append(chunk_id) 

95 chunk_id += 1 

96 for file, cids in file_to_cids.items(): 

97 for cx, cid in enumerate(cids): 

98 file_cids.append((file, cid)) 

99 if cx > 0: 

100 previous_chunk[cid] = cid - 1 

101 if cx < len(chunks) - 1: 

102 next_chunk[cid] = cid + 1 

103 for cx, (file, cid) in enumerate(file_cids): 

104 for cb in callbacks: 

105 cb.on_batch_change(cx + 1, len(file_cids)) 

106 period = None 

107 chunk = cid_to_text[cid] 

108 try: 

109 chunk_json = loads(chunk) 

110 if "period" in chunk_json: 

111 period = chunk_json["period"] 

112 except Exception as e: 

113 print(e) 

114 pass 

115 periods = ["ALL"] 

116 period_to_cids["ALL"].append(cid) 

117 if period is not None: 

118 periods.append(period) 

119 period_to_cids[period].append(cid) 

120 graph_builder.update_concept_graph_edges( 

121 node_period_counts, 

122 edge_period_counts, 

123 periods, 

124 chunk, 

125 cid, 

126 concept_to_cids, 

127 cid_to_concepts, 

128 ) 

129 

130 for node, period_counts in node_period_counts.items(): 

131 for period, count in period_counts.items(): 

132 period_concept_graphs[period].add_node(node, count=count) 

133 for edge, period_counts in edge_period_counts.items(): 

134 for period, count in period_counts.items(): 

135 period_concept_graphs[period].add_edge(edge[0], edge[1], weight=count) 

136 

137 hierarchical_communities = {} 

138 community_to_label = {} 

139 if len(period_concept_graphs["ALL"].nodes()) > 0: 

140 (hierarchical_communities, community_to_label) = ( 

141 graph_builder.prepare_concept_graphs( 

142 period_concept_graphs, 

143 max_cluster_size=max_cluster_size, 

144 min_edge_weight=min_edge_weight, 

145 min_node_degree=min_node_degree, 

146 ) 

147 ) 

148 return ProcessedChunks( 

149 cid_to_text=cid_to_text, 

150 text_to_cid=text_to_cid, 

151 period_concept_graphs=period_concept_graphs, 

152 hierarchical_communities=hierarchical_communities, 

153 community_to_label=community_to_label, 

154 concept_to_cids=concept_to_cids, 

155 cid_to_concepts=cid_to_concepts, 

156 previous_cid=previous_chunk, 

157 next_cid=next_chunk, 

158 period_to_cids=period_to_cids, 

159 node_period_counts=node_period_counts, 

160 edge_period_counts=edge_period_counts, 

161 )