wactorz_agents/
one_off_actuator.rs1use anyhow::Result;
15use async_trait::async_trait;
16use std::sync::Arc;
17use tokio::sync::mpsc;
18
19use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
20
21pub struct OneOffActuatorAgent {
26 config: ActorConfig,
27 state: ActorState,
28 metrics: Arc<ActorMetrics>,
29 mailbox_tx: mpsc::Sender<Message>,
30 mailbox_rx: Option<mpsc::Receiver<Message>>,
31 publisher: Option<EventPublisher>,
32 ha_url: String,
33 ha_token: String,
34 task_id: String,
35 reply_to_id: String,
36}
37
38impl OneOffActuatorAgent {
39 pub fn new(config: ActorConfig, task_id: String, reply_to_id: String) -> Self {
44 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
45 Self {
46 config,
47 state: ActorState::Initializing,
48 metrics: Arc::new(ActorMetrics::new()),
49 mailbox_tx: tx,
50 mailbox_rx: Some(rx),
51 publisher: None,
52 ha_url: String::new(),
53 ha_token: String::new(),
54 task_id,
55 reply_to_id,
56 }
57 }
58
59 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
61 self.publisher = Some(p);
62 self
63 }
64
65 pub fn with_ha_config(mut self, url: String, token: String) -> Self {
67 self.ha_url = url;
68 self.ha_token = token;
69 self
70 }
71}
72
73#[async_trait]
74impl Actor for OneOffActuatorAgent {
75 fn id(&self) -> String {
76 self.config.id.clone()
77 }
78
79 fn name(&self) -> &str {
80 &self.config.name
81 }
82
83 fn state(&self) -> ActorState {
84 self.state.clone()
85 }
86
87 fn metrics(&self) -> Arc<ActorMetrics> {
88 Arc::clone(&self.metrics)
89 }
90
91 fn mailbox(&self) -> mpsc::Sender<Message> {
92 self.mailbox_tx.clone()
93 }
94
95 fn is_protected(&self) -> bool {
96 self.config.protected
97 }
98
99 async fn on_start(&mut self) -> Result<()> {
100 self.state = ActorState::Running;
101 tracing::info!(
102 "[one-off-actuator] OneOffActuator spawned for task {}",
103 self.task_id
104 );
105 Ok(())
106 }
107
108 async fn handle_message(&mut self, message: Message) -> Result<()> {
109 tracing::debug!(
110 "[one-off-actuator] received message from {:?}: {:?}",
111 message.from,
112 message.payload,
113 );
114 let stub_reply = format!(
117 "[stub] OneOffActuator for task {} received request (HA actuation not yet implemented)",
118 self.task_id
119 );
120 if !self.reply_to_id.is_empty() {
121 tracing::info!(
122 "[one-off-actuator] would reply to {} with: {}",
123 self.reply_to_id,
124 stub_reply
125 );
126 }
127 Ok(())
128 }
129
130 async fn run(&mut self) -> Result<()> {
131 self.on_start().await?;
132 let mut rx = self
133 .mailbox_rx
134 .take()
135 .ok_or_else(|| anyhow::anyhow!("OneOffActuatorAgent already running"))?;
136
137 if let Some(msg) = rx.recv().await {
139 self.metrics.record_received();
140 match self.handle_message(msg).await {
141 Ok(_) => self.metrics.record_processed(),
142 Err(e) => {
143 tracing::error!("[{}] {e}", self.config.name);
144 self.metrics.record_failed();
145 }
146 }
147 }
148
149 self.state = ActorState::Stopped;
150 self.on_stop().await
151 }
152}