# pydantic-resolve

pydantic-resolve is a powerful Python library that transforms Pydantic models from static data containers into dynamic, composable components with automatic data resolution and post-processing capabilities. It provides a declarative approach to building complex nested data structures without imperative glue code, enabling developers to compose sophisticated business models from base entities through resolve hooks and post-processing methods.

The library integrates seamlessly with modern Python web frameworks like FastAPI, Litestar, and Django-ninja. It leverages DataLoader for efficient batch loading to eliminate N+1 query problems, supports bidirectional data flow through ancestor/descendant communication, and provides powerful post-processing hooks for data transformation, aggregation, and computed fields. Version 2.0 introduces ErDiagram for declarative entity relationships with automatic dataloader inference, significantly reducing boilerplate code.

## Resolver

Core resolver for executing data resolution lifecycle

```python
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader
from aiodataloader import DataLoader

# Define base models
class User(BaseModel):
    id: int
    name: str

class Task(BaseModel):
    id: int
    title: str
    assignee_id: int

# Create DataLoader
class UserLoader(DataLoader):
    async def batch_load_fn(self, user_ids):
        users = await fetch_users_by_ids(user_ids)
        return [users.get(uid) for uid in user_ids]

# Extend model with resolve method
class TaskWithUser(Task):
    user: User | None = None
    def resolve_user(self, loader=Loader(UserLoader)):
        return loader.load(self.assignee_id)

# Execute resolution
tasks = [TaskWithUser(id=1, title="Fix bug", assignee_id=10)]
resolved_tasks = await Resolver().resolve(tasks)
# Output: tasks[0].user is now populated with User data
```

## Resolver with Context and Loader Parameters

Pass global context and configure DataLoader parameters

```python
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader
from aiodataloader import DataLoader

class FilteredUserLoader(DataLoader):
    level: str  # class attribute for filtering

    async def batch_load_fn(self, user_ids):
        users = await fetch_users_by_ids(user_ids, level=self.level)
        return [users.get(uid) for uid in user_ids]

class Project(BaseModel):
    id: int
    name: str
    members: list[User] = []

    def resolve_members(self, loader=Loader(FilteredUserLoader), context):
        # Context can add request-specific business rules on top of loader params.
        if context['min_level'] > 10:
            return []
        return loader.load(self.id)

# Configure resolver with parameters
projects = [Project(id=1, name="Alpha")]
resolver = Resolver(
    loader_params={FilteredUserLoader: {'level': 'senior'}},
    context={'min_level': 5},
    debug=True  # Enable performance profiling
)
resolved = await resolver.resolve(projects)
# Output: projects[0].members contains only senior-level users
# Debug output shows timing for each node
```

## build_list and build_object

Helper functions for aggregating DataLoader results

```python
from pydantic_resolve import build_list, build_object
from aiodataloader import DataLoader

class CommentsByPostLoader(DataLoader):
    async def batch_load_fn(self, post_ids):
        # Fetch all comments for requested post_ids
        comments = await db.query(
            "SELECT * FROM comments WHERE post_id IN (?)", post_ids
        )
        # build_list groups items by key, returns list per key
        return build_list(comments, post_ids, lambda c: c.post_id)

class AuthorLoader(DataLoader):
    async def batch_load_fn(self, author_ids):
        authors = await db.query(
            "SELECT * FROM authors WHERE id IN (?)", author_ids
        )
        # build_object maps items by key, returns single item per key
        return build_object(authors, author_ids, lambda a: a.id)

# Usage in models
class Post(BaseModel):
    id: int
    author_id: int

    comments: list[Comment] = []
    def resolve_comments(self, loader=Loader(CommentsByPostLoader)):
        return loader.load(self.id)  # Returns list of comments

    author: Author | None = None
    def resolve_author(self, loader=Loader(AuthorLoader)):
        return loader.load(self.author_id)  # Returns single author
```

## Post Methods for Data Transformation

Post-process resolved data after descendants complete

```python
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader

class Task(BaseModel):
    id: int
    estimate_hours: int

class Sprint(BaseModel):
    id: int
    name: str

    tasks: list[Task] = []
    def resolve_tasks(self, loader=Loader(TaskLoader)):
        return loader.load(self.id)

    # Post method executes after tasks are fully resolved
    total_hours: int = 0
    def post_total_hours(self):
        return sum(task.estimate_hours for task in self.tasks)

    completed_count: int = 0
    def post_completed_count(self):
        return len([t for t in self.tasks if t.status == 'done'])

sprints = [Sprint(id=1, name="Sprint 1")]
resolved = await Resolver().resolve(sprints)
# Output: sprints[0].total_hours and completed_count are computed
```

