Coverage for agentos/multimodal/__init__.py: 29%

375 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.14.3 — Multimodal Context Manager. 

3 

4Unified multimodal context layer for AgentOS agents. Handles images, 

5audio, video, and structured documents as first-class context objects. 

6 

7Features: 

8- Multi-format image processing (PNG, JPEG, GIF, WebP, SVG, HEIC) 

9- Audio transcription & processing (WAV, MP3, FLAC, M4A) 

10- Video keyframe extraction & captioning 

11- PDF/DOCX document text extraction with layout awareness 

12- File type auto-detection (magic bytes) 

13- Image preprocessing pipeline (resize, compress, format convert) 

14- Vision LLM adapter for base64 images 

15- Thumbnail generation 

16- Metadata extraction (EXIF, duration, dimensions) 

17 

18Architecture: 

19 File Input 

20 ├── MediaDetector (magic bytes identification) 

21 ├── MediaProcessor (format-specific pipeline) 

22 │ ├── ImageProcessor (resize/compress/convert/base64) 

23 │ ├── AudioProcessor (transcription via whisper) 

24 │ ├── VideoProcessor (keyframe extraction) 

25 │ └── DocumentProcessor (PDF/DOCX extraction) 

26 └── MediaContext (unified context object) 

27 

28Inspired by: GPT-4V multimodal API, Claude Vision, Gemini 1.5 Pro 

29""" 

30 

31from __future__ import annotations 

32 

33import base64 

34import io 

35import json 

36import mimetypes 

37import os 

38import struct 

39import subprocess 

40import tempfile 

41import uuid 

42from abc import ABC, abstractmethod 

43from dataclasses import dataclass, field 

44from enum import Enum 

45from pathlib import Path 

46from typing import ( 

47 Any, Dict, List, Optional, Tuple, Union, 

48) 

49 

50 

51# ── Types ─────────────────────────────────── 

52 

53 

54class MediaType(str, Enum): 

55 IMAGE = "image" 

56 AUDIO = "audio" 

57 VIDEO = "video" 

58 DOCUMENT = "document" 

59 UNKNOWN = "unknown" 

60 

61 

62class ImageFormat(str, Enum): 

63 PNG = "png" 

64 JPEG = "jpeg" 

65 GIF = "gif" 

66 WEBP = "webp" 

67 SVG = "svg" 

68 BMP = "bmp" 

69 HEIC = "heic" 

70 TIFF = "tiff" 

71 

72 

73@dataclass 

74class MediaMetadata: 

75 """媒体文件元数据。""" 

76 

77 file_path: str = "" 

78 media_type: MediaType = MediaType.UNKNOWN 

79 mime_type: str = "" 

80 file_size_bytes: int = 0 

81 

82 # Image 

83 width: int = 0 

84 height: int = 0 

85 color_mode: str = "" 

86 

87 # Audio/Video 

88 duration_s: float = 0.0 

89 sample_rate: int = 0 

90 channels: int = 0 

91 bitrate_kbps: int = 0 

92 

93 # General 

94 has_alpha: bool = False 

95 page_count: int = 0 

96 extra: Dict[str, Any] = field(default_factory=dict) 

97 

98 def to_dict(self) -> dict: 

99 return { 

100 "file_path": self.file_path, 

101 "media_type": self.media_type.value, 

102 "mime_type": self.mime_type, 

103 "file_size_bytes": self.file_size_bytes, 

104 "width": self.width, 

105 "height": self.height, 

106 "duration_s": self.duration_s, 

107 "page_count": self.page_count, 

108 } 

109 

110 

111@dataclass 

112class MediaContext: 

113 """统一的多模态上下文对象。 

114 

115 This is what gets passed to LLM context windows. 

