Metadata-Version: 2.4
Name: reaktiv
Version: 0.14.2
Summary: Reactive signals for Python with async support
Author-email: Tuan Anh Bui <mail@bui.app>
License: MIT
Project-URL: Homepage, https://github.com/buiapp/reaktiv
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.9
Description-Content-Type: text/markdown

# reaktiv

![Python Version](https://img.shields.io/badge/python-3.9%2B-blue) [![PyPI Version](https://img.shields.io/pypi/v/reaktiv.svg)](https://pypi.org/project/reaktiv/) ![PyPI - Downloads](https://img.shields.io/pypi/dm/reaktiv) ![Documentation Status](https://readthedocs.org/projects/reaktiv/badge/) ![License](https://img.shields.io/badge/license-MIT-green) [![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)

**Reactive Signals for Python** with first-class async support, inspired by Angular's reactivity model.

![reaktiv](assets/logo.png)

## Documentation

Full documentation is available at [https://reaktiv.readthedocs.io/](https://reaktiv.readthedocs.io/).

## Installation

```bash
pip install reaktiv
# or with uv
uv pip install reaktiv
```

## Quick Start

### Basic Reactivity

```python
import asyncio
from reaktiv import signal, effect

async def main():
    name = signal("Alice")

    async def greet():
        print(f"Hello, {name()}!")

    # Create and schedule effect
    # IMPORTANT: Assign the effect to a variable to ensure it is not garbage collected.
    greeter = effect(greet)

    name.set("Bob")  # Prints: "Hello, Bob!"
    await asyncio.sleep(0)  # Process effects

asyncio.run(main())
```

### Using `update()`

Instead of calling `set(new_value)`, `update()` lets you modify a signal based on its current value.

```python
from reaktiv import signal

counter = signal(0)

# Standard way
counter.set(counter() + 1)

# Using update() for cleaner syntax
counter.update(lambda x: x + 1)

print(counter())  # 2
```

### Computed Values

```python
from reaktiv import signal, computed

# Synchronous context example
price = signal(100)
tax_rate = signal(0.2)
total = computed(lambda: price() * (1 + tax_rate()))

print(total())  # 120.0
tax_rate.set(0.25)
print(total())  # 125.0
```

## Core Concepts

```mermaid
graph LR
    A[Signal] -->|Value| B[Computed Signal]
    A -->|Change| C[Effect]
    B -->|Value| C
    B -->|Change| C
    C -->|Update| D[External System]
    
    classDef signal fill:#4CAF50,color:white;
    classDef computed fill:#2196F3,color:white;
    classDef effect fill:#FF9800,color:white;
    
    class A,B signal;
    class B computed;
    class C effect;
```

## How it Works

`reaktiv` provides three core primitives:

1. **Signals**: Store a value and notify dependents when it changes
2. **Computed Signals**: Derive values that automatically update when dependencies change
3. **Effects**: Run side effects when signals or computed signals change

## Why reaktiv?

If you've worked with modern frontend frameworks like React, Vue, or Angular, you're familiar with the power of reactive state management. The idea is simple but transformative: when data changes, everything that depends on it updates automatically. 

While this pattern revolutionized frontend development, **its benefits are equally powerful in backend systems** where complex state management is often overlooked or implemented with brittle, ad-hoc solutions.

`reaktiv` brings these **reactive programming** advantages to your Python backend projects:

- **Automatic state dependency tracking:** No more manually tracing which components need updating when data changes
- **Declarative state relationships:** Define how data is transformed once, not every time it changes
- **Efficient fine-grained updates:** Only recompute what actually needs to change
- **Async-first design:** Seamlessly integrates with Python's `asyncio` for managing real-time data flows
- **Zero external dependencies:** Lightweight with minimal overhead
- **Type-safe:** Fully annotated for clarity and maintainability

## Benefits for Backend Development

`reaktiv` addresses key challenges in backend state management:

1. **Eliminates manual dependency tracking**: No more forgotten update logic when state changes
2. **Prevents state synchronization bugs**: Updates happen automatically and consistently
3. **Improves performance**: Only affected computations are recalculated
4. **Reduces cognitive load**: Declare relationships once, not throughout your codebase
5. **Simplifies testing**: Clean separation of state, derivation, and effects

Even in "stateless" architectures, ephemeral state still exists during request processing. `reaktiv` helps manage this complexity without the boilerplate of observers, callbacks, or event dispatchers.

## Beyond Pub/Sub: State Management in Backend Systems

Many backend developers view reactive libraries as just another pub/sub system and question their value in "stateless" architectures. However, `reaktiv` addresses fundamentally different problems:

### Traditional Pub/Sub vs. Reaktiv

| Pub/Sub Systems | Reaktiv |
|----------------|---------|
| Message delivery between components | Automatic state dependency tracking |
| Point-to-point or broadcast messaging | Fine-grained computation graphs |
| Manual subscription management | Automatic dependency detection |
| Focus on message transport | Focus on state derivation |
| Stateless by design | Intentional state management |

### State in "Stateless" Systems

Even in "stateless" microservices and serverless functions, state exists during request processing:

- Configuration management
- Request context propagation
- In-memory caching
- Rate limiting and circuit breaking
- Feature flag evaluation
- Connection pooling
- Runtime metrics collection

`reaktiv` helps manage this ephemeral state with less code, fewer bugs, and better maintainability.

## Basic Examples

### Feature Flag System with Dynamic Rules

```python
from reaktiv import signal, computed, effect
import time

# Core state
user_segments = signal({"user1": ["premium", "beta_tester"]})
feature_flags = signal({
    "new_dashboard": {"enabled": True, "segments": ["premium"]},
    "dark_mode": {"enabled": True, "segments": []},
    "beta_feature": {"enabled": True, "segments": ["beta_tester"]}
})

# Computed user permissions that update automatically
user_features = computed(lambda: {
    user_id: [
        flag_name 
        for flag_name, flag in feature_flags().items()
        if flag["enabled"] and (
            not flag["segments"] or 
            any(segment in user_segments().get(user_id, []) for segment in flag["segments"])
        )
    ]
    for user_id in user_segments()
})

# Add real-time monitoring
def monitor_features():
    features = user_features()
    for user_id, enabled_features in features.items():
        if "beta_feature" in enabled_features:
            print(f"User {user_id} has access to beta features")

feature_monitor = effect(monitor_features)

# When segments change, permissions automatically update
user_segments.update(lambda segments: {**segments, "user2": ["premium"]})
```

## Real-World Backend Use Cases

### 1. Intelligent Cache Management

```python
from reaktiv import signal, computed, effect

# Database-derived state (imagine this comes from your database)
user_data = signal({})
permissions = signal({})
content_items = signal({})

# Computed caches that automatically update when source data changes
user_permissions = computed(lambda: {
    user_id: [perm for perm_id, perm in permissions().items() 
              if perm['user_id'] == user_id]
    for user_id in user_data()
})

authorized_content = computed(lambda: {
    user_id: [item for item_id, item in content_items().items()
              if any(p['level'] >= item['required_level'] for p in user_permissions().get(user_id, []))]
    for user_id in user_data()
})

# Demonstration of automatic cache updates
def demo_reactive_cache():
    # Initial data setup
    user_data.set({"user1": {"name": "Alice"}, "user2": {"name": "Bob"}})
    permissions.set({
        "p1": {"user_id": "user1", "level": 5},  # Admin
        "p2": {"user_id": "user2", "level": 2}   # Editor
    })
    content_items.set({
        "c1": {"title": "Public Content", "required_level": 1},
        "c2": {"title": "Admin Content", "required_level": 5}
    })
    
    # Access permissions and content (cache is computed on first access)
    print("Bob can access:", [item["title"] for item in authorized_content().get("user2", [])])
    # Output: Bob can access: ['Public Content']
    
    # Update Bob's permission level - cache automatically updates!
    permissions.update(lambda p: {**p, "p2": {"user_id": "user2", "level": 5}})
    
    # Cache is automatically recalculated only for Bob
    print("Bob can now access:", [item["title"] for item in authorized_content().get("user2", [])])
    # Output: Bob can now access: ['Public Content', 'Admin Content']
    
    # No manual cache invalidation needed anywhere!

demo_reactive_cache()
```

### 2. Adaptive Rate Limiting & Circuit Breaking

```python
from reaktiv import signal, computed, effect
import time
import asyncio

# Track API calls and failures
endpoint_calls = signal({})  # endpoint: [timestamp, timestamp...]
endpoint_failures = signal({})  # endpoint: [timestamp, timestamp...]

# Computed circuit breakers that automatically update
circuit_status = computed(lambda: {
    endpoint: "open" if len(failures) >= 5 and (time.time() - failures[-1]) < 30 else "closed"
    for endpoint, failures in endpoint_failures().items()
})

# Dynamic rate limiting based on traffic patterns
rate_limits = computed(lambda: {
    endpoint: max(10, min(1000, len(calls) // 10)) 
    for endpoint, calls in endpoint_calls().items() 
    if calls and time.time() - calls[0] < 60
})

# Add monitoring effect to see changes in real-time
def monitor_circuit_status():
    status = circuit_status()
    if status:
        print(f"Circuit Status: {status}")

circuit_monitor = effect(monitor_circuit_status)

# Simulate API calls and failures
async def simulate_traffic():
    print("\n=== Simulating API Traffic and Failures ===")
    
    # Record some successful calls
    endpoint_calls.update(lambda calls: {
        **calls, 
        "api/users": [time.time() - i for i in range(20)]
    })
    print(f"Rate limit for api/users: {rate_limits()['api/users']} requests/min")
    
    # Simulate failures for api/orders
    endpoint_failures.update(lambda failures: {
        **failures,
        "api/orders": [time.time() - i for i in range(3)]
    })
    print(f"Circuit status for api/orders: {circuit_status()['api/orders']}")
    
    # Simulate more failures to trigger circuit breaker
    print("Adding more failures to api/orders...")
    endpoint_failures.update(lambda failures: {
        **failures,
        "api/orders": failures["api/orders"] + [time.time() for _ in range(3)]
    })
    print(f"Circuit status for api/orders: {circuit_status()['api/orders']}")

# Run this example with: asyncio.run(simulate_traffic())
```

### 3. Multi-Layer Configuration Management

```python
from reaktiv import signal, computed, effect
import asyncio

# Configuration at different levels
global_config = signal({"log_level": "INFO", "timeout": 30})
service_config = signal({"auth": {"timeout": 10}})
instance_config = signal({"log_level": "DEBUG"})

# Computed effective configuration with correct precedence
effective_config = computed(lambda: {
    **global_config(),
    **{k: v for k, v in service_config().items() if not isinstance(v, dict)},
    **instance_config()
})

# Nested configs are merged properly
auth_config = computed(lambda: {
    **global_config(),
    **(service_config().get("auth", {}))
})

# When any config source changes, all systems update automatically
def log_config_changes():
    print(f"Current effective config: {effective_config()}")
    print(f"Auth specific config: {auth_config()}")

logger_config = effect(log_config_changes)

async def demo_configuration():
    print("\n=== Configuration Management Demo ===")
    
    # Initial configuration state
    print(f"Initial effective config: {effective_config()}")
    print(f"Initial auth config: {auth_config()}")
    
    # Change global config
    print("\nUpdating global timeout to 60 seconds...")
    global_config.update(lambda cfg: {**cfg, "timeout": 60})
    await asyncio.sleep(0.1)  # Allow effects to process
    
    # Override at service level
    print("\nAdding database configuration at service level...")
    service_config.update(lambda cfg: {
        **cfg, 
        "database": {"host": "localhost", "port": 5432}
    })
    await asyncio.sleep(0.1)  # Allow effects to process
    
    # Change instance config
    print("\nChanging instance log_level to TRACE...")
    instance_config.update(lambda cfg: {**cfg, "log_level": "TRACE"})
    await asyncio.sleep(0.1)  # Allow effects to process

# Run this example with: asyncio.run(demo_configuration())
```

## Advanced Examples

### 1. Resource Pool Management

```python
from reaktiv import signal, computed, effect
import time

# Connection pool state
db_connections = signal({})  # id: {created_at, last_used, state}
connection_requests = signal(0)

# Auto-scaling connection pool
idle_connections = computed(lambda: [
    conn_id for conn_id, conn in db_connections().items() 
    if conn['state'] == 'idle' and time.time() - conn['last_used'] < 60
])

connections_needed = computed(lambda: max(0, connection_requests() - len(idle_connections())))

# Effect that manages pool size based on demand
def manage_pool():
    current_needed = connections_needed()
    if current_needed > 0:
        create_new_connections(current_needed)
    elif len(idle_connections()) > 10 and connection_requests() < 5:
        close_excess_connections()

pool_manager = effect(manage_pool)
```

### 2. Multi-Stage Data Processing Pipeline

```python
from reaktiv import signal, computed, effect, batch
import json

# Raw event stream
raw_events = signal([])

# Normalized data
normalized_events = computed(lambda: [
    {**event, "timestamp": parse_timestamp(event.get("ts", 0))}
    for event in raw_events()
])

# Filtered data
error_events = computed(lambda: [
    event for event in normalized_events()
    if event.get("level") == "ERROR"
])

# Aggregated metrics
error_count_by_service = computed(lambda: {
    service: len([e for e in error_events() if e.get("service") == service])
    for service in set(e.get("service", "unknown") for e in error_events())
})

# Effect to trigger alerts
def check_alerts():
    counts = error_count_by_service()
    for service, count in counts.items():
        if count > 5:
            print(f"ALERT: {service} has {count} errors")

alert_system = effect(check_alerts)

# Adding data triggers the entire pipeline automatically
def ingest_batch(new_events):
    raw_events.update(lambda events: events + new_events)

# Multiple updates in a batch to prevent intermediate recalculations
with batch():
    raw_events.update(lambda events: events + [
        {"service": "auth", "level": "ERROR", "ts": 1619712000},
        {"service": "auth", "level": "ERROR", "ts": 1619712060}
    ])
```

### 3. Real-Time System Monitoring

```python
from reaktiv import signal, computed, effect
import asyncio

# System metrics
cpu_usage = signal([])
memory_usage = signal([])
disk_io = signal([])

# Derived analytics
avg_cpu = computed(lambda: sum(cpu_usage()[-5:]) / 5 if len(cpu_usage()) >= 5 else 0)
avg_memory = computed(lambda: sum(memory_usage()[-5:]) / 5 if len(memory_usage()) >= 5 else 0)

# System status derived from multiple metrics
system_status = computed(lambda: 
    "critical" if avg_cpu() > 90 or avg_memory() > 90 else
    "warning" if avg_cpu() > 70 or avg_memory() > 70 else
    "normal"
)

# Single monitoring effect that updates based on derived status
def update_monitoring_dashboard():
    status = system_status()
    print(f"System status: {status}")
    print(f"CPU: {avg_cpu():.1f}%, Memory: {avg_memory():.1f}%")
    
    if status == "critical":
        print("⚠️ ALERT: System resources critical!")

dashboard = effect(update_monitoring_dashboard)

# When new metrics arrive, all derived values and the dashboard update automatically
async def simulate_metrics():
    for i in range(10):
        cpu_usage.update(lambda readings: readings + [50 + i * 5])
        memory_usage.update(lambda readings: readings + [60 + i * 4])
        await asyncio.sleep(1)

# No need to manually update the dashboard - it reacts to changes automatically
```

### 4. API Gateway Rate Limiting Example

```python
from reaktiv import signal, computed, effect
import time

# Per-client request tracking
client_requests = signal({})  # client_id: [(timestamp, endpoint), ...]

# Computed rate limits that automatically update
requests_per_minute = computed(lambda: {
    client_id: len([req for timestamp, _ in requests 
                   if time.time() - timestamp < 60])
    for client_id, requests in client_requests().items()
})

endpoint_requests = computed(lambda: {
    endpoint: sum(1 for client_reqs in client_requests().values() 
                 for _, ep in client_reqs if ep == endpoint)
    for endpoint in set(ep for reqs in client_requests().values() 
                       for _, ep in reqs)
})

# Rate limit decision making
should_rate_limit = computed(lambda: {
    client_id: requests_per_minute().get(client_id, 0) > 100
    for client_id in client_requests()
})

# Real-time monitoring
def monitor_rate_limits():
    limits = should_rate_limit()
    throttled = [client for client, limited in limits.items() if limited]
    if throttled:
        print(f"Rate limiting clients: {throttled}")

rate_limit_monitor = effect(monitor_rate_limits)

# When new requests come in, all rate limits automatically recalculate
def track_request(client_id, endpoint):
    client_requests.update(lambda reqs: {
        **reqs,
        client_id: reqs.get(client_id, []) + [(time.time(), endpoint)]
    })
```

## Example Application: Health Monitoring System

This example shows how `reaktiv` simplifies building a real-time health monitoring system that ingests metrics, computes derived health indicators, and triggers alerts.

```python
from reaktiv import signal, computed, effect
import asyncio
import time

# Core state signals
server_metrics = signal({})  # server_id -> {cpu, memory, disk, last_seen}
alert_thresholds = signal({"cpu": 80, "memory": 90, "disk": 95})
maintenance_mode = signal({})  # server_id -> bool

# Derived state
servers_online = computed(lambda: {
    server_id: time.time() - metrics["last_seen"] < 60
    for server_id, metrics in server_metrics().items()
})

health_status = computed(lambda: {
    server_id: (
        "maintenance" if maintenance_mode().get(server_id, False) else
        "offline" if not servers_online().get(server_id, False) else
        "alert" if (
            metrics["cpu"] > alert_thresholds()["cpu"] or
            metrics["memory"] > alert_thresholds()["memory"] or
            metrics["disk"] > alert_thresholds()["disk"]
        ) else 
        "healthy"
    )
    for server_id, metrics in server_metrics().items()
})

servers_by_status = computed(lambda: {
    status: [server_id for server_id, s in health_status().items() if s == status]
    for status in ["healthy", "alert", "offline", "maintenance"]
})

# Effects for monitoring and alerting
def update_dashboard():
    statuses = servers_by_status()
    print(f"Dashboard: {len(statuses['healthy'])} healthy, {len(statuses['alert'])} in alert")
    
    if statuses["alert"]:
        print(f"⚠️ ALERT: Servers in alert state: {statuses['alert']}")

dashboard_effect = effect(update_dashboard)

async def main():
    # Update metrics - all derived values and effects update automatically
    server_metrics.set({
        "server1": {"cpu": 70, "memory": 65, "disk": 80, "last_seen": time.time()},
        "server2": {"cpu": 85, "memory": 50, "disk": 70, "last_seen": time.time()}
    })
    
    await asyncio.sleep(1)
    
    # Put a server in maintenance - dashboard updates automatically
    maintenance_mode.set({"server2": True})
    
    await asyncio.sleep(1)
    
    # Adjust thresholds - alerts recalculate automatically
    alert_thresholds.set({"cpu": 75, "memory": 90, "disk": 95})

asyncio.run(main())
```

## More Examples

You can find more example scripts in the examples folder to help you get started with using this project.

---

**Inspired by** Angular Signals • **Built for** Python's async-first world • **Made in** Hamburg
