wactorz_agents/
weather_agent.rs

1//! Weather information agent.
2//!
3//! [`WeatherAgent`] fetches current weather conditions on demand using the
4//! free [wttr.in](https://wttr.in) service — **no API key required**.
5//!
6//! ## Usage (via IO bar)
7//!
8//! ```text
9//! @weather-agent                  → weather for default location (WEATHER_DEFAULT_LOCATION or "London")
10//! @weather-agent Tokyo            → weather for Tokyo
11//! @weather-agent New York         → weather for New York
12//! @weather-agent help             → show usage
13//! ```
14//!
15//! The agent does **not** poll; it only fetches when it receives a message.
16//! It is stoppable and pausable — consumes no resources when idle.
17
18use anyhow::Result;
19use async_trait::async_trait;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25/// Default location used when the user sends `@weather-agent` with no argument.
26const DEFAULT_LOCATION_ENV: &str = "WEATHER_DEFAULT_LOCATION";
27const DEFAULT_LOCATION_FALLBACK: &str = "London";
28
29/// Idle timeout for the reqwest client.
30const HTTP_TIMEOUT_SECS: u64 = 10;
31
32pub struct WeatherAgent {
33    config: ActorConfig,
34    state: ActorState,
35    metrics: Arc<ActorMetrics>,
36    mailbox_tx: mpsc::Sender<Message>,
37    mailbox_rx: Option<mpsc::Receiver<Message>>,
38    publisher: Option<EventPublisher>,
39    http: reqwest::Client,
40    /// Default location used when no argument is given. Falls back to the
41    /// `WEATHER_DEFAULT_LOCATION` env var, then to `"London"`.
42    configured_default_location: Option<String>,
43}
44
45impl WeatherAgent {
46    pub fn new(config: ActorConfig) -> Self {
47        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
48        let http = reqwest::Client::builder()
49            .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
50            .user_agent("AgentFlow-WeatherAgent/1.0")
51            .build()
52            .unwrap_or_default();
53        Self {
54            config,
55            state: ActorState::Initializing,
56            metrics: Arc::new(ActorMetrics::new()),
57            mailbox_tx: tx,
58            mailbox_rx: Some(rx),
59            publisher: None,
60            http,
61            configured_default_location: None,
62        }
63    }
64
65    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
66        self.publisher = Some(p);
67        self
68    }
69
70    /// Override the default weather location instead of relying on the
71    /// `WEATHER_DEFAULT_LOCATION` environment variable.
72    pub fn with_default_location(mut self, location: impl Into<String>) -> Self {
73        let loc = location.into();
74        if !loc.is_empty() {
75            self.configured_default_location = Some(loc);
76        }
77        self
78    }
79
80    fn now_ms() -> u64 {
81        std::time::SystemTime::now()
82            .duration_since(std::time::UNIX_EPOCH)
83            .unwrap_or_default()
84            .as_millis() as u64
85    }
86
87    fn default_location(&self) -> String {
88        if let Some(loc) = &self.configured_default_location {
89            return loc.clone();
90        }
91        std::env::var(DEFAULT_LOCATION_ENV)
92            .unwrap_or_else(|_| DEFAULT_LOCATION_FALLBACK.to_string())
93    }
94
95    fn reply(&self, content: &str) {
96        if let Some(pub_) = &self.publisher {
97            pub_.publish(
98                wactorz_mqtt::topics::chat(&self.config.id),
99                &serde_json::json!({
100                    "from":        self.config.name,
101                    "to":          "user",
102                    "content":     content,
103                    "timestampMs": Self::now_ms(),
104                }),
105            );
106        }
107    }
108
109    async fn fetch_weather(&self, location: &str) -> Result<String> {
110        // wttr.in format=j1 returns JSON with current conditions.
111        let url = format!("https://wttr.in/{}?format=j1", urlencoding(location));
112
113        let resp = self.http.get(&url).send().await?;
114        if !resp.status().is_success() {
115            // Fallback to one-line format on error
116            let url2 = format!("https://wttr.in/{}?format=3", urlencoding(location));
117            let r2 = self.http.get(&url2).send().await?;
118            return Ok(r2.text().await?.trim().to_string());
119        }
120
121        let json: serde_json::Value = resp.json().await?;
122
123        // Parse the JSON response
124        let current = json
125            .get("current_condition")
126            .and_then(|a| a.as_array())
127            .and_then(|a| a.first())
128            .cloned()
129            .unwrap_or_default();
130
131        let desc = current
132            .get("weatherDesc")
133            .and_then(|a| a.as_array())
134            .and_then(|a| a.first())
135            .and_then(|v| v.get("value"))
136            .and_then(|v| v.as_str())
137            .unwrap_or("Unknown");
138
139        let temp_c = current
140            .get("temp_C")
141            .and_then(|v| v.as_str())
142            .unwrap_or("?");
143        let temp_f = current
144            .get("temp_F")
145            .and_then(|v| v.as_str())
146            .unwrap_or("?");
147        let feels_c = current
148            .get("FeelsLikeC")
149            .and_then(|v| v.as_str())
150            .unwrap_or("?");
151        let humidity = current
152            .get("humidity")
153            .and_then(|v| v.as_str())
154            .unwrap_or("?");
155        let wind_kmph = current
156            .get("windspeedKmph")
157            .and_then(|v| v.as_str())
158            .unwrap_or("?");
159        let wind_dir = current
160            .get("winddir16Point")
161            .and_then(|v| v.as_str())
162            .unwrap_or("?");
163        let uv = current
164            .get("uvIndex")
165            .and_then(|v| v.as_str())
166            .unwrap_or("?");
167        let visibility = current
168            .get("visibility")
169            .and_then(|v| v.as_str())
170            .unwrap_or("?");
171
172        // Nearest area name
173        let area = json
174            .get("nearest_area")
175            .and_then(|a| a.as_array())
176            .and_then(|a| a.first())
177            .and_then(|v| v.get("areaName"))
178            .and_then(|a| a.as_array())
179            .and_then(|a| a.first())
180            .and_then(|v| v.get("value"))
181            .and_then(|v| v.as_str())
182            .unwrap_or(location);
183
184        Ok(format!(
185            "**Weather in {area}**\n\n\
186             🌡 **{temp_c}°C / {temp_f}°F** (feels like {feels_c}°C)\n\
187             ☁ {desc}\n\
188             💧 Humidity: {humidity}%\n\
189             💨 Wind: {wind_kmph} km/h {wind_dir}\n\
190             👁 Visibility: {visibility} km\n\
191             ☀ UV index: {uv}\n\n\
192             *Data: [wttr.in](https://wttr.in/{loc})*",
193            loc = urlencoding(location)
194        ))
195    }
196}
197
198/// Minimal URL percent-encoding for location names.
199fn urlencoding(s: &str) -> String {
200    s.chars()
201        .flat_map(|c| match c {
202            ' ' => vec!['+'],
203            c if c.is_alphanumeric() || matches!(c, '-' | '_' | '.' | ',') => vec![c],
204            c => format!("%{:02X}", c as u32).chars().collect(),
205        })
206        .collect()
207}
208
209#[async_trait]
210impl Actor for WeatherAgent {
211    fn id(&self) -> String {
212        self.config.id.clone()
213    }
214    fn name(&self) -> &str {
215        &self.config.name
216    }
217    fn state(&self) -> ActorState {
218        self.state.clone()
219    }
220    fn metrics(&self) -> Arc<ActorMetrics> {
221        Arc::clone(&self.metrics)
222    }
223    fn mailbox(&self) -> mpsc::Sender<Message> {
224        self.mailbox_tx.clone()
225    }
226    fn is_protected(&self) -> bool {
227        self.config.protected
228    }
229
230    async fn on_start(&mut self) -> Result<()> {
231        self.state = ActorState::Running;
232        if let Some(pub_) = &self.publisher {
233            pub_.publish(
234                wactorz_mqtt::topics::spawn(&self.config.id),
235                &serde_json::json!({
236                    "agentId":   self.config.id,
237                    "agentName": self.config.name,
238                    "agentType": "data",
239                    "timestampMs": Self::now_ms(),
240                }),
241            );
242        }
243        Ok(())
244    }
245
246    async fn handle_message(&mut self, message: Message) -> Result<()> {
247        use wactorz_core::message::MessageType;
248
249        let content = match &message.payload {
250            MessageType::Text { content } => content.trim().to_string(),
251            MessageType::Task { description, .. } => description.trim().to_string(),
252            _ => return Ok(()),
253        };
254
255        // Strip @weather-agent prefix if present
256        let arg = content
257            .strip_prefix("@weather-agent")
258            .unwrap_or(&content)
259            .trim()
260            .to_string();
261
262        match arg.to_lowercase().as_str() {
263            "" => {
264                let loc = self.default_location();
265                let typing = format!("🌦 Fetching weather for **{loc}**…");
266                self.reply(&typing);
267                match self.fetch_weather(&loc).await {
268                    Ok(report) => self.reply(&report),
269                    Err(e) => self.reply(&format!("⚠ Could not fetch weather: {e}")),
270                }
271            }
272            "help" => {
273                let default = self.default_location();
274                self.reply(&format!(
275                    "**WeatherAgent** — current conditions via wttr.in (no API key needed)\n\n\
276                     ```\n\
277                     @weather-agent              # {default} (default)\n\
278                     @weather-agent Tokyo\n\
279                     @weather-agent New York\n\
280                     @weather-agent 48.8566,2.3522  # coordinates\n\
281                     ```\n\
282                     Set `WEATHER_DEFAULT_LOCATION` in `.env` to change the default."
283                ));
284            }
285            location => {
286                let typing = format!("🌦 Fetching weather for **{location}**…");
287                self.reply(&typing);
288                match self.fetch_weather(location).await {
289                    Ok(report) => self.reply(&report),
290                    Err(e) => {
291                        self.reply(&format!("⚠ Could not fetch weather for '{location}': {e}"))
292                    }
293                }
294            }
295        }
296
297        Ok(())
298    }
299
300    async fn on_heartbeat(&mut self) -> Result<()> {
301        if let Some(pub_) = &self.publisher {
302            pub_.publish(
303                wactorz_mqtt::topics::heartbeat(&self.config.id),
304                &serde_json::json!({
305                    "agentId":   self.config.id,
306                    "agentName": self.config.name,
307                    "state":     self.state,
308                    "timestampMs": Self::now_ms(),
309                }),
310            );
311        }
312        Ok(())
313    }
314
315    async fn run(&mut self) -> Result<()> {
316        self.on_start().await?;
317        let mut rx = self
318            .mailbox_rx
319            .take()
320            .ok_or_else(|| anyhow::anyhow!("WeatherAgent already running"))?;
321        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
322            self.config.heartbeat_interval_secs,
323        ));
324        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
325        loop {
326            tokio::select! {
327                biased;
328                msg = rx.recv() => match msg {
329                    None => break,
330                    Some(m) => {
331                        self.metrics.record_received();
332                        if let wactorz_core::message::MessageType::Command {
333                            command: wactorz_core::message::ActorCommand::Stop
334                        } = &m.payload { break; }
335                        match self.handle_message(m).await {
336                            Ok(_)  => self.metrics.record_processed(),
337                            Err(e) => {
338                                tracing::error!("[{}] {e}", self.config.name);
339                                self.metrics.record_failed();
340                            }
341                        }
342                    }
343                },
344                _ = hb.tick() => {
345                    self.metrics.record_heartbeat();
346                    if let Err(e) = self.on_heartbeat().await {
347                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
348                    }
349                }
350            }
351        }
352        self.state = ActorState::Stopped;
353        self.on_stop().await
354    }
355}