# sqlmodel-nexus — LLM Reference

> sqlmodel-nexus：从 SQLModel 类自动生成 GraphQL API，提供 Core API 声明式数据组装，以及 MCP 服务。
> 渐进式框架 — ER Diagram → GraphQL API → 声明式数据组装。

版本：1.3.1 | Python >= 3.10 | 许可证：MIT

---

## 1. 概念映射

### GraphQL 模式

| 用户概念 | 框架概念 | 代码入口 |
|----------|----------|----------|
| 定义数据模型 | SQLModel + Relationship | `class MyEntity(BaseEntity, table=True)` |
| 暴露查询接口 | `@query` 装饰器 | `@query` on class method |
| 暴露变更接口 | `@mutation` 装饰器 | `@mutation` on class method |
| 自动 CRUD | `AutoQueryConfig` | `GraphQLHandler(auto_query_config=...)` |
| 关系加载 | DataLoader 批量查询 | `session_factory` 参数 |
| 列表分页 | ROW_NUMBER 窗口函数 | `enable_pagination=True` |
| AI 集成 | MCP Server | `config_simple_mcp_server()` |
| 交互调试 | GraphiQL | `handler.get_graphiql_html()` |

### Core API 模式

| 用户概念 | 框架概念 | 代码入口 |
|----------|----------|----------|
| 构建 DTO | `DefineSubset` + `SubsetConfig` | `class MyDTO(DefineSubset)` |
| 关系管理 | `ErManager` | `ErManager(base=..., session_factory=...)` |
| 数据遍历 | `Resolver` | `er.create_resolver()` |
| 声明加载依赖 | `Loader` | `def resolve_x(self, loader=Loader('rel_name'))` |
| 派生字段 | `post_*` 方法 | `def post_count(self)` |
| 祖先上下文传递 | `ExposeAs` | `Annotated[str, ExposeAs('key')]` |
| 后代值聚合 | `SendTo` + `Collector` | `Annotated[User, SendTo('name')]` |
| 自定义关系 | `Relationship` | `__relationships__` 列表 |
| 可视化 ER 图 | `ErDiagram` | `ErDiagram.from_sqlmodel(entities)` |

### RPC + Voyager 模式

| 用户概念 | 框架概念 | 代码入口 |
|----------|----------|----------|
| 业务服务定义 | `RpcService` | `class MyService(RpcService)` |
| RPC MCP 服务 | `create_rpc_mcp_server` | `create_rpc_mcp_server(services=...)` |
| 交互式可视化 | `create_rpc_voyager` | `create_rpc_voyager(services=..., er_manager=...)` |

---

## 2. 安装

```bash
pip install sqlmodel-nexus
pip install sqlmodel-nexus[fastmcp]  # 包含 MCP 支持
```

---

## 3. API Reference

### 3.1 `@query`

将 class method 标记为 GraphQL 查询。

```python
from sqlmodel_nexus import query

class User(BaseEntity, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str

    @query
    async def get_all(cls, limit: int = 10) -> list['User']:
        """Docstring becomes GraphQL field description."""
        async with get_session() as session:
            return (await session.exec(select(cls).limit(limit))).all()
```

规则：
- 第一个参数必须是 `cls`（自动转为 classmethod）
- 返回类型决定 GraphQL 字段类型（`list[User]` → `[User!]!`，`Optional[User]` → `User`）
- 方法参数成为 GraphQL 字段参数
- docstring 成为字段描述
- 生成的字段名：`{EntityName}{MethodName}`（如 `userGetAll`）

### 3.2 `@mutation`

与 `@query` 用法相同，标记为 GraphQL mutation。

```python
from sqlmodel_nexus import mutation

class User(BaseEntity, table=True):
    @mutation
    async def create(cls, name: str, email: str) -> 'User':
        """Create a new user."""
        async with get_session() as session:
            user = cls(name=name, email=email)
            session.add(user)
            await session.commit()
            await session.refresh(user)
            return user
```

支持 Input 类型参数：

```python
class CreateUserInput(BaseModel):
    name: str
    email: str

class User(BaseEntity, table=True):
    @mutation
    async def create(cls, input: CreateUserInput) -> 'User':
        ...
```

### 3.3 `GraphQLHandler`

核心类，负责实体发现、SDL 生成、查询执行。

```python
from sqlmodel_nexus import GraphQLHandler, AutoQueryConfig

handler = GraphQLHandler(
    base=BaseEntity,                           # 必填：SQLModel 基类
    session_factory=async_session,             # DataLoader 查询需要
    enable_pagination=True,                    # 列表关系分页
    auto_query_config=AutoQueryConfig(
        session_factory=async_session,
        default_limit=10,
        generate_by_id=True,
        generate_by_filter=True,
    ),
    query_description="My Query type",        # 可选
    mutation_description="My Mutation type",   # 可选
)
```

**参数说明：**

| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `base` | `type` | 是 | SQLModel 基类，用于实体发现 |
| `session_factory` | `Callable | None` | 否 | 异步 session 工厂。关系加载必需 |
| `enable_pagination` | `bool` | 否 | 启用列表关系分页，默认 False |
| `auto_query_config` | `AutoQueryConfig | None` | 否 | 自动生成 by_id/by_filter 查询 |
| `query_description` | `str | None` | 否 | Query 类型描述 |
| `mutation_description` | `str | None` | 否 | Mutation 类型描述 |

**方法：**

```python
# 执行 GraphQL 查询
result: dict = await handler.execute(
    query: str,
    variables: dict | None = None,
    operation_name: str | None = None,
)
# 返回 {"data": {...}} 或 {"data": {...}, "errors": [...]}

# 获取 SDL
sdl: str = handler.get_sdl()

# 获取 GraphiQL 页面
html: str = handler.get_graphiql_html(endpoint="/graphql")
```

### 3.4 `AutoQueryConfig`

自动为所有实体生成标准查询。

```python
from sqlmodel_nexus import AutoQueryConfig

config = AutoQueryConfig(
    session_factory=async_session,    # 必填
    default_limit=10,                 # by_filter 默认 limit
    generate_by_id=True,              # 生成 by_id 查询
    generate_by_filter=True,          # 生成 by_filter 查询
    enabled=True,                     # 总开关
)
```

**生成的查询（以 User 实体为例）：**

| GraphQL 字段 | 签名 | 说明 |
|-------------|------|------|
| `userById` | `(id: Int!): User` | 按主键查单个 |
| `userByFilter` | `(filter: UserFilterInput, limit: Int): [User!]!` | 按字段精确匹配过滤 |

**FilterInput：** 自动生成，所有字段可选，仅非 None 值作为 WHERE 条件（精确匹配）。

**约束：** `by_id` 要求实体有且仅有一个主键字段。复合主键实体会跳过 by_id。

### 3.5 MCP 集成

#### 单应用 MCP Server

```python
from sqlmodel_nexus.mcp import config_simple_mcp_server

mcp = config_simple_mcp_server(
    base=BaseEntity,
    name="My API",
    desc="API description",
    allow_mutation=False,  # 只读模式
)
mcp.run()                    # stdio 模式
# mcp.run(transport="streamable-http")  # HTTP 模式
```

提供 3 个工具：`get_schema()`、`graphql_query()`、`graphql_mutation()`

**HTTP 部署模式：**

```python
# 方式一：直接 run（简单场景）
mcp.run(transport="streamable-http")

# 方式二：http_app + uvicorn（推荐，支持 CORS、stateless 等）
mcp_app = mcp.http_app(transport="streamable-http", stateless_http=True)
mcp_app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
)
uvicorn.run(mcp_app, host="0.0.0.0", port=8006)
```

`stateless_http=True` 允许一次性客户端（如 list-tools 探测）无需管理 `mcp-session-id` 即可调用端点。

#### 多应用 MCP Server

```python
from sqlmodel_nexus.mcp import create_mcp_server, AppConfig

mcp = create_mcp_server(
    apps=[
        AppConfig(name="blog", base=BlogBase, description="Blog API"),
        AppConfig(name="shop", base=ShopBase, description="Shop API"),
    ],
    name="Multi-App API",
    allow_mutation=False,
)
mcp.run()
```

提供 8 个工具：`list_apps`、`list_queries`、`list_mutations`、`get_query_schema`、`get_mutation_schema`、`graphql_query`、`graphql_mutation`

### 3.6 `DefineSubset`

从 SQLModel 实体生成独立的 Pydantic DTO 模型。选择特定字段，自动隐藏 FK 列，并支持声明关系字段。

```python
from sqlmodel_nexus import DefineSubset

# 基本用法：选择字段
class UserSummary(DefineSubset):
    __subset__ = (User, ('id', 'name'))

# 带关系字段：owner 自动从 Task.owner 关系加载
class TaskSummary(DefineSubset):
    __subset__ = (Task, ('id', 'title', 'owner_id'))
    owner: UserSummary | None = None
```

**FK 字段处理：**
- FK 字段（如 `owner_id`）自动设置 `exclude=True`，序列化时隐藏
- 内部仍可通过 `self.owner_id` 在 `resolve_*` 方法中访问

**关系字段声明规则：**
- 关系字段声明在类体中（不在 `__subset__` 内）
- 类型**必须**使用 DTO 类型（DefineSubset 子类），不能直接用 SQLModel 实体

```python
# 错误 — 不能直接使用 SQLModel 实体
class TaskSummary(DefineSubset):
    __subset__ = (Task, ('id', 'title'))
    owner: User | None = None          # TypeError!

# 正确 — 使用 DTO 类型
class TaskSummary(DefineSubset):
    __subset__ = (Task, ('id', 'title'))
    owner: UserSummary | None = None   # OK
```

**`resolve_*` 和 `post_*` 方法：**

```python
class PostSummary(DefineSubset):
    __subset__ = (Post, ('id', 'title', 'author_id'))
    author: UserSummary | None = None
    word_count: int = 0

    # resolve_*: 加载外部数据
    def resolve_author(self, loader=Loader('author')):
        return loader.load(self.author_id)

    # post_*: 在子树解析完成后计算派生字段
    def post_word_count(self):
        return len(self.title.split())
```