116 """ 

117 

118 context_id: str = field(default_factory=lambda: f"mctx-{uuid.uuid4().hex[:12]}") 

119 media_type: MediaType = MediaType.UNKNOWN 

120 metadata: MediaMetadata = field(default_factory=MediaMetadata) 

121 

122 # Processed representations 

123 text_description: str = "" # 自然语言描述 

124 base64_data: str = "" # Base64 编码(用于视觉 LLM) 

125 extracted_text: str = "" # OCR/转录文本 

126 thumbnail_path: str = "" # 缩略图路径 

127 

128 # Structured 

129 entities: List[Dict[str, Any]] = field(default_factory=list) 

130 captions: List[str] = field(default_factory=list) 

131 

132 def to_llm_message(self) -> dict: 

133 """转换为 LLM API 消息格式。""" 

134 if self.media_type == MediaType.IMAGE and self.base64_data: 

135 return { 

136 "role": "user", 

137 "content": [ 

138 { 

139 "type": "image_url", 

140 "image_url": { 

141 "url": f"data:{self.metadata.mime_type};base64,{self.base64_data}", 

142 "detail": "auto", 

143 }, 

144 }, 

145 { 

146 "type": "text", 

147 "text": self.text_description or "Describe this image.", 

148 }, 

149 ], 

150 } 

151 return { 

152 "role": "user", 

153 "content": self.text_description or self.extracted_text or "", 

154 } 

155 

156 

157# ── Media Detector ────────────────────────── 

158 

159 

160class MediaDetector: 

161 """通过文件魔数 (magic bytes) 检测媒体类型。 

162 

163 Usage: 

164 detector = MediaDetector() 

165 mt = detector.detect("photo.jpg") # MediaType.IMAGE 

166 """ 

167 

168 # Magic bytes signatures 

169 MAGIC_SIGNATURES = { 

170 b'\xFF\xD8\xFF': (MediaType.IMAGE, ImageFormat.JPEG), 

171 b'\x89PNG\r\n\x1A\n': (MediaType.IMAGE, ImageFormat.PNG), 

172 b'GIF87a': (MediaType.IMAGE, ImageFormat.GIF), 

173 b'GIF89a': (MediaType.IMAGE, ImageFormat.GIF), 

174 b'RIFF': (MediaType.IMAGE, ImageFormat.WEBP), # WEBP is RIFF{size}WEBP 

175 b'\x42\x4D': (MediaType.IMAGE, ImageFormat.BMP), 

176 b'<?xml': (MediaType.IMAGE, ImageFormat.SVG), 

177 b'<svg': (MediaType.IMAGE, ImageFormat.SVG), 

178 b'II*\x00': (MediaType.IMAGE, ImageFormat.TIFF), 

179 b'MM\x00*': (MediaType.IMAGE, ImageFormat.TIFF), 

180 # Audio 

181 b'RIFF': (MediaType.AUDIO, None), # WAV is RIFF 

182 b'ID3': (MediaType.AUDIO, None), # MP3 with ID3 

183 b'\xFF\xFB': (MediaType.AUDIO, None), # MP3 

184 b'\xFF\xF3': (MediaType.AUDIO, None), # MP3 

185 b'fLaC': (MediaType.AUDIO, None), # FLAC 

186 b'OggS': (MediaType.AUDIO, None), # OGG 

187 # Video 

188 b'\x00\x00\x00\x18ftyp': (MediaType.VIDEO, None), # MP4 

189 b'\x00\x00\x00\x20ftyp': (MediaType.VIDEO, None), 

190 b'\x1A\x45\xDF\xA3': (MediaType.VIDEO, None), # WebM/MKV 

191 # Documents 

192 b'%PDF': (MediaType.DOCUMENT, None), 

193 b'PK\x03\x04': (MediaType.DOCUMENT, None), # DOCX/XLSX/PPTX (ZIP) 

194 } 

195 

196 # Audio extensions 

197 AUDIO_EXTENSIONS = {'.mp3', '.wav', '.flac', '.m4a', '.ogg', '.aac', '.wma', '.opus'} 

198 

199 # Video extensions 

200 VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.webm', '.flv', '.m4v', '.3gp'} 

201 

202 # Image extensions 

203 IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.tiff', '.heic', '.ico'} 

204 

205 # Document extensions 

206 DOCUMENT_EXTENSIONS = {'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt', '.txt', '.md', '.html', '.epub'} 

207 

208 @classmethod 

209 def detect(cls, file_path: str) -> MediaType: 

210 """检测文件媒体类型。""" 

211 ext = Path(file_path).suffix.lower() 

212 

213 if ext in cls.IMAGE_EXTENSIONS: 

214 return MediaType.IMAGE 

215 if ext in cls.AUDIO_EXTENSIONS: 

216 return MediaType.AUDIO 

217 if ext in cls.VIDEO_EXTENSIONS: 

218 return MediaType.VIDEO 

219 if ext in cls.DOCUMENT_EXTENSIONS: 

220 return MediaType.DOCUMENT 

221 

222 # Fallback to magic bytes 

223 try: 

224 with open(file_path, 'rb') as f: 

225 header = f.read(32) 

226 except Exception: 

227 return MediaType.UNKNOWN 

228 

229 for magic, (mtype, _) in cls.MAGIC_SIGNATURES.items(): 

230 if header.startswith(magic): 

231 # RIFF ambiguity resolution 

232 if magic == b'RIFF': 

233 if b'WEBP' in header: 

234 return MediaType.IMAGE 

235 if b'WAVE' in header: 

236 return MediaType.AUDIO 

237 return mtype 

238 

239 # MIME type fallback 

240 mime, _ = mimetypes.guess_type(file_path) 

241 if mime: 

242 if mime.startswith('image/'): 

243 return MediaType.IMAGE 

244 if mime.startswith('audio/'): 

245 return MediaType.AUDIO 

246 if mime.startswith('video/'): 

247 return MediaType.VIDEO 

248 

249 return MediaType.UNKNOWN 

250 

251 @classmethod 

252 def batch_detect(cls, file_paths: List[str]) -> Dict[str, MediaType]: 

253 """批量检测。""" 

254 return {fp: cls.detect(fp) for fp in file_paths} 

255 

256 

257# ── Media Processors ──────────────────────── 

258 

259 

260class MediaProcessor(ABC): 

261 """媒体处理器基类。""" 

262 

263 @abstractmethod 

264 def process(self, file_path: str) -> MediaContext: 

265 ... 

266 

267 @abstractmethod 

268 def extract_metadata(self, file_path: str) -> MediaMetadata: 

269 ... 

270 

271 

272class ImageProcessor(MediaProcessor): 

273 """图像处理器。 