## Collector for Cross-Generation Data Aggregation

Collect data from descendants to ancestors

```python
from typing import Annotated
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader, Collector, SendTo

class Task(BaseModel):
    id: int
    assignee_id: int
    assignee: Annotated[User | None, SendTo('all_assignees')] = None

    def resolve_assignee(self, loader=Loader(UserLoader)):
        return loader.load(self.assignee_id)

class Story(BaseModel):
    id: int
    tasks: list[Task] = []
    def resolve_tasks(self, loader=Loader(TaskLoader)):
        return loader.load(self.id)

class Project(BaseModel):
    id: int
    stories: list[Story] = []
    def resolve_stories(self, loader=Loader(StoryLoader)):
        return loader.load(self.id)

    # Collector aggregates 'assignee' from all descendant tasks
    all_assignees: list[User] = []
    def post_all_assignees(self, collector=Collector(alias='all_assignees')):
        return collector.values()

projects = [Project(id=1)]
resolved = await Resolver().resolve(projects)
# Output: projects[0].all_assignees contains the assignees collected from nested tasks
```

## Expose and Ancestor Context

Share ancestor data with descendants

```python
from typing import Annotated
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader, ExposeAs

class Task(BaseModel):
    id: int
    name: str

    # Access ancestor context in post method
    full_name: str = ''
    def post_full_name(self, ancestor_context):
        project_name = ancestor_context['project_name']
        story_name = ancestor_context['story_name']
        return f"{project_name}/{story_name}/{self.name}"

class Story(BaseModel):
    id: int
    name: Annotated[str, ExposeAs('story_name')]
    tasks: list[Task] = []
    def resolve_tasks(self, loader=Loader(TaskLoader)):
        return loader.load(self.id)

class Project(BaseModel):
    id: int
    name: Annotated[str, ExposeAs('project_name')]
    stories: list[Story] = []
    def resolve_stories(self, loader=Loader(StoryLoader)):
        return loader.load(self.id)

projects = [Project(id=1, name="Alpha", stories=[
    Story(id=1, name="Feature X", tasks=[Task(id=1, name="Implement")])
])]
resolved = await Resolver().resolve(projects)
# Output: resolved[0].stories[0].tasks[0].full_name = "Alpha/Feature X/Implement"
```

## Parent Access for Tree Structures

Access direct parent node in hierarchical data

```python
from pydantic import BaseModel
from pydantic_resolve import Resolver

class TreeNode(BaseModel):
    name: str
    children: list['TreeNode'] = []

    # Build path from root using parent reference
    path: str = ''
    def resolve_path(self, parent):
        if parent is not None:
            return f'{parent.path}/{self.name}'
        return self.name

    # Compute depth using parent
    depth: int = 0
    def post_depth(self, parent):
        if parent is not None:
            return parent.depth + 1
        return 0

tree = TreeNode(name="root", children=[
    TreeNode(name="a", children=[TreeNode(name="a1")]),
    TreeNode(name="b", children=[TreeNode(name="b1")])
])
resolved = await Resolver().resolve(tree)
# Output:
# resolved.path = "root"
# resolved.children[0].path = "root/a"
# resolved.children[0].children[0].path = "root/a/a1"
# resolved.children[0].children[0].depth = 2
```

## ErDiagram with AutoLoad for Automatic Loader Inference

Declare entity relationships once, use everywhere

```python
from typing import Annotated
from pydantic import BaseModel
from pydantic_resolve import (
    Resolver, Entity, Relationship, ErDiagram,
    config_global_resolver
)

# Define base entities
class User(BaseModel):
    id: int
    name: str

class Task(BaseModel):
    id: int
    owner_id: int
    story_id: int

class Story(BaseModel):
    id: int
    owner_id: int

# Declare entity relationships centrally
diagram = ErDiagram(configs=[
    Entity(
        kls=Story,
        relationships=[
            Relationship(fk='id', target=list[Task], name='tasks', loader=story_to_task_loader),
            Relationship(fk='owner_id', target=User, name='owner', loader=user_batch_loader)
        ]
    ),
    Entity(
        kls=Task,
        relationships=[
            Relationship(fk='owner_id', target=User, name='owner', loader=user_batch_loader)
        ]
    )
])

AutoLoad = diagram.create_auto_load()

# Apply diagram globally
config_global_resolver(diagram)

# Use AutoLoad annotation - loaders auto-inferred from diagram
class StoryDetail(Story):
    tasks: Annotated[list[Task], AutoLoad()] = []
    owner: Annotated[User, AutoLoad()] = None

stories = [StoryDetail(id=1, owner_id=10)]
resolved = await Resolver().resolve(stories)
# Output: tasks and owner automatically resolved using diagram loaders
```