**Implicit Auto-Loading：** 当以下条件全部满足时，Resolver 自动加载关系字段（无需手写 `resolve_*`）：

1. 字段没有对应的 `resolve_*` 方法
2. 字段是额外字段（不在 `__subset__` 定义中）
3. 字段名匹配已注册的 ORM/自定义关系
4. 字段类型是 BaseModel DTO 且与关系目标实体兼容

```python
# 无需 resolve_author — Resolver 自动加载
class TaskSummary(DefineSubset):
    __subset__ = (Task, ('id', 'title', 'owner_id'))
    owner: UserSummary | None = None  # 自动匹配 Task.owner 关系
```

### 3.7 `SubsetConfig`

`DefineSubset` 的声明式配置，替代 `__subset__` 元组语法。支持更丰富的字段控制。

```python
from sqlmodel_nexus import DefineSubset, SubsetConfig

class UserSummary(DefineSubset):
    __subset__ = SubsetConfig(
        kls=User,                          # 源 SQLModel 实体
        fields=['id', 'name'],             # 包含的字段（与 omit_fields 互斥）
    )
```

**参数说明：**

| 参数 | 类型 | 说明 |
|------|------|------|
| `kls` | `type` | 源 SQLModel 实体类 |
| `fields` | `list[str] | "all"` | 包含的字段名列表。`"all"` 包含所有字段 |
| `omit_fields` | `list[str]` | 排除的字段名列表。与 `fields` 互斥 |
| `excluded_fields` | `list[str]` | 存在于 DTO 但隐藏于序列化输出（`exclude=True`） |
| `expose_as` | `list[tuple[str, str]]` | `(field_name, alias)` 对，等价于 `ExposeAs` 注解 |
| `send_to` | `list[tuple[str, str]]` | `(field_name, collector_name)` 对，等价于 `SendTo` 注解 |

**`expose_as` 用法（声明式 ExposeAs）：**

```python
class SprintDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Sprint,
        fields=['id', 'name'],
        expose_as=[('name', 'sprint_name')],  # name 暴露为 sprint_name
    )
    tasks: list[TaskDetail] = []
```

**`send_to` 用法（声明式 SendTo）：**

```python
class TaskDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Task,
        fields=['id', 'title', 'owner_id'],
        send_to=[('owner', 'contributors')],  # owner 发送到 contributors 收集器
    )
    owner: UserSummary | None = None
```

**`omit_fields` 用法：**

```python
class UserPublic(DefineSubset):
    __subset__ = SubsetConfig(
        kls=User,
        omit_fields=['password', 'secret_key'],  # 排除敏感字段
    )
```

### 3.8 `ErManager`

实体关系管理器。检查 SQLModel ORM 元数据，自动发现关系，创建 DataLoader，生成 Resolver。

```python
from sqlmodel_nexus import ErManager

# 方式一：通过基类自动发现实体
er = ErManager(base=SQLModel, session_factory=async_session)

# 方式二：显式指定实体列表
er = ErManager(entities=[User, Post, Comment], session_factory=async_session)
```

**参数说明：**

| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `base` | `type | None` | 二选一 | SQLModel 基类，自动发现子类 |
| `entities` | `list[type] | None` | 二选一 | 显式实体列表 |
| `session_factory` | `Callable` | 是 | 异步 session 工厂 |
| `enable_pagination` | `bool` | 否 | 启用列表关系分页，默认 False |
| `split_loader_by_type` | `bool` | 否 | 按 DTO 类型拆分 Loader 实例，默认 False |

**注意：** `base` 和 `entities` 互斥，不能同时提供。

**核心方法：**

```python
# 创建 Resolver 类（应用启动时调用一次）
Resolver = er.create_resolver()

# 每次 request 创建 Resolver 实例
resolver = Resolver(context={"user_id": 1})
result = await resolver.resolve(dtos)

# 查询关系信息
er.get_relationships(entity=User)           # → dict[str, RelationshipInfo]
er.get_all_entities()                        # → list[type[SQLModel]]
er.get_relationship(entity=User, name='posts')  # → RelationshipInfo | None
```

**自动关系发现：**
- 通过 SQLAlchemy inspect 发现 ORM 关系（MANYTOONE、ONETOMANY、MANYTOMANY）
- 通过 `__relationships__` 发现自定义关系
- 为每个关系自动创建 DataLoader 类

### 3.9 Resolver 执行引擎

由 `ErManager.create_resolver()` 创建，遍历 Pydantic 模型树，执行 `resolve_*` 加载数据，`post_*` 计算派生字段。

**执行顺序：**

```
1. resolve_* 方法 + implicit auto-load  →  加载外部数据
2. traverse 已有对象字段                   →  递归处理子节点
3. post_* 方法                            →  计算派生字段（子树已就绪）
4. collect SendTo 值                      →  聚合后代数据到祖先 Collector
```