274 

275 支持格式转换、缩放、压缩、Base64 编码。 

276 

277 Usage: 

278 processor = ImageProcessor() 

279 ctx = processor.process("photo.jpg") 

280 base64_str = ctx.base64_data # 可直接用于 LLM API 

281 """ 

282 

283 def __init__( 

284 self, 

285 max_size: int = 2048, 

286 quality: int = 85, 

287 output_format: str = "JPEG", 

288 ): 

289 self._max_size = max_size 

290 self._quality = quality 

291 self._output_format = output_format 

292 

293 def process(self, file_path: str) -> MediaContext: 

294 ctx = MediaContext(media_type=MediaType.IMAGE) 

295 ctx.metadata = self.extract_metadata(file_path) 

296 ctx.base64_data = self._encode_base64(file_path) 

297 ctx.text_description = self._generate_description(file_path) 

298 ctx.thumbnail_path = self._generate_thumbnail(file_path) 

299 return ctx 

300 

301 def extract_metadata(self, file_path: str) -> MediaMetadata: 

302 meta = MediaMetadata( 

303 file_path=file_path, 

304 media_type=MediaType.IMAGE, 

305 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream", 

306 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0, 

307 ) 

308 

309 # Try to get dimensions using PIL 

310 try: 

311 from PIL import Image 

312 with Image.open(file_path) as img: 

313 meta.width = img.width 

314 meta.height = img.height 

315 meta.color_mode = img.mode 

316 meta.has_alpha = img.mode in ('RGBA', 'LA', 'PA') 

317 

318 # EXIF extraction 

319 exif = img.getexif() 

320 if exif: 

321 for tag_id, value in exif.items(): 

322 meta.extra[str(tag_id)] = str(value) 

323 except ImportError: 

324 pass 

325 except Exception: 

326 pass 

327 

328 return meta 

329 

330 def _encode_base64(self, file_path: str) -> str: 

331 """将图片编码为 Base64。""" 

332 try: 

333 with open(file_path, 'rb') as f: 

334 return base64.b64encode(f.read()).decode('utf-8') 

335 except Exception: 

336 return "" 

337 

338 def _generate_description(self, file_path: str) -> str: 

339 """生成图片自然语言描述(应由视觉 LLM 生成)。""" 

340 meta = self.extract_metadata(file_path) 

341 return f"Image: {meta.width}x{meta.height}, format: {Path(file_path).suffix}" 

342 

343 def _generate_thumbnail(self, file_path: str) -> str: 

344 """生成缩略图。""" 

345 try: 

346 from PIL import Image 

347 

348 thumb_dir = Path(tempfile.gettempdir()) / "agentos_thumbnails" 

349 thumb_dir.mkdir(exist_ok=True) 

350 

351 thumb_name = f"thumb_{uuid.uuid4().hex[:8]}.jpg" 

352 thumb_path = thumb_dir / thumb_name 

353 

354 with Image.open(file_path) as img: 

355 img.thumbnail((self._max_size, self._max_size)) 

356 img.convert("RGB").save(thumb_path, self._output_format, quality=self._quality) 

357 

358 return str(thumb_path) 

359 except Exception: 

360 return "" 

361 

362 def resize(self, file_path: str, width: int, height: int, output_path: Optional[str] = None) -> str: 

363 """缩放图片。""" 

364 try: 

365 from PIL import Image 

366 

367 out = output_path or str( 

368 Path(tempfile.gettempdir()) / f"resized_{uuid.uuid4().hex[:8]}{Path(file_path).suffix}" 

369 ) 

370 

371 with Image.open(file_path) as img: 

372 img.resize((width, height), Image.LANCZOS).save(out) 

373 

374 return out 

375 except Exception as e: 

376 raise RuntimeError(f"Image resize failed: {e}") 

377 

378 def compress( 

379 self, 

380 file_path: str, 

381 quality: int = 70, 

382 output_path: Optional[str] = None, 

383 ) -> str: 

384 """压缩图片。""" 

385 try: 

386 from PIL import Image 

387 

388 out = output_path or str( 

389 Path(tempfile.gettempdir()) / f"compressed_{uuid.uuid4().hex[:8]}.jpg" 

390 ) 

391 

392 with Image.open(file_path) as img: 

393 img.convert("RGB").save(out, "JPEG", quality=quality, optimize=True) 

394 

395 return out 

396 except Exception as e: 

397 raise RuntimeError(f"Image compression failed: {e}") 

398 

399 def convert_format(self, file_path: str, target_format: str, output_path: Optional[str] = None) -> str: 

400 """转换图片格式。""" 

401 try: 

402 from PIL import Image 

403 

404 fmt = target_format.upper().replace('.', '') 

405 ext = f".{target_format.lower().lstrip('.')}" 

406 out = output_path or str( 

407 Path(tempfile.gettempdir()) / f"converted_{uuid.uuid4().hex[:8]}{ext}" 

408 ) 

409 

410 with Image.open(file_path) as img: 

411 img.save(out, fmt) 

412 

413 return out 

414 except Exception as e: 

415 raise RuntimeError(f"Format conversion failed: {e}") 

416 

417 

418class AudioProcessor(MediaProcessor): 

419 """音频处理器。 

