wactorz_agents/
ha_actuator.rs1use anyhow::Result;
22use async_trait::async_trait;
23use std::sync::Arc;
24use tokio::sync::mpsc;
25
26use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
27
28pub struct HomeAssistantActuatorAgent {
33 config: ActorConfig,
34 state: ActorState,
35 metrics: Arc<ActorMetrics>,
36 mailbox_tx: mpsc::Sender<Message>,
37 mailbox_rx: Option<mpsc::Receiver<Message>>,
38 publisher: Option<EventPublisher>,
39 ha_url: String,
40 ha_token: String,
41}
42
43impl HomeAssistantActuatorAgent {
44 pub fn new(config: ActorConfig) -> Self {
48 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
49 Self {
50 config,
51 state: ActorState::Initializing,
52 metrics: Arc::new(ActorMetrics::new()),
53 mailbox_tx: tx,
54 mailbox_rx: Some(rx),
55 publisher: None,
56 ha_url: String::new(),
57 ha_token: String::new(),
58 }
59 }
60
61 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
63 self.publisher = Some(p);
64 self
65 }
66
67 pub fn with_ha_config(mut self, url: String, token: String) -> Self {
69 if !url.is_empty() {
70 self.ha_url = url;
71 }
72 if !token.is_empty() {
73 self.ha_token = token;
74 }
75 self
76 }
77}
78
79#[async_trait]
80impl Actor for HomeAssistantActuatorAgent {
81 fn id(&self) -> String {
82 self.config.id.clone()
83 }
84
85 fn name(&self) -> &str {
86 &self.config.name
87 }
88
89 fn state(&self) -> ActorState {
90 self.state.clone()
91 }
92
93 fn metrics(&self) -> Arc<ActorMetrics> {
94 Arc::clone(&self.metrics)
95 }
96
97 fn mailbox(&self) -> mpsc::Sender<Message> {
98 self.mailbox_tx.clone()
99 }
100
101 fn is_protected(&self) -> bool {
102 self.config.protected
103 }
104
105 async fn on_start(&mut self) -> Result<()> {
106 self.state = ActorState::Running;
107 tracing::info!(
108 "[ha-actuator] HomeAssistantActuatorAgent started (stub — HA WebSocket client implementation pending)"
109 );
110 Ok(())
111 }
112
113 async fn handle_message(&mut self, message: Message) -> Result<()> {
114 tracing::debug!(
115 "[ha-actuator] received message from {:?}: {:?}",
116 message.from,
117 message.payload,
118 );
119 if let Some(pub_) = &self.publisher {
122 pub_.publish(
123 wactorz_mqtt::topics::chat(&self.config.id),
124 &serde_json::json!({
125 "from": self.config.name,
126 "content": "HA actuator not yet implemented",
127 }),
128 );
129 }
130 Ok(())
131 }
132
133 async fn run(&mut self) -> Result<()> {
134 self.on_start().await?;
135 let mut rx = self
136 .mailbox_rx
137 .take()
138 .ok_or_else(|| anyhow::anyhow!("HomeAssistantActuatorAgent already running"))?;
139 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
140 self.config.heartbeat_interval_secs,
141 ));
142 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
143 loop {
144 tokio::select! {
145 biased;
146 msg = rx.recv() => {
147 match msg {
148 None => break,
149 Some(m) => {
150 self.metrics.record_received();
151 if let wactorz_core::message::MessageType::Command {
152 command: wactorz_core::message::ActorCommand::Stop,
153 } = &m.payload
154 {
155 break;
156 }
157 match self.handle_message(m).await {
158 Ok(_) => self.metrics.record_processed(),
159 Err(e) => {
160 tracing::error!("[{}] {e}", self.config.name);
161 self.metrics.record_failed();
162 }
163 }
164 }
165 }
166 }
167 _ = hb.tick() => {
168 self.metrics.record_heartbeat();
169 if let Err(e) = self.on_heartbeat().await {
170 tracing::error!("[{}] heartbeat: {e}", self.config.name);
171 }
172 }
173 }
174 }
175 self.state = ActorState::Stopped;
176 self.on_stop().await
177 }
178}