1use anyhow::Result;
12use async_trait::async_trait;
13use serde::{Deserialize, Serialize};
14use std::error::Error as StdError;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17
18use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
22#[serde(rename_all = "lowercase")]
23pub enum LlmProvider {
24 #[default]
25 Anthropic,
26 OpenAI,
27 Ollama,
28 Gemini,
30 Nim,
33}
34
35impl std::fmt::Display for LlmProvider {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 LlmProvider::Anthropic => write!(f, "anthropic"),
39 LlmProvider::OpenAI => write!(f, "openai"),
40 LlmProvider::Ollama => write!(f, "ollama"),
41 LlmProvider::Gemini => write!(f, "gemini"),
42 LlmProvider::Nim => write!(f, "nim"),
43 }
44 }
45}
46
47fn pricing(model: &str) -> (f64, f64) {
49 match model {
50 m if m.starts_with("claude-sonnet-4-6") => (3.0, 15.0),
51 m if m.starts_with("claude-haiku-4-5") => (0.8, 4.0),
52 m if m.starts_with("claude-opus-4-6") => (15.0, 75.0),
53 m if m.starts_with("gpt-4o-mini") => (0.15, 0.6),
54 m if m.starts_with("gpt-4o") => (2.5, 10.0),
55 m if m.starts_with("deepseek") => (0.27, 1.10),
56 m if m.contains("llama-3.3-70b") => (0.39, 0.39),
57 m if m.contains("llama-3.1-8b") => (0.10, 0.10),
58 m if m.starts_with("gemini-2.0-flash") => (0.10, 0.40),
59 m if m.starts_with("gemini-1.5-pro") => (1.25, 5.0),
60 _ => (1.0, 3.0),
61 }
62}
63
64pub fn calc_cost_nano_usd(model: &str, input_tokens: u64, output_tokens: u64) -> u64 {
66 let (in_price, out_price) = pricing(model);
67 let cost_usd =
68 (input_tokens as f64 * in_price + output_tokens as f64 * out_price) / 1_000_000.0;
69 (cost_usd * 1_000_000_000.0) as u64
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ChatMessage {
75 pub role: String,
76 pub content: String,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct LlmConfig {
82 pub provider: LlmProvider,
83 pub model: String,
85 pub api_key: Option<String>,
87 pub base_url: Option<String>,
89 pub max_tokens: u32,
91 pub temperature: f32,
93 pub system_prompt: Option<String>,
95}
96
97impl Default for LlmConfig {
98 fn default() -> Self {
99 Self {
100 provider: LlmProvider::Anthropic,
101 model: "claude-sonnet-4-6".into(),
102 api_key: None,
103 base_url: None,
104 max_tokens: 4096,
105 temperature: 0.7,
106 system_prompt: None,
107 }
108 }
109}
110
111pub struct LlmAgent {
113 pub(crate) config: ActorConfig,
114 pub(crate) llm_config: LlmConfig,
115 pub(crate) http: reqwest::Client,
116 pub(crate) state: ActorState,
117 pub(crate) metrics: Arc<ActorMetrics>,
118 pub(crate) mailbox_tx: mpsc::Sender<Message>,
119 pub(crate) mailbox_rx: Option<mpsc::Receiver<Message>>,
120 pub(crate) history: Vec<ChatMessage>,
122 pub(crate) publisher: Option<EventPublisher>,
123 pub(crate) consecutive_errors: u32,
125}
126
127impl LlmAgent {
128 fn describe_reqwest_error(err: &reqwest::Error) -> String {
129 let kind = if err.is_timeout() {
130 "timeout"
131 } else if err.is_connect() {
132 "connect"
133 } else if err.is_request() {
134 "request"
135 } else if err.is_body() {
136 "body"
137 } else if err.is_decode() {
138 "decode"
139 } else {
140 "other"
141 };
142 let mut parts = vec![format!("kind={kind}")];
143 if let Some(url) = err.url() {
144 parts.push(format!("url={url}"));
145 }
146 let mut chain = Vec::new();
147 let mut source = err.source();
148 while let Some(src) = source {
149 chain.push(src.to_string());
150 source = src.source();
151 }
152 if !chain.is_empty() {
153 parts.push(format!("causes={}", chain.join(" | ")));
154 }
155 format!("{} ({})", err, parts.join(", "))
156 }
157
158 pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
159 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
160 Self {
161 config,
162 llm_config,
163 http: reqwest::Client::new(),
164 state: ActorState::Initializing,
165 metrics: Arc::new(ActorMetrics::new()),
166 mailbox_tx: tx,
167 mailbox_rx: Some(rx),
168 history: Vec::new(),
169 publisher: None,
170 consecutive_errors: 0,
171 }
172 }
173
174 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
176 self.publisher = Some(p);
177 self
178 }
179
180 fn provider_api_key(config: &LlmConfig) -> Option<String> {
181 match config.provider {
182 LlmProvider::Anthropic => config
183 .api_key
184 .clone()
185 .or_else(|| std::env::var("ANTHROPIC_API_KEY").ok()),
186 LlmProvider::OpenAI => config
187 .api_key
188 .clone()
189 .or_else(|| std::env::var("OPENAI_API_KEY").ok()),
190 LlmProvider::Gemini => config
191 .api_key
192 .clone()
193 .or_else(|| std::env::var("GEMINI_API_KEY").ok())
194 .or_else(|| std::env::var("GOOGLE_GEMINI_API_KEY").ok())
195 .or_else(|| std::env::var("GOOGLE_API_KEY").ok()),
196 LlmProvider::Nim => config
197 .api_key
198 .clone()
199 .or_else(|| std::env::var("NIM_API_KEY").ok())
200 .or_else(|| std::env::var("OPENAI_API_KEY").ok()),
201 LlmProvider::Ollama => None,
202 }
203 }
204
205 fn provider_base_url(config: &LlmConfig) -> Option<String> {
206 match config.provider {
207 LlmProvider::OpenAI => config
208 .base_url
209 .clone()
210 .or_else(|| std::env::var("OPENAI_BASE_URL").ok()),
211 LlmProvider::Ollama => Some(
212 config
213 .base_url
214 .clone()
215 .or_else(|| std::env::var("OLLAMA_BASE_URL").ok())
216 .unwrap_or_else(|| "http://127.0.0.1:11434/v1".to_string()),
217 ),
218 LlmProvider::Nim => config
219 .base_url
220 .clone()
221 .or_else(|| std::env::var("NIM_BASE_URL").ok()),
222 _ => config.base_url.clone(),
223 }
224 }
225
226 fn fallback_configs(&self) -> Vec<LlmConfig> {
227 let mut configs = Vec::new();
228
229 let mut add = |provider: LlmProvider, model_env: &str, default_model: &str| {
230 if provider == self.llm_config.provider {
231 return;
232 }
233 let cfg = LlmConfig {
234 provider: provider.clone(),
235 model: std::env::var(model_env).unwrap_or_else(|_| default_model.to_string()),
236 api_key: None,
237 base_url: None,
238 max_tokens: self.llm_config.max_tokens,
239 temperature: self.llm_config.temperature,
240 system_prompt: self.llm_config.system_prompt.clone(),
241 };
242 let has_key = Self::provider_api_key(&cfg).is_some();
243 let has_endpoint = matches!(provider, LlmProvider::Ollama)
244 .then(|| std::env::var("OLLAMA_BASE_URL").is_ok())
245 .unwrap_or(false);
246 if has_key || has_endpoint {
247 configs.push(LlmConfig {
248 api_key: Self::provider_api_key(&cfg),
249 base_url: Self::provider_base_url(&cfg),
250 ..cfg
251 });
252 }
253 };
254
255 add(
256 LlmProvider::Anthropic,
257 "ANTHROPIC_MODEL",
258 "claude-sonnet-4-6",
259 );
260 add(LlmProvider::OpenAI, "OPENAI_MODEL", "gpt-4o-mini");
261 add(LlmProvider::Gemini, "GEMINI_MODEL", "gemini-2.0-flash");
262 add(LlmProvider::Ollama, "OLLAMA_MODEL", "llama3.1:8b");
263 add(LlmProvider::Nim, "NIM_MODEL", "meta/llama-3.1-8b-instruct");
264 configs
265 }
266
267 fn should_try_fallback(err: &anyhow::Error) -> bool {
268 let msg = err.to_string().to_lowercase();
269 msg.contains("not set")
270 || msg.contains("error sending request")
271 || msg.contains("connection")
272 || msg.contains("dns")
273 || msg.contains("timed out")
274 || msg.contains("429")
275 || msg.contains("rate limit")
276 || msg.contains("502")
277 || msg.contains("503")
278 || msg.contains("504")
279 }
280
281 async fn complete_with_config(
282 &self,
283 prompt: &str,
284 config: &LlmConfig,
285 ) -> Result<(String, u64, u64)> {
286 match config.provider {
287 LlmProvider::Anthropic => self.complete_anthropic(prompt, config).await,
288 LlmProvider::OpenAI | LlmProvider::Ollama => {
289 self.complete_openai_compat(prompt, config, None).await
290 }
291 LlmProvider::Nim => {
292 let base = "https://integrate.api.nvidia.com/v1";
293 self.complete_openai_compat(prompt, config, Some(base))
294 .await
295 }
296 LlmProvider::Gemini => self.complete_gemini(prompt, config).await,
297 }
298 }
299
300 pub async fn complete(&self, prompt: &str) -> Result<String> {
303 let primary = LlmConfig {
304 api_key: Self::provider_api_key(&self.llm_config),
305 base_url: Self::provider_base_url(&self.llm_config),
306 ..self.llm_config.clone()
307 };
308
309 let mut candidates = vec![primary];
310 candidates.extend(self.fallback_configs());
311
312 let mut last_err: Option<anyhow::Error> = None;
313 for (idx, cfg) in candidates.iter().enumerate() {
314 match self.complete_with_config(prompt, cfg).await {
315 Ok((text, input_tok, output_tok)) => {
316 if idx > 0 {
317 tracing::warn!(
318 "[{}] LLM fallback succeeded with provider={} model={}",
319 self.config.name,
320 cfg.provider,
321 cfg.model
322 );
323 }
324 let cost_nano = calc_cost_nano_usd(&cfg.model, input_tok, output_tok);
325 self.metrics
326 .record_llm_usage(input_tok, output_tok, cost_nano);
327 return Ok(text);
328 }
329 Err(err) => {
330 let retryable = Self::should_try_fallback(&err);
331 tracing::warn!(
332 "[{}] LLM attempt failed provider={} model={} retryable={} error={}",
333 self.config.name,
334 cfg.provider,
335 cfg.model,
336 retryable,
337 err
338 );
339 last_err = Some(err);
340 if !retryable {
341 break;
342 }
343 }
344 }
345 }
346
347 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no LLM providers available")))
348 }
349
350 fn now_ms() -> u64 {
351 std::time::SystemTime::now()
352 .duration_since(std::time::UNIX_EPOCH)
353 .unwrap_or_default()
354 .as_millis() as u64
355 }
356
357 fn publish_llm_error(&self, error: &str) {
359 if let Some(pub_) = &self.publisher {
360 pub_.publish(
361 wactorz_mqtt::topics::SYSTEM_LLM_ERROR,
362 &serde_json::json!({
363 "provider": self.llm_config.provider.to_string(),
364 "model": self.llm_config.model,
365 "error": error,
366 "consecutiveErrors": self.consecutive_errors + 1,
367 "timestampMs": Self::now_ms(),
368 }),
369 );
370 }
371 }
372
373 async fn complete_anthropic(
375 &self,
376 prompt: &str,
377 config: &LlmConfig,
378 ) -> Result<(String, u64, u64)> {
379 let api_key = config
380 .api_key
381 .as_deref()
382 .ok_or_else(|| anyhow::anyhow!("LLM_API_KEY not set for Anthropic"))?;
383
384 let mut messages = serde_json::json!([]);
385 for m in &self.history {
386 messages
387 .as_array_mut()
388 .unwrap()
389 .push(serde_json::json!({"role": m.role, "content": m.content}));
390 }
391 messages
392 .as_array_mut()
393 .unwrap()
394 .push(serde_json::json!({"role": "user", "content": prompt}));
395
396 let mut body = serde_json::json!({
397 "model": config.model,
398 "max_tokens": config.max_tokens,
399 "messages": messages,
400 });
401 if let Some(sys) = &config.system_prompt {
402 body["system"] = serde_json::Value::String(sys.clone());
403 }
404
405 let resp = self
406 .http
407 .post("https://api.anthropic.com/v1/messages")
408 .header("x-api-key", api_key)
409 .header("anthropic-version", "2023-06-01")
410 .json(&body)
411 .send()
412 .await
413 .map_err(|err| {
414 anyhow::anyhow!(
415 "Anthropic transport error: {}",
416 Self::describe_reqwest_error(&err)
417 )
418 })?;
419
420 if !resp.status().is_success() {
421 let s = resp.status();
422 let t = resp.text().await.unwrap_or_default();
423 anyhow::bail!("Anthropic {s}: {t}");
424 }
425 let raw = resp.text().await.unwrap_or_default();
426 let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
427 anyhow::anyhow!(
428 "Anthropic decode error: {err}; body={}",
429 raw.chars().take(400).collect::<String>()
430 )
431 })?;
432 let text = json["content"][0]["text"]
433 .as_str()
434 .ok_or_else(|| anyhow::anyhow!("unexpected Anthropic response: {json}"))?
435 .to_string();
436 let input_tok = json["usage"]["input_tokens"].as_u64().unwrap_or(0);
437 let output_tok = json["usage"]["output_tokens"].as_u64().unwrap_or(0);
438 Ok((text, input_tok, output_tok))
439 }
440
441 async fn complete_gemini(
443 &self,
444 prompt: &str,
445 config: &LlmConfig,
446 ) -> Result<(String, u64, u64)> {
447 let api_key = config
448 .api_key
449 .as_deref()
450 .ok_or_else(|| anyhow::anyhow!("LLM_API_KEY not set for Gemini"))?;
451
452 let model = &config.model;
453 let url = format!(
454 "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}",
455 model, api_key
456 );
457
458 let mut contents: Vec<serde_json::Value> = self
459 .history
460 .iter()
461 .map(|m| {
462 let role = if m.role == "assistant" {
463 "model"
464 } else {
465 "user"
466 };
467 serde_json::json!({ "role": role, "parts": [{ "text": m.content }] })
468 })
469 .collect();
470 contents.push(serde_json::json!({
471 "role": "user",
472 "parts": [{ "text": prompt }]
473 }));
474
475 let mut body = serde_json::json!({ "contents": contents });
476 if let Some(sys) = &config.system_prompt {
477 body["systemInstruction"] = serde_json::json!({ "parts": [{ "text": sys }] });
478 }
479
480 let resp = self
481 .http
482 .post(&url)
483 .json(&body)
484 .send()
485 .await
486 .map_err(|err| {
487 anyhow::anyhow!(
488 "Gemini transport error: {}",
489 Self::describe_reqwest_error(&err)
490 )
491 })?;
492 if !resp.status().is_success() {
493 let s = resp.status();
494 let t = resp.text().await.unwrap_or_default();
495 anyhow::bail!("Gemini {s}: {t}");
496 }
497 let raw = resp.text().await.unwrap_or_default();
498 let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
499 anyhow::anyhow!(
500 "Gemini decode error: {err}; body={}",
501 raw.chars().take(400).collect::<String>()
502 )
503 })?;
504 let text = json["candidates"][0]["content"]["parts"][0]["text"]
505 .as_str()
506 .ok_or_else(|| anyhow::anyhow!("unexpected Gemini response: {json}"))?
507 .to_string();
508 let input_tok = json["usageMetadata"]["promptTokenCount"]
509 .as_u64()
510 .unwrap_or(0);
511 let output_tok = json["usageMetadata"]["candidatesTokenCount"]
512 .as_u64()
513 .unwrap_or(0);
514 Ok((text, input_tok, output_tok))
515 }
516
517 async fn complete_openai_compat(
521 &self,
522 prompt: &str,
523 config: &LlmConfig,
524 base_url_override: Option<&str>,
525 ) -> Result<(String, u64, u64)> {
526 let base = base_url_override
527 .or(config.base_url.as_deref())
528 .unwrap_or("https://api.openai.com/v1");
529
530 let mut msgs = Vec::new();
531 if let Some(sys) = &config.system_prompt {
532 msgs.push(serde_json::json!({"role": "system", "content": sys}));
533 }
534 for m in &self.history {
535 msgs.push(serde_json::json!({"role": m.role, "content": m.content}));
536 }
537 msgs.push(serde_json::json!({"role": "user", "content": prompt}));
538
539 let body = serde_json::json!({
540 "model": config.model,
541 "messages": msgs,
542 "max_tokens": config.max_tokens,
543 "temperature": config.temperature,
544 });
545
546 let mut req = self
547 .http
548 .post(format!("{base}/chat/completions"))
549 .json(&body);
550 if let Some(key) = &config.api_key {
551 req = req.header("Authorization", format!("Bearer {key}"));
552 }
553 let resp = req.send().await.map_err(|err| {
554 anyhow::anyhow!(
555 "OpenAI-compat transport error: {}",
556 Self::describe_reqwest_error(&err)
557 )
558 })?;
559 if !resp.status().is_success() {
560 let s = resp.status();
561 let t = resp.text().await.unwrap_or_default();
562 anyhow::bail!("OpenAI-compat {s}: {t}");
563 }
564 let raw = resp.text().await.unwrap_or_default();
565 let json: serde_json::Value = serde_json::from_str(&raw).map_err(|err| {
566 anyhow::anyhow!(
567 "OpenAI-compat decode error: {err}; body={}",
568 raw.chars().take(400).collect::<String>()
569 )
570 })?;
571 let text = json["choices"][0]["message"]["content"]
572 .as_str()
573 .ok_or_else(|| anyhow::anyhow!("unexpected response: {json}"))?
574 .to_string();
575 let input_tok = json["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
576 let output_tok = json["usage"]["completion_tokens"].as_u64().unwrap_or(0);
577 Ok((text, input_tok, output_tok))
578 }
579}
580
581#[async_trait]
582impl Actor for LlmAgent {
583 fn id(&self) -> String {
584 self.config.id.clone()
585 }
586 fn name(&self) -> &str {
587 &self.config.name
588 }
589 fn state(&self) -> ActorState {
590 self.state.clone()
591 }
592 fn metrics(&self) -> Arc<ActorMetrics> {
593 Arc::clone(&self.metrics)
594 }
595 fn mailbox(&self) -> mpsc::Sender<Message> {
596 self.mailbox_tx.clone()
597 }
598 fn is_protected(&self) -> bool {
599 self.config.protected
600 }
601
602 async fn handle_message(&mut self, message: Message) -> Result<()> {
603 use wactorz_core::message::MessageType;
604
605 if let MessageType::Task {
607 task_id, payload, ..
608 } = &message.payload
609 && task_id == "wik/switch"
610 {
611 let provider_str = payload
612 .get("provider")
613 .and_then(|v| v.as_str())
614 .unwrap_or("");
615 let new_provider = match provider_str {
616 "anthropic" => LlmProvider::Anthropic,
617 "openai" => LlmProvider::OpenAI,
618 "gemini" => LlmProvider::Gemini,
619 "ollama" => LlmProvider::Ollama,
620 "nim" => LlmProvider::Nim,
621 other => {
622 tracing::warn!(
623 "[{}] wik/switch: unknown provider '{other}'",
624 self.config.name
625 );
626 return Ok(());
627 }
628 };
629 let reason = payload
630 .get("reason")
631 .and_then(|v| v.as_str())
632 .unwrap_or("WIK switch");
633 tracing::info!(
634 "[{}] ⚡ provider switch: {} → {provider_str} ({reason})",
635 self.config.name,
636 self.llm_config.provider,
637 );
638 self.llm_config.provider = new_provider;
639 if let Some(model) = payload.get("model").and_then(|v| v.as_str()) {
640 self.llm_config.model = model.to_string();
641 }
642 if let Some(key) = payload.get("apiKey").and_then(|v| v.as_str()) {
643 self.llm_config.api_key = Some(key.to_string());
644 }
645 if let Some(url) = payload.get("baseUrl").and_then(|v| v.as_str()) {
646 self.llm_config.base_url = Some(url.to_string());
647 }
648 self.consecutive_errors = 0;
649 return Ok(());
650 }
651
652 let prompt = match &message.payload {
653 MessageType::Text { content } => content.clone(),
654 MessageType::Task { description, .. } => description.clone(),
655 _ => return Ok(()),
656 };
657
658 match self.complete(&prompt).await {
659 Ok(reply_text) => {
660 self.consecutive_errors = 0;
661 self.history.push(ChatMessage {
662 role: "user".into(),
663 content: prompt,
664 });
665 self.history.push(ChatMessage {
666 role: "assistant".into(),
667 content: reply_text.clone(),
668 });
669 if let Some(sender_id) = message.from {
670 tracing::debug!(
671 "[{}] generated reply ({} chars)",
672 self.config.name,
673 reply_text.len()
674 );
675 let reply =
676 Message::text(Some(self.config.id.clone()), Some(sender_id), reply_text);
677 let _ = reply;
678 }
679 }
680 Err(e) => {
681 self.consecutive_errors += 1;
682 let err_str = e.to_string();
683 tracing::error!(
684 "[{}] LLM error (consecutive: {}) — {err_str}",
685 self.config.name,
686 self.consecutive_errors
687 );
688 self.publish_llm_error(&err_str);
689 return Err(e);
690 }
691 }
692 Ok(())
693 }
694
695 async fn on_heartbeat(&mut self) -> Result<()> {
696 if let Some(pub_) = &self.publisher {
698 let snap = self.metrics.snapshot();
699 pub_.publish(
700 wactorz_mqtt::topics::heartbeat(&self.config.id),
701 &serde_json::json!({
702 "agentId": self.config.id,
703 "agentName": self.config.name,
704 "state": self.state,
705 "provider": self.llm_config.provider.to_string(),
706 "model": self.llm_config.model,
707 "llmInputTokens": snap.llm_input_tokens,
708 "llmOutputTokens": snap.llm_output_tokens,
709 "llmCostUsd": snap.llm_cost_usd,
710 "restartCount": snap.restart_count,
711 "sequence": snap.heartbeats,
712 "timestampMs": std::time::SystemTime::now()
713 .duration_since(std::time::UNIX_EPOCH)
714 .unwrap_or_default().as_millis() as u64,
715 }),
716 );
717 }
718 Ok(())
719 }
720
721 async fn run(&mut self) -> Result<()> {
722 self.on_start().await?;
723 self.state = ActorState::Running;
724 let mut rx = self
725 .mailbox_rx
726 .take()
727 .ok_or_else(|| anyhow::anyhow!("LlmAgent already running"))?;
728 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
729 self.config.heartbeat_interval_secs,
730 ));
731 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
732 loop {
733 tokio::select! {
734 biased;
735 msg = rx.recv() => {
736 match msg {
737 None => break,
738 Some(m) => {
739 self.metrics.record_received();
740 if let wactorz_core::message::MessageType::Command {
741 command: wactorz_core::message::ActorCommand::Stop
742 } = &m.payload {
743 break;
744 }
745 match self.handle_message(m).await {
746 Ok(_) => self.metrics.record_processed(),
747 Err(e) => {
748 tracing::error!("[{}] {e}", self.config.name);
749 self.metrics.record_failed();
750 }
751 }
752 }
753 }
754 }
755 _ = hb.tick() => {
756 self.metrics.record_heartbeat();
757 if let Err(e) = self.on_heartbeat().await {
758 tracing::error!("[{}] heartbeat: {e}", self.config.name);
759 }
760 }
761 }
762 }
763 self.state = ActorState::Stopped;
764 self.on_stop().await
765 }
766}