wactorz_agents/
fuseki_agent.rs

1//! Apache Jena Fuseki SPARQL agent.
2//!
3//! [`FusekiAgent`] executes SPARQL queries and updates against an Apache
4//! Jena Fuseki endpoint.  It also supports LLM-assisted query generation.
5//!
6//! Configuration (env vars):
7//! - `FUSEKI_URL`     — Fuseki base URL (default: `http://fuseki:3030`)
8//! - `FUSEKI_DATASET` — Dataset path   (default: `/ds`)
9
10use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::mpsc;
14
15use crate::llm_agent::{LlmAgent, LlmConfig};
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct FusekiAgent {
19    config: ActorConfig,
20    fuseki_url: String,
21    dataset: String,
22    fuseki_user: String,
23    fuseki_password: String,
24    http: reqwest::Client,
25    llm: Option<LlmAgent>,
26    state: ActorState,
27    metrics: Arc<ActorMetrics>,
28    mailbox_tx: mpsc::Sender<Message>,
29    mailbox_rx: Option<mpsc::Receiver<Message>>,
30    publisher: Option<EventPublisher>,
31}
32
33impl FusekiAgent {
34    fn dataset_base_url(&self) -> String {
35        let base = self.fuseki_url.trim_end_matches('/');
36        let dataset = self.dataset.trim();
37        if dataset.is_empty() {
38            base.to_string()
39        } else if dataset.starts_with('/') {
40            format!("{base}{dataset}")
41        } else {
42            format!("{base}/{dataset}")
43        }
44    }
45
46    pub fn new(config: ActorConfig) -> Self {
47        let fuseki_url =
48            std::env::var("FUSEKI_URL").unwrap_or_else(|_| "http://fuseki:3030".into());
49        let dataset = std::env::var("FUSEKI_DATASET").unwrap_or_else(|_| "/ds".into());
50        let fuseki_user = std::env::var("FUSEKI_USER").unwrap_or_default();
51        let fuseki_password = std::env::var("FUSEKI_PASSWORD").unwrap_or_default();
52        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
53        Self {
54            config,
55            fuseki_url,
56            dataset,
57            fuseki_user,
58            fuseki_password,
59            http: reqwest::Client::new(),
60            llm: None,
61            state: ActorState::Initializing,
62            metrics: Arc::new(ActorMetrics::new()),
63            mailbox_tx: tx,
64            mailbox_rx: Some(rx),
65            publisher: None,
66        }
67    }
68
69    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
70        self.publisher = Some(p);
71        self
72    }
73
74    /// Override the Fuseki URL and dataset instead of relying on environment variables.
75    pub fn with_fuseki_config(
76        mut self,
77        url: impl Into<String>,
78        dataset: impl Into<String>,
79    ) -> Self {
80        let url = url.into();
81        let dataset = dataset.into();
82        if !url.is_empty() {
83            self.fuseki_url = url;
84        }
85        if !dataset.is_empty() {
86            self.dataset = dataset;
87        }
88        self
89    }
90
91    pub fn with_fuseki_auth(
92        mut self,
93        user: impl Into<String>,
94        password: impl Into<String>,
95    ) -> Self {
96        self.fuseki_user = user.into();
97        self.fuseki_password = password.into();
98        self
99    }
100
101    pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
102        let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
103        self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
104        self
105    }
106
107    /// Execute a SPARQL SELECT query; returns JSON results.
108    async fn sparql_query(&self, query: &str) -> Result<serde_json::Value> {
109        let url = format!("{}/query", self.dataset_base_url());
110        let mut req = self
111            .http
112            .post(&url)
113            .header("Content-Type", "application/sparql-query")
114            .header("Accept", "application/sparql-results+json");
115        if !self.fuseki_user.is_empty() {
116            req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
117        }
118        let resp = req.body(query.to_string()).send().await?;
119        if !resp.status().is_success() {
120            let s = resp.status();
121            let t = resp.text().await.unwrap_or_default();
122            anyhow::bail!("Fuseki {s}: {t}");
123        }
124        Ok(resp.json().await?)
125    }
126
127    /// Execute a SPARQL UPDATE statement.
128    async fn sparql_update(&self, update: &str) -> Result<()> {
129        let url = format!("{}/update", self.dataset_base_url());
130        let mut req = self
131            .http
132            .post(&url)
133            .header("Content-Type", "application/sparql-update");
134        if !self.fuseki_user.is_empty() {
135            req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
136        }
137        let resp = req.body(update.to_string()).send().await?;
138        if !resp.status().is_success() {
139            let s = resp.status();
140            let t = resp.text().await.unwrap_or_default();
141            anyhow::bail!("Fuseki update {s}: {t}");
142        }
143        Ok(())
144    }
145
146    async fn process(&mut self, text: &str) -> String {
147        // If input looks like SPARQL, run it directly
148        let trimmed = text.trim().to_uppercase();
149        let endpoint = self.dataset_base_url();
150        if trimmed.starts_with("SELECT")
151            || trimmed.starts_with("ASK")
152            || trimmed.starts_with("CONSTRUCT")
153        {
154            match self.sparql_query(text).await {
155                Ok(v) => format!(
156                    "SPARQL results:\n{}",
157                    serde_json::to_string_pretty(&v).unwrap_or_else(|_| v.to_string())
158                ),
159                Err(e) => format!("Fuseki error: {e}"),
160            }
161        } else if trimmed.starts_with("INSERT")
162            || trimmed.starts_with("DELETE")
163            || trimmed.starts_with("WITH")
164        {
165            match self.sparql_update(text).await {
166                Ok(()) => "Update executed successfully.".into(),
167                Err(e) => format!("Fuseki update error: {e}"),
168            }
169        } else if let Some(llm) = &mut self.llm {
170            let prompt = format!(
171                "You are a SPARQL/RDF expert connected to a Fuseki endpoint at {}\n\
172                 The user asked: \"{text}\"\n\
173                 Generate a SPARQL query or respond helpfully. \
174                 If you generate a query, wrap it in ```sparql ... ``` fences.",
175                endpoint
176            );
177            llm.complete(&prompt)
178                .await
179                .unwrap_or_else(|e| format!("LLM error: {e}"))
180        } else {
181            format!(
182                "Provide a SPARQL query (SELECT/ASK/INSERT/DELETE) to execute against {}",
183                endpoint
184            )
185        }
186    }
187
188    fn now_ms() -> u64 {
189        std::time::SystemTime::now()
190            .duration_since(std::time::UNIX_EPOCH)
191            .unwrap_or_default()
192            .as_millis() as u64
193    }
194}
195
196#[async_trait]
197impl Actor for FusekiAgent {
198    fn id(&self) -> String {
199        self.config.id.clone()
200    }
201    fn name(&self) -> &str {
202        &self.config.name
203    }
204    fn state(&self) -> ActorState {
205        self.state.clone()
206    }
207    fn metrics(&self) -> Arc<ActorMetrics> {
208        Arc::clone(&self.metrics)
209    }
210    fn mailbox(&self) -> mpsc::Sender<Message> {
211        self.mailbox_tx.clone()
212    }
213
214    async fn on_start(&mut self) -> Result<()> {
215        self.state = ActorState::Running;
216        tracing::info!(
217            "[{}] Fuseki agent → {}",
218            self.config.name,
219            self.dataset_base_url()
220        );
221        if let Some(pub_) = &self.publisher {
222            pub_.publish(
223                wactorz_mqtt::topics::spawn(&self.config.id),
224                &serde_json::json!({
225                    "agentId":   self.config.id,
226                    "agentName": self.config.name,
227                    "agentType": "fuseki",
228                    "fusekiUrl": self.fuseki_url,
229                    "dataset":   self.dataset,
230                    "timestampMs": Self::now_ms(),
231                }),
232            );
233        }
234        Ok(())
235    }
236
237    async fn handle_message(&mut self, message: Message) -> Result<()> {
238        use wactorz_core::message::MessageType;
239        let text = match &message.payload {
240            MessageType::Text { content } => content.clone(),
241            MessageType::Task { description, .. } => description.clone(),
242            _ => return Ok(()),
243        };
244        let response = self.process(&text).await;
245        if let Some(pub_) = &self.publisher {
246            pub_.publish(
247                wactorz_mqtt::topics::chat(&self.config.id),
248                &serde_json::json!({
249                    "from":      self.config.name,
250                    "to":        message.from.as_deref().unwrap_or("user"),
251                    "content":   response,
252                    "timestampMs": Self::now_ms(),
253                }),
254            );
255        }
256        Ok(())
257    }
258
259    async fn on_heartbeat(&mut self) -> Result<()> {
260        if let Some(pub_) = &self.publisher {
261            pub_.publish(
262                wactorz_mqtt::topics::heartbeat(&self.config.id),
263                &serde_json::json!({
264                    "agentId":   self.config.id,
265                    "agentName": self.config.name,
266                    "state":     self.state,
267                    "fusekiUrl": self.fuseki_url,
268                    "timestampMs": Self::now_ms(),
269                }),
270            );
271        }
272        Ok(())
273    }
274
275    async fn run(&mut self) -> Result<()> {
276        self.on_start().await?;
277        let mut rx = self
278            .mailbox_rx
279            .take()
280            .ok_or_else(|| anyhow::anyhow!("FusekiAgent already running"))?;
281        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
282            self.config.heartbeat_interval_secs,
283        ));
284        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
285        loop {
286            tokio::select! {
287                biased;
288                msg = rx.recv() => {
289                    match msg {
290                        None    => break,
291                        Some(m) => {
292                            self.metrics.record_received();
293                            if let wactorz_core::message::MessageType::Command {
294                                command: wactorz_core::message::ActorCommand::Stop
295                            } = &m.payload { break; }
296                            match self.handle_message(m).await {
297                                Ok(_)  => self.metrics.record_processed(),
298                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
299                            }
300                        }
301                    }
302                }
303                _ = hb.tick() => {
304                    self.metrics.record_heartbeat();
305                    if let Err(e) = self.on_heartbeat().await {
306                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
307                    }
308                }
309            }
310        }
311        self.state = ActorState::Stopped;
312        self.on_stop().await
313    }
314}