wactorz_interfaces/
rest.rs

1//! axum HTTP REST API.
2//!
3//! Exposes a thin REST layer over the actor system.
4//!
5//! ## Endpoints
6//!
7//! | Method | Path | Description |
8//! |--------|------|-------------|
9//! | GET | `/health` | Server liveness check |
10//! | GET | `/actors` | List all actors + states |
11//! | GET | `/actors/{id}` | Single actor info |
12//! | POST | `/actors/{id}/message` | Send a message to an actor |
13//! | DELETE | `/actors/{id}` | Stop an actor (if not protected) |
14//! | GET | `/actors/{id}/metrics` | Actor runtime metrics |
15//! | POST | `/chat` | Send a message to MainActor and stream response |
16
17use anyhow::Result;
18use axum::{
19    Json, Router,
20    body::Bytes,
21    extract::{Path, State},
22    http::{HeaderMap, StatusCode, header},
23    response::{IntoResponse, Response},
24    routing::{delete, get, post},
25};
26use serde::Deserialize;
27use std::net::SocketAddr;
28use tower_http::cors::CorsLayer;
29use tower_http::services::{ServeDir, ServeFile};
30use tower_http::trace::TraceLayer;
31
32use wactorz_core::ActorSystem;
33use wactorz_core::message::{ActorCommand, Message};
34
35/// Runtime config exposed via /api/config (mirrors Python's config_handler).
36#[derive(Clone, Debug, Default)]
37pub struct RuntimeConfig {
38    pub ha_url: String,
39    pub ha_token: String,
40    pub fuseki_url: String,
41    pub fuseki_dataset: String,
42    pub fuseki_user: String,
43    pub fuseki_password: String,
44    pub weather_default_location: String,
45    pub mqtt_host: String,
46    pub mqtt_port: u16,
47    pub mqtt_ws_port: u16,
48    pub llm_provider: String,
49    pub llm_model: String,
50}
51
52/// Shared application state injected into axum handlers.
53#[derive(Clone)]
54pub struct AppState {
55    pub system: ActorSystem,
56    pub config: RuntimeConfig,
57    pub http: reqwest::Client,
58}
59
60/// JSON body for POST /actors/{id}/message
61#[derive(Debug, Deserialize)]
62pub struct SendMessageRequest {
63    pub content: String,
64    #[serde(rename = "type", default)]
65    pub message_type: String,
66}
67
68/// JSON body for POST /chat
69#[derive(Debug, Deserialize)]
70pub struct ChatRequest {
71    pub message: String,
72    pub agent_name: Option<String>,
73}
74
75/// The axum HTTP server.
76pub struct RestServer {
77    state: AppState,
78    addr: SocketAddr,
79    /// Path to the built frontend assets directory (e.g. "static/app").
80    static_dir: String,
81    /// Optional WsBridge router merged onto the same port (Python-compatible single-port setup).
82    ws_router: Option<axum::Router>,
83}
84
85impl RestServer {
86    pub fn new(
87        system: ActorSystem,
88        addr: SocketAddr,
89        config: RuntimeConfig,
90        static_dir: String,
91    ) -> Self {
92        Self {
93            state: AppState {
94                system,
95                config,
96                http: reqwest::Client::new(),
97            },
98            addr,
99            static_dir,
100            ws_router: None,
101        }
102    }
103
104    /// Merge a WsBridge router so /ws and /mqtt are served on the same port.
105    pub fn with_ws(mut self, ws_router: axum::Router) -> Self {
106        self.ws_router = Some(ws_router);
107        self
108    }
109
110    /// Build the axum `Router`.
111    pub fn router(&self) -> Router {
112        let index_html = format!("{}/index.html", self.static_dir);
113        let serve_dir = ServeDir::new(&self.static_dir).fallback(ServeFile::new(&index_html));
114
115        let mut r = Router::new()
116            .route("/health", get(health_handler))
117            // Native paths
118            .route("/actors", get(list_actors_handler))
119            .route("/actors/{id}", get(get_actor_handler))
120            .route("/actors/{id}/message", post(send_message_handler))
121            .route("/actors/{id}", delete(stop_actor_handler))
122            .route("/actors/{id}/pause", post(pause_actor_handler))
123            .route("/actors/{id}/resume", post(resume_actor_handler))
124            .route("/actors/{id}/metrics", get(get_metrics_handler))
125            .route("/chat", post(chat_handler))
126            // /api/* aliases — match paths the Python backend and frontend expect
127            .route("/api/config", get(config_handler))
128            .route("/api/actors", get(list_actors_handler))
129            .route("/api/actors/{id}", get(get_actor_handler))
130            .route("/api/actors/{id}/message", post(send_message_handler))
131            .route("/api/actors/{id}", delete(stop_actor_handler))
132            .route("/api/actors/{id}/pause", post(pause_actor_handler))
133            .route("/api/actors/{id}/resume", post(resume_actor_handler))
134            .route("/api/actors/{id}/metrics", get(get_metrics_handler))
135            .route("/api/fuseki/{dataset}/sparql", post(fuseki_sparql_handler))
136            .route("/api/fuseki/{dataset}/update", post(fuseki_update_handler))
137            .with_state(self.state.clone());
138
139        // Merge /ws and /mqtt onto the same port so the frontend can reach
140        // them via window.location.host (Python-compatible single-port layout).
141        if let Some(ws) = &self.ws_router {
142            r = r.merge(ws.clone());
143        }
144
145        r.fallback_service(serve_dir)
146            .layer(CorsLayer::permissive())
147            .layer(TraceLayer::new_for_http())
148    }
149
150    /// Start listening and serving.
151    pub async fn serve(self) -> Result<()> {
152        let router = self.router();
153        let listener = tokio::net::TcpListener::bind(self.addr).await?;
154        tracing::info!("REST API listening on {}", self.addr);
155        axum::serve(listener, router).await?;
156        Ok(())
157    }
158}
159
160// ── Handlers ─────────────────────────────────────────────────────────────────
161
162async fn health_handler() -> impl IntoResponse {
163    Json(serde_json::json!({ "status": "ok" }))
164}
165
166async fn list_actors_handler(State(state): State<AppState>) -> impl IntoResponse {
167    let actors = state.system.registry.list().await;
168    let body: Vec<_> = actors
169        .iter()
170        .map(|e| {
171            serde_json::json!({
172                "id": e.id,
173                "name": e.name,
174                "state": format!("{}", e.state),
175                "protected": e.protected,
176            })
177        })
178        .collect();
179    Json(body)
180}
181
182async fn get_actor_handler(
183    State(state): State<AppState>,
184    Path(id): Path<String>,
185) -> impl IntoResponse {
186    match state.system.registry.get(&id).await {
187        Some(entry) => Json(serde_json::json!({
188            "id": entry.id,
189            "name": entry.name,
190            "state": format!("{}", entry.state),
191            "protected": entry.protected,
192        }))
193        .into_response(),
194        None => (StatusCode::NOT_FOUND, "actor not found").into_response(),
195    }
196}
197
198async fn send_message_handler(
199    State(state): State<AppState>,
200    Path(id): Path<String>,
201    Json(body): Json<SendMessageRequest>,
202) -> axum::response::Response {
203    let msg = Message::text(None, Some(id.clone()), body.content);
204    match state.system.registry.send(&id, msg).await {
205        Ok(_) => (StatusCode::OK, Json(serde_json::json!({"status": "sent"}))).into_response(),
206        Err(e) => (StatusCode::NOT_FOUND, e.to_string()).into_response(),
207    }
208}
209
210async fn stop_actor_handler(
211    State(state): State<AppState>,
212    Path(id): Path<String>,
213) -> axum::response::Response {
214    let entry = match state.system.registry.get(&id).await {
215        Some(e) => e,
216        None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
217    };
218    if entry.protected {
219        return (StatusCode::FORBIDDEN, "actor is protected").into_response();
220    }
221    let msg = Message::command(id.clone(), ActorCommand::Stop);
222    match state.system.registry.send(&id, msg).await {
223        Ok(_) => (StatusCode::OK, "stopping").into_response(),
224        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
225    }
226}
227
228async fn get_metrics_handler(
229    State(state): State<AppState>,
230    Path(id): Path<String>,
231) -> axum::response::Response {
232    match state.system.registry.get(&id).await {
233        Some(e) => Json(e.metrics.snapshot()).into_response(),
234        None => (StatusCode::NOT_FOUND, "actor not found").into_response(),
235    }
236}
237
238async fn pause_actor_handler(
239    State(state): State<AppState>,
240    Path(id): Path<String>,
241) -> axum::response::Response {
242    let entry = match state.system.registry.get(&id).await {
243        Some(e) => e,
244        None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
245    };
246    if entry.protected {
247        return (StatusCode::FORBIDDEN, "actor is protected").into_response();
248    }
249    let msg = Message::command(id.clone(), ActorCommand::Pause);
250    match state.system.registry.send(&id, msg).await {
251        Ok(_) => (
252            StatusCode::OK,
253            Json(serde_json::json!({"status": "pausing"})),
254        )
255            .into_response(),
256        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
257    }
258}
259
260async fn resume_actor_handler(
261    State(state): State<AppState>,
262    Path(id): Path<String>,
263) -> axum::response::Response {
264    let entry = match state.system.registry.get(&id).await {
265        Some(e) => e,
266        None => return (StatusCode::NOT_FOUND, "actor not found").into_response(),
267    };
268    if entry.protected {
269        return (StatusCode::FORBIDDEN, "actor is protected").into_response();
270    }
271    let msg = Message::command(id.clone(), ActorCommand::Resume);
272    match state.system.registry.send(&id, msg).await {
273        Ok(_) => (
274            StatusCode::OK,
275            Json(serde_json::json!({"status": "resuming"})),
276        )
277            .into_response(),
278        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
279    }
280}
281
282async fn config_handler(State(state): State<AppState>) -> impl IntoResponse {
283    let c = &state.config;
284    Json(serde_json::json!({
285        "ha": { "url": c.ha_url, "token": c.ha_token },
286        "fuseki": { "url": c.fuseki_url, "dataset": c.fuseki_dataset },
287        "mqtt": {
288            "host": c.mqtt_host,
289            "port": c.mqtt_port,
290            "url": format!("ws://{}:{}", c.mqtt_host, c.mqtt_ws_port),
291        },
292        "llm": { "provider": c.llm_provider, "model": c.llm_model },
293        "weather": { "defaultLocation": c.weather_default_location },
294    }))
295}
296
297async fn fuseki_sparql_handler(
298    State(state): State<AppState>,
299    Path(dataset): Path<String>,
300    headers: HeaderMap,
301    body: Bytes,
302) -> Response {
303    fuseki_proxy_request(state, dataset, "sparql", headers, body).await
304}
305
306async fn fuseki_update_handler(
307    State(state): State<AppState>,
308    Path(dataset): Path<String>,
309    headers: HeaderMap,
310    body: Bytes,
311) -> Response {
312    fuseki_proxy_request(state, dataset, "update", headers, body).await
313}
314
315async fn fuseki_proxy_request(
316    state: AppState,
317    dataset: String,
318    operation: &'static str,
319    headers: HeaderMap,
320    body: Bytes,
321) -> Response {
322    let base = state.config.fuseki_url.trim().trim_end_matches('/');
323    if base.is_empty() {
324        return (
325            StatusCode::SERVICE_UNAVAILABLE,
326            Json(serde_json::json!({
327                "error": "Fuseki is not configured on the Rust server"
328            })),
329        )
330            .into_response();
331    }
332
333    let target = format!("{base}/{dataset}/{operation}");
334    let mut request = state.http.post(&target);
335    tracing::info!(
336        "Fuseki proxy {} dataset={} target={} auth={}",
337        operation,
338        dataset,
339        target,
340        if headers.get(header::AUTHORIZATION).is_some() || !state.config.fuseki_user.is_empty() {
341            "yes"
342        } else {
343            "no"
344        }
345    );
346
347    if let Some(value) = headers.get(header::AUTHORIZATION) {
348        request = request.header(header::AUTHORIZATION, value);
349    } else if !state.config.fuseki_user.is_empty() {
350        request = request.basic_auth(
351            &state.config.fuseki_user,
352            Some(&state.config.fuseki_password),
353        );
354    }
355    if let Some(value) = headers.get(header::ACCEPT) {
356        request = request.header(header::ACCEPT, value);
357    }
358    if let Some(value) = headers.get(header::CONTENT_TYPE) {
359        request = request.header(header::CONTENT_TYPE, value);
360    }
361
362    let upstream = match request.body(body.to_vec()).send().await {
363        Ok(resp) => resp,
364        Err(err) => {
365            tracing::warn!("Fuseki proxy error for {target}: {err}");
366            return (
367                StatusCode::BAD_GATEWAY,
368                Json(serde_json::json!({
369                    "error": format!("Fuseki proxy request failed: {err}")
370                })),
371            )
372                .into_response();
373        }
374    };
375
376    let status =
377        StatusCode::from_u16(upstream.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
378    tracing::info!(
379        "Fuseki proxy {} target={} status={}",
380        operation,
381        target,
382        status
383    );
384    let mut response_headers = HeaderMap::new();
385    if let Some(value) = upstream.headers().get(header::CONTENT_TYPE) {
386        response_headers.insert(header::CONTENT_TYPE, value.clone());
387    }
388
389    match upstream.bytes().await {
390        Ok(bytes) => (status, response_headers, bytes).into_response(),
391        Err(err) => {
392            tracing::warn!("Fuseki proxy body read error for {target}: {err}");
393            (
394                StatusCode::BAD_GATEWAY,
395                Json(serde_json::json!({
396                    "error": format!("Fuseki proxy response read failed: {err}")
397                })),
398            )
399                .into_response()
400        }
401    }
402}
403
404async fn chat_handler(
405    State(state): State<AppState>,
406    Json(body): Json<ChatRequest>,
407) -> axum::response::Response {
408    let target_name = body.agent_name.as_deref().unwrap_or("main-actor");
409    match state.system.registry.get_by_name(target_name).await {
410        None => (
411            StatusCode::NOT_FOUND,
412            format!("agent '{target_name}' not found"),
413        )
414            .into_response(),
415        Some(entry) => {
416            let msg = Message::text(None, Some(entry.id.clone()), body.message);
417            match state.system.registry.send(&entry.id, msg).await {
418                Ok(_) => Json(serde_json::json!({"status": "sent", "agent": target_name}))
419                    .into_response(),
420                Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
421            }
422        }
423    }
424}