wactorz_agents/
catalog.rs1use anyhow::Result;
16use async_trait::async_trait;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19
20use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
21
22pub struct CatalogAgent {
28 config: ActorConfig,
29 state: ActorState,
30 metrics: Arc<ActorMetrics>,
31 mailbox_tx: mpsc::Sender<Message>,
32 mailbox_rx: Option<mpsc::Receiver<Message>>,
33 publisher: Option<EventPublisher>,
34}
35
36impl CatalogAgent {
37 pub fn new(config: ActorConfig) -> Self {
42 let protected_config = ActorConfig {
43 protected: true,
44 ..config.clone()
45 };
46 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
47 Self {
48 config: protected_config,
49 state: ActorState::Initializing,
50 metrics: Arc::new(ActorMetrics::new()),
51 mailbox_tx: tx,
52 mailbox_rx: Some(rx),
53 publisher: None,
54 }
55 }
56
57 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
58 self.publisher = Some(p);
59 self
60 }
61}
62
63#[async_trait]
64impl Actor for CatalogAgent {
65 fn id(&self) -> String {
66 self.config.id.clone()
67 }
68
69 fn name(&self) -> &str {
70 &self.config.name
71 }
72
73 fn state(&self) -> ActorState {
74 self.state.clone()
75 }
76
77 fn metrics(&self) -> Arc<ActorMetrics> {
78 Arc::clone(&self.metrics)
79 }
80
81 fn mailbox(&self) -> mpsc::Sender<Message> {
82 self.mailbox_tx.clone()
83 }
84
85 fn is_protected(&self) -> bool {
86 self.config.protected
87 }
88
89 async fn on_start(&mut self) -> Result<()> {
90 self.state = ActorState::Running;
91 tracing::info!("[catalog] CatalogAgent started (stub — full implementation pending)");
92 Ok(())
93 }
94
95 async fn handle_message(&mut self, message: Message) -> Result<()> {
96 tracing::debug!(
97 "[catalog] received message from {:?}: {:?}",
98 message.from,
99 message.payload,
100 );
101 Ok(())
104 }
105
106 async fn run(&mut self) -> Result<()> {
107 self.on_start().await?;
108 let mut rx = self
109 .mailbox_rx
110 .take()
111 .ok_or_else(|| anyhow::anyhow!("CatalogAgent already running"))?;
112 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
113 self.config.heartbeat_interval_secs,
114 ));
115 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
116 loop {
117 tokio::select! {
118 biased;
119 msg = rx.recv() => {
120 match msg {
121 None => break,
122 Some(m) => {
123 self.metrics.record_received();
124 if let wactorz_core::message::MessageType::Command {
125 command: wactorz_core::message::ActorCommand::Stop,
126 } = &m.payload
127 {
128 break;
129 }
130 match self.handle_message(m).await {
131 Ok(_) => self.metrics.record_processed(),
132 Err(e) => {
133 tracing::error!("[{}] {e}", self.config.name);
134 self.metrics.record_failed();
135 }
136 }
137 }
138 }
139 }
140 _ = hb.tick() => {
141 self.metrics.record_heartbeat();
142 if let Err(e) = self.on_heartbeat().await {
143 tracing::error!("[{}] heartbeat: {e}", self.config.name);
144 }
145 }
146 }
147 }
148 self.state = ActorState::Stopped;
149 self.on_stop().await
150 }
151}