**关键特性：**
- 每层 `resolve_*` 并发执行（`asyncio.gather`）
- DataLoader 批量化同层相同 loader 的请求（避免 N+1）
- `post_*` 在子树完全解析后才执行，可安全读取已加载的字段
- 支持 `context`、`parent`、`ancestor_context` 参数注入

**返回值：** `resolve()` 返回处理后的 DTO 对象（单个对象或列表）。传入 list 则返回 list，传入单个对象则返回单个对象。FastAPI endpoint 中可直接 return，Pydantic 会自动序列化。

```python
# 完整使用示例
from sqlmodel_nexus import DefineSubset, ErManager

er = ErManager(base=SQLModel, session_factory=async_session)
Resolver = er.create_resolver()

# 在 request handler 中
resolver = Resolver(context={"current_user_id": user_id})
result = await resolver.resolve([
    SprintSummary.model_validate(s) for s in sprints
])
```

### 3.10 `Loader`

在 `resolve_*` 方法参数中声明 DataLoader 依赖。

```python
from sqlmodel_nexus import Loader

class PostSummary(DefineSubset):
    __subset__ = (Post, ('id', 'title', 'author_id'))
    author: UserSummary | None = None

    # 按名称查找 ErManager 中注册的关系
    def resolve_author(self, loader=Loader('author')):
        return loader.load(self.author_id)
```

**`Loader` 支持三种依赖类型：**

| 类型 | 用法 | 说明 |
|------|------|------|
| `str` | `Loader('author')` | 按 ErManager 关系名查找 |
| `DataLoader` 子类 | `Loader(UserLoader)` | 实例化并缓存 |
| `async callable` | `Loader(load_users)` | 包装为 DataLoader |

**匹配规则：**
- `Loader('author')` 查找 ErManager 中注册的关系名为 `author` 的 DataLoader
- 当 DefineSubset DTO 使用时，优先在源实体的关系中查找
- 若多个实体有同名关系，发出警告并返回第一个匹配

### 3.11 跨层数据流：ExposeAs / SendTo / Collector

实现父→子和子→父的跨层数据传递。

#### ExposeAs — 祖先向下传递上下文

将字段值暴露给后代节点，通过 `ancestor_context` 参数访问。

```python
from typing import Annotated
from sqlmodel_nexus import ExposeAs

class SprintDetail(DefineSubset):
    __subset__ = (Sprint, ('id', 'name'))
    name: Annotated[str, ExposeAs('sprint_name')]  # 暴露给后代
    tasks: list[TaskDetail] = []

class TaskDetail(DefineSubset):
    __subset__ = (Task, ('id', 'title'))

    def post_full_title(self, ancestor_context):
        # ancestor_context 包含祖先 ExposeAs 的值
        sprint_name = ancestor_context.get('sprint_name', '')
        return f"{sprint_name} / {self.title}"
```

#### SendTo + Collector — 后代向上聚合值

`SendTo` 标记字段值发送到祖先的 `Collector`，`Collector` 在 `post_*` 中接收聚合结果。

```python
from typing import Annotated
from sqlmodel_nexus import SendTo, Collector

class SprintDetail(DefineSubset):
    __subset__ = (Sprint, ('id', 'name'))
    tasks: list[TaskDetail] = []
    contributors: list[UserSummary] = []

    # Collector 接收所有后代 SendTo 的值
    def post_contributors(self, collector=Collector('contributors')):
        return collector.values()

class TaskDetail(DefineSubset):
    __subset__ = (Task, ('id', 'title', 'owner_id'))
    # owner 值发送到名为 'contributors' 的 Collector
    owner: Annotated[UserSummary | None, SendTo('contributors')] = None
```

**Collector 参数：**

| 参数 | 类型 | 说明 |
|------|------|------|
| `alias` | `str` | 收集器名称，匹配 SendTo 的目标 |
| `flat` | `bool` | 为 True 时展开列表值（用于列表字段的 SendTo） |

**完整跨层协作示例：**

```python
class SprintDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Sprint,
        fields=['id', 'name'],
        expose_as=[('name', 'sprint_name')],  # 声明式 ExposeAs
    )
    tasks: list[TaskDetail] = []
    contributors: list[UserSummary] = []

    def post_contributors(self, collector=Collector('contributors')):
        return collector.values()

class TaskDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Task,
        fields=['id', 'title', 'owner_id'],
        send_to=[('owner', 'contributors')],  # 声明式 SendTo
    )
    owner: UserSummary | None = None
    full_title: str = ""

    def post_full_title(self, ancestor_context):
        sprint_name = ancestor_context.get('sprint_name', '')
        return f"{sprint_name} / {self.title}"
```

### 3.12 `Relationship`

定义自定义（非 ORM）关系。用于没有 SQLAlchemy Relationship 的关联，通过手写 async 批量加载器实现。

