Coverage for agentos/multimodal/__init__.py: 29%
375 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.14.3 — Multimodal Context Manager.
4Unified multimodal context layer for AgentOS agents. Handles images,
5audio, video, and structured documents as first-class context objects.
7Features:
8- Multi-format image processing (PNG, JPEG, GIF, WebP, SVG, HEIC)
9- Audio transcription & processing (WAV, MP3, FLAC, M4A)
10- Video keyframe extraction & captioning
11- PDF/DOCX document text extraction with layout awareness
12- File type auto-detection (magic bytes)
13- Image preprocessing pipeline (resize, compress, format convert)
14- Vision LLM adapter for base64 images
15- Thumbnail generation
16- Metadata extraction (EXIF, duration, dimensions)
18Architecture:
19 File Input
20 ├── MediaDetector (magic bytes identification)
21 ├── MediaProcessor (format-specific pipeline)
22 │ ├── ImageProcessor (resize/compress/convert/base64)
23 │ ├── AudioProcessor (transcription via whisper)
24 │ ├── VideoProcessor (keyframe extraction)
25 │ └── DocumentProcessor (PDF/DOCX extraction)
26 └── MediaContext (unified context object)
28Inspired by: GPT-4V multimodal API, Claude Vision, Gemini 1.5 Pro
29"""
31from __future__ import annotations
33import base64
34import io
35import json
36import mimetypes
37import os
38import struct
39import subprocess
40import tempfile
41import uuid
42from abc import ABC, abstractmethod
43from dataclasses import dataclass, field
44from enum import Enum
45from pathlib import Path
46from typing import (
47 Any, Dict, List, Optional, Tuple, Union,
48)
51# ── Types ───────────────────────────────────
54class MediaType(str, Enum):
55 IMAGE = "image"
56 AUDIO = "audio"
57 VIDEO = "video"
58 DOCUMENT = "document"
59 UNKNOWN = "unknown"
62class ImageFormat(str, Enum):
63 PNG = "png"
64 JPEG = "jpeg"
65 GIF = "gif"
66 WEBP = "webp"
67 SVG = "svg"
68 BMP = "bmp"
69 HEIC = "heic"
70 TIFF = "tiff"
73@dataclass
74class MediaMetadata:
75 """媒体文件元数据。"""
77 file_path: str = ""
78 media_type: MediaType = MediaType.UNKNOWN
79 mime_type: str = ""
80 file_size_bytes: int = 0
82 # Image
83 width: int = 0
84 height: int = 0
85 color_mode: str = ""
87 # Audio/Video
88 duration_s: float = 0.0
89 sample_rate: int = 0
90 channels: int = 0
91 bitrate_kbps: int = 0
93 # General
94 has_alpha: bool = False
95 page_count: int = 0
96 extra: Dict[str, Any] = field(default_factory=dict)
98 def to_dict(self) -> dict:
99 return {
100 "file_path": self.file_path,
101 "media_type": self.media_type.value,
102 "mime_type": self.mime_type,
103 "file_size_bytes": self.file_size_bytes,
104 "width": self.width,
105 "height": self.height,
106 "duration_s": self.duration_s,
107 "page_count": self.page_count,
108 }
111@dataclass
112class MediaContext:
113 """统一的多模态上下文对象。
115 This is what gets passed to LLM context windows.
116 """
118 context_id: str = field(default_factory=lambda: f"mctx-{uuid.uuid4().hex[:12]}")
119 media_type: MediaType = MediaType.UNKNOWN
120 metadata: MediaMetadata = field(default_factory=MediaMetadata)
122 # Processed representations
123 text_description: str = "" # 自然语言描述
124 base64_data: str = "" # Base64 编码(用于视觉 LLM)
125 extracted_text: str = "" # OCR/转录文本
126 thumbnail_path: str = "" # 缩略图路径
128 # Structured
129 entities: List[Dict[str, Any]] = field(default_factory=list)
130 captions: List[str] = field(default_factory=list)
132 def to_llm_message(self) -> dict:
133 """转换为 LLM API 消息格式。"""
134 if self.media_type == MediaType.IMAGE and self.base64_data:
135 return {
136 "role": "user",
137 "content": [
138 {
139 "type": "image_url",
140 "image_url": {
141 "url": f"data:{self.metadata.mime_type};base64,{self.base64_data}",
142 "detail": "auto",
143 },
144 },
145 {
146 "type": "text",
147 "text": self.text_description or "Describe this image.",
148 },
149 ],
150 }
151 return {
152 "role": "user",
153 "content": self.text_description or self.extracted_text or "",
154 }
157# ── Media Detector ──────────────────────────
160class MediaDetector:
161 """通过文件魔数 (magic bytes) 检测媒体类型。
163 Usage:
164 detector = MediaDetector()
165 mt = detector.detect("photo.jpg") # MediaType.IMAGE
166 """
168 # Magic bytes signatures
169 MAGIC_SIGNATURES = {
170 b'\xFF\xD8\xFF': (MediaType.IMAGE, ImageFormat.JPEG),
171 b'\x89PNG\r\n\x1A\n': (MediaType.IMAGE, ImageFormat.PNG),
172 b'GIF87a': (MediaType.IMAGE, ImageFormat.GIF),
173 b'GIF89a': (MediaType.IMAGE, ImageFormat.GIF),
174 b'RIFF': (MediaType.IMAGE, ImageFormat.WEBP), # WEBP is RIFF{size}WEBP
175 b'\x42\x4D': (MediaType.IMAGE, ImageFormat.BMP),
176 b'<?xml': (MediaType.IMAGE, ImageFormat.SVG),
177 b'<svg': (MediaType.IMAGE, ImageFormat.SVG),
178 b'II*\x00': (MediaType.IMAGE, ImageFormat.TIFF),
179 b'MM\x00*': (MediaType.IMAGE, ImageFormat.TIFF),
180 # Audio
181 b'RIFF': (MediaType.AUDIO, None), # WAV is RIFF
182 b'ID3': (MediaType.AUDIO, None), # MP3 with ID3
183 b'\xFF\xFB': (MediaType.AUDIO, None), # MP3
184 b'\xFF\xF3': (MediaType.AUDIO, None), # MP3
185 b'fLaC': (MediaType.AUDIO, None), # FLAC
186 b'OggS': (MediaType.AUDIO, None), # OGG
187 # Video
188 b'\x00\x00\x00\x18ftyp': (MediaType.VIDEO, None), # MP4
189 b'\x00\x00\x00\x20ftyp': (MediaType.VIDEO, None),
190 b'\x1A\x45\xDF\xA3': (MediaType.VIDEO, None), # WebM/MKV
191 # Documents
192 b'%PDF': (MediaType.DOCUMENT, None),
193 b'PK\x03\x04': (MediaType.DOCUMENT, None), # DOCX/XLSX/PPTX (ZIP)
194 }
196 # Audio extensions
197 AUDIO_EXTENSIONS = {'.mp3', '.wav', '.flac', '.m4a', '.ogg', '.aac', '.wma', '.opus'}
199 # Video extensions
200 VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.webm', '.flv', '.m4v', '.3gp'}
202 # Image extensions
203 IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.tiff', '.heic', '.ico'}
205 # Document extensions
206 DOCUMENT_EXTENSIONS = {'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt', '.txt', '.md', '.html', '.epub'}
208 @classmethod
209 def detect(cls, file_path: str) -> MediaType:
210 """检测文件媒体类型。"""
211 ext = Path(file_path).suffix.lower()
213 if ext in cls.IMAGE_EXTENSIONS:
214 return MediaType.IMAGE
215 if ext in cls.AUDIO_EXTENSIONS:
216 return MediaType.AUDIO
217 if ext in cls.VIDEO_EXTENSIONS:
218 return MediaType.VIDEO
219 if ext in cls.DOCUMENT_EXTENSIONS:
220 return MediaType.DOCUMENT
222 # Fallback to magic bytes
223 try:
224 with open(file_path, 'rb') as f:
225 header = f.read(32)
226 except Exception:
227 return MediaType.UNKNOWN
229 for magic, (mtype, _) in cls.MAGIC_SIGNATURES.items():
230 if header.startswith(magic):
231 # RIFF ambiguity resolution
232 if magic == b'RIFF':
233 if b'WEBP' in header:
234 return MediaType.IMAGE
235 if b'WAVE' in header:
236 return MediaType.AUDIO
237 return mtype
239 # MIME type fallback
240 mime, _ = mimetypes.guess_type(file_path)
241 if mime:
242 if mime.startswith('image/'):
243 return MediaType.IMAGE
244 if mime.startswith('audio/'):
245 return MediaType.AUDIO
246 if mime.startswith('video/'):
247 return MediaType.VIDEO
249 return MediaType.UNKNOWN
251 @classmethod
252 def batch_detect(cls, file_paths: List[str]) -> Dict[str, MediaType]:
253 """批量检测。"""
254 return {fp: cls.detect(fp) for fp in file_paths}
257# ── Media Processors ────────────────────────
260class MediaProcessor(ABC):
261 """媒体处理器基类。"""
263 @abstractmethod
264 def process(self, file_path: str) -> MediaContext:
265 ...
267 @abstractmethod
268 def extract_metadata(self, file_path: str) -> MediaMetadata:
269 ...
272class ImageProcessor(MediaProcessor):
273 """图像处理器。
275 支持格式转换、缩放、压缩、Base64 编码。
277 Usage:
278 processor = ImageProcessor()
279 ctx = processor.process("photo.jpg")
280 base64_str = ctx.base64_data # 可直接用于 LLM API
281 """
283 def __init__(
284 self,
285 max_size: int = 2048,
286 quality: int = 85,
287 output_format: str = "JPEG",
288 ):
289 self._max_size = max_size
290 self._quality = quality
291 self._output_format = output_format
293 def process(self, file_path: str) -> MediaContext:
294 ctx = MediaContext(media_type=MediaType.IMAGE)
295 ctx.metadata = self.extract_metadata(file_path)
296 ctx.base64_data = self._encode_base64(file_path)
297 ctx.text_description = self._generate_description(file_path)
298 ctx.thumbnail_path = self._generate_thumbnail(file_path)
299 return ctx
301 def extract_metadata(self, file_path: str) -> MediaMetadata:
302 meta = MediaMetadata(
303 file_path=file_path,
304 media_type=MediaType.IMAGE,
305 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream",
306 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0,
307 )
309 # Try to get dimensions using PIL
310 try:
311 from PIL import Image
312 with Image.open(file_path) as img:
313 meta.width = img.width
314 meta.height = img.height
315 meta.color_mode = img.mode
316 meta.has_alpha = img.mode in ('RGBA', 'LA', 'PA')
318 # EXIF extraction
319 exif = img.getexif()
320 if exif:
321 for tag_id, value in exif.items():
322 meta.extra[str(tag_id)] = str(value)
323 except ImportError:
324 pass
325 except Exception:
326 pass
328 return meta
330 def _encode_base64(self, file_path: str) -> str:
331 """将图片编码为 Base64。"""
332 try:
333 with open(file_path, 'rb') as f:
334 return base64.b64encode(f.read()).decode('utf-8')
335 except Exception:
336 return ""
338 def _generate_description(self, file_path: str) -> str:
339 """生成图片自然语言描述(应由视觉 LLM 生成)。"""
340 meta = self.extract_metadata(file_path)
341 return f"Image: {meta.width}x{meta.height}, format: {Path(file_path).suffix}"
343 def _generate_thumbnail(self, file_path: str) -> str:
344 """生成缩略图。"""
345 try:
346 from PIL import Image
348 thumb_dir = Path(tempfile.gettempdir()) / "agentos_thumbnails"
349 thumb_dir.mkdir(exist_ok=True)
351 thumb_name = f"thumb_{uuid.uuid4().hex[:8]}.jpg"
352 thumb_path = thumb_dir / thumb_name
354 with Image.open(file_path) as img:
355 img.thumbnail((self._max_size, self._max_size))
356 img.convert("RGB").save(thumb_path, self._output_format, quality=self._quality)
358 return str(thumb_path)
359 except Exception:
360 return ""
362 def resize(self, file_path: str, width: int, height: int, output_path: Optional[str] = None) -> str:
363 """缩放图片。"""
364 try:
365 from PIL import Image
367 out = output_path or str(
368 Path(tempfile.gettempdir()) / f"resized_{uuid.uuid4().hex[:8]}{Path(file_path).suffix}"
369 )
371 with Image.open(file_path) as img:
372 img.resize((width, height), Image.LANCZOS).save(out)
374 return out
375 except Exception as e:
376 raise RuntimeError(f"Image resize failed: {e}")
378 def compress(
379 self,
380 file_path: str,
381 quality: int = 70,
382 output_path: Optional[str] = None,
383 ) -> str:
384 """压缩图片。"""
385 try:
386 from PIL import Image
388 out = output_path or str(
389 Path(tempfile.gettempdir()) / f"compressed_{uuid.uuid4().hex[:8]}.jpg"
390 )
392 with Image.open(file_path) as img:
393 img.convert("RGB").save(out, "JPEG", quality=quality, optimize=True)
395 return out
396 except Exception as e:
397 raise RuntimeError(f"Image compression failed: {e}")
399 def convert_format(self, file_path: str, target_format: str, output_path: Optional[str] = None) -> str:
400 """转换图片格式。"""
401 try:
402 from PIL import Image
404 fmt = target_format.upper().replace('.', '')
405 ext = f".{target_format.lower().lstrip('.')}"
406 out = output_path or str(
407 Path(tempfile.gettempdir()) / f"converted_{uuid.uuid4().hex[:8]}{ext}"
408 )
410 with Image.open(file_path) as img:
411 img.save(out, fmt)
413 return out
414 except Exception as e:
415 raise RuntimeError(f"Format conversion failed: {e}")
418class AudioProcessor(MediaProcessor):
419 """音频处理器。
421 支持转录(需 whisper)、格式转换、元数据提取。
423 Usage:
424 processor = AudioProcessor()
425 ctx = processor.process("recording.mp3")
426 print(ctx.extracted_text) # 转录文本
427 """
429 def __init__(self, transcription_model: str = "base"):
430 self._model = transcription_model
432 def process(self, file_path: str) -> MediaContext:
433 ctx = MediaContext(media_type=MediaType.AUDIO)
434 ctx.metadata = self.extract_metadata(file_path)
435 ctx.extracted_text = self._transcribe(file_path)
436 return ctx
438 def extract_metadata(self, file_path: str) -> MediaMetadata:
439 meta = MediaMetadata(
440 file_path=file_path,
441 media_type=MediaType.AUDIO,
442 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream",
443 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0,
444 )
446 # Extract with ffprobe if available
447 try:
448 result = subprocess.run(
449 ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path],
450 capture_output=True, text=True, timeout=10,
451 )
452 if result.returncode == 0:
453 info = json.loads(result.stdout)
454 fmt = info.get("format", {})
455 meta.duration_s = float(fmt.get("duration", 0))
456 meta.bitrate_kbps = int(int(fmt.get("bit_rate", 0)) / 1000)
458 for stream in info.get("streams", []):
459 if stream.get("codec_type") == "audio":
460 meta.sample_rate = int(stream.get("sample_rate", 0))
461 meta.channels = int(stream.get("channels", 0))
462 break
463 except Exception:
464 pass
466 return meta
468 def _transcribe(self, file_path: str) -> str:
469 """音频转录。"""
470 try:
471 import whisper
472 model = whisper.load_model(self._model)
473 result = model.transcribe(file_path)
474 return result["text"]
475 except ImportError:
476 return "[Transcription requires: pip install openai-whisper]"
477 except Exception as e:
478 return f"[Transcription error: {e}]"
481class VideoProcessor(MediaProcessor):
482 """视频处理器。
484 提取关键帧、生成描述。
486 Usage:
487 processor = VideoProcessor()
488 ctx = processor.process("demo.mp4")
489 for caption in ctx.captions:
490 print(caption)
491 """
493 def __init__(self, keyframe_interval_s: float = 2.0, max_keyframes: int = 10):
494 self._keyframe_interval = keyframe_interval_s
495 self._max_keyframes = max_keyframes
496 self._image_processor = ImageProcessor()
498 def process(self, file_path: str) -> MediaContext:
499 ctx = MediaContext(media_type=MediaType.VIDEO)
500 ctx.metadata = self.extract_metadata(file_path)
501 ctx.captions = self._extract_keyframes(file_path)
502 return ctx
504 def extract_metadata(self, file_path: str) -> MediaMetadata:
505 meta = MediaMetadata(
506 file_path=file_path,
507 media_type=MediaType.VIDEO,
508 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream",
509 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0,
510 )
512 try:
513 result = subprocess.run(
514 ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path],
515 capture_output=True, text=True, timeout=10,
516 )
517 if result.returncode == 0:
518 info = json.loads(result.stdout)
519 fmt = info.get("format", {})
520 meta.duration_s = float(fmt.get("duration", 0))
521 meta.bitrate_kbps = int(int(fmt.get("bit_rate", 0)) / 1000)
523 for stream in info.get("streams", []):
524 if stream.get("codec_type") == "video":
525 meta.width = int(stream.get("width", 0))
526 meta.height = int(stream.get("height", 0))
527 break
528 except Exception:
529 pass
531 return meta
533 def _extract_keyframes(self, file_path: str) -> List[str]:
534 """提取视频关键帧。"""
535 captions = []
536 meta = self.extract_metadata(file_path)
537 duration = meta.duration_s
539 if duration == 0:
540 return captions
542 num_frames = min(
543 int(duration / self._keyframe_interval),
544 self._max_keyframes,
545 )
547 thumb_dir = Path(tempfile.gettempdir()) / "agentos_video_frames"
548 thumb_dir.mkdir(exist_ok=True)
550 for i in range(num_frames):
551 timestamp = i * self._keyframe_interval
552 frame_path = thumb_dir / f"frame_{uuid.uuid4().hex[:8]}.jpg"
554 try:
555 subprocess.run(
556 [
557 "ffmpeg", "-y", "-loglevel", "quiet",
558 "-ss", str(timestamp),
559 "-i", file_path,
560 "-vframes", "1",
561 "-q:v", "2",
562 str(frame_path),
563 ],
564 timeout=30,
565 check=True,
566 )
568 if frame_path.exists():
569 # Encode frame as base64
570 ctx = self._image_processor.process(str(frame_path))
571 captions.append(
572 f"[{self._format_time(timestamp)}] {ctx.text_description} "
573 f"base64:{ctx.base64_data[:50]}..."
574 )
575 # Cleanup frame file
576 frame_path.unlink(missing_ok=True)
577 except Exception:
578 pass
580 return captions
582 @staticmethod
583 def _format_time(seconds: float) -> str:
584 m, s = divmod(int(seconds), 60)
585 h, m = divmod(m, 60)
586 if h:
587 return f"{h}:{m:02d}:{s:02d}"
588 return f"{m}:{s:02d}"
591class DocumentProcessor(MediaProcessor):
592 """文档处理器。
594 从 PDF/DOCX 等文档中提取文本。
596 Usage:
597 processor = DocumentProcessor()
598 ctx = processor.process("report.pdf")
599 print(ctx.extracted_text[:500])
600 """
602 def process(self, file_path: str) -> MediaContext:
603 ctx = MediaContext(media_type=MediaType.DOCUMENT)
604 ctx.metadata = self.extract_metadata(file_path)
605 ctx.extracted_text = self._extract_text(file_path)
606 return ctx
608 def extract_metadata(self, file_path: str) -> MediaMetadata:
609 return MediaMetadata(
610 file_path=file_path,
611 media_type=MediaType.DOCUMENT,
612 mime_type=mimetypes.guess_type(file_path)[0] or "application/octet-stream",
613 file_size_bytes=os.path.getsize(file_path) if os.path.exists(file_path) else 0,
614 )
616 def _extract_text(self, file_path: str) -> str:
617 """提取文档文本。"""
618 ext = Path(file_path).suffix.lower()
620 if ext == '.pdf':
621 return self._extract_pdf(file_path)
622 elif ext in ('.docx', '.doc'):
623 return self._extract_docx(file_path)
624 elif ext in ('.txt', '.md', '.py', '.json', '.yaml', '.xml', '.html', '.csv'):
625 try:
626 return Path(file_path).read_text(encoding='utf-8')
627 except Exception:
628 return Path(file_path).read_text(encoding='latin-1')
629 else:
630 return f"[Unsupported document format: {ext}]"
632 def _extract_pdf(self, file_path: str) -> str:
633 """从 PDF 中提取文本。"""
634 try:
635 import fitz # PyMuPDF
636 doc = fitz.open(file_path)
637 text_parts = []
638 for page_num in range(len(doc)):
639 page = doc[page_num]
640 text = page.get_text()
641 if text.strip():
642 text_parts.append(f"--- Page {page_num + 1} ---\n{text}")
643 doc.close()
644 return "\n\n".join(text_parts) if text_parts else "[No extractable text in PDF]"
645 except ImportError:
646 try:
647 result = subprocess.run(
648 ["pdftotext", file_path, "-"],
649 capture_output=True, text=True, timeout=30,
650 )
651 if result.returncode == 0:
652 return result.stdout
653 except Exception:
654 pass
655 return "[PDF extraction requires: pip install PyMuPDF]"
656 except Exception as e:
657 return f"[PDF extraction error: {e}]"
659 def _extract_docx(self, file_path: str) -> str:
660 """从 DOCX 中提取文本。"""
661 try:
662 from docx import Document
663 doc = Document(file_path)
664 paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
665 return "\n\n".join(paragraphs) if paragraphs else "[No text in document]"
666 except ImportError:
667 return "[DOCX extraction requires: pip install python-docx]"
668 except Exception as e:
669 return f"[DOCX extraction error: {e}]"
672# ── Multimodal Context Manager ──────────────
675class MultimodalContextManager:
676 """多模态上下文管理器。
678 统一入口:接收文件路径,返回 MediaContext。
680 Usage:
681 mgr = MultimodalContextManager()
682 ctx = mgr.load("photo.jpg")
683 message = ctx.to_llm_message()
684 """
686 def __init__(self):
687 self._detector = MediaDetector()
688 self._processors: Dict[MediaType, MediaProcessor] = {
689 MediaType.IMAGE: ImageProcessor(),
690 MediaType.AUDIO: AudioProcessor(),
691 MediaType.VIDEO: VideoProcessor(),
692 MediaType.DOCUMENT: DocumentProcessor(),
693 }
695 def load(self, file_path: str) -> MediaContext:
696 """加载并处理单个媒体文件。"""
697 mtype = self._detector.detect(file_path)
698 processor = self._processors.get(mtype)
700 if not processor:
701 ctx = MediaContext(media_type=MediaType.UNKNOWN)
702 ctx.metadata = MediaMetadata(file_path=file_path, media_type=MediaType.UNKNOWN)
703 ctx.extracted_text = f"[Unsupported media type: {mtype}]"
704 return ctx
706 return processor.process(file_path)
708 def load_batch(self, file_paths: List[str]) -> List[MediaContext]:
709 """批量加载。"""
710 return [self.load(fp) for fp in file_paths]
712 def load_as_message(self, file_path: str) -> dict:
713 """加载并转换为 LLM 消息格式。"""
714 return self.load(file_path).to_llm_message()
716 def load_batch_as_messages(self, file_paths: List[str]) -> List[dict]:
717 """批量加载为 LLM 消息。"""
718 return [self.load_as_message(fp) for fp in file_paths]
720 def register_processor(self, media_type: MediaType, processor: MediaProcessor) -> None:
721 """注册自定义处理器。"""
722 self._processors[media_type] = processor
724 def analyze_directory(self, directory: str) -> Dict[str, List[str]]:
725 """分析目录中的媒体文件分布。"""
726 result: Dict[str, List[str]] = {
727 "images": [],
728 "audio": [],
729 "video": [],
730 "documents": [],
731 "unknown": [],
732 }
734 dir_path = Path(directory)
735 if not dir_path.exists():
736 return result
738 for file_path in dir_path.rglob("*"):
739 if not file_path.is_file():
740 continue
742 mtype = self._detector.detect(str(file_path))
744 if mtype == MediaType.IMAGE:
745 result["images"].append(str(file_path))
746 elif mtype == MediaType.AUDIO:
747 result["audio"].append(str(file_path))
748 elif mtype == MediaType.VIDEO:
749 result["video"].append(str(file_path))
750 elif mtype == MediaType.DOCUMENT:
751 result["documents"].append(str(file_path))
752 else:
753 result["unknown"].append(str(file_path))
755 return result
758# ── Quick Start ─────────────────────────────
761def create_multimodal_manager() -> MultimodalContextManager:
762 """快速创建多模态上下文管理器。"""
763 return MultimodalContextManager()
766def quick_load(file_path: str) -> MediaContext:
767 """快速加载单个文件。"""
768 return MultimodalContextManager().load(file_path)
771# ── Compatibility aliases (required by agentos/__init__.py) ──
773MultimodalManager = MultimodalContextManager
774Modality = MediaType