`AutoLoad()` matches the response field name (`tasks`, `owner`) against `Relationship.name`. If the field name differs, use `AutoLoad(origin='relationship_name')`.

## ensure_subset for Type Safety

Enforce field subset relationships between models

```python
from pydantic import BaseModel
from pydantic_resolve import ensure_subset

class FullUser(BaseModel):
    id: int
    name: str
    email: str
    password_hash: str
    created_at: str

# Ensure PublicUser only uses fields from FullUser
@ensure_subset(FullUser)
class PublicUser(BaseModel):
    id: int
    name: str
    # If FullUser renames 'name' to 'username', this raises AttributeError

@ensure_subset(FullUser)
class UserSummary(BaseModel):
    id: int
    name: str
    created_at: str

# Safe to use as response models - guaranteed valid subset
users = [FullUser(**data) for data in fetch_users()]
public_users = [PublicUser(**u.model_dump()) for u in users]
# Output: definition-time safety that fields exist in the base model
```

## mapper for Custom Data Transformation

Apply custom transformations to resolved data

```python
from pydantic import BaseModel
from pydantic_resolve import Resolver, Loader, mapper

def format_timestamps(comments):
    """Transform raw comments to formatted versions"""
    return [{
        **c,
        'created_at': c['created_at'].isoformat(),
        'content': c['content'].strip()
    } for c in comments]

def uppercase_names(users):
    """Normalize user names"""
    for user in users:
        user.name = user.name.upper()
    return users

class Post(BaseModel):
    id: int

    comments: list[Comment] = []
    @mapper(format_timestamps)
    def resolve_comments(self, loader=Loader(CommentLoader)):
        return loader.load(self.id)

    contributors: list[User] = []
    @mapper(uppercase_names)
    def resolve_contributors(self, loader=Loader(UserLoader)):
        return loader.load(self.id)

posts = [Post(id=1)]
resolved = await Resolver().resolve(posts)
# Output: comments have ISO timestamps, contributor names are uppercase
```

## Custom Collector for Specialized Aggregation

Implement custom collection logic

```python
from typing import Annotated
from pydantic import BaseModel
from pydantic_resolve import Resolver, ICollector, SendTo

class SumCollector(ICollector):
    def __init__(self, alias):
        self.alias = alias
        self.total = 0

    def add(self, val):
        self.total += val

    def values(self):
        return self.total

class Task(BaseModel):
    id: int
    estimate: Annotated[int, SendTo('total_estimate')]

class Project(BaseModel):
    id: int
    tasks: list[Task] = []
    def resolve_tasks(self, loader=Loader(TaskLoader)):
        return loader.load(self.id)

    total_estimate: int = 0
    def post_total_estimate(self, collector=SumCollector(alias='total_estimate')):
        return collector.values()

projects = [Project(id=1)]
resolved = await Resolver().resolve(projects)
# Output: projects[0].total_estimate = sum of all task estimates
```

## DefineSubset for Selective Field Inheritance

Dynamically create subsets of base models

```python
from pydantic import BaseModel
from pydantic_resolve import DefineSubset
from typing import Annotated

# Prerequisite:
# AutoLoad = diagram.create_auto_load()
# config_global_resolver(diagram)

class FullStory(BaseModel):
    id: int
    name: str
    description: str
    owner_id: int
    created_at: str
    updated_at: str
    sprint_id: int

# Define subset with only needed fields.
# owner_id is kept explicit here for clarity, although DefineSubset can auto-add
# missing FK fields required by AutoLoad with exclude=True.
class StoryListItem(DefineSubset):
    __subset__ = (FullStory, ('id', 'name', 'owner_id'))

    owner: Annotated[User, AutoLoad()] = None

class StoryDetail(DefineSubset):
    __subset__ = (
        FullStory,
        ('id', 'name', 'description', 'owner_id', 'sprint_id')
    )

    owner: Annotated[User, AutoLoad()] = None
    tasks: Annotated[list[Task], AutoLoad()] = []

# Use different views for different endpoints
stories = await fetch_stories()
list_view = [StoryListItem(**s.model_dump()) for s in stories]
detail_view = [StoryDetail(**s.model_dump()) for s in stories]
```