420 

421 支持转录(需 whisper)、格式转换、元数据提取。 

422 

423 Usage: 

424 processor = AudioProcessor() 

425 ctx = processor.process("recording.mp3") 

426 print(ctx.extracted_text) # 转录文本 

427 """ 

428 

429 def __init__(self, transcription_model: str = "base"): 

430 self._model = transcription_model 

431 

432 def process(self, file_path: str) -> MediaContext: 

433 ctx = MediaContext(media_type=MediaType.AUDIO) 

434 ctx.metadata = self.extract_metadata(file_path) 

435 ctx.extracted_text = self._transcribe(file_path) 

436 return ctx 

437 

438 def extract_metadata(self, file_path: str) -> MediaMetadata: 

439 meta = MediaMetadata( 

440 file_path=file_path, 

441 media_type=MediaType.AUDIO, 

442 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream", 

443 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0, 

444 ) 

445 

446 # Extract with ffprobe if available 

447 try: 

448 result = subprocess.run( 

449 ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path], 

450 capture_output=True, text=True, timeout=10, 

451 ) 

452 if result.returncode == 0: 

453 info = json.loads(result.stdout) 

454 fmt = info.get("format", {}) 

455 meta.duration_s = float(fmt.get("duration", 0)) 

456 meta.bitrate_kbps = int(int(fmt.get("bit_rate", 0)) / 1000) 

457 

458 for stream in info.get("streams", []): 

459 if stream.get("codec_type") == "audio": 

460 meta.sample_rate = int(stream.get("sample_rate", 0)) 

461 meta.channels = int(stream.get("channels", 0)) 

462 break 

463 except Exception: 

464 pass 

465 

466 return meta 

467 

468 def _transcribe(self, file_path: str) -> str: 

469 """音频转录。""" 

470 try: 

471 import whisper 

472 model = whisper.load_model(self._model) 

473 result = model.transcribe(file_path) 

474 return result["text"] 

475 except ImportError: 

476 return "[Transcription requires: pip install openai-whisper]" 

477 except Exception as e: 

478 return f"[Transcription error: {e}]" 

479 

480 

481class VideoProcessor(MediaProcessor): 

482 """视频处理器。 

