1use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::mpsc;
14
15use crate::llm_agent::{LlmAgent, LlmConfig};
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct FusekiAgent {
19 config: ActorConfig,
20 fuseki_url: String,
21 dataset: String,
22 fuseki_user: String,
23 fuseki_password: String,
24 http: reqwest::Client,
25 llm: Option<LlmAgent>,
26 state: ActorState,
27 metrics: Arc<ActorMetrics>,
28 mailbox_tx: mpsc::Sender<Message>,
29 mailbox_rx: Option<mpsc::Receiver<Message>>,
30 publisher: Option<EventPublisher>,
31}
32
33impl FusekiAgent {
34 fn dataset_base_url(&self) -> String {
35 let base = self.fuseki_url.trim_end_matches('/');
36 let dataset = self.dataset.trim();
37 if dataset.is_empty() {
38 base.to_string()
39 } else if dataset.starts_with('/') {
40 format!("{base}{dataset}")
41 } else {
42 format!("{base}/{dataset}")
43 }
44 }
45
46 pub fn new(config: ActorConfig) -> Self {
47 let fuseki_url =
48 std::env::var("FUSEKI_URL").unwrap_or_else(|_| "http://fuseki:3030".into());
49 let dataset = std::env::var("FUSEKI_DATASET").unwrap_or_else(|_| "/ds".into());
50 let fuseki_user = std::env::var("FUSEKI_USER").unwrap_or_default();
51 let fuseki_password = std::env::var("FUSEKI_PASSWORD").unwrap_or_default();
52 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
53 Self {
54 config,
55 fuseki_url,
56 dataset,
57 fuseki_user,
58 fuseki_password,
59 http: reqwest::Client::new(),
60 llm: None,
61 state: ActorState::Initializing,
62 metrics: Arc::new(ActorMetrics::new()),
63 mailbox_tx: tx,
64 mailbox_rx: Some(rx),
65 publisher: None,
66 }
67 }
68
69 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
70 self.publisher = Some(p);
71 self
72 }
73
74 pub fn with_fuseki_config(
76 mut self,
77 url: impl Into<String>,
78 dataset: impl Into<String>,
79 ) -> Self {
80 let url = url.into();
81 let dataset = dataset.into();
82 if !url.is_empty() {
83 self.fuseki_url = url;
84 }
85 if !dataset.is_empty() {
86 self.dataset = dataset;
87 }
88 self
89 }
90
91 pub fn with_fuseki_auth(
92 mut self,
93 user: impl Into<String>,
94 password: impl Into<String>,
95 ) -> Self {
96 self.fuseki_user = user.into();
97 self.fuseki_password = password.into();
98 self
99 }
100
101 pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
102 let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
103 self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
104 self
105 }
106
107 async fn sparql_query(&self, query: &str) -> Result<serde_json::Value> {
109 let url = format!("{}/query", self.dataset_base_url());
110 let mut req = self
111 .http
112 .post(&url)
113 .header("Content-Type", "application/sparql-query")
114 .header("Accept", "application/sparql-results+json");
115 if !self.fuseki_user.is_empty() {
116 req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
117 }
118 let resp = req.body(query.to_string()).send().await?;
119 if !resp.status().is_success() {
120 let s = resp.status();
121 let t = resp.text().await.unwrap_or_default();
122 anyhow::bail!("Fuseki {s}: {t}");
123 }
124 Ok(resp.json().await?)
125 }
126
127 async fn sparql_update(&self, update: &str) -> Result<()> {
129 let url = format!("{}/update", self.dataset_base_url());
130 let mut req = self
131 .http
132 .post(&url)
133 .header("Content-Type", "application/sparql-update");
134 if !self.fuseki_user.is_empty() {
135 req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
136 }
137 let resp = req.body(update.to_string()).send().await?;
138 if !resp.status().is_success() {
139 let s = resp.status();
140 let t = resp.text().await.unwrap_or_default();
141 anyhow::bail!("Fuseki update {s}: {t}");
142 }
143 Ok(())
144 }
145
146 async fn process(&mut self, text: &str) -> String {
147 let trimmed = text.trim().to_uppercase();
149 let endpoint = self.dataset_base_url();
150 if trimmed.starts_with("SELECT")
151 || trimmed.starts_with("ASK")
152 || trimmed.starts_with("CONSTRUCT")
153 {
154 match self.sparql_query(text).await {
155 Ok(v) => format!(
156 "SPARQL results:\n{}",
157 serde_json::to_string_pretty(&v).unwrap_or_else(|_| v.to_string())
158 ),
159 Err(e) => format!("Fuseki error: {e}"),
160 }
161 } else if trimmed.starts_with("INSERT")
162 || trimmed.starts_with("DELETE")
163 || trimmed.starts_with("WITH")
164 {
165 match self.sparql_update(text).await {
166 Ok(()) => "Update executed successfully.".into(),
167 Err(e) => format!("Fuseki update error: {e}"),
168 }
169 } else if let Some(llm) = &mut self.llm {
170 let prompt = format!(
171 "You are a SPARQL/RDF expert connected to a Fuseki endpoint at {}\n\
172 The user asked: \"{text}\"\n\
173 Generate a SPARQL query or respond helpfully. \
174 If you generate a query, wrap it in ```sparql ... ``` fences.",
175 endpoint
176 );
177 llm.complete(&prompt)
178 .await
179 .unwrap_or_else(|e| format!("LLM error: {e}"))
180 } else {
181 format!(
182 "Provide a SPARQL query (SELECT/ASK/INSERT/DELETE) to execute against {}",
183 endpoint
184 )
185 }
186 }
187
188 fn now_ms() -> u64 {
189 std::time::SystemTime::now()
190 .duration_since(std::time::UNIX_EPOCH)
191 .unwrap_or_default()
192 .as_millis() as u64
193 }
194}
195
196#[async_trait]
197impl Actor for FusekiAgent {
198 fn id(&self) -> String {
199 self.config.id.clone()
200 }
201 fn name(&self) -> &str {
202 &self.config.name
203 }
204 fn state(&self) -> ActorState {
205 self.state.clone()
206 }
207 fn metrics(&self) -> Arc<ActorMetrics> {
208 Arc::clone(&self.metrics)
209 }
210 fn mailbox(&self) -> mpsc::Sender<Message> {
211 self.mailbox_tx.clone()
212 }
213
214 async fn on_start(&mut self) -> Result<()> {
215 self.state = ActorState::Running;
216 tracing::info!(
217 "[{}] Fuseki agent → {}",
218 self.config.name,
219 self.dataset_base_url()
220 );
221 if let Some(pub_) = &self.publisher {
222 pub_.publish(
223 wactorz_mqtt::topics::spawn(&self.config.id),
224 &serde_json::json!({
225 "agentId": self.config.id,
226 "agentName": self.config.name,
227 "agentType": "fuseki",
228 "fusekiUrl": self.fuseki_url,
229 "dataset": self.dataset,
230 "timestampMs": Self::now_ms(),
231 }),
232 );
233 }
234 Ok(())
235 }
236
237 async fn handle_message(&mut self, message: Message) -> Result<()> {
238 use wactorz_core::message::MessageType;
239 let text = match &message.payload {
240 MessageType::Text { content } => content.clone(),
241 MessageType::Task { description, .. } => description.clone(),
242 _ => return Ok(()),
243 };
244 let response = self.process(&text).await;
245 if let Some(pub_) = &self.publisher {
246 pub_.publish(
247 wactorz_mqtt::topics::chat(&self.config.id),
248 &serde_json::json!({
249 "from": self.config.name,
250 "to": message.from.as_deref().unwrap_or("user"),
251 "content": response,
252 "timestampMs": Self::now_ms(),
253 }),
254 );
255 }
256 Ok(())
257 }
258
259 async fn on_heartbeat(&mut self) -> Result<()> {
260 if let Some(pub_) = &self.publisher {
261 pub_.publish(
262 wactorz_mqtt::topics::heartbeat(&self.config.id),
263 &serde_json::json!({
264 "agentId": self.config.id,
265 "agentName": self.config.name,
266 "state": self.state,
267 "fusekiUrl": self.fuseki_url,
268 "timestampMs": Self::now_ms(),
269 }),
270 );
271 }
272 Ok(())
273 }
274
275 async fn run(&mut self) -> Result<()> {
276 self.on_start().await?;
277 let mut rx = self
278 .mailbox_rx
279 .take()
280 .ok_or_else(|| anyhow::anyhow!("FusekiAgent already running"))?;
281 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
282 self.config.heartbeat_interval_secs,
283 ));
284 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
285 loop {
286 tokio::select! {
287 biased;
288 msg = rx.recv() => {
289 match msg {
290 None => break,
291 Some(m) => {
292 self.metrics.record_received();
293 if let wactorz_core::message::MessageType::Command {
294 command: wactorz_core::message::ActorCommand::Stop
295 } = &m.payload { break; }
296 match self.handle_message(m).await {
297 Ok(_) => self.metrics.record_processed(),
298 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
299 }
300 }
301 }
302 }
303 _ = hb.tick() => {
304 self.metrics.record_heartbeat();
305 if let Err(e) = self.on_heartbeat().await {
306 tracing::error!("[{}] heartbeat: {e}", self.config.name);
307 }
308 }
309 }
310 }
311 self.state = ActorState::Stopped;
312 self.on_stop().await
313 }
314}