wactorz/
main.rs

1//! AgentFlow server entry point.
2//!
3//! Starts the full system:
4//! 1. Parses CLI arguments (via [`clap`])
5//! 2. Initialises structured logging (via [`tracing_subscriber`])
6//! 3. Creates the [`ActorSystem`]
7//! 4. Connects to MQTT broker
8//! 5. Spawns [`MainActor`] and [`MonitorAgent`]
9//! 6. Starts the REST API + WebSocket bridge
10//! 7. Optionally starts the interactive CLI
11//! 8. Awaits a Ctrl-C signal then shuts down gracefully
12
13use anyhow::Result;
14use clap::Parser;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use tracing::info;
18
19use wactorz_agents::{
20    DynamicAgent, HomeAssistantAgent, IOAgent, InstallerAgent, LlmConfig, LlmProvider, MainActor,
21    ManualAgent, MonitorAgent,
22};
23use wactorz_core::{ActorConfig, ActorSystem, EventPublisher, Supervisor, SupervisorStrategy};
24use wactorz_interfaces::ws::WsEnvelope;
25use wactorz_interfaces::{RestServer, WsBridge};
26use wactorz_mqtt::{MqttClient, MqttConfig};
27
28/// AgentFlow: async multi-agent orchestration framework
29#[derive(Debug, Parser)]
30#[command(name = "wactorz", version, about)]
31pub struct Args {
32    /// MQTT broker host
33    #[arg(long, default_value = "localhost", env = "MQTT_HOST")]
34    pub mqtt_host: String,
35
36    /// MQTT broker port
37    #[arg(long, default_value_t = 1883, env = "MQTT_PORT")]
38    pub mqtt_port: u16,
39
40    /// REST API listen address
41    #[arg(long, default_value = "127.0.0.1:8080", env = "API_ADDR")]
42    pub api_addr: SocketAddr,
43
44    /// WebSocket bridge listen address
45    #[arg(long, default_value = "127.0.0.1:8081", env = "WS_ADDR")]
46    pub ws_addr: SocketAddr,
47
48    /// LLM provider (anthropic | openai | ollama | nim | gemini)
49    #[arg(long, default_value = "anthropic", env = "LLM_PROVIDER")]
50    pub llm_provider: String,
51
52    /// LLM model name
53    #[arg(long, default_value = "claude-sonnet-4-6", env = "LLM_MODEL")]
54    pub llm_model: String,
55
56    /// LLM API key
57    #[arg(long, env = "LLM_API_KEY")]
58    pub llm_api_key: Option<String>,
59
60    /// NVIDIA NIM model (e.g. meta/llama-3.3-70b-instruct).
61    /// Implies --llm-provider nim when set.
62    #[arg(long, env = "NIM_MODEL")]
63    pub nim_model: Option<String>,
64
65    /// Disable interactive CLI (useful for container deployments)
66    #[arg(long, default_value_t = false, env = "NO_CLI")]
67    pub no_cli: bool,
68}
69
70#[tokio::main]
71async fn main() -> Result<()> {
72    // ── Logging ───────────────────────────────────────────────────────────────
73    tracing_subscriber::fmt()
74        .with_env_filter(
75            tracing_subscriber::EnvFilter::try_from_default_env()
76                .unwrap_or_else(|_| "wactorz=info,tower_http=debug".into()),
77        )
78        .init();
79
80    let args = Args::parse();
81    info!("Starting AgentFlow server");
82
83    // ── Publisher channel ─────────────────────────────────────────────────────
84    let (publisher, mut pub_rx) = EventPublisher::channel();
85
86    // ── Actor system ──────────────────────────────────────────────────────────
87    let system = ActorSystem::with_publisher(publisher.clone());
88
89    // ── MQTT ──────────────────────────────────────────────────────────────────
90    let mqtt_config = MqttConfig {
91        host: args.mqtt_host.clone(),
92        port: args.mqtt_port,
93        client_id: "wactorz-server".into(),
94        ..Default::default()
95    };
96
97    let (mqtt_client, mut event_loop) = MqttClient::new(mqtt_config)?;
98    let mqtt_client = Arc::new(mqtt_client);
99
100    // WebSocket broadcast channel
101    let (ws_tx, _) = tokio::sync::broadcast::channel::<WsEnvelope>(100);
102    let ws_tx_for_mqtt = ws_tx.clone();
103
104    // Registry clone for routing inbound chat messages → actor mailboxes
105    let registry_for_route = system.registry.clone();
106    let registry_for_qa = system.registry.clone();
107    // WIK receives system/llm/error; LlmAgent/MainActor receives system/llm/switch
108    let registry_for_wik = system.registry.clone();
109    let registry_for_switch = system.registry.clone();
110
111    // Start MQTT event loop task
112    tokio::spawn(async move {
113        MqttClient::run_event_loop(&mut event_loop, move |evt| {
114            if let wactorz_mqtt::MqttEvent::Incoming { topic, payload } = evt {
115                tracing::debug!("MQTT in: {topic}");
116                if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&payload) {
117                    // Broadcast to WebSocket clients
118                    let envelope = WsEnvelope {
119                        topic: topic.clone(),
120                        payload: json_val.clone(),
121                    };
122                    let _ = ws_tx_for_mqtt.send(envelope);
123
124                    // ── system/llm/error → forward to wik-agent ──────────────
125                    if topic == wactorz_mqtt::topics::SYSTEM_LLM_ERROR {
126                        let reg = registry_for_wik.clone();
127                        let payload_str = serde_json::to_string(&json_val).unwrap_or_default();
128                        tokio::spawn(async move {
129                            if let Some(entry) = reg.get_by_name("wik-agent").await {
130                                let msg = wactorz_core::Message::text(
131                                    Some("system".to_string()),
132                                    Some(entry.id.clone()),
133                                    payload_str,
134                                );
135                                let _ = reg.send(&entry.id, msg).await;
136                            }
137                        });
138                    }
139
140                    // ── system/llm/switch → forward to main-actor as Task ─────
141                    if topic == wactorz_mqtt::topics::SYSTEM_LLM_SWITCH {
142                        let reg = registry_for_switch.clone();
143                        let switch_payload = json_val.clone();
144                        tokio::spawn(async move {
145                            if let Some(entry) = reg.get_by_name("main-actor").await {
146                                let msg = wactorz_core::Message::new(
147                                    Some("wik-agent".to_string()),
148                                    Some(entry.id.clone()),
149                                    wactorz_core::message::MessageType::Task {
150                                        task_id: "wik/switch".to_string(),
151                                        description: "LLM provider switch".to_string(),
152                                        payload: switch_payload,
153                                    },
154                                );
155                                let _ = reg.send(&entry.id, msg).await;
156                            }
157                        });
158                    }
159
160                    // Forward all chat messages to QA agent for passive inspection
161                    if topic.ends_with("/chat") {
162                        let reg_qa = registry_for_qa.clone();
163                        let qa_content = serde_json::to_string(&json_val).unwrap_or_default();
164                        tokio::spawn(async move {
165                            if let Some(entry) = reg_qa.get_by_name("qa-agent").await {
166                                let msg = wactorz_core::Message::text(
167                                    Some("mqtt-router".to_string()),
168                                    Some(entry.id.clone()),
169                                    qa_content,
170                                );
171                                let _ = reg_qa.send(&entry.id, msg).await;
172                            }
173                        });
174                    }
175
176                    // Route agents/{id}/chat from user → actor mailbox
177                    if topic.ends_with("/chat") {
178                        let from = json_val.get("from").and_then(|v| v.as_str()).unwrap_or("");
179                        let content = json_val
180                            .get("content")
181                            .and_then(|v| v.as_str())
182                            .unwrap_or("")
183                            .to_string();
184
185                        if !content.is_empty() && (from == "user" || from.is_empty()) {
186                            if topic == wactorz_mqtt::topics::IO_CHAT {
187                                // io/chat → look up io-agent by name
188                                let reg = registry_for_route.clone();
189                                tokio::spawn(async move {
190                                    if let Some(entry) = reg.get_by_name("io-agent").await {
191                                        let msg = wactorz_core::Message::text(
192                                            Some("user".to_string()),
193                                            Some(entry.id.clone()),
194                                            content,
195                                        );
196                                        let _ = reg.send(&entry.id, msg).await;
197                                    }
198                                });
199                            } else if let Some(actor_id) = topic
200                                .strip_prefix("agents/")
201                                .and_then(|s| s.strip_suffix("/chat"))
202                            {
203                                // agents/{id}/chat → send directly to that actor
204                                let reg = registry_for_route.clone();
205                                let id = actor_id.to_string();
206                                tokio::spawn(async move {
207                                    let msg = wactorz_core::Message::text(
208                                        Some("user".to_string()),
209                                        Some(id.clone()),
210                                        content,
211                                    );
212                                    let _ = reg.send(&id, msg).await;
213                                });
214                            }
215                        }
216                    }
217                }
218            }
219        })
220        .await;
221    });
222
223    // Subscribe to all agent and system topics, plus the IO gateway topic
224    if let Err(e) = mqtt_client.subscribe("agents/#").await {
225        tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
226    }
227    if let Err(e) = mqtt_client.subscribe("system/#").await {
228        tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
229    }
230    if let Err(e) = mqtt_client.subscribe(wactorz_mqtt::topics::IO_CHAT).await {
231        tracing::warn!("MQTT subscribe io/chat failed: {e}");
232    }
233    if let Err(e) = mqtt_client.subscribe("system/llm/#").await {
234        tracing::warn!("MQTT subscribe system/llm/# failed: {e}");
235    }
236
237    // Publisher bridge task: drain pub_rx → MQTT
238    let mqtt_for_bridge = Arc::clone(&mqtt_client);
239    tokio::spawn(async move {
240        while let Some((topic, payload)) = pub_rx.recv().await {
241            if let Err(e) = mqtt_for_bridge.publish_raw(&topic, payload).await {
242                tracing::error!("MQTT publish error: {e}");
243            }
244        }
245    });
246
247    // ── LLM config ────────────────────────────────────────────────────────────
248    let (llm_provider, llm_model) = if let Some(nim_model) = &args.nim_model {
249        (LlmProvider::Nim, nim_model.clone())
250    } else {
251        let p = match args.llm_provider.as_str() {
252            "openai" => LlmProvider::OpenAI,
253            "ollama" => LlmProvider::Ollama,
254            "gemini" => LlmProvider::Gemini,
255            "nim" => LlmProvider::Nim,
256            _ => LlmProvider::Anthropic,
257        };
258        (p, args.llm_model.clone())
259    };
260    let llm_config = LlmConfig {
261        provider: llm_provider,
262        model: llm_model,
263        api_key: args.llm_api_key.clone(),
264        ..Default::default()
265    };
266
267    // ── Supervisor + agents ───────────────────────────────────────────────────.
268    // NATO node names:
269    //   alpha=main-actor  bravo=monitor  charlie=io
270    //   delta=installer   echo=code-agent (DynamicAgent)
271    //   foxtrot=manual    golf=home-assistant
272
273    let mut sup = Supervisor::new(system.clone());
274
275    {
276        let lc = llm_config.clone();
277        let sys = system.clone();
278        let pub_ = publisher.clone();
279        sup.supervise(
280            "main-actor",
281            Arc::new(move || {
282                let c = ActorConfig::new_with_node("main-actor", "alpha").protected();
283                Box::new(MainActor::new(c, lc.clone(), sys.clone()).with_publisher(pub_.clone()))
284            }),
285            SupervisorStrategy::OneForOne,
286            10,
287            60.0,
288            2.0,
289        );
290    }
291    {
292        let sys = system.clone();
293        let pub_ = publisher.clone();
294        sup.supervise(
295            "monitor-agent",
296            Arc::new(move || {
297                let c = ActorConfig::new_with_node("monitor-agent", "bravo").protected();
298                Box::new(MonitorAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
299            }),
300            SupervisorStrategy::OneForOne,
301            10,
302            60.0,
303            1.0,
304        );
305    }
306    {
307        let sys = system.clone();
308        let pub_ = publisher.clone();
309        sup.supervise(
310            "io-agent",
311            Arc::new(move || {
312                let c = ActorConfig::new_with_node("io-agent", "charlie");
313                Box::new(IOAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
314            }),
315            SupervisorStrategy::OneForOne,
316            10,
317            60.0,
318            1.0,
319        );
320    }
321    {
322        let pub_ = publisher.clone();
323        sup.supervise(
324            "installer-agent",
325            Arc::new(move || {
326                let c = ActorConfig::new_with_node("installer-agent", "delta");
327                Box::new(InstallerAgent::new(c).with_publisher(pub_.clone()))
328            }),
329            SupervisorStrategy::OneForOne,
330            5,
331            60.0,
332            2.0,
333        );
334    }
335    {
336        let pub_ = publisher.clone();
337        sup.supervise(
338            "code-agent",
339            Arc::new(move || {
340                let c = ActorConfig::new_with_node("code-agent", "echo");
341                Box::new(DynamicAgent::new(c, "").with_publisher(pub_.clone()))
342            }),
343            SupervisorStrategy::OneForOne,
344            5,
345            60.0,
346            1.0,
347        );
348    }
349    {
350        let lc = llm_config.clone();
351        let pub_ = publisher.clone();
352        sup.supervise(
353            "manual-agent",
354            Arc::new(move || {
355                let c = ActorConfig::new_with_node("manual-agent", "foxtrot");
356                Box::new(ManualAgent::new(c, lc.clone()).with_publisher(pub_.clone()))
357            }),
358            SupervisorStrategy::OneForOne,
359            5,
360            60.0,
361            1.0,
362        );
363    }
364    {
365        let pub_ = publisher.clone();
366        sup.supervise(
367            "home-assistant-agent",
368            Arc::new(move || {
369                let c = ActorConfig::new_with_node("home-assistant-agent", "golf");
370                Box::new(HomeAssistantAgent::new(c).with_publisher(pub_.clone()))
371            }),
372            SupervisorStrategy::OneForOne,
373            5,
374            60.0,
375            2.0,
376        );
377    }
378
379    sup.start().await?;
380    info!(
381        "Supervisor started — 7 agents (main, monitor, io, installer, code, manual, home-assistant)"
382    );
383
384    // ── REST server ───────────────────────────────────────────────────────────
385    let rest_addr: SocketAddr = args.api_addr;
386    let system_for_rest = system.clone();
387    tokio::spawn(async move {
388        let server = RestServer::new(system_for_rest, rest_addr);
389        if let Err(e) = server.serve().await {
390            tracing::error!("REST error: {e}");
391        }
392    });
393
394    // ── WebSocket bridge ──────────────────────────────────────────────────────
395    let ws_addr: SocketAddr = args.ws_addr;
396    let ws_bridge = WsBridge::new(ws_tx, mqtt_client, args.mqtt_host, args.mqtt_port);
397    tokio::spawn(async move {
398        let router = ws_bridge.router();
399        match tokio::net::TcpListener::bind(ws_addr).await {
400            Ok(listener) => {
401                tracing::info!("WS bridge listening on {ws_addr}");
402                if let Err(e) = axum::serve(listener, router).await {
403                    tracing::error!("WS bridge error: {e}");
404                }
405            }
406            Err(e) => tracing::error!("WS bind error: {e}"),
407        }
408    });
409
410    // ── CLI (optional) ────────────────────────────────────────────────────────
411    if !args.no_cli {
412        tokio::spawn(wactorz_interfaces::cli::run_cli(system.clone()));
413    }
414
415    // ── Wait for shutdown ─────────────────────────────────────────────────────
416    tokio::signal::ctrl_c().await?;
417    info!("Received Ctrl-C, shutting down…");
418    sup.stop().await;
419    system.shutdown().await?;
420    info!("Goodbye.");
421    Ok(())
422}