483 

484 提取关键帧、生成描述。 

485 

486 Usage: 

487 processor = VideoProcessor() 

488 ctx = processor.process("demo.mp4") 

489 for caption in ctx.captions: 

490 print(caption) 

491 """ 

492 

493 def __init__(self, keyframe_interval_s: float = 2.0, max_keyframes: int = 10): 

494 self._keyframe_interval = keyframe_interval_s 

495 self._max_keyframes = max_keyframes 

496 self._image_processor = ImageProcessor() 

497 

498 def process(self, file_path: str) -> MediaContext: 

499 ctx = MediaContext(media_type=MediaType.VIDEO) 

500 ctx.metadata = self.extract_metadata(file_path) 

501 ctx.captions = self._extract_keyframes(file_path) 

502 return ctx 

503 

504 def extract_metadata(self, file_path: str) -> MediaMetadata: 

505 meta = MediaMetadata( 

506 file_path=file_path, 

507 media_type=MediaType.VIDEO, 

508 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream", 

509 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0, 

510 ) 

511 

512 try: 

513 result = subprocess.run( 

514 ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path], 

515 capture_output=True, text=True, timeout=10, 

516 ) 

517 if result.returncode == 0: 

518 info = json.loads(result.stdout) 

519 fmt = info.get("format", {}) 

520 meta.duration_s = float(fmt.get("duration", 0)) 

521 meta.bitrate_kbps = int(int(fmt.get("bit_rate", 0)) / 1000) 

522 

523 for stream in info.get("streams", []): 

524 if stream.get("codec_type") == "video": 

525 meta.width = int(stream.get("width", 0)) 

526 meta.height = int(stream.get("height", 0)) 

527 break 

528 except Exception: 

529 pass 

530 

531 return meta 

532 

533 def _extract_keyframes(self, file_path: str) -> List[str]: 

534 """提取视频关键帧。""" 

535 captions = [] 

536 meta = self.extract_metadata(file_path) 

537 duration = meta.duration_s 

538 

539 if duration == 0: 

540 return captions 

541 

542 num_frames = min( 

543 int(duration / self._keyframe_interval), 

544 self._max_keyframes, 

545 ) 

546 

547 thumb_dir = Path(tempfile.gettempdir()) / "agentos_video_frames" 

548 thumb_dir.mkdir(exist_ok=True) 

549 

550 for i in range(num_frames): 

551 timestamp = i * self._keyframe_interval 

552 frame_path = thumb_dir / f"frame_{uuid.uuid4().hex[:8]}.jpg" 

553 

554 try: 

555 subprocess.run( 

556 [ 

557 "ffmpeg", "-y", "-loglevel", "quiet", 

558 "-ss", str(timestamp), 

559 "-i", file_path, 

560 "-vframes", "1", 

561 "-q:v", "2", 

562 str(frame_path), 

563 ], 

564 timeout=30, 

565 check=True, 

566 ) 

567 

568 if frame_path.exists(): 

569 # Encode frame as base64 

570 ctx = self._image_processor.process(str(frame_path)) 

571 captions.append( 

572 f"[{self._format_time(timestamp)}] {ctx.text_description} " 

573 f"base64:{ctx.base64_data[:50]}..." 

574 ) 

575 # Cleanup frame file 

576 frame_path.unlink(missing_ok=True) 

577 except Exception: 

578 pass 

579 

580 return captions 

581 

582 @staticmethod 

583 def _format_time(seconds: float) -> str: 

584 m, s = divmod(int(seconds), 60) 

585 h, m = divmod(m, 60) 

586 if h: 

587 return f"{h}:{m:02d}:{s:02d}" 

588 return f"{m}:{s:02d}" 

589 

590 

591class DocumentProcessor(MediaProcessor): 

592 """文档处理器。 

