Coverage for agentos/core/session.py: 67%
27 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2会话管理 — 多会话隔离与状态持久化。
3"""
5from __future__ import annotations
7import uuid
8from dataclasses import dataclass, field
9from datetime import datetime
12@dataclass
13class Session:
14 """Agent 会话记录。"""
16 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
17 created_at: str = field(default_factory=lambda: datetime.now().isoformat())
18 state: str = "active" # active | completed | failed
19 task: str = ""
20 metadata: dict = field(default_factory=dict)
23class SessionStore:
24 """会话存储后端(内存实现,可替换为SQLite/Postgres)。"""
26 def __init__(self):
27 self._sessions: dict[str, Session] = {}
29 def create(self, task: str, metadata: dict | None = None) -> Session:
30 session = Session(task=task, metadata=metadata or {})
31 self._sessions[session.id] = session
32 return session
34 def get(self, session_id: str) -> Session | None:
35 return self._sessions.get(session_id)
37 def update_state(self, session_id: str, state: str):
38 if session := self._sessions.get(session_id):
39 session.state = state
41 def list_active(self) -> list[Session]:
42 return [s for s in self._sessions.values() if s.state == "active"]
44 def delete(self, session_id: str):
45 self._sessions.pop(session_id, None)