Coverage for agentos/core/streaming.py: 62%
56 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.20 流式输出系统。
3支持 SSE (Server-Sent Events) 格式流式传输。
4"""
6from __future__ import annotations
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Any
13class StreamEvent(str, Enum):
15 """流式事件。"""
17 START = "start"
18 STEP_START = "step_start"
19 THINKING = "thinking"
20 TOOL_CALL = "tool_call"
21 TOOL_RESULT = "tool_result"
22 TEXT = "text"
23 ERROR = "error"
24 COMPLETE = "complete"
25 CANCELLED = "cancelled"
28@dataclass
29class StreamChunk:
30 """流式输出的单块数据。"""
32 event: StreamEvent
33 data: dict[str, Any] = field(default_factory=dict)
34 timestamp: float = 0.0 # auto-filled by emitter
36 def to_sse(self) -> str:
37 import json
38 payload = {
39 "event": self.event.value,
40 **self.data,
41 "ts": self.timestamp,
42 }
43 return f"data: {json.dumps(payload, default=str)}\n\n"
45 @property
46 def is_terminal(self) -> bool:
47 return self.event in (StreamEvent.COMPLETE, StreamEvent.ERROR, StreamEvent.CANCELLED)
50class StreamEmitter:
51 """异步SSE发射器。"""
53 def __init__(self):
54 import time
55 self._start = time.time()
57 def emit(self, event: StreamEvent, **data) -> StreamChunk:
58 import time
59 chunk = StreamChunk(event=event, data=data)
60 chunk.timestamp = (time.time() - self._start) * 1000
61 return chunk
63 def thinking(self, text: str) -> StreamChunk:
64 return self.emit(StreamEvent.THINKING, text=text)
66 def text(self, text: str) -> StreamChunk:
67 return self.emit(StreamEvent.TEXT, text=text)
69 def tool_call(self, name: str, args: dict) -> StreamChunk:
70 return self.emit(StreamEvent.TOOL_CALL, name=name, arguments=args)
72 def tool_result(self, name: str, result: str) -> StreamChunk:
73 return self.emit(StreamEvent.TOOL_RESULT, name=name, result=result)
75 def error(self, message: str) -> StreamChunk:
76 return self.emit(StreamEvent.ERROR, error=message)
79class ResponseCollector:
80 """收集流式chunk并拼接为最终响应。"""
82 def __init__(self):
83 self.chunks: list[StreamChunk] = []
84 self._text_buf: list[str] = []
86 def feed(self, chunk: StreamChunk):
87 self.chunks.append(chunk)
88 if chunk.event == StreamEvent.TEXT:
89 self._text_buf.append(chunk.data.get("text", ""))
91 @property
92 def full_text(self) -> str:
93 return "".join(self._text_buf)