593 

594 从 PDF/DOCX 等文档中提取文本。 

595 

596 Usage: 

597 processor = DocumentProcessor() 

598 ctx = processor.process("report.pdf") 

599 print(ctx.extracted_text[:500]) 

600 """ 

601 

602 def process(self, file_path: str) -> MediaContext: 

603 ctx = MediaContext(media_type=MediaType.DOCUMENT) 

604 ctx.metadata = self.extract_metadata(file_path) 

605 ctx.extracted_text = self._extract_text(file_path) 

606 return ctx 

607 

608 def extract_metadata(self, file_path: str) -> MediaMetadata: 

609 return MediaMetadata( 

610 file_path=file_path, 

611 media_type=MediaType.DOCUMENT, 

612 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream", 

613 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0, 

614 ) 

615 

616 def _extract_text(self, file_path: str) -> str: 

617 """提取文档文本。""" 

618 ext = Path(file_path).suffix.lower() 

619 

620 if ext == '.pdf': 

621 return self._extract_pdf(file_path) 

622 elif ext in ('.docx', '.doc'): 

623 return self._extract_docx(file_path) 

624 elif ext in ('.txt', '.md', '.py', '.json', '.yaml', '.xml', '.html', '.csv'): 

625 try: 

626 return Path(file_path).read_text(encoding='utf-8') 

627 except Exception: 

628 return Path(file_path).read_text(encoding='latin-1') 

629 else: 

630 return f"[Unsupported document format: {ext}]" 

631 

632 def _extract_pdf(self, file_path: str) -> str: 

633 """从 PDF 中提取文本。""" 

634 try: 

635 import fitz # PyMuPDF 

636 doc = fitz.open(file_path) 

637 text_parts = [] 

638 for page_num in range(len(doc)): 

639 page = doc[page_num] 

640 text = page.get_text() 

641 if text.strip(): 

642 text_parts.append(f"--- Page {page_num + 1} ---\n{text}") 

643 doc.close() 

644 return "\n\n".join(text_parts) if text_parts else "[No extractable text in PDF]" 

645 except ImportError: 

646 try: 

647 result = subprocess.run( 

648 ["pdftotext", file_path, "-"], 

649 capture_output=True, text=True, timeout=30, 

650 ) 

651 if result.returncode == 0: 

652 return result.stdout 

653 except Exception: 

654 pass 

655 return "[PDF extraction requires: pip install PyMuPDF]" 

656 except Exception as e: 

657 return f"[PDF extraction error: {e}]" 

658 

659 def _extract_docx(self, file_path: str) -> str: 

660 """从 DOCX 中提取文本。""" 

661 try: 

662 from docx import Document 

663 doc = Document(file_path) 

664 paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] 

665 return "\n\n".join(paragraphs) if paragraphs else "[No text in document]" 

666 except ImportError: 

667 return "[DOCX extraction requires: pip install python-docx]" 

668 except Exception as e: 

669 return f"[DOCX extraction error: {e}]" 

670 

671 

672# ── Multimodal Context Manager ────────────── 

673 

674 

675class MultimodalContextManager: 

676 """多模态上下文管理器。 

