Metadata-Version: 2.4
Name: dmr-toolkit
Version: 0.1.0
Summary: Reusable, composable design pattern helpers for Django REST APIs
Project-URL: Homepage, https://github.com/h7ussein-ahmad/dmr-toolkit
Project-URL: Repository, https://github.com/h7ussein-ahmad/dmr-toolkit
Author: h7ussein.ahmad
License: MIT
Keywords: cache,command,component,django,event-sourcing,observer,patterns,plugin,repository,rest,specification,state-machine,strategy,unit-of-work
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: Django
Classifier: Framework :: Django :: 4.2
Classifier: Framework :: Django :: 5.0
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: django>=4.2
Provides-Extra: dev
Requires-Dist: hypothesis; extra == 'dev'
Requires-Dist: pytest; extra == 'dev'
Requires-Dist: pytest-asyncio; extra == 'dev'
Requires-Dist: pytest-django; extra == 'dev'
Provides-Extra: otel
Requires-Dist: opentelemetry-api>=1.20; extra == 'otel'
Description-Content-Type: text/markdown

# dmr-toolkit

Reusable, composable design pattern helpers for Django REST APIs.

`dmr-toolkit` gives Django API developers first-class, opinionated implementations of the most common architectural patterns — Repository, Specification, State Machine, Observer/EventBus, Plugin Registry, Lazy Loader, Strategy, Cache Layer, Component, Command, Unit of Work, and Event Sourcing — all wired together and ready to use.

---

## Installation

```bash
pip install dmr-toolkit
```

For OpenTelemetry observability support:

```bash
pip install "dmr-toolkit[otel]"
```

**Requirements:** Python 3.10+, Django 4.2+

---

## Quick Start

Add `dmr_toolkit_patterns` to your `INSTALLED_APPS` to enable Django migrations for audit and event-sourcing models:

```python
INSTALLED_APPS = [
    ...
    "dmr_toolkit_patterns",
]
```

Run migrations:

```bash
python manage.py migrate
```

Optional configuration in `settings.py`:

```python
DMR_TOOLKIT = {
    "patterns": {
        "audit": True,               # Enable audit logs for StateMachine + Command
        "cache_backend": "default",  # Django cache alias
        "event_bus": "default",
        "event_store_backend": "django",
        "plugin_group": "dmr_toolkit.plugins",
    }
}
```

---

## Patterns

### Repository

Decouples domain logic from data access. Swap storage backends in tests with zero friction.

```python
from dmr_toolkit.patterns import Repository, DjangoRepository, NotFound

# Abstract base — implement your own
class OrderRepository(Repository[Order, int]):
    def get(self, id: int) -> Order: ...
    def list(self, spec=None) -> list[Order]: ...
    # ... add, update, remove, and async variants

# Concrete Django-backed implementation
class DjangoOrderRepository(DjangoRepository[Order, int]):
    model = Order  # binds to Order.objects manager

repo = DjangoOrderRepository()

try:
    order = repo.get(42)
except NotFound:
    print("Order not found")

orders = repo.list()          # all orders
await repo.aget(42)           # async variant
await repo.alist()            # async list
```

---

### Specification

Composable, testable query predicates. Build complex filters from small units without raw ORM expressions in business logic.

```python
from dmr_toolkit.patterns import Specification
from django.db.models import Q

class ActiveSpec(Specification):
    def is_satisfied_by(self, entity) -> bool:
        return entity.status == "active"

    def to_query(self) -> Q:
        return Q(status="active")

class PremiumSpec(Specification):
    def is_satisfied_by(self, entity) -> bool:
        return entity.tier == "premium"

    def to_query(self) -> Q:
        return Q(tier="premium")

# Compose with operators
active_premium = ActiveSpec() & PremiumSpec()
active_or_premium = ActiveSpec() | PremiumSpec()
not_active = ~ActiveSpec()

# Use with Repository
repo.list(active_premium)

# Serialise / deserialise
data = active_premium.to_dict()
restored = Specification.from_dict(data)
```

---

### State Machine

Declarative entity lifecycle management. Transitions are validated, audited, and impossible to bypass accidentally.

