1use anyhow::Result;
18use axum::{
19 Json, Router,
20 body::Bytes,
21 extract::{Path, State},
22 http::{HeaderMap, StatusCode, header},
23 response::{IntoResponse, Response},
24 routing::{delete, get, post},
25};
26use serde::Deserialize;
27use std::net::SocketAddr;
28use tower_http::cors::CorsLayer;
29use tower_http::services::{ServeDir, ServeFile};
30use tower_http::trace::TraceLayer;
31
32use wactorz_core::ActorSystem;
33use wactorz_core::message::{ActorCommand, Message};
34
35#[derive(Clone, Debug, Default)]
37pub struct RuntimeConfig {
38 pub ha_url: String,
39 pub ha_token: String,
40 pub fuseki_url: String,
41 pub fuseki_dataset: String,
42 pub fuseki_user: String,
43 pub fuseki_password: String,
44 pub weather_default_location: String,
45 pub mqtt_host: String,
46 pub mqtt_port: u16,
47 pub mqtt_ws_port: u16,
48 pub llm_provider: String,
49 pub llm_model: String,
50}
51
52#[derive(Clone)]
54pub struct AppState {
55 pub system: ActorSystem,
56 pub config: RuntimeConfig,
57 pub http: reqwest::Client,
58}
59
60#[derive(Debug, Deserialize)]
62pub struct SendMessageRequest {
63 pub content: String,
64 #[serde(rename = "type", default)]
65 pub message_type: String,
66}
67
68#[derive(Debug, Deserialize)]
70pub struct ChatRequest {
71 pub message: String,
72 pub agent_name: Option<String>,
73}
74
75pub struct RestServer {
77 state: AppState,
78 addr: SocketAddr,
79 static_dir: String,
81 ws_router: Option<axum::Router>,
83}
84
85impl RestServer {
86 pub fn new(
87 system: ActorSystem,
88 addr: SocketAddr,
89 config: RuntimeConfig,
90 static_dir: String,
91 ) -> Self {
92 Self {
93 state: AppState {
94 system,
95 config,
96 http: reqwest::Client::new(),
97 },
98 addr,
99 static_dir,
100 ws_router: None,
101 }
102 }
103
104 pub fn with_ws(mut self, ws_router: axum::Router) -> Self {
106 self.ws_router = Some(ws_router);
107 self
108 }
109
110 pub fn router(&self) -> Router {
112 let index_html = format!("{}/index.html", self.static_dir);
113 let serve_dir = ServeDir::new(&self.static_dir).fallback(ServeFile::new(&index_html));
114
115 let mut r = Router::new()
116 .route("/health", get(health_handler))
117 .route("/actors", get(list_actors_handler))
119 .route("/actors/{id}", get(get_actor_handler))
120 .route("/actors/{id}/message", post(send_message_handler))
121 .route("/actors/{id}", delete(stop_actor_handler))
122 .route("/actors/{id}/pause", post(pause_actor_handler))
123 .route("/actors/{id}/resume", post(resume_actor_handler))
124 .route("/actors/{id}/metrics", get(get_metrics_handler))
125 .route("/chat", post(chat_handler))
126 .route("/api/config", get(config_handler))
128 .route("/api/actors", get(list_actors_handler))
129 .route("/api/actors/{id}", get(get_actor_handler))
130 .route("/api/actors/{id}/message", post(send_message_handler))
131 .route("/api/actors/{id}", delete(stop_actor_handler))
132 .route("/api/actors/{id}/pause", post(pause_actor_handler))
133 .route("/api/actors/{id}/resume", post(resume_actor_handler))
134 .route("/api/actors/{id}/metrics", get(get_metrics_handler))
135 .route("/api/fuseki/{dataset}/sparql", post(fuseki_sparql_handler))
136 .route("/api/fuseki/{dataset}/update", post(fuseki_update_handler))
137 .with_state(self.state.clone());
138
139 if let Some(ws) = &self.ws_router {
142 r = r.merge(ws.clone());
143 }
144
145 r.fallback_service(serve_dir)
146 .layer(CorsLayer::permissive())
147 .layer(TraceLayer::new_for_http())
148 }
149
150 pub async fn serve(self) -> Result<()> {
152 let router = self.router();
153 let listener = tokio::net::TcpListener::bind(self.addr).await?;
154 tracing::info!("REST API listening on {}", self.addr);
155 axum::serve(listener, router).await?;
156 Ok(())
157 }
158}
159
160async fn health_handler() -> impl IntoResponse {
163 Json(serde_json::json!({ "status": "ok" }))
164}
165
166async fn list_actors_handler(State(state): State<AppState>) -> impl IntoResponse {
167 let actors = state.system.registry.list().await;
168 let body: Vec<_> = actors
169 .iter()
170 .map(|e| {
171 serde_json::json!({
172 "id": e.id,
173 "name": e.name,
174 "state": format!("{}", e.state),
175 "protected": e.protected,
176 })
177 })
178 .collect();
179 Json(body)
180}
181
182async fn get_actor_handler(
183 State(state): State<AppState>,
184 Path(id): Path<String>,
185) -> impl IntoResponse {
186 match state.system.registry.get(&id).await {
187 Some(entry) => Json(serde_json::json!({
188 "id": entry.id,
189 "name": entry.name,
190 "state": format!("{}", entry.state),
191 "protected": entry.protected,
192 }))
193 .into_response(),
194 None => (StatusCode::NOT_FOUND, "actor not found").into_response(),
195 }
196}
197
198async fn send_message_handler(
199 State(state): State<AppState>,
200 Path(id): Path<String>,
201 Json(body): Json<SendMessageRequest>,
202) -> axum::response::Response {
203 let msg = Message::text(None, Some(id.clone()), body.content);
204 match state.system.registry.send(&id, msg).await {
205 Ok(_) => (StatusCode::OK, Json(serde_json::json!({"status": "sent"}))).into_response(),
206 Err(e) => (StatusCode::NOT_FOUND, e.to_string()).into_response(),
207 }
208}
209
210async fn stop_actor_handler(
211 State(state): State<AppState>,
212 Path(id): Path<String>,
213) -> axum::response::Response {
214 let entry = match state.system.registry.get(&id).await {
215 Some(e) => e,
216 None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
217 };
218 if entry.protected {
219 return (StatusCode::FORBIDDEN, "actor is protected").into_response();
220 }
221 let msg = Message::command(id.clone(), ActorCommand::Stop);
222 match state.system.registry.send(&id, msg).await {
223 Ok(_) => (StatusCode::OK, "stopping").into_response(),
224 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
225 }
226}
227
228async fn get_metrics_handler(
229 State(state): State<AppState>,
230 Path(id): Path<String>,
231) -> axum::response::Response {
232 match state.system.registry.get(&id).await {
233 Some(e) => Json(e.metrics.snapshot()).into_response(),
234 None => (StatusCode::NOT_FOUND, "actor not found").into_response(),
235 }
236}
237
238async fn pause_actor_handler(
239 State(state): State<AppState>,
240 Path(id): Path<String>,
241) -> axum::response::Response {
242 let entry = match state.system.registry.get(&id).await {
243 Some(e) => e,
244 None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
245 };
246 if entry.protected {
247 return (StatusCode::FORBIDDEN, "actor is protected").into_response();
248 }
249 let msg = Message::command(id.clone(), ActorCommand::Pause);
250 match state.system.registry.send(&id, msg).await {
251 Ok(_) => (
252 StatusCode::OK,
253 Json(serde_json::json!({"status": "pausing"})),
254 )
255 .into_response(),
256 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
257 }
258}
259
260async fn resume_actor_handler(
261 State(state): State<AppState>,
262 Path(id): Path<String>,
263) -> axum::response::Response {
264 let entry = match state.system.registry.get(&id).await {
265 Some(e) => e,
266 None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
267 };
268 if entry.protected {
269 return (StatusCode::FORBIDDEN, "actor is protected").into_response();
270 }
271 let msg = Message::command(id.clone(), ActorCommand::Resume);
272 match state.system.registry.send(&id, msg).await {
273 Ok(_) => (
274 StatusCode::OK,
275 Json(serde_json::json!({"status": "resuming"})),
276 )
277 .into_response(),
278 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
279 }
280}
281
282async fn config_handler(State(state): State<AppState>) -> impl IntoResponse {
283 let c = &state.config;
284 Json(serde_json::json!({
285 "ha": { "url": c.ha_url, "token": c.ha_token },
286 "fuseki": { "url": c.fuseki_url, "dataset": c.fuseki_dataset },
287 "mqtt": {
288 "host": c.mqtt_host,
289 "port": c.mqtt_port,
290 "url": format!("ws://{}:{}", c.mqtt_host, c.mqtt_ws_port),
291 },
292 "llm": { "provider": c.llm_provider, "model": c.llm_model },
293 "weather": { "defaultLocation": c.weather_default_location },
294 }))
295}
296
297async fn fuseki_sparql_handler(
298 State(state): State<AppState>,
299 Path(dataset): Path<String>,
300 headers: HeaderMap,
301 body: Bytes,
302) -> Response {
303 fuseki_proxy_request(state, dataset, "sparql", headers, body).await
304}
305
306async fn fuseki_update_handler(
307 State(state): State<AppState>,
308 Path(dataset): Path<String>,
309 headers: HeaderMap,
310 body: Bytes,
311) -> Response {
312 fuseki_proxy_request(state, dataset, "update", headers, body).await
313}
314
315async fn fuseki_proxy_request(
316 state: AppState,
317 dataset: String,
318 operation: &'static str,
319 headers: HeaderMap,
320 body: Bytes,
321) -> Response {
322 let base = state.config.fuseki_url.trim().trim_end_matches('/');
323 if base.is_empty() {
324 return (
325 StatusCode::SERVICE_UNAVAILABLE,
326 Json(serde_json::json!({
327 "error": "Fuseki is not configured on the Rust server"
328 })),
329 )
330 .into_response();
331 }
332
333 let target = format!("{base}/{dataset}/{operation}");
334 let mut request = state.http.post(&target);
335 tracing::info!(
336 "Fuseki proxy {} dataset={} target={} auth={}",
337 operation,
338 dataset,
339 target,
340 if headers.get(header::AUTHORIZATION).is_some() || !state.config.fuseki_user.is_empty() {
341 "yes"
342 } else {
343 "no"
344 }
345 );
346
347 if let Some(value) = headers.get(header::AUTHORIZATION) {
348 request = request.header(header::AUTHORIZATION, value);
349 } else if !state.config.fuseki_user.is_empty() {
350 request = request.basic_auth(
351 &state.config.fuseki_user,
352 Some(&state.config.fuseki_password),
353 );
354 }
355 if let Some(value) = headers.get(header::ACCEPT) {
356 request = request.header(header::ACCEPT, value);
357 }
358 if let Some(value) = headers.get(header::CONTENT_TYPE) {
359 request = request.header(header::CONTENT_TYPE, value);
360 }
361
362 let upstream = match request.body(body.to_vec()).send().await {
363 Ok(resp) => resp,
364 Err(err) => {
365 tracing::warn!("Fuseki proxy error for {target}: {err}");
366 return (
367 StatusCode::BAD_GATEWAY,
368 Json(serde_json::json!({
369 "error": format!("Fuseki proxy request failed: {err}")
370 })),
371 )
372 .into_response();
373 }
374 };
375
376 let status =
377 StatusCode::from_u16(upstream.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
378 tracing::info!(
379 "Fuseki proxy {} target={} status={}",
380 operation,
381 target,
382 status
383 );
384 let mut response_headers = HeaderMap::new();
385 if let Some(value) = upstream.headers().get(header::CONTENT_TYPE) {
386 response_headers.insert(header::CONTENT_TYPE, value.clone());
387 }
388
389 match upstream.bytes().await {
390 Ok(bytes) => (status, response_headers, bytes).into_response(),
391 Err(err) => {
392 tracing::warn!("Fuseki proxy body read error for {target}: {err}");
393 (
394 StatusCode::BAD_GATEWAY,
395 Json(serde_json::json!({
396 "error": format!("Fuseki proxy response read failed: {err}")
397 })),
398 )
399 .into_response()
400 }
401 }
402}
403
404async fn chat_handler(
405 State(state): State<AppState>,
406 Json(body): Json<ChatRequest>,
407) -> axum::response::Response {
408 let target_name = body.agent_name.as_deref().unwrap_or("main-actor");
409 match state.system.registry.get_by_name(target_name).await {
410 None => (
411 StatusCode::NOT_FOUND,
412 format!("agent '{target_name}' not found"),
413 )
414 .into_response(),
415 Some(entry) => {
416 let msg = Message::text(None, Some(entry.id.clone()), body.message);
417 match state.system.registry.send(&entry.id, msg).await {
418 Ok(_) => Json(serde_json::json!({"status": "sent", "agent": target_name}))
419 .into_response(),
420 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
421 }
422 }
423 }
424}