```python
from sqlmodel_nexus import Relationship

async def tags_by_post_loader(post_ids: list[int]) -> list[list[Tag]]:
    """加载 post 的标签。签名：list[KEY] → list[list[TARGET]]"""
    ...

class Post(SQLModel, table=True):
    __relationships__ = [
        Relationship(
            fk='id',                      # 源实体上的字段名，作为 loader 的 key
            target=list[Tag],             # 目标类型，list 表示一对多
            name='tags',                  # 关系名（唯一）
            loader=tags_by_post_loader,   # async 批量加载函数
            description='Post tags',      # 可选描述（ER 图用）
        )
    ]
    id: int | None = Field(default=None, primary_key=True)
    title: str
```

**参数说明：**

| 参数 | 类型 | 说明 |
|------|------|------|
| `fk` | `str` | 源实体字段名，其值传给 loader 作为 batch key |
| `target` | `type` 或 `list[type]` | 目标实体。`list[Tag]` 表示一对多 |
| `name` | `str` | 关系名，在同一实体内唯一 |
| `loader` | `Callable` | async 批量加载函数 |
| `description` | `str | None` | 可选描述 |

**Loader 签名：**
- 标量目标（多对一）：`async def fn(keys: list[K]) -> list[V | None]`
- 列表目标（一对多）：`async def fn(keys: list[K]) -> list[list[V]]`

**DefineSubset 中使用自定义关系：**

```python
class TagDTO(DefineSubset):
    __subset__ = (Tag, ('id', 'name'))

class TaskWithTags(DefineSubset):
    __subset__ = (Task, ('id', 'title'))
    tags: list[TagDTO] = []   # 自动匹配 Task.__relationships__['tags']

    def post_tag_count(self):
        return len(self.tags)
```

### 3.13 `ErDiagram`

从 SQLModel 实体生成 Mermaid ER 图。

```python
from sqlmodel_nexus import ErDiagram

diagram = ErDiagram.from_sqlmodel(entities=[User, Post, Comment])
print(diagram.to_mermaid())
```

**输出示例：**

```
erDiagram
    User {
        id
        name
    }
    Post {
        id
        title
    }
    User ||--o{ Post : posts
```

**特性：**
- 自动发现 ORM 关系和 `__relationships__` 自定义关系
- 显示实体标量字段（排除关系字段）
- 去重双向关系线

### 3.14 `RpcService`

业务服务基类。子类定义的 async classmethod 会被自动发现并暴露为 RPC 方法。

```python
from sqlmodel_nexus import RpcService

class SprintService(RpcService):
    """Sprint management service."""

    @classmethod
    async def list_sprints(cls) -> list[SprintSummary]:
        """Get all sprints."""
        async with get_session() as session:
            return (await session.exec(select(Sprint))).all()

    @classmethod
    async def get_sprint(cls, sprint_id: int) -> SprintSummary | None:
        """Get a sprint by ID."""
        async with get_session() as session:
            return await session.get(Sprint, sprint_id)
```

**规则：**
- 只有 `async classmethod` 会被自动发现
- `get_tag_name()` 方法被排除（保留用于自定义 tag 名）
- 默认 tag 名：类名去掉 `Service`/`Rpc` 后缀，转 snake_case（`SprintService` → `sprint`）
- 继承时自动合并基类方法

**`get_tag_name()` 与 FastAPI 配合：**

```python
# get_tag_name() 返回 snake_case 的服务名，用于 FastAPI 路由的 OpenAPI 分组
@app.get("/api/users", tags=[UserService.get_tag_name()])
async def get_users():
    return await UserService.list_users()

@app.get("/api/tasks", tags=[TaskService.get_tag_name()])
async def get_tasks():
    return await TaskService.list_tasks()
```

这使同一个 RpcService 既能通过 MCP 暴露给 AI，也能通过 FastAPI 路由提供 HTTP API，业务逻辑只写一次。

**RPC MCP Server：**

```python
from sqlmodel_nexus import create_rpc_mcp_server

# services 参数接受 list[dict] 或 list[RpcServiceConfig]
# 两种写法等价：

# 写法一：dict 形式
mcp = create_rpc_mcp_server(
    services=[
        {"name": "sprint", "service": SprintService,
         "description": "Sprint management"},
    ],
    name="Project RPC API",
)

# 写法二：RpcServiceConfig TypedDict
from sqlmodel_nexus.rpc import RpcServiceConfig

mcp = create_rpc_mcp_server(
    services=[
        RpcServiceConfig(
            name="sprint",
            service=SprintService,
            description="Sprint management",
        ),
    ],
    name="Project RPC API",
)
mcp.run()
```

三层渐进式 AI 工具：
1. `list_services()` → 发现可用服务
2. `describe_service(service_name)` → 查看方法签名和参数 schema
3. `call_rpc(service_name, method_name, params)` → 执行方法

### 3.15 `create_rpc_voyager`

创建交互式可视化 FastAPI 子应用，展示 RPC 服务结构和 ER 图。

