Coverage for agentos/multimodal/provider.py: 0%

180 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.10.0: Multimodal Provider — Vision & Audio abstraction layer. 

3 

4Supports: 

5- VisionProvider: image→text (base class + OpenAI/VLLM adapters) 

6- AudioProvider: TTS + STT (base class + adapters) 

7- MultiModalMessage: unified multimodal message format 

8""" 

9 

10from __future__ import annotations 

11 

12import base64 

13import io 

14from dataclasses import dataclass, field 

15from enum import Enum 

16from pathlib import Path 

17from typing import Any, Optional, Protocol, runtime_checkable 

18 

19 

20# ── Enums & Data Classes ────────────────────────────────────────── 

21 

22class Modality(str, Enum): 

23 TEXT = "text" 

24 IMAGE = "image" 

25 AUDIO = "audio" 

26 VIDEO = "video" 

27 

28 

29class ImageFormat(str, Enum): 

30 PNG = "png" 

31 JPEG = "jpeg" 

32 WEBP = "webp" 

33 GIF = "gif" 

34 SVG = "svg" 

35 

36 

37class AudioFormat(str, Enum): 

38 MP3 = "mp3" 

39 WAV = "wav" 

40 OGG = "ogg" 

41 FLAC = "flac" 

42 AAC = "aac" 

43 

44 

45@dataclass 

46class MultiModalContent: 

47 """A piece of multimodal content.""" 

48 type: Modality 

49 text: str = "" 

50 data: bytes = field(default=b"", repr=False) 

51 data_url: str = "" # data:image/png;base64,... 

52 mime_type: str = "" 

53 metadata: dict[str, Any] = field(default_factory=dict) 

54 

55 @staticmethod 

56 def text(content: str) -> "MultiModalContent": 

57 return MultiModalContent(type=Modality.TEXT, text=content) 

58 

59 @staticmethod 

60 def from_path(path: str | Path) -> "MultiModalContent": 

61 path = Path(path) 

62 data = path.read_bytes() 

63 ext = path.suffix.lower().lstrip(".") 

64 fmt_map = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", 

65 "webp": "image/webp", "gif": "image/gif", "mp3": "audio/mpeg", 

66 "wav": "audio/wav", "ogg": "audio/ogg", "flac": "audio/flac"} 

67 mime = fmt_map.get(ext, "application/octet-stream") 

68 b64 = base64.b64encode(data).decode() 

69 modality = Modality.IMAGE if mime.startswith("image/") else ( 

70 Modality.AUDIO if mime.startswith("audio/") else Modality.TEXT 

71 ) 

72 return MultiModalContent( 

73 type=modality, data=data, 

74 data_url=f"data:{mime};base64,{b64}", 

75 mime_type=mime, 

76 ) 

77 

78 @staticmethod 

79 def from_bytes(data: bytes, mime_type: str = "image/png") -> "MultiModalContent": 

80 b64 = base64.b64encode(data).decode() 

81 modality = Modality.IMAGE if "image" in mime_type else ( 

82 Modality.AUDIO if "audio" in mime_type else Modality.TEXT 

83 ) 

84 return MultiModalContent( 

85 type=modality, data=data, 

86 data_url=f"data:{mime_type};base64,{b64}", 

87 mime_type=mime_type, 

88 ) 

89 

90 

91@dataclass 

92class MultiModalMessage: 

93 """A multimodal message with mixed content blocks.""" 

94 role: str = "user" # system / user / assistant 

95 content: list[MultiModalContent] = field(default_factory=list) 

96 metadata: dict[str, Any] = field(default_factory=dict) 

97 

98 def add_text(self, text: str) -> "MultiModalMessage": 

99 self.content.append(MultiModalContent.text(text)) 

100 return self 

101 

102 def add_image_path(self, path: str | Path) -> "MultiModalMessage": 

103 self.content.append(MultiModalContent.from_path(path)) 

104 return self 

105 

106 def add_audio_path(self, path: str | Path) -> "MultiModalMessage": 

107 self.content.append(MultiModalContent.from_path(path)) 

108 return self 

109 

110 def to_openai_format(self) -> dict[str, Any]: 

111 """Convert to OpenAI chat completion message format.""" 

112 blocks: list[dict[str, Any]] = [] 

113 for c in self.content: 

114 if c.type == Modality.TEXT: 

115 blocks.append({"type": "text", "text": c.text}) 

116 elif c.type == Modality.IMAGE: 

117 blocks.append({ 

118 "type": "image_url", 

119 "image_url": {"url": c.data_url, "detail": "auto"}, 

120 }) 

121 elif c.type == Modality.AUDIO: 

122 blocks.append({ 

123 "type": "input_audio", 

124 "input_audio": {"data": base64.b64encode(c.data).decode(), "format": c.mime_type.split("/")[-1] if c.mime_type else "wav"}, 

125 }) 

126 return {"role": self.role, "content": blocks} 

127 

128 def to_gemini_format(self) -> dict[str, Any]: 

129 """Convert to Gemini API message format.""" 

130 parts: list[dict[str, Any]] = [] 

131 for c in self.content: 

132 if c.type == Modality.TEXT: 

133 parts.append({"text": c.text}) 

134 elif c.type == Modality.IMAGE: 

135 parts.append({ 

136 "inline_data": { 

137 "mime_type": c.mime_type or "image/png", 

138 "data": base64.b64encode(c.data).decode(), 

139 } 

140 }) 

141 elif c.type == Modality.AUDIO: 

142 parts.append({ 

143 "inline_data": { 

144 "mime_type": c.mime_type or "audio/wav", 

145 "data": base64.b64encode(c.data).decode(), 

146 } 

147 }) 

148 return {"role": "user" if self.role == "user" else "model", "parts": parts} 

149 

150 

151# ── Vision Provider ─────────────────────────────────────────────── 

152 

153@runtime_checkable 

154class VisionProvider(Protocol): 

155 """Protocol for vision providers (image → text).""" 

156 

157 async def describe(self, image: MultiModalContent, prompt: str = "") -> str: 

158 """Describe an image. Returns text description.""" 

159 ... 

160 

161 async def ask(self, images: list[MultiModalContent], question: str) -> str: 

162 """Ask a question about one or more images.""" 

163 ... 

164 

165 

166class OpenAIVisionProvider: 

167 """OpenAI GPT-4V / GPT-4o vision provider.""" 

168 

169 def __init__(self, api_key: str = "", model: str = "gpt-4o", base_url: str = ""): 

170 self.api_key = api_key 

171 self.model = model 

172 self.base_url = base_url 

173 

174 async def describe(self, image: MultiModalContent, prompt: str = "") -> str: 

175 return await self.ask([image], prompt or "Describe this image in detail.") 

176 

177 async def ask(self, images: list[MultiModalContent], question: str) -> str: 

178 import aiohttp 

179 

180 message = MultiModalMessage(role="user") 

181 for img in images: 

182 message.content.append(img) 

183 message.add_text(question) 

184 body = message.to_openai_format() 

185 

186 payload = { 

187 "model": self.model, 

188 "messages": [ 

189 {"role": "system", "content": "You are a helpful vision assistant."}, 

190 body, 

191 ], 

192 "max_tokens": 1024, 

193 } 

194 

195 url = f"{self.base_url}/chat/completions" if self.base_url else "https://api.openai.com/v1/chat/completions" 

196 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} 

197 

198 async with aiohttp.ClientSession() as session: 

199 async with session.post(url, json=payload, headers=headers) as resp: 

200 result = await resp.json() 

201 return result["choices"][0]["message"]["content"] 

202 

203 

204class LocalVisionProvider: 

205 """Local vision provider (placeholder for vLLM/Ollama).""" 

206 

207 def __init__(self, endpoint: str = "http://localhost:11434", model: str = "llava"): 

208 self.endpoint = endpoint 

209 self.model = model 

210 

211 async def describe(self, image: MultiModalContent, prompt: str = "") -> str: 

212 return await self.ask([image], prompt or "Describe this image.") 

213 

214 async def ask(self, images: list[MultiModalContent], question: str) -> str: 

215 import aiohttp 

216 

217 async with aiohttp.ClientSession() as session: 

218 async with session.post( 

219 f"{self.endpoint}/api/generate", 

220 json={ 

221 "model": self.model, 

222 "prompt": question, 

223 "images": [img.data_url.split(",", 1)[1] for img in images if img.data_url], 

224 "stream": False, 

225 }, 

226 ) as resp: 

227 result = await resp.json() 

228 return result.get("response", "") 

229 

230 

231# ── Audio Provider ───────────────────────────────────────────────── 

232 

233@runtime_checkable 

234class AudioProvider(Protocol): 

235 """Protocol for audio providers (TTS + STT).""" 

236 

237 async def transcribe(self, audio: MultiModalContent, language: str = "") -> str: 

238 """Speech-to-text: transcribe audio to text.""" 

239 ... 

240 

241 async def synthesize(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent: 

242 """Text-to-speech: generate audio from text.""" 

243 ... 

244 

245 

246class OpenAIAudioProvider: 

247 """OpenAI Whisper + TTS audio provider.""" 

248 

249 def __init__(self, api_key: str = "", tts_model: str = "tts-1", stt_model: str = "whisper-1"): 

250 self.api_key = api_key 

251 self.tts_model = tts_model 

252 self.stt_model = stt_model 

253 

254 async def transcribe(self, audio: MultiModalContent, language: str = "") -> str: 

255 import aiohttp 

256 

257 form = aiohttp.FormData() 

258 form.add_field("model", self.stt_model) 

259 form.add_field("file", audio.data, filename=f"audio.{audio.mime_type.split('/')[-1] or 'wav'}", 

260 content_type=audio.mime_type or "audio/wav") 

261 if language: 

262 form.add_field("language", language) 

263 

264 headers = {"Authorization": f"Bearer {self.api_key}"} 

265 async with aiohttp.ClientSession() as session: 

266 async with session.post("https://api.openai.com/v1/audio/transcriptions", 

267 data=form, headers=headers) as resp: 

268 result = await resp.json() 

269 return result.get("text", "") 

270 

271 async def synthesize(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent: 

272 import aiohttp 

273 

274 payload = { 

275 "model": self.tts_model, "input": text, 

276 "voice": voice, "speed": speed, 

277 "response_format": "mp3", 

278 } 

279 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} 

280 async with aiohttp.ClientSession() as session: 

281 async with session.post("https://api.openai.com/v1/audio/speech", 

282 json=payload, headers=headers) as resp: 

283 audio_data = await resp.read() 

284 return MultiModalContent.from_bytes(audio_data, "audio/mpeg") 

285 

286 

287class EdgeTTSProvider: 

288 """Microsoft Edge TTS (free, local).""" 

289 

290 def __init__(self, voice: str = "zh-CN-XiaoxiaoNeural"): 

291 self.voice = voice 

292 

293 async def synthesize(self, text: str, voice: str = "", speed: float = 1.0) -> MultiModalContent: 

294 import edge_tts # type: ignore[import-untyped] 

295 

296 voice_name = voice or self.voice 

297 rate = f"{int((speed - 1.0) * 100):+d}%" 

298 communicate = edge_tts.Communicate(text, voice_name, rate=rate) 

299 audio_chunks = [] 

300 async for chunk in communicate.stream(): 

301 if chunk["type"] == "audio": 

302 audio_chunks.append(chunk["data"]) 

303 audio_data = b"".join(audio_chunks) 

304 return MultiModalContent.from_bytes(audio_data, "audio/mpeg") 

305 

306 

307# ── MultiModal Client ───────────────────────────────────────────── 

308 

309class MultiModalClient: 

310 """Unified multimodal client: vision + audio in one interface.""" 

311 

312 def __init__( 

313 self, 

314 vision: VisionProvider | None = None, 

315 audio: AudioProvider | None = None, 

316 ): 

317 self.vision = vision or LocalVisionProvider() 

318 self.audio = audio 

319 

320 async def see(self, image_path: str | Path, question: str = "What's in this image?") -> str: 

321 """Look at an image and answer a question about it.""" 

322 img = MultiModalContent.from_path(image_path) 

323 return await self.vision.ask([img], question) 

324 

325 async def hear(self, audio_path: str | Path, language: str = "") -> str: 

326 """Transcribe audio to text.""" 

327 if not self.audio: 

328 raise RuntimeError("No audio provider configured") 

329 audio = MultiModalContent.from_path(audio_path) 

330 return await self.audio.transcribe(audio, language) 

331 

332 async def speak(self, text: str, voice: str = "alloy", speed: float = 1.0) -> MultiModalContent: 

333 """Generate speech from text.""" 

334 if not self.audio: 

335 raise RuntimeError("No audio provider configured") 

336 return await self.audio.synthesize(text, voice, speed)