wactorz_agents/
weather_agent.rs1use anyhow::Result;
19use async_trait::async_trait;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25const DEFAULT_LOCATION_ENV: &str = "WEATHER_DEFAULT_LOCATION";
27const DEFAULT_LOCATION_FALLBACK: &str = "London";
28
29const HTTP_TIMEOUT_SECS: u64 = 10;
31
32pub struct WeatherAgent {
33 config: ActorConfig,
34 state: ActorState,
35 metrics: Arc<ActorMetrics>,
36 mailbox_tx: mpsc::Sender<Message>,
37 mailbox_rx: Option<mpsc::Receiver<Message>>,
38 publisher: Option<EventPublisher>,
39 http: reqwest::Client,
40 configured_default_location: Option<String>,
43}
44
45impl WeatherAgent {
46 pub fn new(config: ActorConfig) -> Self {
47 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
48 let http = reqwest::Client::builder()
49 .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
50 .user_agent("AgentFlow-WeatherAgent/1.0")
51 .build()
52 .unwrap_or_default();
53 Self {
54 config,
55 state: ActorState::Initializing,
56 metrics: Arc::new(ActorMetrics::new()),
57 mailbox_tx: tx,
58 mailbox_rx: Some(rx),
59 publisher: None,
60 http,
61 configured_default_location: None,
62 }
63 }
64
65 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
66 self.publisher = Some(p);
67 self
68 }
69
70 pub fn with_default_location(mut self, location: impl Into<String>) -> Self {
73 let loc = location.into();
74 if !loc.is_empty() {
75 self.configured_default_location = Some(loc);
76 }
77 self
78 }
79
80 fn now_ms() -> u64 {
81 std::time::SystemTime::now()
82 .duration_since(std::time::UNIX_EPOCH)
83 .unwrap_or_default()
84 .as_millis() as u64
85 }
86
87 fn default_location(&self) -> String {
88 if let Some(loc) = &self.configured_default_location {
89 return loc.clone();
90 }
91 std::env::var(DEFAULT_LOCATION_ENV)
92 .unwrap_or_else(|_| DEFAULT_LOCATION_FALLBACK.to_string())
93 }
94
95 fn reply(&self, content: &str) {
96 if let Some(pub_) = &self.publisher {
97 pub_.publish(
98 wactorz_mqtt::topics::chat(&self.config.id),
99 &serde_json::json!({
100 "from": self.config.name,
101 "to": "user",
102 "content": content,
103 "timestampMs": Self::now_ms(),
104 }),
105 );
106 }
107 }
108
109 async fn fetch_weather(&self, location: &str) -> Result<String> {
110 let url = format!("https://wttr.in/{}?format=j1", urlencoding(location));
112
113 let resp = self.http.get(&url).send().await?;
114 if !resp.status().is_success() {
115 let url2 = format!("https://wttr.in/{}?format=3", urlencoding(location));
117 let r2 = self.http.get(&url2).send().await?;
118 return Ok(r2.text().await?.trim().to_string());
119 }
120
121 let json: serde_json::Value = resp.json().await?;
122
123 let current = json
125 .get("current_condition")
126 .and_then(|a| a.as_array())
127 .and_then(|a| a.first())
128 .cloned()
129 .unwrap_or_default();
130
131 let desc = current
132 .get("weatherDesc")
133 .and_then(|a| a.as_array())
134 .and_then(|a| a.first())
135 .and_then(|v| v.get("value"))
136 .and_then(|v| v.as_str())
137 .unwrap_or("Unknown");
138
139 let temp_c = current
140 .get("temp_C")
141 .and_then(|v| v.as_str())
142 .unwrap_or("?");
143 let temp_f = current
144 .get("temp_F")
145 .and_then(|v| v.as_str())
146 .unwrap_or("?");
147 let feels_c = current
148 .get("FeelsLikeC")
149 .and_then(|v| v.as_str())
150 .unwrap_or("?");
151 let humidity = current
152 .get("humidity")
153 .and_then(|v| v.as_str())
154 .unwrap_or("?");
155 let wind_kmph = current
156 .get("windspeedKmph")
157 .and_then(|v| v.as_str())
158 .unwrap_or("?");
159 let wind_dir = current
160 .get("winddir16Point")
161 .and_then(|v| v.as_str())
162 .unwrap_or("?");
163 let uv = current
164 .get("uvIndex")
165 .and_then(|v| v.as_str())
166 .unwrap_or("?");
167 let visibility = current
168 .get("visibility")
169 .and_then(|v| v.as_str())
170 .unwrap_or("?");
171
172 let area = json
174 .get("nearest_area")
175 .and_then(|a| a.as_array())
176 .and_then(|a| a.first())
177 .and_then(|v| v.get("areaName"))
178 .and_then(|a| a.as_array())
179 .and_then(|a| a.first())
180 .and_then(|v| v.get("value"))
181 .and_then(|v| v.as_str())
182 .unwrap_or(location);
183
184 Ok(format!(
185 "**Weather in {area}**\n\n\
186 🌡 **{temp_c}°C / {temp_f}°F** (feels like {feels_c}°C)\n\
187 ☁ {desc}\n\
188 💧 Humidity: {humidity}%\n\
189 💨 Wind: {wind_kmph} km/h {wind_dir}\n\
190 👁 Visibility: {visibility} km\n\
191 ☀ UV index: {uv}\n\n\
192 *Data: [wttr.in](https://wttr.in/{loc})*",
193 loc = urlencoding(location)
194 ))
195 }
196}
197
198fn urlencoding(s: &str) -> String {
200 s.chars()
201 .flat_map(|c| match c {
202 ' ' => vec!['+'],
203 c if c.is_alphanumeric() || matches!(c, '-' | '_' | '.' | ',') => vec![c],
204 c => format!("%{:02X}", c as u32).chars().collect(),
205 })
206 .collect()
207}
208
209#[async_trait]
210impl Actor for WeatherAgent {
211 fn id(&self) -> String {
212 self.config.id.clone()
213 }
214 fn name(&self) -> &str {
215 &self.config.name
216 }
217 fn state(&self) -> ActorState {
218 self.state.clone()
219 }
220 fn metrics(&self) -> Arc<ActorMetrics> {
221 Arc::clone(&self.metrics)
222 }
223 fn mailbox(&self) -> mpsc::Sender<Message> {
224 self.mailbox_tx.clone()
225 }
226 fn is_protected(&self) -> bool {
227 self.config.protected
228 }
229
230 async fn on_start(&mut self) -> Result<()> {
231 self.state = ActorState::Running;
232 if let Some(pub_) = &self.publisher {
233 pub_.publish(
234 wactorz_mqtt::topics::spawn(&self.config.id),
235 &serde_json::json!({
236 "agentId": self.config.id,
237 "agentName": self.config.name,
238 "agentType": "data",
239 "timestampMs": Self::now_ms(),
240 }),
241 );
242 }
243 Ok(())
244 }
245
246 async fn handle_message(&mut self, message: Message) -> Result<()> {
247 use wactorz_core::message::MessageType;
248
249 let content = match &message.payload {
250 MessageType::Text { content } => content.trim().to_string(),
251 MessageType::Task { description, .. } => description.trim().to_string(),
252 _ => return Ok(()),
253 };
254
255 let arg = content
257 .strip_prefix("@weather-agent")
258 .unwrap_or(&content)
259 .trim()
260 .to_string();
261
262 match arg.to_lowercase().as_str() {
263 "" => {
264 let loc = self.default_location();
265 let typing = format!("🌦 Fetching weather for **{loc}**…");
266 self.reply(&typing);
267 match self.fetch_weather(&loc).await {
268 Ok(report) => self.reply(&report),
269 Err(e) => self.reply(&format!("⚠ Could not fetch weather: {e}")),
270 }
271 }
272 "help" => {
273 let default = self.default_location();
274 self.reply(&format!(
275 "**WeatherAgent** — current conditions via wttr.in (no API key needed)\n\n\
276 ```\n\
277 @weather-agent # {default} (default)\n\
278 @weather-agent Tokyo\n\
279 @weather-agent New York\n\
280 @weather-agent 48.8566,2.3522 # coordinates\n\
281 ```\n\
282 Set `WEATHER_DEFAULT_LOCATION` in `.env` to change the default."
283 ));
284 }
285 location => {
286 let typing = format!("🌦 Fetching weather for **{location}**…");
287 self.reply(&typing);
288 match self.fetch_weather(location).await {
289 Ok(report) => self.reply(&report),
290 Err(e) => {
291 self.reply(&format!("⚠ Could not fetch weather for '{location}': {e}"))
292 }
293 }
294 }
295 }
296
297 Ok(())
298 }
299
300 async fn on_heartbeat(&mut self) -> Result<()> {
301 if let Some(pub_) = &self.publisher {
302 pub_.publish(
303 wactorz_mqtt::topics::heartbeat(&self.config.id),
304 &serde_json::json!({
305 "agentId": self.config.id,
306 "agentName": self.config.name,
307 "state": self.state,
308 "timestampMs": Self::now_ms(),
309 }),
310 );
311 }
312 Ok(())
313 }
314
315 async fn run(&mut self) -> Result<()> {
316 self.on_start().await?;
317 let mut rx = self
318 .mailbox_rx
319 .take()
320 .ok_or_else(|| anyhow::anyhow!("WeatherAgent already running"))?;
321 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
322 self.config.heartbeat_interval_secs,
323 ));
324 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
325 loop {
326 tokio::select! {
327 biased;
328 msg = rx.recv() => match msg {
329 None => break,
330 Some(m) => {
331 self.metrics.record_received();
332 if let wactorz_core::message::MessageType::Command {
333 command: wactorz_core::message::ActorCommand::Stop
334 } = &m.payload { break; }
335 match self.handle_message(m).await {
336 Ok(_) => self.metrics.record_processed(),
337 Err(e) => {
338 tracing::error!("[{}] {e}", self.config.name);
339 self.metrics.record_failed();
340 }
341 }
342 }
343 },
344 _ = hb.tick() => {
345 self.metrics.record_heartbeat();
346 if let Err(e) = self.on_heartbeat().await {
347 tracing::error!("[{}] heartbeat: {e}", self.config.name);
348 }
349 }
350 }
351 }
352 self.state = ActorState::Stopped;
353 self.on_stop().await
354 }
355}