```python
from sqlmodel_nexus import create_rpc_voyager

voyager_app = create_rpc_voyager(
    services=[
        {"name": "user", "service": UserService},
        {"name": "task", "service": TaskService},
    ],
    er_manager=er,                       # 可选：展示 ER 图
    name="My Project API",
    initial_page_policy="first",         # "first" | "full" | "empty"
    online_repo_url="https://github.com/...",  # 可选：源码链接
)

app = FastAPI()
app.mount("/voyager", voyager_app)
```

**参数说明：**

| 参数 | 类型 | 说明 |
|------|------|------|
| `services` | `list[RpcServiceConfig]` | RPC 服务配置列表 |
| `er_manager` | `ErManager | None` | 可选，用于 ER 图可视化 |
| `name` | `str` | 界面显示名称 |
| `module_color` | `dict[str, str] | None` | 模块颜色映射 |
| `initial_page_policy` | `"first" / "full" / "empty"` | 初始页面策略 |
| `online_repo_url` | `str | None` | 在线仓库 URL（源码链接） |
| `version` | `str` | 版本号（用于缓存） |

**Voyager 功能：**
- RPC 服务结构图（DOT 格式）
- ER 图（基于 ErManager）
- 服务搜索和过滤
- 源码查看（支持 VSCode 链接）
- DefineSubset 源实体追踪（DTO → Entity 连线）
- @query/@mutation 方法在 ER 图节点上显示

---

## 4. 类型映射

### Python → GraphQL

| Python 类型 | GraphQL 类型 |
|------------|-------------|
| `int` | `Int!` |
| `str` | `String!` |
| `bool` | `Boolean!` |
| `float` | `Float!` |
| `Optional[int]` | `Int` |
| `list[int]` | `[Int!]!` |
| `list[User]`（Entity） | `[User!]!` |
| `Optional[User]` | `User` |
| `Enum` 子类 | `EnumName!` |
| `SQLModel` 子类（参数中） | `InputName!` |

### 分页类型（enable_pagination=True 时）

列表关系自动包装为 Result 类型：

```graphql
type PostResult {
  items: [Post!]!
  pagination: Pagination!
}

type Pagination {
  has_more: Boolean!
  total_count: Int
}
```

查询时传入 `limit` 和 `offset`：

```graphql
{
  userGetAll {
    posts(limit: 3, offset: 0) {
      items { title }
      pagination { has_more total_count }
    }
  }
}
```

---

## 5. 用法模式

### 模式 A：最小可运行 GraphQL API

```python
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlmodel import SQLModel, Field, select
from sqlmodel_nexus import query, GraphQLHandler

class BaseEntity(SQLModel): pass

class User(BaseEntity, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str

    @query
    async def get_all(cls) -> list['User']:
        async with get_session() as session:
            return (await session.exec(select(cls))).all()

handler = GraphQLHandler(base=BaseEntity)

class GraphQLRequest(BaseModel):
    query: str

app = FastAPI()

@app.get("/graphql", response_class=HTMLResponse)
async def graphiql():
    return handler.get_graphiql_html()

@app.post("/graphql")
async def graphql(req: GraphQLRequest):
    return await handler.execute(req.query)
```

### 模式 B：带关系的嵌套查询

```python
class User(BaseEntity, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    posts: list["Post"] = Relationship(
        back_populates="author",
        sa_relationship_kwargs={"order_by": "Post.id"},
    )

class Post(BaseEntity, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    author_id: int = Field(foreign_key="user.id")
    author: Optional[User] = Relationship(back_populates="posts")
```

查询示例：

```graphql
{
  userGetAll(limit: 5) {
    id
    name
    posts(limit: 3) {
      items {
        title
        author { name }
      }
      pagination { has_more total_count }
    }
  }
}
```

关系通过 DataLoader 自动批量加载，无需手动 eager loading。

### 模式 C：AutoQueryConfig 零手写查询

```python
from sqlmodel_nexus import GraphQLHandler, AutoQueryConfig

handler = GraphQLHandler(
    base=BaseEntity,
    session_factory=async_session,
    auto_query_config=AutoQueryConfig(
        session_factory=async_session,
        default_limit=20,
    ),
    enable_pagination=True,
)

# 自动为所有 BaseEntity 子类生成：
# - entityById(id: PK!): Entity
# - entityByFilter(filter: EntityFilterInput, limit: Int): [Entity!]!
```

### 模式 D：MCP 服务

```python
from sqlmodel_nexus.mcp import config_simple_mcp_server

mcp = config_simple_mcp_server(
    base=BaseEntity,
    name="My App API",
    desc="Application description",
    allow_mutation=True,
)
mcp.run()
```

AI 助手使用流程：
1. `get_schema()` → 获取完整 SDL
2. `graphql_query(query="...")` → 执行查询
3. `graphql_mutation(mutation="...")` → 执行变更

### 模式 E：Core API DefineSubset DTO

从实体构建 DTO，使用 ErManager + Resolver 自动加载关系。

