Coverage for intelligence_toolkit/query_text_data/input_processor.py: 0%
114 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
4import io
5from collections import defaultdict
6from datetime import datetime
7from enum import Enum
8from json import dumps, loads
10import networkx as nx
12import intelligence_toolkit.query_text_data.graph_builder as graph_builder
13from intelligence_toolkit.AI.text_splitter import TextSplitter
14from intelligence_toolkit.query_text_data.classes import ProcessedChunks
16PeriodOption = Enum("Period", "NONE DAY WEEK MONTH QUARTER YEAR")
19def concert_titled_texts_to_chunks(titled_texts):
20 text_to_chunks = defaultdict(list)
21 splitter = TextSplitter()
22 for title, text in enumerate(titled_texts.items()):
23 text_chunks = splitter.split(text)
24 for index, text in enumerate(text_chunks):
25 chunk = {"title": title, "text_chunk": text, "chunk_id": index + 1}
26 text_chunks[index] = dumps(chunk, indent=2, ensure_ascii=False)
27 text_to_chunks[title] = text_chunks
28 return text_to_chunks
30def process_json_text(text_json, period: PeriodOption):
31 def convert_to_year_quarter(datetm):
32 month = datetm.month
33 quarter = (month - 1) // 3 + 1
34 return f"{datetm.year}-Q{quarter}"
36 chunks = []
37 splitter = TextSplitter()
38 text_chunks = splitter.split(text_json["text"])
39 for cx, chunk in enumerate(text_chunks):
40 chunk_json = {"title": text_json["title"]}
41 if "timestamp" in text_json and period != PeriodOption.NONE:
42 timestamp = text_json["timestamp"]
43 chunk_json["timestamp"] = timestamp
44 period_str = ""
45 # Round timestamp to the enclosing period
46 datetm = datetime.fromisoformat(timestamp)
47 if period == PeriodOption.DAY:
48 period_str = datetm.strftime("%Y-%m-%d")
49 elif period == PeriodOption.WEEK:
50 period_str = datetm.strftime("%Y-%W")
51 elif period == PeriodOption.MONTH:
52 period_str = datetm.strftime("%Y-%m")
53 elif period == PeriodOption.QUARTER:
54 period_str = convert_to_year_quarter(datetm)
55 elif period == PeriodOption.YEAR:
56 period_str = str(datetm.year)
57 chunk_json["period"] = period_str
58 if "metadata" in text_json:
59 chunk_json["metadata"] = text_json["metadata"]
60 chunk_json["chunk_id"] = cx + 1
61 chunk_json["text_chunk"] = chunk
62 chunks.append(dumps(chunk_json, indent=2, ensure_ascii=False))
63 return chunks
66def process_json_texts(file_to_text_jsons, period: PeriodOption):
67 file_to_chunks = {}
68 for file, text_json in file_to_text_jsons.items():
69 file_to_chunks[file] = process_json_text(text_json, period)
70 return file_to_chunks
73def process_chunks(
74 file_to_chunks, max_cluster_size, min_edge_weight, min_node_degree, callbacks=[]
75):
76 period_concept_graphs = defaultdict(nx.Graph)
77 period_concept_graphs["ALL"] = nx.Graph()
78 node_period_counts = defaultdict(lambda: defaultdict(int))
79 edge_period_counts = defaultdict(lambda: defaultdict(int))
80 previous_chunk = {}
81 next_chunk = {}
82 concept_to_cids = defaultdict(list)
83 cid_to_concepts = defaultdict(list)
84 period_to_cids = defaultdict(list)
85 file_cids = []
86 cid_to_text = {}
87 text_to_cid = {}
88 chunk_id = 1
89 file_to_cids = defaultdict(list)
90 for file, chunks in file_to_chunks.items():
91 for chunk in chunks:
92 cid_to_text[chunk_id] = chunk
93 text_to_cid[chunk] = chunk_id
94 file_to_cids[file].append(chunk_id)
95 chunk_id += 1
96 for file, cids in file_to_cids.items():
97 for cx, cid in enumerate(cids):
98 file_cids.append((file, cid))
99 if cx > 0:
100 previous_chunk[cid] = cid - 1
101 if cx < len(chunks) - 1:
102 next_chunk[cid] = cid + 1
103 for cx, (file, cid) in enumerate(file_cids):
104 for cb in callbacks:
105 cb.on_batch_change(cx + 1, len(file_cids))
106 period = None
107 chunk = cid_to_text[cid]
108 try:
109 chunk_json = loads(chunk)
110 if "period" in chunk_json:
111 period = chunk_json["period"]
112 except Exception as e:
113 print(e)
114 pass
115 periods = ["ALL"]
116 period_to_cids["ALL"].append(cid)
117 if period is not None:
118 periods.append(period)
119 period_to_cids[period].append(cid)
120 graph_builder.update_concept_graph_edges(
121 node_period_counts,
122 edge_period_counts,
123 periods,
124 chunk,
125 cid,
126 concept_to_cids,
127 cid_to_concepts,
128 )
130 for node, period_counts in node_period_counts.items():
131 for period, count in period_counts.items():
132 period_concept_graphs[period].add_node(node, count=count)
133 for edge, period_counts in edge_period_counts.items():
134 for period, count in period_counts.items():
135 period_concept_graphs[period].add_edge(edge[0], edge[1], weight=count)
137 hierarchical_communities = {}
138 community_to_label = {}
139 if len(period_concept_graphs["ALL"].nodes()) > 0:
140 (hierarchical_communities, community_to_label) = (
141 graph_builder.prepare_concept_graphs(
142 period_concept_graphs,
143 max_cluster_size=max_cluster_size,
144 min_edge_weight=min_edge_weight,
145 min_node_degree=min_node_degree,
146 )
147 )
148 return ProcessedChunks(
149 cid_to_text=cid_to_text,
150 text_to_cid=text_to_cid,
151 period_concept_graphs=period_concept_graphs,
152 hierarchical_communities=hierarchical_communities,
153 community_to_label=community_to_label,
154 concept_to_cids=concept_to_cids,
155 cid_to_concepts=cid_to_concepts,
156 previous_cid=previous_chunk,
157 next_cid=next_chunk,
158 period_to_cids=period_to_cids,
159 node_period_counts=node_period_counts,
160 edge_period_counts=edge_period_counts,
161 )