Coverage for agentos/multimodal/manager.py: 35%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.40 Multimodal — 多模态输入支持。 

3支持:图片理解、语音转文字、PDF/文档解析。 

4""" 

5 

6from __future__ import annotations 

7 

8import base64 

9import json 

10import logging 

11from dataclasses import dataclass, field 

12from enum import Enum 

13from typing import Optional, Any 

14 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class Modality(str, Enum): 

20 

21 """模态类型枚举。""" 

22 

23 TEXT = "text" 

24 IMAGE = "image" 

25 AUDIO = "audio" 

26 VIDEO = "video" 

27 DOCUMENT = "document" 

28 

29 

30@dataclass 

31class MultimodalBlock: 

32 """多模态输入块 — 遵循OpenAI/Anthropic content block格式。""" 

33 type: str # text | image_url | audio | image 

34 text: str = "" 

35 source: dict = field(default_factory=dict) 

36 mime_type: str = "" 

37 

38 @classmethod 

39 def text_block(cls, text: str) -> "MultimodalBlock": 

40 return cls(type="text", text=text) 

41 

42 @classmethod 

43 def image_url(cls, url: str, detail: str = "auto") -> "MultimodalBlock": 

44 return cls(type="image_url", source={"type": "image_url", "image_url": {"url": url, "detail": detail}}) 

45 

46 @classmethod 

47 def image_base64(cls, data: bytes, mime: str = "image/jpeg") -> "MultimodalBlock": 

48 b64 = base64.b64encode(data).decode() 

49 return cls(type="image_url", source={"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}}) 

50 

51 @classmethod 

52 def audio(cls, data: bytes, mime: str = "audio/wav") -> "MultimodalBlock": 

53 b64 = base64.b64encode(data).decode() 

54 return cls(type="audio", mime_type=mime, source={"data": b64}) 

55 

56 def to_openai_format(self) -> dict: 

57 if self.type == "text": 

58 return {"type": "text", "text": self.text} 

59 if self.type == "image_url": 

60 return {"type": "image_url", "image_url": self.source["image_url"]} 

61 return {"type": self.type, **self.source} 

62 

63 

64class ImageProcessor: 

65 """图片处理器 — 压缩、格式转换、OCR预处理。""" 

66 

67 MAX_SIZE = 2048 

68 JPEG_QUALITY = 85 

69 

70 @staticmethod 

71 def encode_file(path: str) -> tuple[str, str]: 

72 """返回(base64, mime_type)。""" 

73 import mimetypes 

74 mime = mimetypes.guess_type(path)[0] or "image/png" 

75 with open(path, "rb") as f: 

76 data = f.read() 

77 return base64.b64encode(data).decode(), mime 

78 

79 @staticmethod 

80 def encode_bytes(data: bytes, mime: str = "image/jpeg") -> str: 

81 return base64.b64encode(data).decode() 

82 

83 @staticmethod 

84 def estimate_tokens(width: int, height: int, detail: str = "auto") -> int: 

85 """估算图片token消耗(OpenAI定价模型)。""" 

86 if detail == "low": 

87 return 85 

88 # high detail 

89 short_side = min(width, height) 

90 scale = min(768 / short_side, 1.0) if short_side > 768 else 1.0 

91 w = int(width * scale) 

92 h = int(height * scale) 

93 tiles = ((w + 511) // 512) * ((h + 511) // 512) 

94 return 85 + 170 * tiles 

95 

96 @staticmethod 

97 def purge_metadata(data: bytes) -> bytes: 

98 """清除图片EXIF元数据。""" 

99 try: 

100 from PIL import Image 

101 import io 

102 img = Image.open(io.BytesIO(data)) 

103 data_no_exif = list(img.getdata()) 

104 cleaned = Image.new(img.mode, img.size) 

105 cleaned.putdata(data_no_exif) 

106 buf = io.BytesIO() 

107 cleaned.save(buf, format=img.format or "PNG") 

108 return buf.getvalue() 

109 except ImportError: 

110 return data 

111 

112 

113class AudioProcessor: 

114 """音频处理器 — 转文字、格式转换。""" 

115 

116 SUPPORTED_FORMATS = ["wav", "mp3", "ogg", "flac", "m4a"] 

117 

118 @staticmethod 

119 def transcribe(path: str, whisper_model: str = "base") -> str: 

120 """使用whisper转文字。""" 

121 try: 

122 import whisper 

123 model = whisper.load_model(whisper_model) 

124 result = model.transcribe(path) 

125 return result["text"] 

126 except ImportError: 

127 logger.warning("whisper not installed, returning empty") 

128 return "[whisper not available]" 

129 

130 @staticmethod 

131 def encode_file(path: str) -> tuple[str, str]: 

132 import mimetypes 

133 mime = mimetypes.guess_type(path)[0] or "audio/wav" 

134 with open(path, "rb") as f: 

135 data = f.read() 

136 return base64.b64encode(data).decode(), mime 

137 

138 

139class DocumentParser: 

140 """文档解析器 — PDF/Word/Markdown。""" 

141 

142 @staticmethod 

143 def parse_pdf(path: str) -> str: 

144 try: 

145 import PyPDF2 

146 text = [] 

147 with open(path, "rb") as f: 

148 reader = PyPDF2.PdfReader(f) 

149 for page in reader.pages: 

150 page_text = page.extract_text() 

151 if page_text: 

152 text.append(page_text) 

153 return "\n\n".join(text) 

154 except ImportError: 

155 logger.warning("PyPDF2 not installed") 

156 return "[PyPDF2 not available]" 

157 

158 @staticmethod 

159 def parse_docx(path: str) -> str: 

160 try: 

161 from docx import Document 

162 doc = Document(path) 

163 return "\n".join(p.text for p in doc.paragraphs if p.text) 

164 except ImportError: 

165 logger.warning("python-docx not installed") 

166 return "[python-docx not available]" 

167 

168 @staticmethod 

169 def parse_auto(path: str) -> tuple[str, str]: 

170 """自动检测文件类型并解析。返回 (content, format)。""" 

171 ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" 

172 if ext == "pdf": 

173 return DocumentParser.parse_pdf(path), "pdf" 

174 elif ext in ("docx", "doc"): 

175 return DocumentParser.parse_docx(path), "docx" 

176 elif ext in ("md", "markdown", "txt"): 

177 with open(path) as f: 

178 return f.read(), ext 

179 else: 

180 try: 

181 with open(path) as f: 

182 return f.read(), "text" 

183 except Exception: 

184 return "", "unknown" 

185 

186 

187class MultimodalManager: 

188 """多模态管理器 — 统一入口。""" 

189 

190 def __init__(self): 

191 self.image = ImageProcessor() 

192 self.audio = AudioProcessor() 

193 self.document = DocumentParser() 

194 

195 def prepare_input(self, blocks: list[MultimodalBlock]) -> list[dict]: 

196 """转换为OpenAI兼容格式。""" 

197 return [b.to_openai_format() for b in blocks] 

198 

199 def from_files(self, paths: list[str]) -> list[MultimodalBlock]: 

200 """从文件路径自动推断模态。""" 

201 blocks = [] 

202 image_exts = {"png", "jpg", "jpeg", "gif", "webp", "bmp"} 

203 audio_exts = {"wav", "mp3", "ogg", "flac", "m4a"} 

204 doc_exts = {"pdf", "docx", "doc", "md", "txt"} 

205 

206 for p in paths: 

207 ext = p.rsplit(".", 1)[-1].lower() if "." in p else "" 

208 try: 

209 if ext in image_exts: 

210 b64, mime = ImageProcessor.encode_file(p) 

211 blocks.append(MultimodalBlock(type="image_url", 

212 source={"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})) 

213 elif ext in audio_exts: 

214 b64, mime = AudioProcessor.encode_file(p) 

215 blocks.append(MultimodalBlock(type="audio", mime_type=mime, source={"data": b64})) 

216 elif ext in doc_exts: 

217 text, fmt = DocumentParser.parse_auto(p) 

218 blocks.append(MultimodalBlock.text_block(text)) 

219 else: 

220 with open(p) as f: 

221 blocks.append(MultimodalBlock.text_block(f.read())) 

222 except Exception as e: 

223 blocks.append(MultimodalBlock.text_block(f"[Error reading {p}: {e}]")) 

224 return blocks 

225 

226 def stats(self) -> dict: 

227 return {"modalities": ["text", "image", "audio", "video", "document"]}