Metadata-Version: 2.4
Name: base-repository
Version: 1.1.0
Summary: A repository library that wraps SQLAlchemy and provides built-in CRUD and a query DSL.
Author-email: jjade <jjades1205@gmail.com>
Project-URL: Homepage, https://github.com/4jades/base-repository
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: <3.14,>=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pydantic>=1.7.4
Requires-Dist: sqlalchemy>=1.4
Requires-Dist: typing-extensions>=4.6.0
Requires-Dist: pydantic<3.0.0,>=2.8.0; python_version >= "3.13"
Dynamic: license-file

<div align="right">
  <a href="./README.md">English</a> | <a href="./README.ko.md">한국어</a>
</div>

# Base Repository

![Python](https://img.shields.io/badge/python-3.10%20%7C%203.11%20%7C%203.12-informational)
![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen)
![License](https://img.shields.io/github/license/4jades/base-repository)


A repository library that wraps SQLAlchemy and provides built-in CRUD and a query DSL.    
Once you inherit from `BaseRepository`, you can use create/read/update/delete right away.

- No need to re-implement simple CRUD over and over.
- Mypy-friendly. Provides generic-aware return types.
- If you need custom repo methods, you can add them in your project and extend freely.
- Supports introducing a Mapper for conversions between Pydantic schemas and SQLAlchemy ORM models.

## Supported Dependency Versions

The table below lists the minimum required versions for each supported dependency.

To see more about [how to test](./tox.ini) and the [result](./docs/dependency_test.md).

| python_version | sqlalchemy         | pydantic          |
|----------------|--------------------|-------------------|
| 3.10           | >= 1.4             | >= 1.10           |
| >= 3.13        | >= 1.4             | >= 2.8, 1.7-1.10  |



## Links
- [How to use](https://4jades.github.io/base-repository/how_to_use.html)
- [Performance](https://4jades.github.io/base-repository/about_performance.html)

---

## Installation

```bash
pip install base-repository
```

---

## Quick Start

### 1) Define Filter and Repo

```python
from dataclasses import dataclass
from pydantic import BaseModel
from base_repository import BaseRepoFilter, BaseRepository

# Example: existing SQLAlchemy ORM model and Pydantic schema in your project
class Base(DeclarativeBase):
    pass

class UserModel(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(nullable=False)

class UserSchema(BaseModel):
    id: int
    name: str
    model_config = ConfigDict(from_attributes=True)


# What you need to add for Base Repository: Filter + Repo
@dataclass
class UserFilter(BaseRepoFilter):
    id: int | iterable[int] | None = None
    name: str | iterable[str] | None = None

class UserRepo(BaseRepository[UserModel, UserSchema]):  # Optional: return Pydantic objects by default
    filter_class = UserFilter

# If you don't need schema mapping
class UserRepo(BaseRepository[UserModel]):
    filter_class = UserFilter

# If you need mapper
class UserRepo(BaseRepository[UserModel, UserSchema]):
    filter_class = UserFilter
    mapper = Usermapper

```

### 2) Plug in a SessionProvider

Decide how the repository should obtain an `AsyncSession`.    
Recommended: inject a `SessionProvider` once, so the repo can fetch sessions when needed.

```python
from typing import Protocol
from base_repository import SessionProvider, BaseRepository

# Example SessionProvider implementation
class MysqlSessionProvider(SessionProvider):
    def get_session(self) -> AsyncSession:
        return Mysql.current_session()

BaseRepository.configure_session_provider(MysqlSessionProvider())
```

Alternatively, you can pass a session directly when creating the repo.

```python
async with AsyncSession(engine) as session:
    repo = UserRepo(session)
```

Alternatively, you can pass the session directly when calling a method.

```python
await repo.create({"name": "Alice", "email": "a@test.com"}, session=session)
```

### 3) Use CRUD immediately

```python
repo = UserRepo()

# Get one
user = await repo.get(UserFilter(name=["Alice", "Bob"]))
user_orm = await repo.get(UserFilter(name="Alice"), convert_schema=False)

# Get one (required)
user2 = await repo.get_or_fail(UserFilter(id=1))

# Create (dict)
created = await repo.create({"name": "Alice", "email": "a@test.com"})

# Create (schema)
created2 = await repo.create(UserSchema(name="Bob", email="b@test.com"))

# Create (schema -> ORM)
created2_orm = await repo.create(
    UserSchema(name="Bob", email="b@test.com"),
    convert_schema=False,
)

# Create many
created_many = await repo.create_many(
    [
        {"name": "Alice", "email": "a@test.com"},
        UserSchema(name="Bob", email="b@test.com"),
    ]
)

# Create many (skip schema conversion -> ORM list)
created_many_orm = await repo.create_many(
    [
        {"name": "Alice", "email": "a@test.com"},
        {"name": "Bob", "email": "b@test.com"},
    ],
    skip_convert=True,
)

# Create from ORM model (as-is)
created3 = await repo.create_from_model(UserModel(name="Chris", email="c@test.com"))

# List (OFFSET paging)
q = (
    repo.list()
        .where(UserFilter(name="A"))
        .order_by(["id", "name"]) # order_by("id"), order_by(User.id), order_by(User.id.desc()) ..
        .paging(page=1, size=20)
)
users = await repo.execute(q)

# List (CURSOR paging)
q1 = (
    repo.list()
        .order_by(["id", "name"]) # order_by("id"), order_by(User.id), order_by(User.id.desc()) ..
        .with_cursor({})  # first page
        .limit(20)
)
users1 = await repo.execute(q1)

# List (convenience: get_list)
users2 = await repo.get_list(flt=UserFilter(name="A"), order_by=["id", User.name], page=1, size=20)

# List (convenience: get_list + cursor)
users3 = await repo.get_list(order_by="id", cursor={}, size=20)

# Update / Delete / Count
cnt = await repo.count(UserFilter(name="Alice"))
updated_rows = await repo.update(UserFilter(name="Bob"), {"email": "bob@new"})
deleted_rows = await repo.delete(UserFilter(name="Alice"))

# Update (dirty checking)
base = await repo.get_or_fail(UserFilter(id=1), convert_schema=False)
after = await repo.update_from_model(base=base, update={"email": "new@test.com"}, convert_schema=True)

# add / add_all (caller controls flush/commit)
repo.add(User(name="D", email="d@test.com"))
repo.add_all([User(name="E", email="e@test.com"), User(name="F", email="f@test.com")])
await repo.session.flush()
```


---