```python
from dmr_toolkit.patterns import StateMachine, Transition

class OrderMachine(StateMachine):
    class Meta:
        states = ["pending", "paid", "shipped", "cancelled"]
        state_field = "status"
        audit = True  # writes StateTransitionAudit records
        transitions = [
            Transition("pay",    source="pending",  target="paid"),
            Transition("ship",   source="paid",     target="shipped"),
            Transition("cancel", source="pending",  target="cancelled",
                       guards=[lambda entity, user: entity.can_cancel]),
        ]

machine = OrderMachine()
machine.trigger(order, "pay", user=request.user)   # updates order.status → "paid"
machine.trigger(order, "ship", user=request.user)  # → "shipped"

# Raises InvalidTransition if source state is wrong
# Raises GuardFailed if a guard returns False

print(machine.diagram())  # Mermaid stateDiagram-v2 string
```

---

### Observer / EventBus

Typed publish/subscribe. Decouple domain events from side-effect handlers without Django signal boilerplate.

```python
from dmr_toolkit.patterns import EventBus

bus = EventBus()

@bus.on("order.paid")
def send_receipt(payload):
    email_service.send(payload["email"], "Your receipt")

@bus.on("order.*")          # wildcard — matches any order.* event
async def log_order_event(payload):
    await audit_log.write(payload)

bus.emit("order.paid", {"email": "user@example.com", "amount": 99.99})
await bus.aemit("order.shipped", {"tracking": "ABC123"})
```

Fault isolation: if one handler raises, the rest still run. All errors are collected into a single `EventDispatchError`.

---

### Plugin Registry

Runtime plugin discovery and management. Third-party packages extend behaviour without modifying core code.

```python
from dmr_toolkit.patterns import Plugin, PluginRegistry

# Define the interface
class Plugin:
    def activate(self): ...
    def deactivate(self): ...

# Third-party plugin (declared in pyproject.toml entry points)
# [project.entry-points."dmr_toolkit.plugins"]
# my_plugin = "my_package:MyPlugin"

registry = PluginRegistry()
registry.load_entry_points("dmr_toolkit.plugins")  # auto-discovers installed plugins

# Manual management
registry.register("my_plugin", MyPlugin)
plugin_class = registry.get("my_plugin")
registry.unregister("my_plugin")

for meta in registry.list_plugins():
    print(meta.name, meta.version, meta.status)
```

---

### Lazy Loader

Descriptor for deferred, cached resource initialisation. Expensive resources are initialised only on first access.

```python
from dmr_toolkit.patterns import lazy

class MyService:
    # Factory called once on first access; result cached on the instance
    client = lazy(lambda: ExpensiveAPIClient())

    # With TTL — re-initialises after 60 seconds
    config = lazy(lambda: load_remote_config(), ttl=60.0)

svc = MyService()
svc.client   # factory called here
svc.client   # returns cached value — factory NOT called again

# Clear cache on demand
MyService.client.reset(svc)
svc.client   # factory called again
```

Thread-safe: the factory is called at most once per instance under concurrent access.

---

### Strategy

Interchangeable algorithms selected at runtime. No conditional branching in business logic.

```python
from dmr_toolkit.patterns import Strategy
from abc import abstractmethod

class PricingStrategy(Strategy):
    @abstractmethod
    def execute(self, order) -> float: ...

@PricingStrategy.register("standard")
class StandardPricing(PricingStrategy):
    def execute(self, order) -> float:
        return order.subtotal

@PricingStrategy.register("premium")
class PremiumPricing(PricingStrategy):
    def execute(self, order) -> float:
        return order.subtotal * 0.9  # 10% discount

# Resolve at runtime
strategy_cls = PricingStrategy.resolve("premium")
price = strategy_cls().execute(order)

# Raises StrategyNotFound with available names if not registered
```

---

### Cache Layer

Transparent, key-managed caching with minimal boilerplate.

