wactorz_agents/
llm_agent.rs

1//! LLM provider abstraction.
2//!
3//! [`LlmAgent`] wraps multiple large-language-model backends behind a single
4//! async `complete()` interface.  Supported providers:
5//! - **Anthropic** (`claude-*` models, Messages API)
6//! - **OpenAI** (`gpt-*` and compatible, Chat Completions API)
7//! - **Ollama** (local, OpenAI-compatible endpoint)
8//!
9//! The active provider and model are selected via [`LlmConfig`].
10
11use anyhow::Result;
12use async_trait::async_trait;
13use serde::{Deserialize, Serialize};
14use std::error::Error as StdError;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17
18use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
19
20/// Supported LLM provider backends.
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
22#[serde(rename_all = "lowercase")]
23pub enum LlmProvider {
24    #[default]
25    Anthropic,
26    OpenAI,
27    Ollama,
28    /// Google Gemini (generativelanguage.googleapis.com).
29    Gemini,
30    /// NVIDIA NIM (integrate.api.nvidia.com) — OpenAI-compatible.
31    /// Free tier: ~1000 API calls/month per model.
32    Nim,
33}
34
35impl std::fmt::Display for LlmProvider {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            LlmProvider::Anthropic => write!(f, "anthropic"),
39            LlmProvider::OpenAI => write!(f, "openai"),
40            LlmProvider::Ollama => write!(f, "ollama"),
41            LlmProvider::Gemini => write!(f, "gemini"),
42            LlmProvider::Nim => write!(f, "nim"),
43        }
44    }
45}
46
47/// Per-model pricing in USD per 1M tokens.
48fn pricing(model: &str) -> (f64, f64) {
49    match model {
50        m if m.starts_with("claude-sonnet-4-6") => (3.0, 15.0),
51        m if m.starts_with("claude-haiku-4-5") => (0.8, 4.0),
52        m if m.starts_with("claude-opus-4-6") => (15.0, 75.0),
53        m if m.starts_with("gpt-4o-mini") => (0.15, 0.6),
54        m if m.starts_with("gpt-4o") => (2.5, 10.0),
55        m if m.starts_with("deepseek") => (0.27, 1.10),
56        m if m.contains("llama-3.3-70b") => (0.39, 0.39),
57        m if m.contains("llama-3.1-8b") => (0.10, 0.10),
58        m if m.starts_with("gemini-2.0-flash") => (0.10, 0.40),
59        m if m.starts_with("gemini-1.5-pro") => (1.25, 5.0),
60        _ => (1.0, 3.0),
61    }
62}
63
64/// Calculate cost in nano-USD from token counts and model name.
65pub fn calc_cost_nano_usd(model: &str, input_tokens: u64, output_tokens: u64) -> u64 {
66    let (in_price, out_price) = pricing(model);
67    let cost_usd =
68        (input_tokens as f64 * in_price + output_tokens as f64 * out_price) / 1_000_000.0;
69    (cost_usd * 1_000_000_000.0) as u64
70}
71
72/// A single turn in a conversation (role + content).
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ChatMessage {
75    pub role: String,
76    pub content: String,
77}
78
79/// Configuration for the LLM backend.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct LlmConfig {
82    pub provider: LlmProvider,
83    /// Model name, e.g. `"claude-sonnet-4-6"`, `"gpt-4o"`, `"llama3"`.
84    pub model: String,
85    /// API key (Anthropic / OpenAI). Not needed for Ollama.
86    pub api_key: Option<String>,
87    /// Base URL override (useful for Ollama or proxies).
88    pub base_url: Option<String>,
89    /// Maximum tokens to generate.
90    pub max_tokens: u32,
91    /// Sampling temperature.
92    pub temperature: f32,
93    /// Optional system prompt.
94    pub system_prompt: Option<String>,
95}
96
97impl Default for LlmConfig {
98    fn default() -> Self {
99        Self {
100            provider: LlmProvider::Anthropic,
101            model: "claude-sonnet-4-6".into(),
102            api_key: None,
103            base_url: None,
104            max_tokens: 4096,
105            temperature: 0.7,
106            system_prompt: None,
107        }
108    }
109}
110
111/// An actor that calls an LLM provider and returns completions.
112pub struct LlmAgent {
113    pub(crate) config: ActorConfig,
114    pub(crate) llm_config: LlmConfig,
115    pub(crate) http: reqwest::Client,
116    pub(crate) state: ActorState,
117    pub(crate) metrics: Arc<ActorMetrics>,
118    pub(crate) mailbox_tx: mpsc::Sender<Message>,
119    pub(crate) mailbox_rx: Option<mpsc::Receiver<Message>>,
120    /// Conversation history for multi-turn exchanges.
121    pub(crate) history: Vec<ChatMessage>,
122    pub(crate) publisher: Option<EventPublisher>,
123    /// Consecutive API errors since last success — WIK monitors this via MQTT.
124    pub(crate) consecutive_errors: u32,
125}
126
127impl LlmAgent {
128    fn describe_reqwest_error(err: &reqwest::Error) -> String {
129        let kind = if err.is_timeout() {
130            "timeout"
131        } else if err.is_connect() {
132            "connect"
133        } else if err.is_request() {
134            "request"
135        } else if err.is_body() {
136            "body"
137        } else if err.is_decode() {
138            "decode"
139        } else {
140            "other"
141        };
142        let mut parts = vec![format!("kind={kind}")];
143        if let Some(url) = err.url() {
144            parts.push(format!("url={url}"));
145        }
146        let mut chain = Vec::new();
147        let mut source = err.source();
148        while let Some(src) = source {
149            chain.push(src.to_string());
150            source = src.source();
151        }
152        if !chain.is_empty() {
153            parts.push(format!("causes={}", chain.join(" | ")));
154        }
155        format!("{} ({})", err, parts.join(", "))
156    }
157
158    pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
159        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
160        Self {
161            config,
162            llm_config,
163            http: reqwest::Client::new(),
164            state: ActorState::Initializing,
165            metrics: Arc::new(ActorMetrics::new()),
166            mailbox_tx: tx,
167            mailbox_rx: Some(rx),
168            history: Vec::new(),
169            publisher: None,
170            consecutive_errors: 0,
171        }
172    }
173
174    /// Attach an EventPublisher for MQTT output.
175    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
176        self.publisher = Some(p);
177        self
178    }
179
180    fn provider_api_key(config: &LlmConfig) -> Option<String> {
181        match config.provider {
182            LlmProvider::Anthropic => config
183                .api_key
184                .clone()
185                .or_else(|| std::env::var("ANTHROPIC_API_KEY").ok()),
186            LlmProvider::OpenAI => config
187                .api_key
188                .clone()
189                .or_else(|| std::env::var("OPENAI_API_KEY").ok()),
190            LlmProvider::Gemini => config
191                .api_key
192                .clone()
193                .or_else(|| std::env::var("GEMINI_API_KEY").ok())
194                .or_else(|| std::env::var("GOOGLE_GEMINI_API_KEY").ok())
195                .or_else(|| std::env::var("GOOGLE_API_KEY").ok()),
196            LlmProvider::Nim => config
197                .api_key
198                .clone()
199                .or_else(|| std::env::var("NIM_API_KEY").ok())
200                .or_else(|| std::env::var("OPENAI_API_KEY").ok()),
201            LlmProvider::Ollama => None,
202        }
203    }
204
205    fn provider_base_url(config: &LlmConfig) -> Option<String> {
206        match config.provider {
207            LlmProvider::OpenAI => config
208                .base_url
209                .clone()
210                .or_else(|| std::env::var("OPENAI_BASE_URL").ok()),
211            LlmProvider::Ollama => Some(
212                config
213                    .base_url
214                    .clone()
215                    .or_else(|| std::env::var("OLLAMA_BASE_URL").ok())
216                    .unwrap_or_else(|| "http://127.0.0.1:11434/v1".to_string()),
217            ),
218            LlmProvider::Nim => config
219                .base_url
220                .clone()
221                .or_else(|| std::env::var("NIM_BASE_URL").ok()),
222            _ => config.base_url.clone(),
223        }
224    }
225
226    fn fallback_configs(&self) -> Vec<LlmConfig> {
227        let mut configs = Vec::new();
228
229        let mut add = |provider: LlmProvider, model_env: &str, default_model: &str| {
230            if provider == self.llm_config.provider {
231                return;
232            }
233            let cfg = LlmConfig {
234                provider: provider.clone(),
235                model: std::env::var(model_env).unwrap_or_else(|_| default_model.to_string()),
236                api_key: None,
237                base_url: None,
238                max_tokens: self.llm_config.max_tokens,
239                temperature: self.llm_config.temperature,
240                system_prompt: self.llm_config.system_prompt.clone(),
241            };
242            let has_key = Self::provider_api_key(&cfg).is_some();
243            let has_endpoint = matches!(provider, LlmProvider::Ollama)
244                .then(|| std::env::var("OLLAMA_BASE_URL").is_ok())
245                .unwrap_or(false);
246            if has_key || has_endpoint {
247                configs.push(LlmConfig {
248                    api_key: Self::provider_api_key(&cfg),
249                    base_url: Self::provider_base_url(&cfg),
250                    ..cfg
251                });
252            }
253        };
254
255        add(
256            LlmProvider::Anthropic,
257            "ANTHROPIC_MODEL",
258            "claude-sonnet-4-6",
259        );
260        add(LlmProvider::OpenAI, "OPENAI_MODEL", "gpt-4o-mini");
261        add(LlmProvider::Gemini, "GEMINI_MODEL", "gemini-2.0-flash");
262        add(LlmProvider::Ollama, "OLLAMA_MODEL", "llama3.1:8b");
263        add(LlmProvider::Nim, "NIM_MODEL", "meta/llama-3.1-8b-instruct");
264        configs
265    }
266
267    fn should_try_fallback(err: &anyhow::Error) -> bool {
268        let msg = err.to_string().to_lowercase();
269        msg.contains("not set")
270            || msg.contains("error sending request")
271            || msg.contains("connection")
272            || msg.contains("dns")
273            || msg.contains("timed out")
274            || msg.contains("429")
275            || msg.contains("rate limit")
276            || msg.contains("502")
277            || msg.contains("503")
278            || msg.contains("504")
279    }
280
281    async fn complete_with_config(
282        &self,
283        prompt: &str,
284        config: &LlmConfig,
285    ) -> Result<(String, u64, u64)> {
286        match config.provider {
287            LlmProvider::Anthropic => self.complete_anthropic(prompt, config).await,
288            LlmProvider::OpenAI | LlmProvider::Ollama => {
289                self.complete_openai_compat(prompt, config, None).await
290            }
291            LlmProvider::Nim => {
292                let base = "https://integrate.api.nvidia.com/v1";
293                self.complete_openai_compat(prompt, config, Some(base))
294                    .await
295            }
296            LlmProvider::Gemini => self.complete_gemini(prompt, config).await,
297        }
298    }
299
300    /// Send a prompt to the configured LLM provider and return the completion.
301    /// Also records token usage and cost in the actor metrics.
302    pub async fn complete(&self, prompt: &str) -> Result<String> {
303        let primary = LlmConfig {
304            api_key: Self::provider_api_key(&self.llm_config),
305            base_url: Self::provider_base_url(&self.llm_config),
306            ..self.llm_config.clone()
307        };
308
309        let mut candidates = vec![primary];
310        candidates.extend(self.fallback_configs());
311
312        let mut last_err: Option<anyhow::Error> = None;
313        for (idx, cfg) in candidates.iter().enumerate() {
314            match self.complete_with_config(prompt, cfg).await {
315                Ok((text, input_tok, output_tok)) => {
316                    if idx > 0 {
317                        tracing::warn!(
318                            "[{}] LLM fallback succeeded with provider={} model={}",
319                            self.config.name,
320                            cfg.provider,
321                            cfg.model
322                        );
323                    }
324                    let cost_nano = calc_cost_nano_usd(&cfg.model, input_tok, output_tok);
325                    self.metrics
326                        .record_llm_usage(input_tok, output_tok, cost_nano);
327                    return Ok(text);
328                }
329                Err(err) => {
330                    let retryable = Self::should_try_fallback(&err);
331                    tracing::warn!(
332                        "[{}] LLM attempt failed provider={} model={} retryable={} error={}",
333                        self.config.name,
334                        cfg.provider,
335                        cfg.model,
336                        retryable,
337                        err
338                    );
339                    last_err = Some(err);
340                    if !retryable {
341                        break;
342                    }
343                }
344            }
345        }
346
347        Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no LLM providers available")))
348    }
349
350    fn now_ms() -> u64 {
351        std::time::SystemTime::now()
352            .duration_since(std::time::UNIX_EPOCH)
353            .unwrap_or_default()
354            .as_millis() as u64
355    }
356
357    /// Publish a provider error to `system/llm/error` so WIK can react.
358    fn publish_llm_error(&self, error: &str) {
359        if let Some(pub_) = &self.publisher {
360            pub_.publish(
361                wactorz_mqtt::topics::SYSTEM_LLM_ERROR,
362                &serde_json::json!({
363                    "provider":          self.llm_config.provider.to_string(),
364                    "model":             self.llm_config.model,
365                    "error":             error,
366                    "consecutiveErrors": self.consecutive_errors + 1,
367                    "timestampMs":       Self::now_ms(),
368                }),
369            );
370        }
371    }
372
373    /// Returns `(text, input_tokens, output_tokens)`.
374    async fn complete_anthropic(
375        &self,
376        prompt: &str,
377        config: &LlmConfig,
378    ) -> Result<(String, u64, u64)> {
379        let api_key = config
380            .api_key
381            .as_deref()
382            .ok_or_else(|| anyhow::anyhow!("LLM_API_KEY not set for Anthropic"))?;
383
384        let mut messages = serde_json::json!([]);
385        for m in &self.history {
386            messages
387                .as_array_mut()
388                .unwrap()
389                .push(serde_json::json!({"role": m.role, "content": m.content}));
390        }
391        messages
392            .as_array_mut()
393            .unwrap()
394            .push(serde_json::json!({"role": "user", "content": prompt}));
395
396        let mut body = serde_json::json!({
397            "model": config.model,
398            "max_tokens": config.max_tokens,
399            "messages": messages,
400        });
401        if let Some(sys) = &config.system_prompt {
402            body["system"] = serde_json::Value::String(sys.clone());
403        }
404
405        let resp = self
406            .http
407            .post("https://api.anthropic.com/v1/messages")
408            .header("x-api-key", api_key)
409            .header("anthropic-version", "2023-06-01")
410            .json(&body)
411            .send()
412            .await
413            .map_err(|err| {
414                anyhow::anyhow!(
415                    "Anthropic transport error: {}",
416                    Self::describe_reqwest_error(&err)
417                )
418            })?;
419
420        if !resp.status().is_success() {
421            let s = resp.status();
422            let t = resp.text().await.unwrap_or_default();
423            anyhow::bail!("Anthropic {s}: {t}");
424        }
425        let raw = resp.text().await.unwrap_or_default();
426        let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
427            anyhow::anyhow!(
428                "Anthropic decode error: {err}; body={}",
429                raw.chars().take(400).collect::<String>()
430            )
431        })?;
432        let text = json["content"][0]["text"]
433            .as_str()
434            .ok_or_else(|| anyhow::anyhow!("unexpected Anthropic response: {json}"))?
435            .to_string();
436        let input_tok = json["usage"]["input_tokens"].as_u64().unwrap_or(0);
437        let output_tok = json["usage"]["output_tokens"].as_u64().unwrap_or(0);
438        Ok((text, input_tok, output_tok))
439    }
440
441    /// Returns `(text, input_tokens, output_tokens)`.
442    async fn complete_gemini(
443        &self,
444        prompt: &str,
445        config: &LlmConfig,
446    ) -> Result<(String, u64, u64)> {
447        let api_key = config
448            .api_key
449            .as_deref()
450            .ok_or_else(|| anyhow::anyhow!("LLM_API_KEY not set for Gemini"))?;
451
452        let model = &config.model;
453        let url = format!(
454            "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}",
455            model, api_key
456        );
457
458        let mut contents: Vec<serde_json::Value> = self
459            .history
460            .iter()
461            .map(|m| {
462                let role = if m.role == "assistant" {
463                    "model"
464                } else {
465                    "user"
466                };
467                serde_json::json!({ "role": role, "parts": [{ "text": m.content }] })
468            })
469            .collect();
470        contents.push(serde_json::json!({
471            "role": "user",
472            "parts": [{ "text": prompt }]
473        }));
474
475        let mut body = serde_json::json!({ "contents": contents });
476        if let Some(sys) = &config.system_prompt {
477            body["systemInstruction"] = serde_json::json!({ "parts": [{ "text": sys }] });
478        }
479
480        let resp = self
481            .http
482            .post(&url)
483            .json(&body)
484            .send()
485            .await
486            .map_err(|err| {
487                anyhow::anyhow!(
488                    "Gemini transport error: {}",
489                    Self::describe_reqwest_error(&err)
490                )
491            })?;
492        if !resp.status().is_success() {
493            let s = resp.status();
494            let t = resp.text().await.unwrap_or_default();
495            anyhow::bail!("Gemini {s}: {t}");
496        }
497        let raw = resp.text().await.unwrap_or_default();
498        let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
499            anyhow::anyhow!(
500                "Gemini decode error: {err}; body={}",
501                raw.chars().take(400).collect::<String>()
502            )
503        })?;
504        let text = json["candidates"][0]["content"]["parts"][0]["text"]
505            .as_str()
506            .ok_or_else(|| anyhow::anyhow!("unexpected Gemini response: {json}"))?
507            .to_string();
508        let input_tok = json["usageMetadata"]["promptTokenCount"]
509            .as_u64()
510            .unwrap_or(0);
511        let output_tok = json["usageMetadata"]["candidatesTokenCount"]
512            .as_u64()
513            .unwrap_or(0);
514        Ok((text, input_tok, output_tok))
515    }
516
517    /// OpenAI-compatible endpoint (OpenAI, Ollama, NIM).
518    /// `base_url_override` takes precedence over `llm_config.base_url`.
519    /// Returns `(text, input_tokens, output_tokens)`.
520    async fn complete_openai_compat(
521        &self,
522        prompt: &str,
523        config: &LlmConfig,
524        base_url_override: Option<&str>,
525    ) -> Result<(String, u64, u64)> {
526        let base = base_url_override
527            .or(config.base_url.as_deref())
528            .unwrap_or("https://api.openai.com/v1");
529
530        let mut msgs = Vec::new();
531        if let Some(sys) = &config.system_prompt {
532            msgs.push(serde_json::json!({"role": "system", "content": sys}));
533        }
534        for m in &self.history {
535            msgs.push(serde_json::json!({"role": m.role, "content": m.content}));
536        }
537        msgs.push(serde_json::json!({"role": "user", "content": prompt}));
538
539        let body = serde_json::json!({
540            "model":       config.model,
541            "messages":    msgs,
542            "max_tokens":  config.max_tokens,
543            "temperature": config.temperature,
544        });
545
546        let mut req = self
547            .http
548            .post(format!("{base}/chat/completions"))
549            .json(&body);
550        if let Some(key) = &config.api_key {
551            req = req.header("Authorization", format!("Bearer {key}"));
552        }
553        let resp = req.send().await.map_err(|err| {
554            anyhow::anyhow!(
555                "OpenAI-compat transport error: {}",
556                Self::describe_reqwest_error(&err)
557            )
558        })?;
559        if !resp.status().is_success() {
560            let s = resp.status();
561            let t = resp.text().await.unwrap_or_default();
562            anyhow::bail!("OpenAI-compat {s}: {t}");
563        }
564        let raw = resp.text().await.unwrap_or_default();
565        let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
566            anyhow::anyhow!(
567                "OpenAI-compat decode error: {err}; body={}",
568                raw.chars().take(400).collect::<String>()
569            )
570        })?;
571        let text = json["choices"][0]["message"]["content"]
572            .as_str()
573            .ok_or_else(|| anyhow::anyhow!("unexpected response: {json}"))?
574            .to_string();
575        let input_tok = json["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
576        let output_tok = json["usage"]["completion_tokens"].as_u64().unwrap_or(0);
577        Ok((text, input_tok, output_tok))
578    }
579}
580
581#[async_trait]
582impl Actor for LlmAgent {
583    fn id(&self) -> String {
584        self.config.id.clone()
585    }
586    fn name(&self) -> &str {
587        &self.config.name
588    }
589    fn state(&self) -> ActorState {
590        self.state.clone()
591    }
592    fn metrics(&self) -> Arc<ActorMetrics> {
593        Arc::clone(&self.metrics)
594    }
595    fn mailbox(&self) -> mpsc::Sender<Message> {
596        self.mailbox_tx.clone()
597    }
598    fn is_protected(&self) -> bool {
599        self.config.protected
600    }
601
602    async fn handle_message(&mut self, message: Message) -> Result<()> {
603        use wactorz_core::message::MessageType;
604
605        // ── WIK hot-swap: task_id "wik/switch" carries new provider config ──────
606        if let MessageType::Task {
607            task_id, payload, ..
608        } = &message.payload
609            && task_id == "wik/switch"
610        {
611            let provider_str = payload
612                .get("provider")
613                .and_then(|v| v.as_str())
614                .unwrap_or("");
615            let new_provider = match provider_str {
616                "anthropic" => LlmProvider::Anthropic,
617                "openai" => LlmProvider::OpenAI,
618                "gemini" => LlmProvider::Gemini,
619                "ollama" => LlmProvider::Ollama,
620                "nim" => LlmProvider::Nim,
621                other => {
622                    tracing::warn!(
623                        "[{}] wik/switch: unknown provider '{other}'",
624                        self.config.name
625                    );
626                    return Ok(());
627                }
628            };
629            let reason = payload
630                .get("reason")
631                .and_then(|v| v.as_str())
632                .unwrap_or("WIK switch");
633            tracing::info!(
634                "[{}] ⚡ provider switch: {} → {provider_str} ({reason})",
635                self.config.name,
636                self.llm_config.provider,
637            );
638            self.llm_config.provider = new_provider;
639            if let Some(model) = payload.get("model").and_then(|v| v.as_str()) {
640                self.llm_config.model = model.to_string();
641            }
642            if let Some(key) = payload.get("apiKey").and_then(|v| v.as_str()) {
643                self.llm_config.api_key = Some(key.to_string());
644            }
645            if let Some(url) = payload.get("baseUrl").and_then(|v| v.as_str()) {
646                self.llm_config.base_url = Some(url.to_string());
647            }
648            self.consecutive_errors = 0;
649            return Ok(());
650        }
651
652        let prompt = match &message.payload {
653            MessageType::Text { content } => content.clone(),
654            MessageType::Task { description, .. } => description.clone(),
655            _ => return Ok(()),
656        };
657
658        match self.complete(&prompt).await {
659            Ok(reply_text) => {
660                self.consecutive_errors = 0;
661                self.history.push(ChatMessage {
662                    role: "user".into(),
663                    content: prompt,
664                });
665                self.history.push(ChatMessage {
666                    role: "assistant".into(),
667                    content: reply_text.clone(),
668                });
669                if let Some(sender_id) = message.from {
670                    tracing::debug!(
671                        "[{}] generated reply ({} chars)",
672                        self.config.name,
673                        reply_text.len()
674                    );
675                    let reply =
676                        Message::text(Some(self.config.id.clone()), Some(sender_id), reply_text);
677                    let _ = reply;
678                }
679            }
680            Err(e) => {
681                self.consecutive_errors += 1;
682                let err_str = e.to_string();
683                tracing::error!(
684                    "[{}] LLM error (consecutive: {}) — {err_str}",
685                    self.config.name,
686                    self.consecutive_errors
687                );
688                self.publish_llm_error(&err_str);
689                return Err(e);
690            }
691        }
692        Ok(())
693    }
694
695    async fn on_heartbeat(&mut self) -> Result<()> {
696        // use std::sync::atomic::Ordering;
697        if let Some(pub_) = &self.publisher {
698            let snap = self.metrics.snapshot();
699            pub_.publish(
700                wactorz_mqtt::topics::heartbeat(&self.config.id),
701                &serde_json::json!({
702                    "agentId":         self.config.id,
703                    "agentName":       self.config.name,
704                    "state":           self.state,
705                    "provider":        self.llm_config.provider.to_string(),
706                    "model":           self.llm_config.model,
707                    "llmInputTokens":  snap.llm_input_tokens,
708                    "llmOutputTokens": snap.llm_output_tokens,
709                    "llmCostUsd":      snap.llm_cost_usd,
710                    "restartCount":    snap.restart_count,
711                    "sequence":        snap.heartbeats,
712                    "timestampMs":     std::time::SystemTime::now()
713                        .duration_since(std::time::UNIX_EPOCH)
714                        .unwrap_or_default().as_millis() as u64,
715                }),
716            );
717        }
718        Ok(())
719    }
720
721    async fn run(&mut self) -> Result<()> {
722        self.on_start().await?;
723        self.state = ActorState::Running;
724        let mut rx = self
725            .mailbox_rx
726            .take()
727            .ok_or_else(|| anyhow::anyhow!("LlmAgent already running"))?;
728        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
729            self.config.heartbeat_interval_secs,
730        ));
731        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
732        loop {
733            tokio::select! {
734                biased;
735                msg = rx.recv() => {
736                    match msg {
737                        None => break,
738                        Some(m) => {
739                            self.metrics.record_received();
740                            if let wactorz_core::message::MessageType::Command {
741                                command: wactorz_core::message::ActorCommand::Stop
742                            } = &m.payload {
743                                break;
744                            }
745                            match self.handle_message(m).await {
746                                Ok(_) => self.metrics.record_processed(),
747                                Err(e) => {
748                                    tracing::error!("[{}] {e}", self.config.name);
749                                    self.metrics.record_failed();
750                                }
751                            }
752                        }
753                    }
754                }
755                _ = hb.tick() => {
756                    self.metrics.record_heartbeat();
757                    if let Err(e) = self.on_heartbeat().await {
758                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
759                    }
760                }
761            }
762        }
763        self.state = ActorState::Stopped;
764        self.on_stop().await
765    }
766}