Coverage for agentos/core/streaming.py: 62%

56 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.20 流式输出系统。 

3支持 SSE (Server-Sent Events) 格式流式传输。 

4""" 

5 

6from __future__ import annotations 

7 

8from dataclasses import dataclass, field 

9from enum import Enum 

10from typing import Any 

11 

12 

13class StreamEvent(str, Enum): 

14 

15 """流式事件。""" 

16 

17 START = "start" 

18 STEP_START = "step_start" 

19 THINKING = "thinking" 

20 TOOL_CALL = "tool_call" 

21 TOOL_RESULT = "tool_result" 

22 TEXT = "text" 

23 ERROR = "error" 

24 COMPLETE = "complete" 

25 CANCELLED = "cancelled" 

26 

27 

28@dataclass 

29class StreamChunk: 

30 """流式输出的单块数据。""" 

31 

32 event: StreamEvent 

33 data: dict[str, Any] = field(default_factory=dict) 

34 timestamp: float = 0.0 # auto-filled by emitter 

35 

36 def to_sse(self) -> str: 

37 import json 

38 payload = { 

39 "event": self.event.value, 

40 **self.data, 

41 "ts": self.timestamp, 

42 } 

43 return f"data: {json.dumps(payload, default=str)}\n\n" 

44 

45 @property 

46 def is_terminal(self) -> bool: 

47 return self.event in (StreamEvent.COMPLETE, StreamEvent.ERROR, StreamEvent.CANCELLED) 

48 

49 

50class StreamEmitter: 

51 """异步SSE发射器。""" 

52 

53 def __init__(self): 

54 import time 

55 self._start = time.time() 

56 

57 def emit(self, event: StreamEvent, **data) -> StreamChunk: 

58 import time 

59 chunk = StreamChunk(event=event, data=data) 

60 chunk.timestamp = (time.time() - self._start) * 1000 

61 return chunk 

62 

63 def thinking(self, text: str) -> StreamChunk: 

64 return self.emit(StreamEvent.THINKING, text=text) 

65 

66 def text(self, text: str) -> StreamChunk: 

67 return self.emit(StreamEvent.TEXT, text=text) 

68 

69 def tool_call(self, name: str, args: dict) -> StreamChunk: 

70 return self.emit(StreamEvent.TOOL_CALL, name=name, arguments=args) 

71 

72 def tool_result(self, name: str, result: str) -> StreamChunk: 

73 return self.emit(StreamEvent.TOOL_RESULT, name=name, result=result) 

74 

75 def error(self, message: str) -> StreamChunk: 

76 return self.emit(StreamEvent.ERROR, error=message) 

77 

78 

79class ResponseCollector: 

80 """收集流式chunk并拼接为最终响应。""" 

81 

82 def __init__(self): 

83 self.chunks: list[StreamChunk] = [] 

84 self._text_buf: list[str] = [] 

85 

86 def feed(self, chunk: StreamChunk): 

87 self.chunks.append(chunk) 

88 if chunk.event == StreamEvent.TEXT: 

89 self._text_buf.append(chunk.data.get("text", "")) 

90 

91 @property 

92 def full_text(self) -> str: 

93 return "".join(self._text_buf)