```python
from dmr_toolkit.patterns import cached, cache_invalidate

@cached(ttl=300, key_prefix="orders:", vary_on=["user_id"])
def get_user_orders(user_id: int) -> list:
    return Order.objects.filter(user_id=user_id)

# Per-user cache isolation — different user_id → different cache entry
orders = get_user_orders(user_id=42)

@cache_invalidate("orders:*")
def create_order(user_id: int, data: dict) -> Order:
    return Order.objects.create(user_id=user_id, **data)

# Calling create_order() invalidates all "orders:*" cache keys
create_order(user_id=42, data={"item": "Widget"})
```

---

### Component

Self-contained, reusable API response fragments that fetch their own data and compose into larger responses.

```python
from dmr_toolkit.patterns import Component, DMRComponentMixin

class UserCard(Component):
    def render(self, context: dict) -> dict:
        user = User.objects.get(id=context["user_id"])
        return {"user": {"id": user.id, "name": user.name}}

class OrderSummary(Component):
    def render(self, context: dict) -> dict:
        orders = Order.objects.filter(user_id=context["user_id"])
        return {"order_count": orders.count()}

class ProfilePage(Component):
    children = [UserCard(), OrderSummary()]
    cache_ttl = 60  # cache rendered output for 60 seconds

    def render(self, context: dict) -> dict:
        return {"page": "profile"}

# Async rendering — children resolved concurrently
page = ProfilePage()
result = await page.arender({"user_id": 42})
# result = {"page": "profile", "user": {...}, "order_count": 5}

# In a DMR_Controller
class ProfileController(DMRComponentMixin):
    def get(self, request):
        return self.component_response(ProfilePage())
```

---

### Command

Encapsulated, auditable, optionally reversible business operations.

```python
from dmr_toolkit.patterns import Command

class CreateOrderCommand(Command[Order]):
    def __init__(self, user_id: int, items: list):
        self.user_id = user_id
        self.items = items
        self._created_order = None

    def execute(self) -> Order:
        self._created_order = Order.objects.create(
            user_id=self.user_id, items=self.items
        )
        return self._created_order

    def undo(self) -> None:
        if self._created_order:
            self._created_order.delete()

cmd = CreateOrderCommand(user_id=42, items=["Widget"])
order = cmd.execute()   # creates order, writes CommandAuditLog if audit=True
cmd.undo()              # deletes the order

# Async variants
order = await cmd.aexecute()
await cmd.aundo()
```

---

### Unit of Work

Groups multiple Repository operations into a single atomic transaction with automatic rollback on failure.

```python
from dmr_toolkit.patterns import UnitOfWork

uow = UnitOfWork()

with uow:
    order = order_repo.add(Order(user_id=42, total=99.99))
    inventory_repo.update(item)
    bus.emit("order.created", {"id": order.id})
    # All operations committed atomically on exit
    # EventBus events dispatched AFTER successful commit

# On exception — all operations rolled back, events suppressed
try:
    with uow:
        order_repo.add(bad_order)
        raise ValueError("something went wrong")
except ValueError:
    pass  # transaction rolled back, no events dispatched

# Async context manager
async with uow:
    await order_repo.aadd(order)

# Nested — inner block uses savepoints
with uow:
    order_repo.add(order)
    with uow:                    # inner savepoint
        inventory_repo.update(item)
        raise ValueError()       # rolls back inner savepoint only
    # outer transaction still intact
```

---

### Event Store

Append-only domain event log for Event Sourcing and audit trails.

```python
from dmr_toolkit.patterns import EventStore

store = EventStore()

# Append events
event = store.append(
    aggregate_id="order-42",
    aggregate_type="Order",
    event_type="OrderCreated",
    payload={"user_id": 42, "total": 99.99},
)

store.append("order-42", "Order", "OrderPaid", {"method": "card"})
store.append("order-42", "Order", "OrderShipped", {"tracking": "ABC"})

# Load and replay
events = store.load("order-42")
for e in events:
    print(f"seq={e.sequence} type={e.event_type}")

# Optimistic concurrency
store.append("order-42", "Order", "OrderCancelled", {}, expected_seq=3)
# Raises ConcurrencyConflict if current head != 3

# Projections
store.project("OrderPaid", lambda e: update_revenue_report(e.payload))

# Snapshots — skip replaying old events
store.save_snapshot("order-42", state={"status": "shipped"}, seq=3)
snapshot = store.load_snapshot("order-42")
events_after = store.load("order-42", after_seq=snapshot.sequence)

# Human-readable replay log
print(store.replay_log("order-42"))
# seq=1 type=OrderCreated
# seq=2 type=OrderPaid
# seq=3 type=OrderShipped
```