677 

678 统一入口:接收文件路径,返回 MediaContext。 

679 

680 Usage: 

681 mgr = MultimodalContextManager() 

682 ctx = mgr.load("photo.jpg") 

683 message = ctx.to_llm_message() 

684 """ 

685 

686 def __init__(self): 

687 self._detector = MediaDetector() 

688 self._processors: Dict[MediaType, MediaProcessor] = { 

689 MediaType.IMAGE: ImageProcessor(), 

690 MediaType.AUDIO: AudioProcessor(), 

691 MediaType.VIDEO: VideoProcessor(), 

692 MediaType.DOCUMENT: DocumentProcessor(), 

693 } 

694 

695 def load(self, file_path: str) -> MediaContext: 

696 """加载并处理单个媒体文件。""" 

697 mtype = self._detector.detect(file_path) 

698 processor = self._processors.get(mtype) 

699 

700 if not processor: 

701 ctx = MediaContext(media_type=MediaType.UNKNOWN) 

702 ctx.metadata = MediaMetadata(file_path=file_path, media_type=MediaType.UNKNOWN) 

703 ctx.extracted_text = f"[Unsupported media type: {mtype}]" 

704 return ctx 

705 

706 return processor.process(file_path) 

707 

708 def load_batch(self, file_paths: List[str]) -> List[MediaContext]: 

709 """批量加载。""" 

710 return [self.load(fp) for fp in file_paths] 

711 

712 def load_as_message(self, file_path: str) -> dict: 

713 """加载并转换为 LLM 消息格式。""" 

714 return self.load(file_path).to_llm_message() 

715 

716 def load_batch_as_messages(self, file_paths: List[str]) -> List[dict]: 

717 """批量加载为 LLM 消息。""" 

718 return [self.load_as_message(fp) for fp in file_paths] 

719 

720 def register_processor(self, media_type: MediaType, processor: MediaProcessor) -> None: 

721 """注册自定义处理器。""" 

722 self._processors[media_type] = processor 

723 

724 def analyze_directory(self, directory: str) -> Dict[str, List[str]]: 

725 """分析目录中的媒体文件分布。""" 

726 result: Dict[str, List[str]] = { 

727 "images": [], 

728 "audio": [], 

729 "video": [], 

730 "documents": [], 

731 "unknown": [], 

732 } 

733 

734 dir_path = Path(directory) 

735 if not dir_path.exists(): 

736 return result 

737 

738 for file_path in dir_path.rglob("*"): 

739 if not file_path.is_file(): 

740 continue 

741 

742 mtype = self._detector.detect(str(file_path)) 

743 

744 if mtype == MediaType.IMAGE: 

745 result["images"].append(str(file_path)) 

746 elif mtype == MediaType.AUDIO: 

747 result["audio"].append(str(file_path)) 

748 elif mtype == MediaType.VIDEO: 

749 result["video"].append(str(file_path)) 

750 elif mtype == MediaType.DOCUMENT: 

751 result["documents"].append(str(file_path)) 

752 else: 

753 result["unknown"].append(str(file_path)) 

754 

755 return result 

756 

757 

758# ── Quick Start ───────────────────────────── 

759 

760 

761def create_multimodal_manager() -> MultimodalContextManager: 

762 """快速创建多模态上下文管理器。""" 

763 return MultimodalContextManager() 

764 

765 

766def quick_load(file_path: str) -> MediaContext: 

767 """快速加载单个文件。""" 

768 return MultimodalContextManager().load(file_path) 

769 

770 

771# ── Compatibility aliases (required by agentos/__init__.py) ── 

772 

773MultimodalManager = MultimodalContextManager 

774Modality = MediaType