1use anyhow::Result;
14use clap::Parser;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use tracing::info;
18
19use wactorz_agents::{
20 DynamicAgent, HomeAssistantAgent, IOAgent, InstallerAgent, LlmConfig, LlmProvider, MainActor,
21 ManualAgent, MonitorAgent,
22};
23use wactorz_core::{ActorConfig, ActorSystem, EventPublisher, Supervisor, SupervisorStrategy};
24use wactorz_interfaces::ws::WsEnvelope;
25use wactorz_interfaces::{RestServer, WsBridge};
26use wactorz_mqtt::{MqttClient, MqttConfig};
27
28#[derive(Debug, Parser)]
30#[command(name = "wactorz", version, about)]
31pub struct Args {
32 #[arg(long, default_value = "localhost", env = "MQTT_HOST")]
34 pub mqtt_host: String,
35
36 #[arg(long, default_value_t = 1883, env = "MQTT_PORT")]
38 pub mqtt_port: u16,
39
40 #[arg(long, default_value = "127.0.0.1:8080", env = "API_ADDR")]
42 pub api_addr: SocketAddr,
43
44 #[arg(long, default_value = "127.0.0.1:8081", env = "WS_ADDR")]
46 pub ws_addr: SocketAddr,
47
48 #[arg(long, default_value = "anthropic", env = "LLM_PROVIDER")]
50 pub llm_provider: String,
51
52 #[arg(long, default_value = "claude-sonnet-4-6", env = "LLM_MODEL")]
54 pub llm_model: String,
55
56 #[arg(long, env = "LLM_API_KEY")]
58 pub llm_api_key: Option<String>,
59
60 #[arg(long, env = "NIM_MODEL")]
63 pub nim_model: Option<String>,
64
65 #[arg(long, default_value_t = false, env = "NO_CLI")]
67 pub no_cli: bool,
68}
69
70#[tokio::main]
71async fn main() -> Result<()> {
72 tracing_subscriber::fmt()
74 .with_env_filter(
75 tracing_subscriber::EnvFilter::try_from_default_env()
76 .unwrap_or_else(|_| "wactorz=info,tower_http=debug".into()),
77 )
78 .init();
79
80 let args = Args::parse();
81 info!("Starting AgentFlow server");
82
83 let (publisher, mut pub_rx) = EventPublisher::channel();
85
86 let system = ActorSystem::with_publisher(publisher.clone());
88
89 let mqtt_config = MqttConfig {
91 host: args.mqtt_host.clone(),
92 port: args.mqtt_port,
93 client_id: "wactorz-server".into(),
94 ..Default::default()
95 };
96
97 let (mqtt_client, mut event_loop) = MqttClient::new(mqtt_config)?;
98 let mqtt_client = Arc::new(mqtt_client);
99
100 let (ws_tx, _) = tokio::sync::broadcast::channel::<WsEnvelope>(100);
102 let ws_tx_for_mqtt = ws_tx.clone();
103
104 let registry_for_route = system.registry.clone();
106 let registry_for_qa = system.registry.clone();
107 let registry_for_wik = system.registry.clone();
109 let registry_for_switch = system.registry.clone();
110
111 tokio::spawn(async move {
113 MqttClient::run_event_loop(&mut event_loop, move |evt| {
114 if let wactorz_mqtt::MqttEvent::Incoming { topic, payload } = evt {
115 tracing::debug!("MQTT in: {topic}");
116 if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&payload) {
117 let envelope = WsEnvelope {
119 topic: topic.clone(),
120 payload: json_val.clone(),
121 };
122 let _ = ws_tx_for_mqtt.send(envelope);
123
124 if topic == wactorz_mqtt::topics::SYSTEM_LLM_ERROR {
126 let reg = registry_for_wik.clone();
127 let payload_str = serde_json::to_string(&json_val).unwrap_or_default();
128 tokio::spawn(async move {
129 if let Some(entry) = reg.get_by_name("wik-agent").await {
130 let msg = wactorz_core::Message::text(
131 Some("system".to_string()),
132 Some(entry.id.clone()),
133 payload_str,
134 );
135 let _ = reg.send(&entry.id, msg).await;
136 }
137 });
138 }
139
140 if topic == wactorz_mqtt::topics::SYSTEM_LLM_SWITCH {
142 let reg = registry_for_switch.clone();
143 let switch_payload = json_val.clone();
144 tokio::spawn(async move {
145 if let Some(entry) = reg.get_by_name("main-actor").await {
146 let msg = wactorz_core::Message::new(
147 Some("wik-agent".to_string()),
148 Some(entry.id.clone()),
149 wactorz_core::message::MessageType::Task {
150 task_id: "wik/switch".to_string(),
151 description: "LLM provider switch".to_string(),
152 payload: switch_payload,
153 },
154 );
155 let _ = reg.send(&entry.id, msg).await;
156 }
157 });
158 }
159
160 if topic.ends_with("/chat") {
162 let reg_qa = registry_for_qa.clone();
163 let qa_content = serde_json::to_string(&json_val).unwrap_or_default();
164 tokio::spawn(async move {
165 if let Some(entry) = reg_qa.get_by_name("qa-agent").await {
166 let msg = wactorz_core::Message::text(
167 Some("mqtt-router".to_string()),
168 Some(entry.id.clone()),
169 qa_content,
170 );
171 let _ = reg_qa.send(&entry.id, msg).await;
172 }
173 });
174 }
175
176 if topic.ends_with("/chat") {
178 let from = json_val.get("from").and_then(|v| v.as_str()).unwrap_or("");
179 let content = json_val
180 .get("content")
181 .and_then(|v| v.as_str())
182 .unwrap_or("")
183 .to_string();
184
185 if !content.is_empty() && (from == "user" || from.is_empty()) {
186 if topic == wactorz_mqtt::topics::IO_CHAT {
187 let reg = registry_for_route.clone();
189 tokio::spawn(async move {
190 if let Some(entry) = reg.get_by_name("io-agent").await {
191 let msg = wactorz_core::Message::text(
192 Some("user".to_string()),
193 Some(entry.id.clone()),
194 content,
195 );
196 let _ = reg.send(&entry.id, msg).await;
197 }
198 });
199 } else if let Some(actor_id) = topic
200 .strip_prefix("agents/")
201 .and_then(|s| s.strip_suffix("/chat"))
202 {
203 let reg = registry_for_route.clone();
205 let id = actor_id.to_string();
206 tokio::spawn(async move {
207 let msg = wactorz_core::Message::text(
208 Some("user".to_string()),
209 Some(id.clone()),
210 content,
211 );
212 let _ = reg.send(&id, msg).await;
213 });
214 }
215 }
216 }
217 }
218 }
219 })
220 .await;
221 });
222
223 if let Err(e) = mqtt_client.subscribe("agents/#").await {
225 tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
226 }
227 if let Err(e) = mqtt_client.subscribe("system/#").await {
228 tracing::warn!("MQTT subscribe failed (broker may not be running): {e}");
229 }
230 if let Err(e) = mqtt_client.subscribe(wactorz_mqtt::topics::IO_CHAT).await {
231 tracing::warn!("MQTT subscribe io/chat failed: {e}");
232 }
233 if let Err(e) = mqtt_client.subscribe("system/llm/#").await {
234 tracing::warn!("MQTT subscribe system/llm/# failed: {e}");
235 }
236
237 let mqtt_for_bridge = Arc::clone(&mqtt_client);
239 tokio::spawn(async move {
240 while let Some((topic, payload)) = pub_rx.recv().await {
241 if let Err(e) = mqtt_for_bridge.publish_raw(&topic, payload).await {
242 tracing::error!("MQTT publish error: {e}");
243 }
244 }
245 });
246
247 let (llm_provider, llm_model) = if let Some(nim_model) = &args.nim_model {
249 (LlmProvider::Nim, nim_model.clone())
250 } else {
251 let p = match args.llm_provider.as_str() {
252 "openai" => LlmProvider::OpenAI,
253 "ollama" => LlmProvider::Ollama,
254 "gemini" => LlmProvider::Gemini,
255 "nim" => LlmProvider::Nim,
256 _ => LlmProvider::Anthropic,
257 };
258 (p, args.llm_model.clone())
259 };
260 let llm_config = LlmConfig {
261 provider: llm_provider,
262 model: llm_model,
263 api_key: args.llm_api_key.clone(),
264 ..Default::default()
265 };
266
267 let mut sup = Supervisor::new(system.clone());
274
275 {
276 let lc = llm_config.clone();
277 let sys = system.clone();
278 let pub_ = publisher.clone();
279 sup.supervise(
280 "main-actor",
281 Arc::new(move || {
282 let c = ActorConfig::new_with_node("main-actor", "alpha").protected();
283 Box::new(MainActor::new(c, lc.clone(), sys.clone()).with_publisher(pub_.clone()))
284 }),
285 SupervisorStrategy::OneForOne,
286 10,
287 60.0,
288 2.0,
289 );
290 }
291 {
292 let sys = system.clone();
293 let pub_ = publisher.clone();
294 sup.supervise(
295 "monitor-agent",
296 Arc::new(move || {
297 let c = ActorConfig::new_with_node("monitor-agent", "bravo").protected();
298 Box::new(MonitorAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
299 }),
300 SupervisorStrategy::OneForOne,
301 10,
302 60.0,
303 1.0,
304 );
305 }
306 {
307 let sys = system.clone();
308 let pub_ = publisher.clone();
309 sup.supervise(
310 "io-agent",
311 Arc::new(move || {
312 let c = ActorConfig::new_with_node("io-agent", "charlie");
313 Box::new(IOAgent::new(c, sys.clone()).with_publisher(pub_.clone()))
314 }),
315 SupervisorStrategy::OneForOne,
316 10,
317 60.0,
318 1.0,
319 );
320 }
321 {
322 let pub_ = publisher.clone();
323 sup.supervise(
324 "installer-agent",
325 Arc::new(move || {
326 let c = ActorConfig::new_with_node("installer-agent", "delta");
327 Box::new(InstallerAgent::new(c).with_publisher(pub_.clone()))
328 }),
329 SupervisorStrategy::OneForOne,
330 5,
331 60.0,
332 2.0,
333 );
334 }
335 {
336 let pub_ = publisher.clone();
337 sup.supervise(
338 "code-agent",
339 Arc::new(move || {
340 let c = ActorConfig::new_with_node("code-agent", "echo");
341 Box::new(DynamicAgent::new(c, "").with_publisher(pub_.clone()))
342 }),
343 SupervisorStrategy::OneForOne,
344 5,
345 60.0,
346 1.0,
347 );
348 }
349 {
350 let lc = llm_config.clone();
351 let pub_ = publisher.clone();
352 sup.supervise(
353 "manual-agent",
354 Arc::new(move || {
355 let c = ActorConfig::new_with_node("manual-agent", "foxtrot");
356 Box::new(ManualAgent::new(c, lc.clone()).with_publisher(pub_.clone()))
357 }),
358 SupervisorStrategy::OneForOne,
359 5,
360 60.0,
361 1.0,
362 );
363 }
364 {
365 let pub_ = publisher.clone();
366 sup.supervise(
367 "home-assistant-agent",
368 Arc::new(move || {
369 let c = ActorConfig::new_with_node("home-assistant-agent", "golf");
370 Box::new(HomeAssistantAgent::new(c).with_publisher(pub_.clone()))
371 }),
372 SupervisorStrategy::OneForOne,
373 5,
374 60.0,
375 2.0,
376 );
377 }
378
379 sup.start().await?;
380 info!(
381 "Supervisor started — 7 agents (main, monitor, io, installer, code, manual, home-assistant)"
382 );
383
384 let rest_addr: SocketAddr = args.api_addr;
386 let system_for_rest = system.clone();
387 tokio::spawn(async move {
388 let server = RestServer::new(system_for_rest, rest_addr);
389 if let Err(e) = server.serve().await {
390 tracing::error!("REST error: {e}");
391 }
392 });
393
394 let ws_addr: SocketAddr = args.ws_addr;
396 let ws_bridge = WsBridge::new(ws_tx, mqtt_client, args.mqtt_host, args.mqtt_port);
397 tokio::spawn(async move {
398 let router = ws_bridge.router();
399 match tokio::net::TcpListener::bind(ws_addr).await {
400 Ok(listener) => {
401 tracing::info!("WS bridge listening on {ws_addr}");
402 if let Err(e) = axum::serve(listener, router).await {
403 tracing::error!("WS bridge error: {e}");
404 }
405 }
406 Err(e) => tracing::error!("WS bind error: {e}"),
407 }
408 });
409
410 if !args.no_cli {
412 tokio::spawn(wactorz_interfaces::cli::run_cli(system.clone()));
413 }
414
415 tokio::signal::ctrl_c().await?;
417 info!("Received Ctrl-C, shutting down…");
418 sup.stop().await;
419 system.shutdown().await?;
420 info!("Goodbye.");
421 Ok(())
422}