---

## Exception Hierarchy

All exceptions carry structured context for easy debugging:

```
PatternError (base)
├── NotFound                  — Repository.get on missing entity
├── InvalidTransition         — StateMachine invalid source state
├── GuardFailed               — StateMachine guard returned False
├── DuplicatePlugin           — PluginRegistry duplicate name
├── StrategyNotFound          — Strategy.resolve unknown name
├── ConcurrencyConflict       — EventStore expected_seq mismatch
└── EventDispatchError        — Observer aggregate of handler errors
```

```python
from dmr_toolkit.patterns.exceptions import NotFound, InvalidTransition

try:
    order = repo.get(999)
except NotFound as e:
    print(repr(e))  # NotFound(model='Order', id=999)
```

---

## Test Helpers

`dmr_toolkit.testing` ships in-memory doubles for every pattern so your unit tests never touch the database:

```python
from dmr_toolkit.testing import (
    InMemoryRepository,
    FakeEventStore,
    MockObserver,
    StrategyTestCase,
)

# InMemoryRepository — full CRUD, raises NotFound correctly
repo = InMemoryRepository()
entity = repo.add(MyEntity(id=1, name="test"))
assert repo.get(1) == entity

# FakeEventStore — full append/load/snapshot, no DB
store = FakeEventStore()
store.append("agg-1", "Order", "Created", {})
events = store.load("agg-1")

# MockObserver — records emitted events for assertion
mock_bus = MockObserver()
mock_bus.emit("order.paid", {"id": 42})
mock_bus.assert_emitted("order.paid", {"id": 42})

# StrategyTestCase — isolates strategy registry per test
class TestMyStrategy(StrategyTestCase):
    strategy_class = MyStrategy

    def test_resolve(self):
        # Registry is fresh for each test — no cross-test pollution
        MyStrategy.register("test")(ConcreteStrategy)
        assert MyStrategy.resolve("test") is ConcreteStrategy
```

---

## OpenTelemetry Observability

Install the optional dependency:

```bash
pip install "dmr-toolkit[otel]"
```

When your application configures an OTel tracer provider, `dmr-toolkit` automatically emits spans for:

- Repository queries (`repository.get`, `repository.list`, etc.)
- Command executions (`command.execute`, `command.undo`)
- State Machine transitions (`state_machine.trigger`)
- Cache hits and misses (`cache.hit`, `cache.miss`, `cache.invalidate`)

No configuration needed — spans are no-ops when OTel is not installed or not configured.

---

## Settings Reference

```python
DMR_TOOLKIT = {
    "patterns": {
        # Enable audit logging for StateMachine transitions and Command executions
        # Writes StateTransitionAudit and CommandAuditLog records
        "audit": False,

        # Django cache alias used by @cached and Component.cache_ttl
        "cache_backend": "default",

        # EventBus backend: "default" (in-process) or "celery"
        "event_bus": "default",

        # EventStore backend: "django" (ORM) or "redis" (requires redis extra)
        "event_store_backend": "django",

        # Entry-point group for PluginRegistry.load_entry_points()
        "plugin_group": "dmr_toolkit.plugins",
    }
}
```

---

## Django Migrations

The following models require migrations when using Django ORM persistence:

| Model | Purpose |
|---|---|
| `StateTransitionAudit` | Audit log for StateMachine transitions |
| `CommandAuditLog` | Audit log for Command executions |
| `DomainEvent` | Immutable event records for Event Sourcing |
| `Snapshot` | Aggregate snapshots for Event Sourcing |

```bash
python manage.py migrate dmr_toolkit_patterns
```

---

## License

MIT