```python
from fastapi import FastAPI
from sqlmodel import SQLModel, Field, Relationship
from sqlmodel_nexus import DefineSubset, SubsetConfig, ErManager

# 实体定义
class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str

class Task(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    owner_id: int | None = Field(default=None, foreign_key="user.id")
    owner: User | None = Relationship()

# DTO 定义
class UserSummary(DefineSubset):
    __subset__ = SubsetConfig(kls=User, fields=['id', 'name'])

class TaskSummary(DefineSubset):
    __subset__ = SubsetConfig(kls=Task, fields=['id', 'title'])
    owner: UserSummary | None = None  # 自动匹配 Task.owner 关系

# 应用启动时创建 ErManager 和 Resolver
er = ErManager(base=SQLModel, session_factory=async_session)
Resolver = er.create_resolver()

# 在 endpoint 中使用
app = FastAPI()

@app.get("/tasks")
async def get_tasks():
    async with async_session() as session:
        tasks = (await session.exec(select(Task))).all()
    dtos = [TaskSummary.model_validate(t) for t in tasks]
    resolver = Resolver()
    return await resolver.resolve(dtos)
```

### 模式 F：Core API 跨层数据流

使用 ExposeAs + SendTo + Collector 实现父子节点协作。

```python
from sqlmodel_nexus import DefineSubset, SubsetConfig, Collector, ErManager

class UserSummary(DefineSubset):
    __subset__ = SubsetConfig(kls=User, fields=['id', 'name'])

class TaskDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Task,
        fields=['id', 'title'],
        send_to=[('owner', 'contributors')],
    )
    owner: UserSummary | None = None
    full_title: str = ""

    def post_full_title(self, ancestor_context):
        sprint_name = ancestor_context.get('sprint_name', '')
        return f"{sprint_name} / {self.title}"

class SprintDetail(DefineSubset):
    __subset__ = SubsetConfig(
        kls=Sprint,
        fields=['id', 'name'],
        expose_as=[('name', 'sprint_name')],
    )
    tasks: list[TaskDetail] = []
    contributors: list[UserSummary] = []

    def post_contributors(self, collector=Collector('contributors')):
        return collector.values()

# 使用
er = ErManager(base=SQLModel, session_factory=async_session)
Resolver = er.create_resolver()

resolver = Resolver()
result = await resolver.resolve([
    SprintDetail.model_validate(s) for s in sprints
])
# result 中每个 SprintDetail 包含:
# - tasks: 已加载的 TaskDetail 列表
# - contributors: 从 tasks 中聚合的 owner 列表
# - 每个 TaskDetail.full_title 包含 sprint_name 前缀
```

### 模式 G：RPC 服务 + MCP + Voyager

定义业务服务，同时服务 MCP 和 FastAPI。

```python
from fastapi import FastAPI
from sqlmodel_nexus import RpcService, create_rpc_mcp_server, create_rpc_voyager

class SprintService(RpcService):
    """Sprint management."""

    @classmethod
    async def list_sprints(cls) -> list[dict]:
        async with get_session() as session:
            sprints = (await session.exec(select(Sprint))).all()
            return [s.model_dump() for s in sprints]

    @classmethod
    async def get_sprint(cls, sprint_id: int) -> dict | None:
        async with get_session() as session:
            sprint = await session.get(Sprint, sprint_id)
            return sprint.model_dump() if sprint else None

services = [
    {"name": "sprint", "service": SprintService, "description": "Sprint management"},
]

# MCP 服务（stdio 模式）
mcp = create_rpc_mcp_server(services=services, name="Project API")

# Voyager 可视化（FastAPI 挂载）
app = FastAPI()
voyager = create_rpc_voyager(
    services=services,
    er_manager=er,
    name="Project API",
)
app.mount("/voyager", voyager)
```

AI 助手使用 RPC 的三层流程：
1. `list_services()` → 发现 sprint 服务
2. `describe_service(service_name="sprint")` → 查看 list_sprints、get_sprint 方法
3. `call_rpc(service_name="sprint", method_name="get_sprint", params='{"sprint_id": 1}')`

---

## 6. 约束与限制

### GraphQL 模式

| 约束 | 说明 |
|------|------|
| session_factory | DataLoader 关系加载需要 session_factory。不提供时，关系字段将无法加载 |
| 列表关系分页 | 需要在 Relationship 上配置 `sa_relationship_kwargs={"order_by": "Entity.column"}` |
| 单主键 | AutoQueryConfig 的 by_id 仅支持单主键实体 |
| 精确匹配 | by_filter 仅支持字段精确匹配，复杂查询需自定义 @query 方法 |
| snake_case 字段名 | 不会自动转为 camelCase |
| @query 方法签名 | 第一个参数必须是 cls，装饰器自动转为 classmethod |
| 实体发现 | 实体必须有 @query/@mutation 装饰器才会被主动发现。仅有 Relationship 引用的实体会被动纳入 |
| Input 类型 | 方法参数中的 SQLModel/BaseModel 子类自动成为 GraphQL Input 类型 |

### Core API 模式

