wactorz_agents/
ha_actuator.rs

1//! Home Assistant actuator agent stub.
2//!
3//! [`HomeAssistantActuatorAgent`] is a reactive MQTT-triggered actuator that
4//! subscribes to one or more MQTT detection topics, optionally evaluates Home
5//! Assistant entity conditions, and calls HA services via a persistent
6//! WebSocket connection.
7//!
8//! In the Python backend one instance is created per external automation:
9//!
10//! ```text
11//! DynamicAgent (sensor) → MQTT topic → HomeAssistantActuatorAgent → HA service call
12//! ```
13//!
14//! # Current status
15//!
16//! **Stub pending HA WebSocket client implementation.**  The agent compiles,
17//! registers in the actor system, and logs every message it receives at `DEBUG`
18//! level.  Full MQTT subscription, condition evaluation, and HA service calls
19//! will be layered on top once the Rust HA WebSocket client is available.
20
21use anyhow::Result;
22use async_trait::async_trait;
23use std::sync::Arc;
24use tokio::sync::mpsc;
25
26use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
27
28/// Reactive actuator that subscribes to MQTT topics and calls HA services.
29///
30/// Not protected — can be stopped by external commands, matching the Python
31/// implementation where each actuator is tied to one automation rule.
32pub struct HomeAssistantActuatorAgent {
33    config: ActorConfig,
34    state: ActorState,
35    metrics: Arc<ActorMetrics>,
36    mailbox_tx: mpsc::Sender<Message>,
37    mailbox_rx: Option<mpsc::Receiver<Message>>,
38    publisher: Option<EventPublisher>,
39    ha_url: String,
40    ha_token: String,
41}
42
43impl HomeAssistantActuatorAgent {
44    /// Create a new [`HomeAssistantActuatorAgent`].
45    ///
46    /// `config.protected` is left as provided (defaults to `false`).
47    pub fn new(config: ActorConfig) -> Self {
48        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
49        Self {
50            config,
51            state: ActorState::Initializing,
52            metrics: Arc::new(ActorMetrics::new()),
53            mailbox_tx: tx,
54            mailbox_rx: Some(rx),
55            publisher: None,
56            ha_url: String::new(),
57            ha_token: String::new(),
58        }
59    }
60
61    /// Attach an [`EventPublisher`] for MQTT output.
62    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
63        self.publisher = Some(p);
64        self
65    }
66
67    /// Override the HA URL and token.
68    pub fn with_ha_config(mut self, url: String, token: String) -> Self {
69        if !url.is_empty() {
70            self.ha_url = url;
71        }
72        if !token.is_empty() {
73            self.ha_token = token;
74        }
75        self
76    }
77}
78
79#[async_trait]
80impl Actor for HomeAssistantActuatorAgent {
81    fn id(&self) -> String {
82        self.config.id.clone()
83    }
84
85    fn name(&self) -> &str {
86        &self.config.name
87    }
88
89    fn state(&self) -> ActorState {
90        self.state.clone()
91    }
92
93    fn metrics(&self) -> Arc<ActorMetrics> {
94        Arc::clone(&self.metrics)
95    }
96
97    fn mailbox(&self) -> mpsc::Sender<Message> {
98        self.mailbox_tx.clone()
99    }
100
101    fn is_protected(&self) -> bool {
102        self.config.protected
103    }
104
105    async fn on_start(&mut self) -> Result<()> {
106        self.state = ActorState::Running;
107        tracing::info!(
108            "[ha-actuator] HomeAssistantActuatorAgent started (stub — HA WebSocket client implementation pending)"
109        );
110        Ok(())
111    }
112
113    async fn handle_message(&mut self, message: Message) -> Result<()> {
114        tracing::debug!(
115            "[ha-actuator] received message from {:?}: {:?}",
116            message.from,
117            message.payload,
118        );
119        // TODO: parse detection payloads, evaluate HA conditions, and call HA
120        // services once the Rust HA WebSocket client is ready.
121        if let Some(pub_) = &self.publisher {
122            pub_.publish(
123                wactorz_mqtt::topics::chat(&self.config.id),
124                &serde_json::json!({
125                    "from":    self.config.name,
126                    "content": "HA actuator not yet implemented",
127                }),
128            );
129        }
130        Ok(())
131    }
132
133    async fn run(&mut self) -> Result<()> {
134        self.on_start().await?;
135        let mut rx = self
136            .mailbox_rx
137            .take()
138            .ok_or_else(|| anyhow::anyhow!("HomeAssistantActuatorAgent already running"))?;
139        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
140            self.config.heartbeat_interval_secs,
141        ));
142        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
143        loop {
144            tokio::select! {
145                biased;
146                msg = rx.recv() => {
147                    match msg {
148                        None => break,
149                        Some(m) => {
150                            self.metrics.record_received();
151                            if let wactorz_core::message::MessageType::Command {
152                                command: wactorz_core::message::ActorCommand::Stop,
153                            } = &m.payload
154                            {
155                                break;
156                            }
157                            match self.handle_message(m).await {
158                                Ok(_) => self.metrics.record_processed(),
159                                Err(e) => {
160                                    tracing::error!("[{}] {e}", self.config.name);
161                                    self.metrics.record_failed();
162                                }
163                            }
164                        }
165                    }
166                }
167                _ = hb.tick() => {
168                    self.metrics.record_heartbeat();
169                    if let Err(e) = self.on_heartbeat().await {
170                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
171                    }
172                }
173            }
174        }
175        self.state = ActorState::Stopped;
176        self.on_stop().await
177    }
178}