wactorz_agents/
catalog.rs

1//! Pre-built agent recipe library (stub).
2//!
3//! [`CatalogAgent`] is a **protected** system agent that acts as a registry of
4//! ready-made [`crate::dynamic_agent::DynamicAgent`] recipes.  In the Python
5//! backend it spawns catalog agents on demand and injects their manifests into
6//! the main actor's knowledge base.
7//!
8//! # Current status
9//!
10//! This is a **stub pending full implementation**.  It compiles, registers in
11//! the actor system, and logs every message it receives at `DEBUG` level.
12//! The spawn / list / info command handling will be layered on top once the
13//! Rust dynamic-agent pipeline is ready.
14
15use anyhow::Result;
16use async_trait::async_trait;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19
20use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
21
22/// Pre-built agent recipe library.
23///
24/// Holds a catalog of ready-made [`crate::dynamic_agent::DynamicAgent`] recipes
25/// and spawns them on request.  Protected so it cannot be killed by external
26/// commands.
27pub struct CatalogAgent {
28    config: ActorConfig,
29    state: ActorState,
30    metrics: Arc<ActorMetrics>,
31    mailbox_tx: mpsc::Sender<Message>,
32    mailbox_rx: Option<mpsc::Receiver<Message>>,
33    publisher: Option<EventPublisher>,
34}
35
36impl CatalogAgent {
37    /// Create a new [`CatalogAgent`].
38    ///
39    /// `config.protected` is forced to `true` regardless of the value passed
40    /// in, matching the Python implementation.
41    pub fn new(config: ActorConfig) -> Self {
42        let protected_config = ActorConfig {
43            protected: true,
44            ..config.clone()
45        };
46        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
47        Self {
48            config: protected_config,
49            state: ActorState::Initializing,
50            metrics: Arc::new(ActorMetrics::new()),
51            mailbox_tx: tx,
52            mailbox_rx: Some(rx),
53            publisher: None,
54        }
55    }
56
57    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
58        self.publisher = Some(p);
59        self
60    }
61}
62
63#[async_trait]
64impl Actor for CatalogAgent {
65    fn id(&self) -> String {
66        self.config.id.clone()
67    }
68
69    fn name(&self) -> &str {
70        &self.config.name
71    }
72
73    fn state(&self) -> ActorState {
74        self.state.clone()
75    }
76
77    fn metrics(&self) -> Arc<ActorMetrics> {
78        Arc::clone(&self.metrics)
79    }
80
81    fn mailbox(&self) -> mpsc::Sender<Message> {
82        self.mailbox_tx.clone()
83    }
84
85    fn is_protected(&self) -> bool {
86        self.config.protected
87    }
88
89    async fn on_start(&mut self) -> Result<()> {
90        self.state = ActorState::Running;
91        tracing::info!("[catalog] CatalogAgent started (stub — full implementation pending)");
92        Ok(())
93    }
94
95    async fn handle_message(&mut self, message: Message) -> Result<()> {
96        tracing::debug!(
97            "[catalog] received message from {:?}: {:?}",
98            message.from,
99            message.payload,
100        );
101        // TODO: parse spawn / list / info commands and delegate to DynamicAgent
102        // pipeline once the Rust dynamic-agent spawn infrastructure is ready.
103        Ok(())
104    }
105
106    async fn run(&mut self) -> Result<()> {
107        self.on_start().await?;
108        let mut rx = self
109            .mailbox_rx
110            .take()
111            .ok_or_else(|| anyhow::anyhow!("CatalogAgent already running"))?;
112        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
113            self.config.heartbeat_interval_secs,
114        ));
115        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
116        loop {
117            tokio::select! {
118                biased;
119                msg = rx.recv() => {
120                    match msg {
121                        None => break,
122                        Some(m) => {
123                            self.metrics.record_received();
124                            if let wactorz_core::message::MessageType::Command {
125                                command: wactorz_core::message::ActorCommand::Stop,
126                            } = &m.payload
127                            {
128                                break;
129                            }
130                            match self.handle_message(m).await {
131                                Ok(_) => self.metrics.record_processed(),
132                                Err(e) => {
133                                    tracing::error!("[{}] {e}", self.config.name);
134                                    self.metrics.record_failed();
135                                }
136                            }
137                        }
138                    }
139                }
140                _ = hb.tick() => {
141                    self.metrics.record_heartbeat();
142                    if let Err(e) = self.on_heartbeat().await {
143                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
144                    }
145                }
146            }
147        }
148        self.state = ActorState::Stopped;
149        self.on_stop().await
150    }
151}