1use anyhow::Result;
13use async_trait::async_trait;
14use reqwest::StatusCode;
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19use urlencoding::encode;
20
21use wactorz_core::message::{ActorCommand, MessageType};
22use wactorz_core::{
23 Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
24};
25
26const DEFAULT_OUTPUT_TOPIC: &str = "ha/state";
27const GRAPH_CURRENT: &str = "urn:ha:current";
28const GRAPH_HISTORY: &str = "urn:ha:history";
29const GRAPH_DEVICES: &str = "urn:ha:devices";
30const GRAPH_AGENTS: &str = "urn:wactorz:agents";
31const POLL_SECS: u64 = 15;
32
33const TTL_PREFIXES: &str = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .\n\
34@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .\n\
35@prefix sosa: <http://www.w3.org/ns/sosa/> .\n\
36@prefix syn: <https://synapse.waldiez.io/ns#> .\n\n";
37
38pub struct HomeAssistantStateBridgeAgent {
39 config: ActorConfig,
40 state: ActorState,
41 metrics: Arc<ActorMetrics>,
42 mailbox_tx: mpsc::Sender<Message>,
43 mailbox_rx: Option<mpsc::Receiver<Message>>,
44 publisher: Option<EventPublisher>,
45 system: Option<ActorSystem>,
46 http: reqwest::Client,
47 ha_url: String,
48 ha_token: String,
49 output_topic: String,
50 domains: Vec<String>,
51 fuseki_url: String,
52 fuseki_dataset: String,
53 fuseki_user: String,
54 fuseki_password: String,
55 last_states: HashMap<String, String>,
56 events_seen: u64,
57 last_error: String,
58}
59
60impl HomeAssistantStateBridgeAgent {
61 pub fn new(config: ActorConfig) -> Self {
62 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
63 Self {
64 config,
65 state: ActorState::Initializing,
66 metrics: Arc::new(ActorMetrics::new()),
67 mailbox_tx: tx,
68 mailbox_rx: Some(rx),
69 publisher: None,
70 system: None,
71 http: reqwest::Client::new(),
72 ha_url: String::new(),
73 ha_token: String::new(),
74 output_topic: DEFAULT_OUTPUT_TOPIC.to_string(),
75 domains: Vec::new(),
76 fuseki_url: String::new(),
77 fuseki_dataset: String::new(),
78 fuseki_user: String::new(),
79 fuseki_password: String::new(),
80 last_states: HashMap::new(),
81 events_seen: 0,
82 last_error: String::new(),
83 }
84 }
85
86 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
87 self.publisher = Some(p);
88 self
89 }
90
91 pub fn with_system(mut self, system: ActorSystem) -> Self {
92 self.system = Some(system);
93 self
94 }
95
96 pub fn with_ha_config(
97 mut self,
98 url: String,
99 token: String,
100 output_topic: String,
101 domains: Vec<String>,
102 ) -> Self {
103 if !url.is_empty() {
104 self.ha_url = url.trim_end_matches('/').to_string();
105 }
106 if !token.is_empty() {
107 self.ha_token = token;
108 }
109 if !output_topic.is_empty() {
110 self.output_topic = output_topic;
111 }
112 self.domains = domains.into_iter().map(|d| d.to_lowercase()).collect();
113 self
114 }
115
116 pub fn with_fuseki_config(mut self, url: String, dataset: String) -> Self {
117 if !url.is_empty() {
118 self.fuseki_url = url.trim_end_matches('/').to_string();
119 }
120 if !dataset.is_empty() {
121 self.fuseki_dataset = dataset.trim_matches('/').to_string();
122 }
123 self
124 }
125
126 pub fn with_fuseki_auth(mut self, user: String, password: String) -> Self {
127 self.fuseki_user = user;
128 self.fuseki_password = password;
129 self
130 }
131
132 async fn fetch_states(&self) -> Result<Vec<Value>> {
133 let resp = self
134 .http
135 .get(format!("{}/api/states", self.ha_url))
136 .header("Authorization", format!("Bearer {}", self.ha_token))
137 .header("Content-Type", "application/json")
138 .send()
139 .await?;
140 let status = resp.status();
141 if !status.is_success() {
142 anyhow::bail!("Home Assistant states fetch failed: {status}");
143 }
144 Ok(resp.json::<Vec<Value>>().await?)
145 }
146
147 fn domain_allowed(&self, entity_id: &str) -> bool {
148 if self.domains.is_empty() {
149 return true;
150 }
151 let domain = entity_id
152 .split('.')
153 .next()
154 .unwrap_or_default()
155 .to_lowercase();
156 self.domains.iter().any(|d| d == &domain)
157 }
158
159 fn safe(raw: &str) -> String {
160 raw.chars()
161 .map(|c| {
162 if c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-' {
163 c
164 } else {
165 '_'
166 }
167 })
168 .collect()
169 }
170
171 fn esc(raw: &str) -> String {
172 raw.replace('\\', "\\\\")
173 .replace('"', "\\\"")
174 .replace('\n', "\\n")
175 .replace('\r', "\\r")
176 .replace('\t', "\\t")
177 }
178
179 fn literal(raw: &str) -> String {
180 format!("\"{}\"", Self::esc(raw))
181 }
182
183 fn entity_iri(entity_id: &str) -> String {
184 format!("<urn:ha:entity:{}>", Self::safe(entity_id))
185 }
186
187 fn obs_iri(entity_id: &str, ts_ms: u64) -> String {
188 format!("<urn:ha:obs:{}_{ts_ms}>", Self::safe(entity_id))
189 }
190
191 fn label_for(state: &Value, entity_id: &str) -> String {
192 state
193 .get("attributes")
194 .and_then(|v| v.get("friendly_name"))
195 .and_then(|v| v.as_str())
196 .unwrap_or(entity_id)
197 .to_string()
198 }
199
200 fn state_value_for(state: &Value) -> String {
201 state
202 .get("state")
203 .and_then(|v| v.as_str())
204 .unwrap_or("")
205 .to_string()
206 }
207
208 fn timestamp_for(state: &Value) -> String {
209 state
210 .get("last_changed")
211 .and_then(|v| v.as_str())
212 .or_else(|| state.get("last_updated").and_then(|v| v.as_str()))
213 .unwrap_or("1970-01-01T00:00:00Z")
214 .to_string()
215 }
216
217 fn domain_type(domain: &str) -> &'static str {
218 match domain {
219 "sensor" | "binary_sensor" | "weather" => "sosa:Sensor",
220 _ => "sosa:Actuator",
221 }
222 }
223
224 fn current_ttl(&self, states: &[Value]) -> String {
225 let mut ttl = String::from(TTL_PREFIXES);
226 for state in states {
227 let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
228 continue;
229 };
230 let domain = entity_id.split('.').next().unwrap_or_default();
231 let label = Self::label_for(state, entity_id);
232 let value = Self::state_value_for(state);
233 let ts = Self::timestamp_for(state);
234 let entity = Self::entity_iri(entity_id);
235 ttl.push_str(&format!(
236 "{entity}\n a {} ;\n rdfs:label {} ;\n syn:entityId {} ;\n syn:domain {} ;\n syn:state {} ;\n syn:lastChanged \"{}\"^^xsd:dateTime",
237 Self::domain_type(domain),
238 Self::literal(&label),
239 Self::literal(entity_id),
240 Self::literal(domain),
241 Self::literal(&value),
242 Self::esc(&ts),
243 ));
244 if let Some(unit) = state
245 .get("attributes")
246 .and_then(|v| v.get("unit_of_measurement"))
247 .and_then(|v| v.as_str())
248 {
249 ttl.push_str(&format!(" ;\n syn:unit {}", Self::literal(unit)));
250 }
251 ttl.push_str(" .\n\n");
252 }
253 ttl
254 }
255
256 fn devices_ttl(&self, states: &[Value]) -> String {
257 let mut ttl = String::from(TTL_PREFIXES);
258 ttl.push_str("<urn:ha:bridge:wactorz>\n rdfs:label \"wactorz HA bridge\" .\n\n");
259 for state in states {
260 let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
261 continue;
262 };
263 let domain = entity_id.split('.').next().unwrap_or_default();
264 let label = Self::label_for(state, entity_id);
265 let entity = Self::entity_iri(entity_id);
266 ttl.push_str(&format!(
267 "{entity}\n a {} ;\n rdfs:label {} ;\n syn:entityId {} ;\n syn:domain {} .\n\n",
268 Self::domain_type(domain),
269 Self::literal(&label),
270 Self::literal(entity_id),
271 Self::literal(domain),
272 ));
273 }
274 ttl
275 }
276
277 fn history_ttl(&self, state: &Value, ts_ms: u64) -> Option<String> {
278 let entity_id = state.get("entity_id").and_then(|v| v.as_str())?;
279 let value = Self::state_value_for(state);
280 let ts = Self::timestamp_for(state);
281 let obs = Self::obs_iri(entity_id, ts_ms);
282 let entity = Self::entity_iri(entity_id);
283 Some(format!(
284 "{TTL_PREFIXES}{obs}\n a sosa:Observation ;\n sosa:madeBySensor {entity} ;\n sosa:hasSimpleResult {} ;\n sosa:resultTime \"{}\"^^xsd:dateTime .\n",
285 Self::literal(&value),
286 Self::esc(&ts),
287 ))
288 }
289
290 async fn agents_ttl(&self) -> String {
291 let mut ttl = String::from(TTL_PREFIXES);
292 let Some(system) = &self.system else {
293 ttl.push_str(
294 "<urn:wactorz:bridge:agent-registry>\n rdfs:label \"wactorz agent registry bridge\" .\n",
295 );
296 return ttl;
297 };
298 let actors = system.registry.list().await;
299 ttl.push_str(
300 "<urn:wactorz:bridge:agent-registry>\n rdfs:label \"wactorz agent registry bridge\" .\n\n",
301 );
302 for actor in actors {
303 let iri = format!("<urn:wactorz:agent:{}>", Self::safe(&actor.name));
304 let state = format!("{}", actor.state);
305 ttl.push_str(&format!(
306 "{iri}\n rdfs:label {} ;\n syn:actorId {} ;\n syn:state {} ;\n syn:protected \"{}\"^^xsd:boolean",
307 Self::literal(&actor.name),
308 Self::literal(&actor.id),
309 Self::literal(&state),
310 if actor.protected { "true" } else { "false" },
311 ));
312 if let Some(supervisor_id) = &actor.supervisor_id {
313 ttl.push_str(&format!(
314 " ;\n syn:supervisorId {}",
315 Self::literal(supervisor_id)
316 ));
317 }
318 ttl.push_str(" .\n\n");
319 }
320 ttl
321 }
322
323 fn gsp_url(&self, graph: &str) -> String {
324 format!(
325 "{}/{}/data?graph={}",
326 self.fuseki_url,
327 self.fuseki_dataset,
328 encode(graph)
329 )
330 }
331
332 async fn replace_graph(&self, graph: &str, ttl: String) -> Result<()> {
333 if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
334 tracing::warn!(
335 "[ha-state-bridge] skipping replace_graph graph={} because Fuseki is not configured (base='{}' dataset='{}')",
336 graph,
337 self.fuseki_url,
338 self.fuseki_dataset
339 );
340 return Ok(());
341 }
342 let target = self.gsp_url(graph);
343 tracing::info!(
344 "[ha-state-bridge] replace_graph graph={} target={} bytes={}",
345 graph,
346 target,
347 ttl.len()
348 );
349 let mut req = self.http.put(&target).header("Content-Type", "text/turtle");
350 if !self.fuseki_user.is_empty() {
351 req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
352 }
353 let resp = req.body(ttl).send().await?;
354 let status = resp.status();
355 tracing::info!(
356 "[ha-state-bridge] replace_graph graph={} status={}",
357 graph,
358 status
359 );
360 if !matches!(
361 status,
362 StatusCode::OK | StatusCode::CREATED | StatusCode::NO_CONTENT
363 ) {
364 let body = resp.text().await.unwrap_or_default();
365 anyhow::bail!("Fuseki replace_graph {graph} failed: {status} {body}");
366 }
367 Ok(())
368 }
369
370 async fn append_graph(&self, graph: &str, ttl: String) -> Result<()> {
371 if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
372 tracing::warn!(
373 "[ha-state-bridge] skipping append_graph graph={} because Fuseki is not configured (base='{}' dataset='{}')",
374 graph,
375 self.fuseki_url,
376 self.fuseki_dataset
377 );
378 return Ok(());
379 }
380 let target = self.gsp_url(graph);
381 tracing::info!(
382 "[ha-state-bridge] append_graph graph={} target={} bytes={}",
383 graph,
384 target,
385 ttl.len()
386 );
387 let mut req = self
388 .http
389 .post(&target)
390 .header("Content-Type", "text/turtle");
391 if !self.fuseki_user.is_empty() {
392 req = req.basic_auth(&self.fuseki_user, Some(&self.fuseki_password));
393 }
394 let resp = req.body(ttl).send().await?;
395 let status = resp.status();
396 tracing::info!(
397 "[ha-state-bridge] append_graph graph={} status={}",
398 graph,
399 status
400 );
401 if !matches!(
402 status,
403 StatusCode::OK | StatusCode::CREATED | StatusCode::NO_CONTENT
404 ) {
405 let body = resp.text().await.unwrap_or_default();
406 anyhow::bail!("Fuseki append_graph {graph} failed: {status} {body}");
407 }
408 Ok(())
409 }
410
411 async fn publish_state_change(&self, state: &Value) {
412 let Some(pub_) = &self.publisher else {
413 return;
414 };
415 let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
416 return;
417 };
418 let domain = entity_id.split('.').next().unwrap_or_default();
419 let topic = format!("{}/{}/{}", self.output_topic, domain, entity_id);
420 pub_.publish(
421 topic,
422 &serde_json::json!({
423 "type": "home_assistant_state_change",
424 "entity_id": entity_id,
425 "domain": domain,
426 "new_state": state,
427 "timestamp": Self::now_secs(),
428 }),
429 );
430 }
431
432 async fn sync_once(&mut self, seed_history: bool) -> Result<()> {
433 let states = self.fetch_states().await?;
434 let filtered: Vec<Value> = states
435 .into_iter()
436 .filter(|state| {
437 state
438 .get("entity_id")
439 .and_then(|v| v.as_str())
440 .map(|id| self.domain_allowed(id))
441 .unwrap_or(false)
442 })
443 .collect();
444 tracing::info!(
445 "[ha-state-bridge] sync_once fetched={} filtered={} seed_history={}",
446 self.last_states.len() + filtered.len(),
447 filtered.len(),
448 seed_history
449 );
450
451 self.replace_graph(GRAPH_CURRENT, self.current_ttl(&filtered))
452 .await?;
453 self.replace_graph(GRAPH_DEVICES, self.devices_ttl(&filtered))
454 .await?;
455
456 for state in &filtered {
457 let Some(entity_id) = state.get("entity_id").and_then(|v| v.as_str()) else {
458 continue;
459 };
460 let snapshot = serde_json::to_string(state).unwrap_or_default();
461 let changed = self
462 .last_states
463 .get(entity_id)
464 .map(|prev| prev != &snapshot)
465 .unwrap_or(true);
466 if seed_history || changed {
467 if let Some(ttl) = self.history_ttl(state, Self::now_ms()) {
468 self.append_graph(GRAPH_HISTORY, ttl).await?;
469 }
470 self.publish_state_change(state).await;
471 self.events_seen += 1;
472 }
473 self.last_states.insert(entity_id.to_string(), snapshot);
474 }
475 Ok(())
476 }
477
478 async fn sync_agents_graph(&self) -> Result<()> {
479 self.replace_graph(GRAPH_AGENTS, self.agents_ttl().await)
480 .await
481 }
482
483 fn status_payload(&self) -> Value {
484 serde_json::json!({
485 "configured": !self.ha_url.is_empty() && !self.ha_token.is_empty(),
486 "events_seen": self.events_seen,
487 "last_error": self.last_error,
488 "output_topic": self.output_topic,
489 "domains": self.domains,
490 "fuseki_url": self.fuseki_url,
491 "fuseki_dataset": self.fuseki_dataset,
492 })
493 }
494
495 fn now_ms() -> u64 {
496 std::time::SystemTime::now()
497 .duration_since(std::time::UNIX_EPOCH)
498 .unwrap_or_default()
499 .as_millis() as u64
500 }
501
502 fn now_secs() -> f64 {
503 std::time::SystemTime::now()
504 .duration_since(std::time::UNIX_EPOCH)
505 .unwrap_or_default()
506 .as_secs_f64()
507 }
508}
509
510#[async_trait]
511impl Actor for HomeAssistantStateBridgeAgent {
512 fn id(&self) -> String {
513 self.config.id.clone()
514 }
515
516 fn name(&self) -> &str {
517 &self.config.name
518 }
519
520 fn state(&self) -> ActorState {
521 self.state.clone()
522 }
523
524 fn metrics(&self) -> Arc<ActorMetrics> {
525 Arc::clone(&self.metrics)
526 }
527
528 fn mailbox(&self) -> mpsc::Sender<Message> {
529 self.mailbox_tx.clone()
530 }
531
532 fn is_protected(&self) -> bool {
533 self.config.protected
534 }
535
536 async fn on_start(&mut self) -> Result<()> {
537 self.state = ActorState::Running;
538 tracing::info!(
539 "[ha-state-bridge] started (ha={}, fuseki={}/{}, output_topic={}, domains={:?})",
540 !self.ha_url.is_empty() && !self.ha_token.is_empty(),
541 self.fuseki_url,
542 self.fuseki_dataset,
543 self.output_topic,
544 self.domains,
545 );
546 if let Some(pub_) = &self.publisher {
547 pub_.publish(
548 wactorz_mqtt::topics::spawn(&self.config.id),
549 &serde_json::json!({
550 "agentId": self.config.id,
551 "agentName": self.config.name,
552 "agentType": "ha_state_bridge",
553 "timestampMs": Self::now_ms(),
554 }),
555 );
556 }
557 match self.sync_agents_graph().await {
558 Ok(()) => tracing::info!("[ha-state-bridge] synced agent graph"),
559 Err(err) => tracing::warn!("[ha-state-bridge] agent graph sync failed: {err}"),
560 }
561 if self.ha_url.is_empty() || self.ha_token.is_empty() {
562 self.last_error = "HA_URL/HA_TOKEN not configured".to_string();
563 tracing::warn!("[ha-state-bridge] {}", self.last_error);
564 return Ok(());
565 }
566 if self.fuseki_url.is_empty() || self.fuseki_dataset.is_empty() {
567 tracing::warn!("[ha-state-bridge] Fuseki not configured; MQTT bridge will still run");
568 }
569 match self.sync_once(true).await {
570 Ok(()) => self.last_error.clear(),
571 Err(err) => {
572 self.last_error = err.to_string();
573 tracing::warn!("[ha-state-bridge] initial sync failed: {err}");
574 }
575 }
576 Ok(())
577 }
578
579 async fn handle_message(&mut self, message: Message) -> Result<()> {
580 match &message.payload {
581 MessageType::Text { content } if content.trim().eq_ignore_ascii_case("status") => {
582 if let Some(pub_) = &self.publisher {
583 pub_.publish(
584 wactorz_mqtt::topics::chat(&self.config.id),
585 &serde_json::json!({
586 "from": self.config.name,
587 "to": message.from.as_deref().unwrap_or("user"),
588 "content": self.status_payload(),
589 "timestampMs": Self::now_ms(),
590 }),
591 );
592 }
593 }
594 MessageType::Command {
595 command: ActorCommand::Status,
596 } => {
597 tracing::info!(
598 "[ha-state-bridge] status requested: {}",
599 self.status_payload()
600 );
601 }
602 _ => {}
603 }
604 Ok(())
605 }
606
607 async fn on_heartbeat(&mut self) -> Result<()> {
608 if let Some(pub_) = &self.publisher {
609 pub_.publish(
610 wactorz_mqtt::topics::heartbeat(&self.config.id),
611 &serde_json::json!({
612 "agentId": self.config.id,
613 "agentName": self.config.name,
614 "state": self.state,
615 "task": format!("ha->mqtt+fuseki events_seen={}", self.events_seen),
616 "timestampMs": Self::now_ms(),
617 }),
618 );
619 }
620 Ok(())
621 }
622
623 async fn run(&mut self) -> Result<()> {
624 self.on_start().await?;
625 let mut rx = self
626 .mailbox_rx
627 .take()
628 .ok_or_else(|| anyhow::anyhow!("HomeAssistantStateBridgeAgent already running"))?;
629 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
630 self.config.heartbeat_interval_secs,
631 ));
632 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
633 let mut poll = tokio::time::interval(std::time::Duration::from_secs(POLL_SECS));
634 poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
635 loop {
636 tokio::select! {
637 biased;
638 msg = rx.recv() => {
639 match msg {
640 None => break,
641 Some(m) => {
642 self.metrics.record_received();
643 if let MessageType::Command { command: ActorCommand::Stop } = &m.payload {
644 break;
645 }
646 match self.handle_message(m).await {
647 Ok(_) => self.metrics.record_processed(),
648 Err(e) => {
649 tracing::error!("[{}] {e}", self.config.name);
650 self.metrics.record_failed();
651 }
652 }
653 }
654 }
655 }
656 _ = poll.tick() => {
657 if let Err(err) = self.sync_agents_graph().await {
658 tracing::warn!("[ha-state-bridge] agent graph sync failed: {err}");
659 }
660 match self.sync_once(false).await {
661 Ok(()) => self.last_error.clear(),
662 Err(err) => {
663 self.last_error = err.to_string();
664 tracing::warn!("[ha-state-bridge] sync failed: {err}");
665 }
666 }
667 }
668 _ = hb.tick() => {
669 self.metrics.record_heartbeat();
670 if let Err(e) = self.on_heartbeat().await {
671 tracing::error!("[{}] heartbeat: {e}", self.config.name);
672 }
673 }
674 }
675 }
676 self.state = ActorState::Stopped;
677 self.on_stop().await
678 }
679}