| 约束 | 说明 |
|------|------|
| DTO 字段类型 | 关系字段必须使用 DefineSubset DTO 类型，不能用 SQLModel 实体 |
| resolve_* 先于 post_* | post_* 在子树完全解析后执行，可安全读取 resolve_* 赋值的字段 |
| Loader 依赖名匹配 | `Loader('author')` 要求 ErManager 中有名为 `author` 的关系 |
| ErManager base/entities 互斥 | 不能同时提供 `base` 和 `entities` 参数 |
| FK 字段自动隐藏 | `__subset__` 中的 FK 字段自动设为 `exclude=True`，但内部可访问 |
| Implicit Auto-Loading 条件 | 必须同时满足：无 resolve_*、非 subset 字段、匹配关系名、DTO 类型兼容 |

---

## 7. 实体发现机制

GraphQLHandler 构造时执行实体发现：

1. 扫描 `base` 的所有 SQLModel 子类
2. 找到有 `@query` 或 `@mutation` 装饰器的类
3. 递归遍历这些类的 `Relationship` 字段，纳入关联实体
4. 为所有发现的实体生成 SDL 类型定义
5. 为所有发现的实体创建 DataLoader（基于 SQLAlchemy inspect 的关系元数据）

ErManager 构造时执行实体发现：

1. 接收实体列表（通过 `base` 自动发现或 `entities` 显式指定）
2. 检查每个实体的 SQLAlchemy ORM 关系
3. 检查每个实体的 `__relationships__` 自定义关系
4. 为每个关系创建对应的 DataLoader 类
5. 若 `enable_pagination=True`，验证所有列表关系都配置了 `order_by`

关系类型与 DataLoader 映射：

| SQLAlchemy 关系方向 | DataLoader 类型 | SQL 模式 |
|-------------------|----------------|---------|
| MANYTOONE | `create_many_to_one_loader` | `WHERE target.id IN (:fk_values)` |
| ONETOMANY（列表） | `create_one_to_many_loader` | `WHERE fk_col IN (:parent_ids)` |
| ONETOMANY（标量） | `create_many_to_one_loader` | 同 MANYTOONE |
| MANYTOMANY | `create_many_to_many_loader` | 通过关联表 JOIN |
| CUSTOM（`__relationships__`） | 自定义 DataLoader | 由用户 loader 函数定义 |

---

## 8. GraphQL 执行流程

```
GraphQL Query String
       │
       ├─ 内省查询 (__schema / __type)
       │   → IntrospectionGenerator 直接处理
       │
       └─ 普通查询
           │
           ├─ 1. QueryParser.parse() → FieldSelection 树
           ├─ 2. 找到 @query 方法，执行用户代码（只加载标量字段）
           ├─ 3. resolve_relationships：逐层 DataLoader 批量加载
           │     每一层收集所有 FK 值，一次 SQL 批量查询
           └─ 4. 按请求字段序列化结果
```

分页 SQL（启用时）：

```sql
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY fk_col ORDER BY sort_col) AS _rn,
           COUNT(*) OVER (PARTITION BY fk_col) AS _tc
    FROM target_table
    WHERE fk_col IN (:fk_values)
) sub WHERE _rn BETWEEN :start AND :end
```

一条 SQL 为所有父实体获取各自的分页数据。

---

## 9. Core API 执行流程

```
resolver.resolve(dtos)
       │
       └─ _traverse(node, parent)
           │
           ├─ 1. 准备阶段
           │     ├─ 设置 parent contextvar
           │     ├─ ExposeAs：将字段值写入 ancestor_context
           │     └─ 创建 Collector 实例
           │
           ├─ 2. resolve_* 方法 + implicit auto-load（并发）
           │     ├─ 执行所有 resolve_* 方法
           │     ├─ 扫描 implicit auto-load 字段
           │     └─ traverse 已有对象字段
           │
           ├─ 3. post_* 方法（并发）
           │     └─ 在子树完全就绪后执行
           │
           ├─ 4. SendTo → Collector
           │     └─ 将 SendTo 字段值添加到祖先 Collector
           │
           └─ 5. 清理阶段
                 ├─ 释放 per-node Collector
                 └─ 重置 contextvar
```

**`build_dto_select` 优化查询：**

根据 DefineSubset DTO 的字段定义，生成只包含所需列的 SQL SELECT 语句（而非 `SELECT *`）。

```python
from sqlmodel_nexus import build_dto_select

# 参数：dto_class（DefineSubset 子类）、where（可选过滤条件）
# 返回：SQLModel select 语句，仅包含 DTO __subset__ 中定义的列

# 查全部
stmt = build_dto_select(TaskSummary)
async with async_session() as session:
    rows = (await session.exec(stmt)).all()
dtos = [TaskSummary(**dict(row._mapping)) for row in rows]

# 带 where 条件
stmt = build_dto_select(SprintDetail, where=Sprint.id == sprint_id)
async with async_session() as session:
    rows = (await session.exec(stmt)).all()
if not rows:
    return None
dto = SprintDetail(**dict(rows[0]._mapping))
return await Resolver().resolve(dto)
```

**用法模式：** `build_dto_select` → session.exec → `dict(row._mapping)` → DTO 构造 → `Resolver().resolve()`
