Coverage for agentos/multimodal/manager.py: 35%
167 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.40 Multimodal — 多模态输入支持。
3支持:图片理解、语音转文字、PDF/文档解析。
4"""
6from __future__ import annotations
8import base64
9import json
10import logging
11from dataclasses import dataclass, field
12from enum import Enum
13from typing import Optional, Any
16logger = logging.getLogger(__name__)
19class Modality(str, Enum):
21 """模态类型枚举。"""
23 TEXT = "text"
24 IMAGE = "image"
25 AUDIO = "audio"
26 VIDEO = "video"
27 DOCUMENT = "document"
30@dataclass
31class MultimodalBlock:
32 """多模态输入块 — 遵循OpenAI/Anthropic content block格式。"""
33 type: str # text | image_url | audio | image
34 text: str = ""
35 source: dict = field(default_factory=dict)
36 mime_type: str = ""
38 @classmethod
39 def text_block(cls, text: str) -> "MultimodalBlock":
40 return cls(type="text", text=text)
42 @classmethod
43 def image_url(cls, url: str, detail: str = "auto") -> "MultimodalBlock":
44 return cls(type="image_url", source={"type": "image_url", "image_url": {"url": url, "detail": detail}})
46 @classmethod
47 def image_base64(cls, data: bytes, mime: str = "image/jpeg") -> "MultimodalBlock":
48 b64 = base64.b64encode(data).decode()
49 return cls(type="image_url", source={"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
51 @classmethod
52 def audio(cls, data: bytes, mime: str = "audio/wav") -> "MultimodalBlock":
53 b64 = base64.b64encode(data).decode()
54 return cls(type="audio", mime_type=mime, source={"data": b64})
56 def to_openai_format(self) -> dict:
57 if self.type == "text":
58 return {"type": "text", "text": self.text}
59 if self.type == "image_url":
60 return {"type": "image_url", "image_url": self.source["image_url"]}
61 return {"type": self.type, **self.source}
64class ImageProcessor:
65 """图片处理器 — 压缩、格式转换、OCR预处理。"""
67 MAX_SIZE = 2048
68 JPEG_QUALITY = 85
70 @staticmethod
71 def encode_file(path: str) -> tuple[str, str]:
72 """返回(base64, mime_type)。"""
73 import mimetypes
74 mime = mimetypes.guess_type(path)[0] or "image/png"
75 with open(path, "rb") as f:
76 data = f.read()
77 return base64.b64encode(data).decode(), mime
79 @staticmethod
80 def encode_bytes(data: bytes, mime: str = "image/jpeg") -> str:
81 return base64.b64encode(data).decode()
83 @staticmethod
84 def estimate_tokens(width: int, height: int, detail: str = "auto") -> int:
85 """估算图片token消耗(OpenAI定价模型)。"""
86 if detail == "low":
87 return 85
88 # high detail
89 short_side = min(width, height)
90 scale = min(768 / short_side, 1.0) if short_side > 768 else 1.0
91 w = int(width * scale)
92 h = int(height * scale)
93 tiles = ((w + 511) // 512) * ((h + 511) // 512)
94 return 85 + 170 * tiles
96 @staticmethod
97 def purge_metadata(data: bytes) -> bytes:
98 """清除图片EXIF元数据。"""
99 try:
100 from PIL import Image
101 import io
102 img = Image.open(io.BytesIO(data))
103 data_no_exif = list(img.getdata())
104 cleaned = Image.new(img.mode, img.size)
105 cleaned.putdata(data_no_exif)
106 buf = io.BytesIO()
107 cleaned.save(buf, format=img.format or "PNG")
108 return buf.getvalue()
109 except ImportError:
110 return data
113class AudioProcessor:
114 """音频处理器 — 转文字、格式转换。"""
116 SUPPORTED_FORMATS = ["wav", "mp3", "ogg", "flac", "m4a"]
118 @staticmethod
119 def transcribe(path: str, whisper_model: str = "base") -> str:
120 """使用whisper转文字。"""
121 try:
122 import whisper
123 model = whisper.load_model(whisper_model)
124 result = model.transcribe(path)
125 return result["text"]
126 except ImportError:
127 logger.warning("whisper not installed, returning empty")
128 return "[whisper not available]"
130 @staticmethod
131 def encode_file(path: str) -> tuple[str, str]:
132 import mimetypes
133 mime = mimetypes.guess_type(path)[0] or "audio/wav"
134 with open(path, "rb") as f:
135 data = f.read()
136 return base64.b64encode(data).decode(), mime
139class DocumentParser:
140 """文档解析器 — PDF/Word/Markdown。"""
142 @staticmethod
143 def parse_pdf(path: str) -> str:
144 try:
145 import PyPDF2
146 text = []
147 with open(path, "rb") as f:
148 reader = PyPDF2.PdfReader(f)
149 for page in reader.pages:
150 page_text = page.extract_text()
151 if page_text:
152 text.append(page_text)
153 return "\n\n".join(text)
154 except ImportError:
155 logger.warning("PyPDF2 not installed")
156 return "[PyPDF2 not available]"
158 @staticmethod
159 def parse_docx(path: str) -> str:
160 try:
161 from docx import Document
162 doc = Document(path)
163 return "\n".join(p.text for p in doc.paragraphs if p.text)
164 except ImportError:
165 logger.warning("python-docx not installed")
166 return "[python-docx not available]"
168 @staticmethod
169 def parse_auto(path: str) -> tuple[str, str]:
170 """自动检测文件类型并解析。返回 (content, format)。"""
171 ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
172 if ext == "pdf":
173 return DocumentParser.parse_pdf(path), "pdf"
174 elif ext in ("docx", "doc"):
175 return DocumentParser.parse_docx(path), "docx"
176 elif ext in ("md", "markdown", "txt"):
177 with open(path) as f:
178 return f.read(), ext
179 else:
180 try:
181 with open(path) as f:
182 return f.read(), "text"
183 except Exception:
184 return "", "unknown"
187class MultimodalManager:
188 """多模态管理器 — 统一入口。"""
190 def __init__(self):
191 self.image = ImageProcessor()
192 self.audio = AudioProcessor()
193 self.document = DocumentParser()
195 def prepare_input(self, blocks: list[MultimodalBlock]) -> list[dict]:
196 """转换为OpenAI兼容格式。"""
197 return [b.to_openai_format() for b in blocks]
199 def from_files(self, paths: list[str]) -> list[MultimodalBlock]:
200 """从文件路径自动推断模态。"""
201 blocks = []
202 image_exts = {"png", "jpg", "jpeg", "gif", "webp", "bmp"}
203 audio_exts = {"wav", "mp3", "ogg", "flac", "m4a"}
204 doc_exts = {"pdf", "docx", "doc", "md", "txt"}
206 for p in paths:
207 ext = p.rsplit(".", 1)[-1].lower() if "." in p else ""
208 try:
209 if ext in image_exts:
210 b64, mime = ImageProcessor.encode_file(p)
211 blocks.append(MultimodalBlock(type="image_url",
212 source={"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}}))
213 elif ext in audio_exts:
214 b64, mime = AudioProcessor.encode_file(p)
215 blocks.append(MultimodalBlock(type="audio", mime_type=mime, source={"data": b64}))
216 elif ext in doc_exts:
217 text, fmt = DocumentParser.parse_auto(p)
218 blocks.append(MultimodalBlock.text_block(text))
219 else:
220 with open(p) as f:
221 blocks.append(MultimodalBlock.text_block(f.read()))
222 except Exception as e:
223 blocks.append(MultimodalBlock.text_block(f"[Error reading {p}: {e}]"))
224 return blocks
226 def stats(self) -> dict:
227 return {"modalities": ["text", "image", "audio", "video", "document"]}