## post_default_handler for Final Processing

Execute final logic after all post methods complete

```python
from typing import Annotated
from pydantic import BaseModel
from pydantic_resolve import Resolver, Collector, SendTo

class Task(BaseModel):
    id: Annotated[int, SendTo('task_ids')]
    status: str

class Sprint(BaseModel):
    id: int
    tasks: list[Task] = []

    total_tasks: int = 0
    def post_total_tasks(self):
        return len(self.tasks)

    completed_tasks: int = 0
    def post_completed_tasks(self):
        return len([t for t in self.tasks if t.status == 'done'])

    task_ids: list[int] = []
    def post_task_ids(self, collector=Collector(alias='task_ids')):
        return collector.values()

    completion_rate: float = 0.0
    summary: str = ''

    # Runs after ALL other post methods finish
    def post_default_handler(self):
        self.completion_rate = (
            self.completed_tasks / self.total_tasks
            if self.total_tasks > 0 else 0
        )
        self.summary = f"Sprint {self.id}: {self.completion_rate:.0%} complete"

sprints = [Sprint(id=1, tasks=[Task(id=1, status='done'), Task(id=2, status='pending')])]
resolved = await Resolver().resolve(sprints)
# Output: sprints[0].summary = "Sprint 1: 50% complete"
```

## FastAPI Integration Example

Complete FastAPI endpoint with pydantic-resolve

```python
from typing import Annotated
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from pydantic_resolve import Resolver

# Prerequisite:
# diagram defines relationships named 'tasks' on Story and 'assignee' on Task.
# AutoLoad = diagram.create_auto_load()
# config_global_resolver(diagram)

app = FastAPI()

class TaskDetail(BaseModel):
    id: int
    name: str
    estimate: int
    assignee_id: int  # FK required by AutoLoad on a plain BaseModel
    assignee: Annotated[User | None, AutoLoad()] = None

class StoryResponse(BaseModel):
    id: int
    name: str
    tasks: Annotated[list[TaskDetail], AutoLoad()] = []

    total_estimate: int = 0
    def post_total_estimate(self):
        return sum(t.estimate for t in self.tasks)

@app.get('/stories', response_model=list[StoryResponse])
async def get_stories(session: AsyncSession = Depends(get_session)):
    # Fetch base data as dicts or Pydantic-compatible objects.
    story_rows = await fetch_story_rows(session)
    story_data = [StoryResponse.model_validate(row) for row in story_rows]

    # Resolve nested data and compute fields
    resolved = await Resolver(
        context={'session': session},
        debug=False
    ).resolve(story_data)

    return resolved
# Output: JSON with stories including nested tasks, assignees, and computed totals
```

## GraphQL Query Execution

Execute GraphQL queries directly from the shared ERD.

```python
# Prerequisite: diagram exposes a root query named `stories`, defined via
# QueryConfig(method=list_stories, name='stories') or an entity @query method.
from pydantic_resolve import GraphQLHandler

handler = GraphQLHandler(diagram)
result = await handler.execute(
    "{ stories { id tasks { id name } owner { name } } }"
)
# Output: {'data': {...}} or {'data': None, 'errors': [...]}
```

## MCP Integration

Expose the same ERD-powered GraphQL API to AI agents through MCP.

```python
from pydantic_resolve import AppConfig, create_mcp_server

mcp = create_mcp_server(
    apps=[
        AppConfig(
            name="blog",
            er_diagram=diagram,
            description="Blog API powered by the shared ERD",
        )
    ],
    name="Blog API",
)
mcp.run()
# Output: AI agents can inspect the schema and execute GraphQL operations through MCP tools
```

pydantic-resolve excels at building complex data structures by combining base entities, efficient batch loading via DataLoader, flexible post-processing for computed fields, and bidirectional data flow between ancestors and descendants. The ErDiagram feature in v2.0 enables declarative entity relationship definitions with automatic loader inference, drastically reducing boilerplate code.

Common use cases include API response composition, aggregation queries with computed fields, hierarchical data structures with path building, collecting nested data across multiple levels, building flexible view models from persistent entities, and eliminating N+1 query problems in GraphQL-style data fetching. The library integrates naturally with FastAPI, SQLAlchemy, and other async Python frameworks, providing a clean separation between data fetching (resolve methods) and business logic (post methods) that enhances maintainability and testability.