1use anyhow::Result;
14use clap::Parser;
15use dotenvy::dotenv;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tracing::info;
19
20use wactorz_agents::{
21 CatalogAgent, DynamicAgent, FusekiAgent, HomeAssistantActuatorAgent, HomeAssistantAgent,
22 HomeAssistantStateBridgeAgent, IOAgent, InstallerAgent, LlmConfig, LlmProvider, MainActor,
23 ManualAgent, MonitorAgent, WeatherAgent,
24};
25use wactorz_core::{ActorConfig, ActorSystem, EventPublisher, Supervisor, SupervisorStrategy};
26use wactorz_interfaces::ws::WsEnvelope;
27use wactorz_interfaces::{RestServer, RuntimeConfig, WsBridge};
28use wactorz_mqtt::{MqttClient, MqttConfig};
29
30#[derive(Debug, Parser)]
32#[command(name = "wactorz", version, about)]
33pub struct Args {
34 #[arg(long, default_value = "localhost", env = "MQTT_HOST")]
36 pub mqtt_host: String,
37
38 #[arg(long, default_value_t = 1883, env = "MQTT_PORT")]
40 pub mqtt_port: u16,
41
42 #[arg(long, default_value = "127.0.0.1:8080", env = "API_ADDR")]
44 pub api_addr: SocketAddr,
45
46 #[arg(long, default_value = "127.0.0.1:8081", env = "WS_ADDR")]
48 pub ws_addr: SocketAddr,
49
50 #[arg(long, default_value = "anthropic", env = "LLM_PROVIDER")]
52 pub llm_provider: String,
53
54 #[arg(long, default_value = "claude-sonnet-4-6", env = "LLM_MODEL")]
56 pub llm_model: String,
57
58 #[arg(long, env = "LLM_API_KEY")]
60 pub llm_api_key: Option<String>,
61
62 #[arg(long, env = "NIM_MODEL")]
65 pub nim_model: Option<String>,
66
67 #[arg(long, default_value_t = 9001, env = "MQTT_WS_PORT")]
69 pub mqtt_ws_port: u16,
70
71 #[arg(long, default_value = "", env = "HA_URL")]
73 pub ha_url: String,
74
75 #[arg(long, default_value = "", env = "HA_TOKEN")]
77 pub ha_token: String,
78
79 #[arg(long, default_value = "", env = "FUSEKI_URL")]
81 pub fuseki_url: String,
82
83 #[arg(long, default_value = "", env = "FUSEKI_DATASET")]
85 pub fuseki_dataset: String,
86
87 #[arg(long, default_value = "", env = "FUSEKI_USER")]
89 pub fuseki_user: String,
90
91 #[arg(long, default_value = "", env = "FUSEKI_PASSWORD")]
93 pub fuseki_password: String,
94
95 #[arg(long, default_value = "", env = "WEATHER_DEFAULT_LOCATION")]
97 pub weather_default_location: String,
98
99 #[arg(long, default_value_t = false, env = "NO_CLI")]
101 pub no_cli: bool,
102
103 #[arg(long, default_value = "", env = "DISCORD_TOKEN")]
105 pub discord_token: String,
106
107 #[arg(long, default_value = "", env = "TELEGRAM_TOKEN")]
109 pub telegram_token: String,
110
111 #[arg(long, default_value = "", env = "TELEGRAM_ALLOWED_USER_ID")]
113 pub telegram_allowed_user_id: String,
114
115 #[arg(long, default_value = "", env = "TWILIO_ACCOUNT_SID")]
117 pub twilio_account_sid: String,
118
119 #[arg(long, default_value = "", env = "TWILIO_AUTH_TOKEN")]
121 pub twilio_auth_token: String,
122
123 #[arg(long, default_value = "", env = "TWILIO_WHATSAPP_NUMBER")]
125 pub twilio_whatsapp_number: String,
126
127 #[arg(long, default_value = "", env = "API_KEY")]
129 pub api_key: String,
130
131 #[arg(long, default_value = "static/app", env = "STATIC_DIR")]
133 pub static_dir: String,
134
135 #[arg(long, default_value = "ha/state", env = "HA_STATE_BRIDGE_OUTPUT_TOPIC")]
137 pub ha_state_bridge_topic: String,
138
139 #[arg(long, default_value = "", env = "HA_STATE_BRIDGE_DOMAINS")]
141 pub ha_state_bridge_domains: String,
142}
143
144fn normalize_fuseki_endpoint(url: &str, dataset: &str) -> (String, String) {
145 let mut base = url.trim().trim_end_matches('/').to_string();
146 let mut ds = dataset.trim().trim_matches('/').to_string();
147
148 if base.is_empty() {
149 return (base, ds);
150 }
151
152 let split_idx = base.find("://").map(|idx| idx + 3).unwrap_or(0);
153 let path_start = base[split_idx..].find('/').map(|idx| split_idx + idx);
154
155 if let Some(path_start) = path_start {
156 let host = &base[..path_start];
157 let path = &base[path_start..];
158 let segments: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
159 if let Some(last) = segments.last() {
160 if ds.is_empty() {
161 ds = (*last).to_string();
162 let parent = &segments[..segments.len().saturating_sub(1)];
163 base = if parent.is_empty() {
164 host.to_string()
165 } else {
166 format!("{}/{}", host, parent.join("/"))
167 };
168 } else if *last == ds {
169 let parent = &segments[..segments.len().saturating_sub(1)];
170 base = if parent.is_empty() {
171 host.to_string()
172 } else {
173 format!("{}/{}", host, parent.join("/"))
174 };
175 }
176 }
177 }
178
179 (base, ds)
180}
181
182#[tokio::main]
183async fn main() -> Result<()> {
184 match dotenv() {
185 Ok(path) => info!("Loaded .env from {}", path.display()),
186 Err(err) => info!("No .env loaded ({err})"),
187 }
188 tracing_subscriber::fmt()
190 .with_env_filter(
191 tracing_subscriber::EnvFilter::try_from_default_env()
192 .unwrap_or_else(|_| "wactorz=info,tower_http=debug".into()),
193 )
194 .init();
195
196 let args = Args::parse();
197 info!("Starting AgentFlow server");
198 let (fuseki_url, fuseki_dataset) =
199 normalize_fuseki_endpoint(&args.fuseki_url, &args.fuseki_dataset);
200 if !fuseki_url.is_empty() || !fuseki_dataset.is_empty() {
201 info!(
202 "Fuseki config normalized to base='{}' dataset='{}'",
203 fuseki_url, fuseki_dataset
204 );
205 }
206
207 let (publisher, mut pub_rx) = EventPublisher::channel();
209
210 let system = ActorSystem::with_publisher(publisher.clone());
212
213 let mqtt_config = MqttConfig {
215 host: args.mqtt_host.clone(),
216 port: args.mqtt_port,
217 client_id: "wactorz-server".into(),
218 ..Default::default()
219 };
220
221 let (mqtt_client, mut event_loop) = MqttClient::new(mqtt_config)?;
222 let mqtt_client = Arc::new(mqtt_client);
223
224 let (ws_tx, _) = tokio::sync::broadcast::channel::<WsEnvelope>(100);
226 let ws_tx_for_mqtt = ws_tx.clone();
227
228 let registry_for_route = system.registry.clone();
230 let registry_for_qa = system.registry.clone();
231 let registry_for_wik = system.registry.clone();
233 let registry_for_switch = system.registry.clone();
234
235 tokio::spawn(async move {
237 MqttClient::run_event_loop(&mut event_loop, move |evt| {
238 if let wactorz_mqtt::MqttEvent::Incoming { topic, payload } = evt {
239 tracing::debug!("MQTT in: {topic}");
240 if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&payload) {
241 let envelope = WsEnvelope {
243 topic: topic.clone(),
244 payload: json_val.clone(),
245 };
246 let _ = ws_tx_for_mqtt.send(envelope);
247
248 if topic == wactorz_mqtt::topics::SYSTEM_LLM_ERROR {
250 let reg = registry_for_wik.clone();
251 let payload_str = serde_json::to_string(&json_val).unwrap_or_default();
252 tokio::spawn(async move {
253 if let Some(entry) = reg.get_by_name("wik-agent").await {
254 let msg = wactorz_core::Message::text(
255 Some("system".to_string()),
256 Some(entry.id.clone()),
257 payload_str,
258 );
259 let _ = reg.send(&entry.id, msg).await;
260 }
261 });
262 }
263
264 if topic == wactorz_mqtt::topics::SYSTEM_LLM_SWITCH {
266 let reg = registry_for_switch.clone();
267 let switch_payload = json_val.clone();
268 tokio::spawn(async move {
269 if let Some(entry) = reg.get_by_name("main-actor").await {
270 let msg = wactorz_core::Message::new(
271 Some("wik-agent".to_string()),
272 Some(entry.id.clone()),
273 wactorz_core::message::MessageType::Task {
274 task_id: "wik/switch".to_string(),
275 description: "LLM provider switch".to_string(),
276 payload: switch_payload,
277 },
278 );
279 let _ = reg.send(&entry.id, msg).await;
280 }
281 });
282 }
283
284 if topic.ends_with("/chat") {
286 let reg_qa = registry_for_qa.clone();
287 let qa_content = serde_json::to_string(&json_val).unwrap_or_default();
288 tokio::spawn(async move {
289 if let Some(entry) = reg_qa.get_by_name("qa-agent").await {
290 let msg = wactorz_core::Message::text(
291 Some("mqtt-router".to_string()),
292 Some(entry.id.clone()),
293 qa_content,
294 );
295 let _ = reg_qa.send(&entry.id, msg).await;
296 }
297 });
298 }
299
300 if topic.ends_with("/chat") {
302 let from = json_val.get("from").and_then(|v| v.as_str()).unwrap_or("");
303 let content = json_val
304 .get("content")
305 .and_then(|v| v.as_str())
306 .unwrap_or("")
307 .to_string();
308
309 if !content.is_empty() && (from == "user" || from.is_empty()) {
310 if topic == wactorz_mqtt::topics::IO_CHAT {
311 let reg = registry_for_route.clone();
313 tokio::spawn(async move {
314 if let Some(entry) = reg.get_by_name("io-agent").await {
315 let msg = wactorz_core::Message::text(
316 Some("user".to_string()),
317 Some(entry.id.clone()),
318 content,
319 );
320 let _ = reg.send(&entry.id, msg).await;
321 }
322 });
323 } else if let Some(actor_id) = topic
324 .strip_prefix("agents/")
325 .and_then(|s| s.strip_suffix("/chat"))
326 {
327 let reg = registry_for_route.clone();
329 let id = actor_id.to_string();
330 tokio::spawn(async move {
331 let msg = wactorz_core::Message::text(
332 Some("user".to_string()),
333 Some(id.clone()),
334 content,
335 );
336 let _ = reg.send(&id, msg).await;
337 });
338 }
339 }
340 }
341 }
342 }
343 })
344 .await;
345 });
346
347 if let Err(e) = mqtt_client.subscribe("agents/#").await {
349 tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
350 }
351 if let Err(e) = mqtt_client.subscribe("system/#").await {
352 tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
353 }
354 if let Err(e) = mqtt_client.subscribe(wactorz_mqtt::topics::IO_CHAT).await {
355 tracing::warn!("MQTT subscribe io/chat failed: {e}");
356 }
357 if let Err(e) = mqtt_client.subscribe("system/llm/#").await {
358 tracing::warn!("MQTT subscribe system/llm/# failed: {e}");
359 }
360 if let Err(e) = mqtt_client.subscribe("nodes/#").await {
361 tracing::warn!("MQTT subscribe nodes/# failed: {e}");
362 }
363
364 let mqtt_for_bridge = Arc::clone(&mqtt_client);
366 tokio::spawn(async move {
367 while let Some((topic, payload)) = pub_rx.recv().await {
368 if let Err(e) = mqtt_for_bridge.publish_raw(&topic, payload).await {
369 tracing::error!("MQTT publish error: {e}");
370 }
371 }
372 });
373
374 let (llm_provider, llm_model) = if let Some(nim_model) = &args.nim_model {
376 (LlmProvider::Nim, nim_model.clone())
377 } else {
378 let p = match args.llm_provider.as_str() {
379 "openai" => LlmProvider::OpenAI,
380 "ollama" => LlmProvider::Ollama,
381 "gemini" => LlmProvider::Gemini,
382 "nim" => LlmProvider::Nim,
383 _ => LlmProvider::Anthropic,
384 };
385 (p, args.llm_model.clone())
386 };
387 let llm_config = LlmConfig {
388 provider: llm_provider,
389 model: llm_model,
390 api_key: args.llm_api_key.clone(),
391 ..Default::default()
392 };
393
394 let mut sup = Supervisor::new(system.clone());
402
403 {
404 let lc = llm_config.clone();
405 let sys = system.clone();
406 let pub_ = publisher.clone();
407 sup.supervise(
408 "main-actor",
409 Arc::new(move || {
410 let c = ActorConfig::new_with_node("main-actor", "alpha").protected();
411 Box::new(MainActor::new(c, lc.clone(), sys.clone()).with_publisher(pub_.clone()))
412 }),
413 SupervisorStrategy::OneForOne,
414 10,
415 60.0,
416 2.0,
417 );
418 }
419 {
420 let sys = system.clone();
421 let pub_ = publisher.clone();
422 sup.supervise(
423 "monitor-agent",
424 Arc::new(move || {
425 let c = ActorConfig::new_with_node("monitor-agent", "bravo").protected();
426 Box::new(MonitorAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
427 }),
428 SupervisorStrategy::OneForOne,
429 10,
430 60.0,
431 1.0,
432 );
433 }
434 {
435 let sys = system.clone();
436 let pub_ = publisher.clone();
437 sup.supervise(
438 "io-agent",
439 Arc::new(move || {
440 let c = ActorConfig::new_with_node("io-agent", "charlie");
441 Box::new(IOAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
442 }),
443 SupervisorStrategy::OneForOne,
444 10,
445 60.0,
446 1.0,
447 );
448 }
449 {
450 let pub_ = publisher.clone();
451 sup.supervise(
452 "installer-agent",
453 Arc::new(move || {
454 let c = ActorConfig::new_with_node("installer-agent", "delta");
455 Box::new(InstallerAgent::new(c).with_publisher(pub_.clone()))
456 }),
457 SupervisorStrategy::OneForOne,
458 5,
459 60.0,
460 2.0,
461 );
462 }
463 {
464 let pub_ = publisher.clone();
465 sup.supervise(
466 "code-agent",
467 Arc::new(move || {
468 let c = ActorConfig::new_with_node("code-agent", "echo");
469 Box::new(DynamicAgent::new(c, "").with_publisher(pub_.clone()))
470 }),
471 SupervisorStrategy::OneForOne,
472 5,
473 60.0,
474 1.0,
475 );
476 }
477 {
478 let lc = llm_config.clone();
479 let pub_ = publisher.clone();
480 sup.supervise(
481 "manual-agent",
482 Arc::new(move || {
483 let c = ActorConfig::new_with_node("manual-agent", "foxtrot");
484 Box::new(ManualAgent::new(c, lc.clone()).with_publisher(pub_.clone()))
485 }),
486 SupervisorStrategy::OneForOne,
487 5,
488 60.0,
489 1.0,
490 );
491 }
492 {
493 let pub_ = publisher.clone();
494 let ha_url = args.ha_url.clone();
495 let ha_token = args.ha_token.clone();
496 sup.supervise(
497 "home-assistant-agent",
498 Arc::new(move || {
499 let c = ActorConfig::new_with_node("home-assistant-agent", "golf");
500 Box::new(
501 HomeAssistantAgent::new(c)
502 .with_ha_config(ha_url.clone(), ha_token.clone())
503 .with_publisher(pub_.clone()),
504 )
505 }),
506 SupervisorStrategy::OneForOne,
507 5,
508 60.0,
509 2.0,
510 );
511 }
512 {
513 let pub_ = publisher.clone();
514 let weather_location = args.weather_default_location.clone();
515 sup.supervise(
516 "weather-agent",
517 Arc::new(move || {
518 let c = ActorConfig::new_with_node("weather-agent", "hotel");
519 Box::new(
520 WeatherAgent::new(c)
521 .with_default_location(weather_location.clone())
522 .with_publisher(pub_.clone()),
523 )
524 }),
525 SupervisorStrategy::OneForOne,
526 5,
527 60.0,
528 1.0,
529 );
530 }
531 {
532 let pub_ = publisher.clone();
533 let fuseki_url = fuseki_url.clone();
534 let fuseki_dataset = fuseki_dataset.clone();
535 let fuseki_user = args.fuseki_user.clone();
536 let fuseki_password = args.fuseki_password.clone();
537 sup.supervise(
538 "fuseki-agent",
539 Arc::new(move || {
540 let c = ActorConfig::new_with_node("fuseki-agent", "india");
541 Box::new(
542 FusekiAgent::new(c)
543 .with_fuseki_config(fuseki_url.clone(), fuseki_dataset.clone())
544 .with_fuseki_auth(fuseki_user.clone(), fuseki_password.clone())
545 .with_publisher(pub_.clone()),
546 )
547 }),
548 SupervisorStrategy::OneForOne,
549 5,
550 60.0,
551 2.0,
552 );
553 }
554
555 {
556 let pub_ = publisher.clone();
557 sup.supervise(
558 "catalog",
559 Arc::new(move || {
560 let c = ActorConfig::new_with_node("catalog", "juliet").protected();
561 Box::new(CatalogAgent::new(c).with_publisher(pub_.clone()))
562 }),
563 SupervisorStrategy::OneForOne,
564 5,
565 60.0,
566 1.0,
567 );
568 }
569
570 {
571 let pub_ = publisher.clone();
572 let ha_url = args.ha_url.clone();
573 let ha_token = args.ha_token.clone();
574 sup.supervise(
575 "ha-actuator",
576 Arc::new(move || {
577 let c = ActorConfig::new_with_node("ha-actuator", "kilo");
578 Box::new(
579 HomeAssistantActuatorAgent::new(c)
580 .with_ha_config(ha_url.clone(), ha_token.clone())
581 .with_publisher(pub_.clone()),
582 )
583 }),
584 SupervisorStrategy::OneForOne,
585 5,
586 60.0,
587 2.0,
588 );
589 }
590
591 {
592 let pub_ = publisher.clone();
593 let sys = system.clone();
594 let ha_url = args.ha_url.clone();
595 let ha_token = args.ha_token.clone();
596 let fuseki_url = fuseki_url.clone();
597 let fuseki_dataset = fuseki_dataset.clone();
598 let fuseki_user = args.fuseki_user.clone();
599 let fuseki_password = args.fuseki_password.clone();
600 let output_topic = args.ha_state_bridge_topic.clone();
601 let domains: Vec<String> = args
602 .ha_state_bridge_domains
603 .split(',')
604 .map(|s| s.trim().to_string())
605 .filter(|s| !s.is_empty())
606 .collect();
607 sup.supervise(
608 "ha-state-bridge",
609 Arc::new(move || {
610 let c = ActorConfig::new_with_node("ha-state-bridge", "lima");
611 Box::new(
612 HomeAssistantStateBridgeAgent::new(c)
613 .with_system(sys.clone())
614 .with_ha_config(
615 ha_url.clone(),
616 ha_token.clone(),
617 output_topic.clone(),
618 domains.clone(),
619 )
620 .with_fuseki_config(fuseki_url.clone(), fuseki_dataset.clone())
621 .with_fuseki_auth(fuseki_user.clone(), fuseki_password.clone())
622 .with_publisher(pub_.clone()),
623 )
624 }),
625 SupervisorStrategy::OneForOne,
626 5,
627 60.0,
628 2.0,
629 );
630 }
631
632 sup.start().await?;
633 info!(
634 "Supervisor started — 12 agents (main, monitor, io, installer, code, manual, home-assistant, weather, fuseki, catalog, ha-actuator, ha-state-bridge)"
635 );
636
637 let rest_addr: SocketAddr = args.api_addr;
639 let system_for_rest = system.clone();
640 let static_dir = args.static_dir.clone();
641 let runtime_cfg = RuntimeConfig {
642 ha_url: args.ha_url.clone(),
643 ha_token: args.ha_token.clone(),
644 fuseki_url: fuseki_url.clone(),
645 fuseki_dataset: fuseki_dataset.clone(),
646 fuseki_user: args.fuseki_user.clone(),
647 fuseki_password: args.fuseki_password.clone(),
648 weather_default_location: args.weather_default_location.clone(),
649 mqtt_host: args.mqtt_host.clone(),
650 mqtt_port: args.mqtt_port,
651 mqtt_ws_port: args.mqtt_ws_port,
652 llm_provider: args.llm_provider.clone(),
653 llm_model: args.llm_model.clone(),
654 };
655 let ws_bridge = WsBridge::new(
659 ws_tx,
660 mqtt_client,
661 system.clone(),
662 args.mqtt_host,
663 args.mqtt_ws_port,
664 );
665 tokio::spawn(async move {
666 let server = RestServer::new(system_for_rest, rest_addr, runtime_cfg, static_dir)
667 .with_ws(ws_bridge.router());
668 if let Err(e) = server.serve().await {
669 tracing::error!("REST+WS error: {e}");
670 }
671 });
672
673 if !args.no_cli {
675 tokio::spawn(wactorz_interfaces::cli::run_cli(system.clone()));
676 }
677
678 tokio::signal::ctrl_c().await?;
680 info!("Received Ctrl-C, shutting down…");
681 sup.stop().await;
682 system.shutdown().await?;
683 info!("Goodbye.");
684 Ok(())
685}