wactorz_agents/
ha_state_bridge.rs

1//! Home Assistant -> MQTT + Fuseki bridge.
2//!
3//! This Rust implementation takes a pragmatic parity step toward the Python
4//! `wactorz.fuseki` bridge:
5//! - polls Home Assistant's REST `/api/states` endpoint
6//! - republishes changed states to MQTT
7//! - maintains three Fuseki named graphs the frontend queries:
8//!   - `urn:ha:current`
9//!   - `urn:ha:devices`
10//!   - `urn:ha:history`
11
12use anyhow::Result;
13use async_trait::async_trait;
14use reqwest::StatusCode;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19use urlencoding::encode;
20
21use wactorz_core::message::{ActorCommand, MessageType};
22use wactorz_core::{
23    Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
24};
25
26const DEFAULT_OUTPUT_TOPIC: &str = "ha/state";
27const GRAPH_CURRENT: &str = "urn:ha:current";
28const GRAPH_HISTORY: &str = "urn:ha:history";
29const GRAPH_DEVICES: &str = "urn:ha:devices";
30const GRAPH_AGENTS: &str = "urn:wactorz:agents";
31const POLL_SECS: u64 = 15;
32
33const TTL_PREFIXES: &str = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .\n\
34@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .\n\
35@prefix sosa: <http://www.w3.org/ns/sosa/> .\n\
36@prefix syn: <https://synapse.waldiez.io/ns#> .\n\n";
37
38pub struct HomeAssistantStateBridgeAgent {
39    config: ActorConfig,
40    state: ActorState,
41    metrics: Arc<ActorMetrics>,
42    mailbox_tx: mpsc::Sender<Message>,
43    mailbox_rx: Option<mpsc::Receiver<Message>>,
44    publisher: Option<EventPublisher>,
45    system: Option<ActorSystem>,
46    http: reqwest::Client,
47    ha_url: String,
48    ha_token: String,
49    output_topic: String,
50    domains: Vec<String>,
51    fuseki_url: String,
52    fuseki_dataset: String,
53    fuseki_user: String,
54    fuseki_password: String,
55    last_states: HashMap<String, String>,
56    events_seen: u64,
57    last_error: String,
58}
59
60impl HomeAssistantStateBridgeAgent {
61    pub fn new(config: ActorConfig) -> Self {
62        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
63        Self {
64            config,
65            state: ActorState::Initializing,
66            metrics: Arc::new(ActorMetrics::new()),
67            mailbox_tx: tx,
68            mailbox_rx: Some(rx),
69            publisher: None,
70            system: None,
71            http: reqwest::Client::new(),
72            ha_url: String::new(),
73            ha_token: String::new(),
74            output_topic: DEFAULT_OUTPUT_TOPIC.to_string(),
75            domains: Vec::new(),
76            fuseki_url: String::new(),
77            fuseki_dataset: String::new(),
78            fuseki_user: String::new(),
79            fuseki_password: String::new(),
80            last_states: HashMap::new(),
81            events_seen: 0,
82            last_error: String::new(),
83        }
84    }
85
86    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
87        self.publisher = Some(p);
88        self
89    }
90
91    pub fn with_system(mut self, system: ActorSystem) -> Self {
92        self.system = Some(system);
93        self
94    }
95
96    pub fn with_ha_config(
97        mut self,
98        url: String,
99        token: String,
100        output_topic: String,
101        domains: Vec<String>,
102    ) -> Self {
103        if !url.is_empty() {
104            self.ha_url = url.trim_end_matches('/').to_string();
105        }
106        if !token.is_empty() {
107            self.ha_token = token;
108        }
109        if !output_topic.is_empty() {
110            self.output_topic = output_topic;
111        }
112        self.domains = domains.into_iter().map(|d| d.to_lowercase()).collect();
113        self
114    }
115
116    pub fn with_fuseki_config(mut self, url: String, dataset: String) -> Self {
117        if !url.is_empty() {
118            self.fuseki_url = url.trim_end_matches('/').to_string();
119        }
120        if !dataset.is_empty() {
121            self.fuseki_dataset = dataset.trim_matches('/').to_string();
122        }
123        self
124    }
125
126    pub fn with_fuseki_auth(mut self, user: String, password: String) -> Self {
127        self.fuseki_user = user;
128        self.fuseki_password = password;
129        self
130    }
131
132    async fn fetch_states(&self) -> Result<Vec<Value>> {
133        let resp = self
134            .http
135            .get(format!("{}/api/states", self.ha_url))
136            .header("Authorization", format!("Bearer {}", self.ha_token))
137            .header("Content-Type", "application/json")
138            .send()
139            .await?;
140        let status = resp.status();
141        if !status.is_success() {
142            anyhow::bail!("Home Assistant states fetch failed: {status}");
143        }
144        Ok(resp.json::<Vec<Value>>().await?)
145    }
146
147    fn domain_allowed(&self, entity_id: &str) -> bool {
148        if self.domains.is_empty() {
149            return true;
150        }
151        let domain = entity_id
152            .split('.')
153            .next()
154            .unwrap_or_default()
155            .to_lowercase();
156        self.domains.iter().any(|d| d == &domain)
157    }
158
159    fn safe(raw: &str) -> String {
160        raw.chars()
161            .map(|c| {
162                if c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-' {
163                    c
164                } else {
165                    '_'
166                }
167            })
168            .collect()
169    }
170
171    fn esc(raw: &str) -> String {
172        raw.replace('\\', "\\\\")
173            .replace('"', "\\\"")
174            .replace('\n', "\\n")
175            .replace('\r', "\\r")
176            .replace('\t', "\\t")
177    }
178
179    fn literal(raw: &str) -> String {
180        format!("\"{}\"", Self::esc(raw))
181    }
182
183    fn entity_iri(entity_id: &str) -> String {
184        format!("<urn:ha:entity:{}>", Self::safe(entity_id))
185    }
186
187    fn obs_iri(entity_id: &str, ts_ms: u64) -> String {
188        format!("<urn:ha:obs:{}_{ts_ms}>", Self::safe(entity_id))
189    }
190
191    fn label_for(state: &Value, entity_id: &str) -> String {
192        state
193            .get("attributes")
194            .and_then(|v| v.get("friendly_name"))
195            .and_then(|v| v.as_str())
196            .unwrap_or(entity_id)
197            .to_string()
198    }
199
200    fn state_value_for(state: &Value) -> String {
201        state
202            .get("state")
203            .and_then(|v| v.as_str())
204            .unwrap_or("")
205            .to_string()
206    }
207
208    fn timestamp_for(state: &Value) -> String {
209        state
210            .get("last_changed")
211            .and_then(|v| v.as_str())
212            .or_else(|| state.get("last_updated").and_then(|v| v.as_str()))
213            .unwrap_or("1970-01-01T00:00:00Z")
214            .to_string()
215    }
216
217    fn domain_type(domain: &str) -> &'static str {
218        match domain {
219            "sensor" | "binary_sensor" | "weather" => "sosa:Sensor",
220            _ => "sosa:Actuator",
221        }
222    }
223
224    fn current_ttl(&self, states: &[Value]) -> String {
225        let mut ttl = String::from(TTL_PREFIXES);
226        for state in states {
227            let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
228                continue;
229            };
230            let domain = entity_id.split('.').next().unwrap_or_default();
231            let label = Self::label_for(state, entity_id);
232            let value = Self::state_value_for(state);
233            let ts = Self::timestamp_for(state);
234            let entity = Self::entity_iri(entity_id);
235            ttl.push_str(&format!(
236                "{entity}\n  a {} ;\n  rdfs:label {} ;\n  syn:entityId {} ;\n  syn:domain {} ;\n  syn:state {} ;\n  syn:lastChanged \"{}\"^^xsd:dateTime",
237                Self::domain_type(domain),
238                Self::literal(&label),
239                Self::literal(entity_id),
240                Self::literal(domain),
241                Self::literal(&value),
242                Self::esc(&ts),
243            ));
244            if let Some(unit) = state
245                .get("attributes")
246                .and_then(|v| v.get("unit_of_measurement"))
247                .and_then(|v| v.as_str())
248            {
249                ttl.push_str(&format!(" ;\n  syn:unit {}", Self::literal(unit)));
250            }
251            ttl.push_str(" .\n\n");
252        }
253        ttl
254    }
255
256    fn devices_ttl(&self, states: &[Value]) -> String {
257        let mut ttl = String::from(TTL_PREFIXES);
258        ttl.push_str("<urn:ha:bridge:wactorz>\n  rdfs:label \"wactorz HA bridge\" .\n\n");
259        for state in states {
260            let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
261                continue;
262            };
263            let domain = entity_id.split('.').next().unwrap_or_default();
264            let label = Self::label_for(state, entity_id);
265            let entity = Self::entity_iri(entity_id);
266            ttl.push_str(&format!(
267                "{entity}\n  a {} ;\n  rdfs:label {} ;\n  syn:entityId {} ;\n  syn:domain {} .\n\n",
268                Self::domain_type(domain),
269                Self::literal(&label),
270                Self::literal(entity_id),
271                Self::literal(domain),
272            ));
273        }
274        ttl
275    }
276
277    fn history_ttl(&self, state: &Value, ts_ms: u64) -> Option<String> {
278        let entity_id = state.get("entity_id").and_then(|v| v.as_str())?;
279        let value = Self::state_value_for(state);
280        let ts = Self::timestamp_for(state);
281        let obs = Self::obs_iri(entity_id, ts_ms);
282        let entity = Self::entity_iri(entity_id);
283        Some(format!(
284            "{TTL_PREFIXES}{obs}\n  a sosa:Observation ;\n  sosa:madeBySensor {entity} ;\n  sosa:hasSimpleResult {} ;\n  sosa:resultTime \"{}\"^^xsd:dateTime .\n",
285            Self::literal(&value),
286            Self::esc(&ts),
287        ))
288    }
289
290    async fn agents_ttl(&self) -> String {
291        let mut ttl = String::from(TTL_PREFIXES);
292        let Some(system) = &self.system else {
293            ttl.push_str(
294                "<urn:wactorz:bridge:agent-registry>\n  rdfs:label \"wactorz agent registry bridge\" .\n",
295            );
296            return ttl;
297        };
298        let actors = system.registry.list().await;
299        ttl.push_str(
300            "<urn:wactorz:bridge:agent-registry>\n  rdfs:label \"wactorz agent registry bridge\" .\n\n",
301        );
302        for actor in actors {
303            let iri = format!("<urn:wactorz:agent:{}>", Self::safe(&actor.name));
304            let state = format!("{}", actor.state);
305            ttl.push_str(&format!(
306                "{iri}\n  rdfs:label {} ;\n  syn:actorId {} ;\n  syn:state {} ;\n  syn:protected \"{}\"^^xsd:boolean",
307                Self::literal(&actor.name),
308                Self::literal(&actor.id),
309                Self::literal(&state),
310                if actor.protected { "true" } else { "false" },
311            ));
312            if let Some(supervisor_id) = &actor.supervisor_id {
313                ttl.push_str(&format!(
314                    " ;\n  syn:supervisorId {}",
315                    Self::literal(supervisor_id)
316                ));
317            }
318            ttl.push_str(" .\n\n");
319        }
320        ttl
321    }
322
323    fn gsp_url(&self, graph: &str) -> String {
324        format!(
325            "{}/{}/data?graph={}",
326            self.fuseki_url,
327            self.fuseki_dataset,
328            encode(graph)
329        )
330    }
331
332    async fn replace_graph(&self, graph: &str, ttl: String) -> Result<()> {
333        if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
334            tracing::warn!(
335                "[ha-state-bridge] skipping replace_graph graph={} because Fuseki is not configured (base='{}' dataset='{}')",
336                graph,
337                self.fuseki_url,
338                self.fuseki_dataset
339            );
340            return Ok(());
341        }
342        let target = self.gsp_url(graph);
343        tracing::info!(
344            "[ha-state-bridge] replace_graph graph={} target={} bytes={}",
345            graph,
346            target,
347            ttl.len()
348        );
349        let mut req = self.http.put(&target).header("Content-Type", "text/turtle");
350        if !self.fuseki_user.is_empty() {
351            req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
352        }
353        let resp = req.body(ttl).send().await?;
354        let status = resp.status();
355        tracing::info!(
356            "[ha-state-bridge] replace_graph graph={} status={}",
357            graph,
358            status
359        );
360        if !matches!(
361            status,
362            StatusCode::OK | StatusCode::CREATED | StatusCode::NO_CONTENT
363        ) {
364            let body = resp.text().await.unwrap_or_default();
365            anyhow::bail!("Fuseki replace_graph {graph} failed: {status} {body}");
366        }
367        Ok(())
368    }
369
370    async fn append_graph(&self, graph: &str, ttl: String) -> Result<()> {
371        if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
372            tracing::warn!(
373                "[ha-state-bridge] skipping append_graph graph={} because Fuseki is not configured (base='{}' dataset='{}')",
374                graph,
375                self.fuseki_url,
376                self.fuseki_dataset
377            );
378            return Ok(());
379        }
380        let target = self.gsp_url(graph);
381        tracing::info!(
382            "[ha-state-bridge] append_graph graph={} target={} bytes={}",
383            graph,
384            target,
385            ttl.len()
386        );
387        let mut req = self
388            .http
389            .post(&target)
390            .header("Content-Type", "text/turtle");
391        if !self.fuseki_user.is_empty() {
392            req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
393        }
394        let resp = req.body(ttl).send().await?;
395        let status = resp.status();
396        tracing::info!(
397            "[ha-state-bridge] append_graph graph={} status={}",
398            graph,
399            status
400        );
401        if !matches!(
402            status,
403            StatusCode::OK | StatusCode::CREATED | StatusCode::NO_CONTENT
404        ) {
405            let body = resp.text().await.unwrap_or_default();
406            anyhow::bail!("Fuseki append_graph {graph} failed: {status} {body}");
407        }
408        Ok(())
409    }
410
411    async fn publish_state_change(&self, state: &Value) {
412        let Some(pub_) = &self.publisher else {
413            return;
414        };
415        let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
416            return;
417        };
418        let domain = entity_id.split('.').next().unwrap_or_default();
419        let topic = format!("{}/{}/{}", self.output_topic, domain, entity_id);
420        pub_.publish(
421            topic,
422            &serde_json::json!({
423                "type": "home_assistant_state_change",
424                "entity_id": entity_id,
425                "domain": domain,
426                "new_state": state,
427                "timestamp": Self::now_secs(),
428            }),
429        );
430    }
431
432    async fn sync_once(&mut self, seed_history: bool) -> Result<()> {
433        let states = self.fetch_states().await?;
434        let filtered: Vec<Value> = states
435            .into_iter()
436            .filter(|state| {
437                state
438                    .get("entity_id")
439                    .and_then(|v| v.as_str())
440                    .map(|id| self.domain_allowed(id))
441                    .unwrap_or(false)
442            })
443            .collect();
444        tracing::info!(
445            "[ha-state-bridge] sync_once fetched={} filtered={} seed_history={}",
446            self.last_states.len() + filtered.len(),
447            filtered.len(),
448            seed_history
449        );
450
451        self.replace_graph(GRAPH_CURRENT, self.current_ttl(&filtered))
452            .await?;
453        self.replace_graph(GRAPH_DEVICES, self.devices_ttl(&filtered))
454            .await?;
455
456        for state in &filtered {
457            let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
458                continue;
459            };
460            let snapshot = serde_json::to_string(state).unwrap_or_default();
461            let changed = self
462                .last_states
463                .get(entity_id)
464                .map(|prev| prev != &snapshot)
465                .unwrap_or(true);
466            if seed_history || changed {
467                if let Some(ttl) = self.history_ttl(state, Self::now_ms()) {
468                    self.append_graph(GRAPH_HISTORY, ttl).await?;
469                }
470                self.publish_state_change(state).await;
471                self.events_seen += 1;
472            }
473            self.last_states.insert(entity_id.to_string(), snapshot);
474        }
475        Ok(())
476    }
477
478    async fn sync_agents_graph(&self) -> Result<()> {
479        self.replace_graph(GRAPH_AGENTS, self.agents_ttl().await)
480            .await
481    }
482
483    fn status_payload(&self) -> Value {
484        serde_json::json!({
485            "configured": !self.ha_url.is_empty() && !self.ha_token.is_empty(),
486            "events_seen": self.events_seen,
487            "last_error": self.last_error,
488            "output_topic": self.output_topic,
489            "domains": self.domains,
490            "fuseki_url": self.fuseki_url,
491            "fuseki_dataset": self.fuseki_dataset,
492        })
493    }
494
495    fn now_ms() -> u64 {
496        std::time::SystemTime::now()
497            .duration_since(std::time::UNIX_EPOCH)
498            .unwrap_or_default()
499            .as_millis() as u64
500    }
501
502    fn now_secs() -> f64 {
503        std::time::SystemTime::now()
504            .duration_since(std::time::UNIX_EPOCH)
505            .unwrap_or_default()
506            .as_secs_f64()
507    }
508}
509
510#[async_trait]
511impl Actor for HomeAssistantStateBridgeAgent {
512    fn id(&self) -> String {
513        self.config.id.clone()
514    }
515
516    fn name(&self) -> &str {
517        &self.config.name
518    }
519
520    fn state(&self) -> ActorState {
521        self.state.clone()
522    }
523
524    fn metrics(&self) -> Arc<ActorMetrics> {
525        Arc::clone(&self.metrics)
526    }
527
528    fn mailbox(&self) -> mpsc::Sender<Message> {
529        self.mailbox_tx.clone()
530    }
531
532    fn is_protected(&self) -> bool {
533        self.config.protected
534    }
535
536    async fn on_start(&mut self) -> Result<()> {
537        self.state = ActorState::Running;
538        tracing::info!(
539            "[ha-state-bridge] started (ha={}, fuseki={}/{}, output_topic={}, domains={:?})",
540            !self.ha_url.is_empty() && !self.ha_token.is_empty(),
541            self.fuseki_url,
542            self.fuseki_dataset,
543            self.output_topic,
544            self.domains,
545        );
546        if let Some(pub_) = &self.publisher {
547            pub_.publish(
548                wactorz_mqtt::topics::spawn(&self.config.id),
549                &serde_json::json!({
550                    "agentId": self.config.id,
551                    "agentName": self.config.name,
552                    "agentType": "ha_state_bridge",
553                    "timestampMs": Self::now_ms(),
554                }),
555            );
556        }
557        match self.sync_agents_graph().await {
558            Ok(()) => tracing::info!("[ha-state-bridge] synced agent graph"),
559            Err(err) => tracing::warn!("[ha-state-bridge] agent graph sync failed: {err}"),
560        }
561        if self.ha_url.is_empty() || self.ha_token.is_empty() {
562            self.last_error = "HA_URL/HA_TOKEN not configured".to_string();
563            tracing::warn!("[ha-state-bridge] {}", self.last_error);
564            return Ok(());
565        }
566        if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
567            tracing::warn!("[ha-state-bridge] Fuseki not configured; MQTT bridge will still run");
568        }
569        match self.sync_once(true).await {
570            Ok(()) => self.last_error.clear(),
571            Err(err) => {
572                self.last_error = err.to_string();
573                tracing::warn!("[ha-state-bridge] initial sync failed: {err}");
574            }
575        }
576        Ok(())
577    }
578
579    async fn handle_message(&mut self, message: Message) -> Result<()> {
580        match &message.payload {
581            MessageType::Text { content } if content.trim().eq_ignore_ascii_case("status") => {
582                if let Some(pub_) = &self.publisher {
583                    pub_.publish(
584                        wactorz_mqtt::topics::chat(&self.config.id),
585                        &serde_json::json!({
586                            "from": self.config.name,
587                            "to": message.from.as_deref().unwrap_or("user"),
588                            "content": self.status_payload(),
589                            "timestampMs": Self::now_ms(),
590                        }),
591                    );
592                }
593            }
594            MessageType::Command {
595                command: ActorCommand::Status,
596            } => {
597                tracing::info!(
598                    "[ha-state-bridge] status requested: {}",
599                    self.status_payload()
600                );
601            }
602            _ => {}
603        }
604        Ok(())
605    }
606
607    async fn on_heartbeat(&mut self) -> Result<()> {
608        if let Some(pub_) = &self.publisher {
609            pub_.publish(
610                wactorz_mqtt::topics::heartbeat(&self.config.id),
611                &serde_json::json!({
612                    "agentId": self.config.id,
613                    "agentName": self.config.name,
614                    "state": self.state,
615                    "task": format!("ha->mqtt+fuseki events_seen={}", self.events_seen),
616                    "timestampMs": Self::now_ms(),
617                }),
618            );
619        }
620        Ok(())
621    }
622
623    async fn run(&mut self) -> Result<()> {
624        self.on_start().await?;
625        let mut rx = self
626            .mailbox_rx
627            .take()
628            .ok_or_else(|| anyhow::anyhow!("HomeAssistantStateBridgeAgent already running"))?;
629        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
630            self.config.heartbeat_interval_secs,
631        ));
632        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
633        let mut poll = tokio::time::interval(std::time::Duration::from_secs(POLL_SECS));
634        poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
635        loop {
636            tokio::select! {
637                biased;
638                msg = rx.recv() => {
639                    match msg {
640                        None => break,
641                        Some(m) => {
642                            self.metrics.record_received();
643                            if let MessageType::Command { command: ActorCommand::Stop } = &m.payload {
644                                break;
645                            }
646                            match self.handle_message(m).await {
647                                Ok(_) => self.metrics.record_processed(),
648                                Err(e) => {
649                                    tracing::error!("[{}] {e}", self.config.name);
650                                    self.metrics.record_failed();
651                                }
652                            }
653                        }
654                    }
655                }
656                _ = poll.tick() => {
657                    if let Err(err) = self.sync_agents_graph().await {
658                        tracing::warn!("[ha-state-bridge] agent graph sync failed: {err}");
659                    }
660                    match self.sync_once(false).await {
661                        Ok(()) => self.last_error.clear(),
662                        Err(err) => {
663                            self.last_error = err.to_string();
664                            tracing::warn!("[ha-state-bridge] sync failed: {err}");
665                        }
666                    }
667                }
668                _ = hb.tick() => {
669                    self.metrics.record_heartbeat();
670                    if let Err(e) = self.on_heartbeat().await {
671                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
672                    }
673                }
674            }
675        }
676        self.state = ActorState::Stopped;
677        self.on_stop().await
678    }
679}