Coverage for agentos/multimodal/provider.py: 0%
180 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.10.0: Multimodal Provider — Vision & Audio abstraction layer.
4Supports:
5- VisionProvider: image→text (base class + OpenAI/VLLM adapters)
6- AudioProvider: TTS + STT (base class + adapters)
7- MultiModalMessage: unified multimodal message format
8"""
10from __future__ import annotations
12import base64
13import io
14from dataclasses import dataclass, field
15from enum import Enum
16from pathlib import Path
17from typing import Any, Optional, Protocol, runtime_checkable
20# ── Enums & Data Classes ──────────────────────────────────────────
22class Modality(str, Enum):
23 TEXT = "text"
24 IMAGE = "image"
25 AUDIO = "audio"
26 VIDEO = "video"
29class ImageFormat(str, Enum):
30 PNG = "png"
31 JPEG = "jpeg"
32 WEBP = "webp"
33 GIF = "gif"
34 SVG = "svg"
37class AudioFormat(str, Enum):
38 MP3 = "mp3"
39 WAV = "wav"
40 OGG = "ogg"
41 FLAC = "flac"
42 AAC = "aac"
45@dataclass
46class MultiModalContent:
47 """A piece of multimodal content."""
48 type: Modality
49 text: str = ""
50 data: bytes = field(default=b"", repr=False)
51 data_url: str = "" # data:image/png;base64,...
52 mime_type: str = ""
53 metadata: dict[str, Any] = field(default_factory=dict)
55 @staticmethod
56 def text(content: str) -> "MultiModalContent":
57 return MultiModalContent(type=Modality.TEXT, text=content)
59 @staticmethod
60 def from_path(path: str | Path) -> "MultiModalContent":
61 path = Path(path)
62 data = path.read_bytes()
63 ext = path.suffix.lower().lstrip(".")
64 fmt_map = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
65 "webp": "image/webp", "gif": "image/gif", "mp3": "audio/mpeg",
66 "wav": "audio/wav", "ogg": "audio/ogg", "flac": "audio/flac"}
67 mime = fmt_map.get(ext, "application/octet-stream")
68 b64 = base64.b64encode(data).decode()
69 modality = Modality.IMAGE if mime.startswith("image/") else (
70 Modality.AUDIO if mime.startswith("audio/") else Modality.TEXT
71 )
72 return MultiModalContent(
73 type=modality, data=data,
74 data_url=f"data:{mime};base64,{b64}",
75 mime_type=mime,
76 )
78 @staticmethod
79 def from_bytes(data: bytes, mime_type: str = "image/png") -> "MultiModalContent":
80 b64 = base64.b64encode(data).decode()
81 modality = Modality.IMAGE if "image" in mime_type else (
82 Modality.AUDIO if "audio" in mime_type else Modality.TEXT
83 )
84 return MultiModalContent(
85 type=modality, data=data,
86 data_url=f"data:{mime_type};base64,{b64}",
87 mime_type=mime_type,
88 )
91@dataclass
92class MultiModalMessage:
93 """A multimodal message with mixed content blocks."""
94 role: str = "user" # system / user / assistant
95 content: list[MultiModalContent] = field(default_factory=list)
96 metadata: dict[str, Any] = field(default_factory=dict)
98 def add_text(self, text: str) -> "MultiModalMessage":
99 self.content.append(MultiModalContent.text(text))
100 return self
102 def add_image_path(self, path: str | Path) -> "MultiModalMessage":
103 self.content.append(MultiModalContent.from_path(path))
104 return self
106 def add_audio_path(self, path: str | Path) -> "MultiModalMessage":
107 self.content.append(MultiModalContent.from_path(path))
108 return self
110 def to_openai_format(self) -> dict[str, Any]:
111 """Convert to OpenAI chat completion message format."""
112 blocks: list[dict[str, Any]] = []
113 for c in self.content:
114 if c.type == Modality.TEXT:
115 blocks.append({"type": "text", "text": c.text})
116 elif c.type == Modality.IMAGE:
117 blocks.append({
118 "type": "image_url",
119 "image_url": {"url": c.data_url, "detail": "auto"},
120 })
121 elif c.type == Modality.AUDIO:
122 blocks.append({
123 "type": "input_audio",
124 "input_audio": {"data": base64.b64encode(c.data).decode(), "format": c.mime_type.split("/")[-1] if c.mime_type else "wav"},
125 })
126 return {"role": self.role, "content": blocks}
128 def to_gemini_format(self) -> dict[str, Any]:
129 """Convert to Gemini API message format."""
130 parts: list[dict[str, Any]] = []
131 for c in self.content:
132 if c.type == Modality.TEXT:
133 parts.append({"text": c.text})
134 elif c.type == Modality.IMAGE:
135 parts.append({
136 "inline_data": {
137 "mime_type": c.mime_type or "image/png",
138 "data": base64.b64encode(c.data).decode(),
139 }
140 })
141 elif c.type == Modality.AUDIO:
142 parts.append({
143 "inline_data": {
144 "mime_type": c.mime_type or "audio/wav",
145 "data": base64.b64encode(c.data).decode(),
146 }
147 })
148 return {"role": "user" if self.role == "user" else "model", "parts": parts}
151# ── Vision Provider ───────────────────────────────────────────────
153@runtime_checkable
154class VisionProvider(Protocol):
155 """Protocol for vision providers (image → text)."""
157 async def describe(self, image: MultiModalContent, prompt: str = "") -> str:
158 """Describe an image. Returns text description."""
159 ...
161 async def ask(self, images: list[MultiModalContent], question: str) -> str:
162 """Ask a question about one or more images."""
163 ...
166class OpenAIVisionProvider:
167 """OpenAI GPT-4V / GPT-4o vision provider."""
169 def __init__(self, api_key: str = "", model: str = "gpt-4o", base_url: str = ""):
170 self.api_key = api_key
171 self.model = model
172 self.base_url = base_url
174 async def describe(self, image: MultiModalContent, prompt: str = "") -> str:
175 return await self.ask([image], prompt or "Describe this image in detail.")
177 async def ask(self, images: list[MultiModalContent], question: str) -> str:
178 import aiohttp
180 message = MultiModalMessage(role="user")
181 for img in images:
182 message.content.append(img)
183 message.add_text(question)
184 body = message.to_openai_format()
186 payload = {
187 "model": self.model,
188 "messages": [
189 {"role": "system", "content": "You are a helpful vision assistant."},
190 body,
191 ],
192 "max_tokens": 1024,
193 }
195 url = f"{self.base_url}/chat/completions" if self.base_url else "https://api.openai.com/v1/chat/completions"
196 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
198 async with aiohttp.ClientSession() as session:
199 async with session.post(url, json=payload, headers=headers) as resp:
200 result = await resp.json()
201 return result["choices"][0]["message"]["content"]
204class LocalVisionProvider:
205 """Local vision provider (placeholder for vLLM/Ollama)."""
207 def __init__(self, endpoint: str = "http://localhost:11434", model: str = "llava"):
208 self.endpoint = endpoint
209 self.model = model
211 async def describe(self, image: MultiModalContent, prompt: str = "") -> str:
212 return await self.ask([image], prompt or "Describe this image.")
214 async def ask(self, images: list[MultiModalContent], question: str) -> str:
215 import aiohttp
217 async with aiohttp.ClientSession() as session:
218 async with session.post(
219 f"{self.endpoint}/api/generate",
220 json={
221 "model": self.model,
222 "prompt": question,
223 "images": [img.data_url.split(",", 1)[1] for img in images if img.data_url],
224 "stream": False,
225 },
226 ) as resp:
227 result = await resp.json()
228 return result.get("response", "")
231# ── Audio Provider ─────────────────────────────────────────────────
233@runtime_checkable
234class AudioProvider(Protocol):
235 """Protocol for audio providers (TTS + STT)."""
237 async def transcribe(self, audio: MultiModalContent, language: str = "") -> str:
238 """Speech-to-text: transcribe audio to text."""
239 ...
241 async def synthesize(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent:
242 """Text-to-speech: generate audio from text."""
243 ...
246class OpenAIAudioProvider:
247 """OpenAI Whisper + TTS audio provider."""
249 def __init__(self, api_key: str = "", tts_model: str = "tts-1", stt_model: str = "whisper-1"):
250 self.api_key = api_key
251 self.tts_model = tts_model
252 self.stt_model = stt_model
254 async def transcribe(self, audio: MultiModalContent, language: str = "") -> str:
255 import aiohttp
257 form = aiohttp.FormData()
258 form.add_field("model", self.stt_model)
259 form.add_field("file", audio.data, filename=f"audio.{audio.mime_type.split('/')[-1] or 'wav'}",
260 content_type=audio.mime_type or "audio/wav")
261 if language:
262 form.add_field("language", language)
264 headers = {"Authorization": f"Bearer {self.api_key}"}
265 async with aiohttp.ClientSession() as session:
266 async with session.post("https://api.openai.com/v1/audio/transcriptions",
267 data=form, headers=headers) as resp:
268 result = await resp.json()
269 return result.get("text", "")
271 async def synthesize(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent:
272 import aiohttp
274 payload = {
275 "model": self.tts_model, "input": text,
276 "voice": voice, "speed": speed,
277 "response_format": "mp3",
278 }
279 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
280 async with aiohttp.ClientSession() as session:
281 async with session.post("https://api.openai.com/v1/audio/speech",
282 json=payload, headers=headers) as resp:
283 audio_data = await resp.read()
284 return MultiModalContent.from_bytes(audio_data, "audio/mpeg")
287class EdgeTTSProvider:
288 """Microsoft Edge TTS (free, local)."""
290 def __init__(self, voice: str = "zh-CN-XiaoxiaoNeural"):
291 self.voice = voice
293 async def synthesize(self, text: str, voice: str = "", speed: float = 1.0) -> MultiModalContent:
294 import edge_tts # type: ignore[import-untyped]
296 voice_name = voice or self.voice
297 rate = f"{int((speed - 1.0) * 100):+d}%"
298 communicate = edge_tts.Communicate(text, voice_name, rate=rate)
299 audio_chunks = []
300 async for chunk in communicate.stream():
301 if chunk["type"] == "audio":
302 audio_chunks.append(chunk["data"])
303 audio_data = b"".join(audio_chunks)
304 return MultiModalContent.from_bytes(audio_data, "audio/mpeg")
307# ── MultiModal Client ─────────────────────────────────────────────
309class MultiModalClient:
310 """Unified multimodal client: vision + audio in one interface."""
312 def __init__(
313 self,
314 vision: VisionProvider | None = None,
315 audio: AudioProvider | None = None,
316 ):
317 self.vision = vision or LocalVisionProvider()
318 self.audio = audio
320 async def see(self, image_path: str | Path, question: str = "What's in this image?") -> str:
321 """Look at an image and answer a question about it."""
322 img = MultiModalContent.from_path(image_path)
323 return await self.vision.ask([img], question)
325 async def hear(self, audio_path: str | Path, language: str = "") -> str:
326 """Transcribe audio to text."""
327 if not self.audio:
328 raise RuntimeError("No audio provider configured")
329 audio = MultiModalContent.from_path(audio_path)
330 return await self.audio.transcribe(audio, language)
332 async def speak(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent:
333 """Generate speech from text."""
334 if not self.audio:
335 raise RuntimeError("No audio provider configured")
336 return await self.audio.synthesize(text, voice, speed)