Metadata-Version: 2.4
Name: snap-task-orchestrator
Version: 0.1.0
Summary: Lightweight distributed task orchestrator with absolute process isolation and SQLite WAL persistence.
Home-page: https://github.com/annaqibz01/snap
Author: Muhammad Nagib
Author-email: nagib@arcelion.com
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-python
Dynamic: summary

# snap — Lightweight, Zero-Dependency Distributed Task Orchestrator

## Executive Overview

**snap** is a lightweight, embedded, production-ready distributed task orchestrator built entirely on the Python Standard Library and SQLite. It requires **zero external dependencies**—no Redis, no RabbitMQ, no Celery, no message brokers. Designed for high-performance engineering teams who need reliable background task execution without infrastructure overhead, snap provides:

- **Process-level isolation** via the `multiprocessing` `spawn` start method, protecting the orchestrator from user-code memory leaks, segmentation faults, and hard crashes.
- **Atomic task scheduling** using SQLite Write-Ahead Logging (WAL) mode with `BEGIN IMMEDIATE` transactions to eliminate race conditions and double-claiming.
- **Directed Acyclic Graph (DAG) support** with built-in topological cycle detection for complex dependency chains.
- **Automatic crash recovery** with persistent worker heartbeats, orphaned task reclamation, and checkpoint/restart capabilities.

snap is ideal for microservices, data pipelines, CI/CD workflows, and any Python application requiring robust background processing without operational complexity.

## Architectural & Technical Highlights

### Zero-Dependency Infrastructure
snap is powered exclusively by the Python Standard Library and SQLite. There are **no third-party packages, message brokers, or external services** to install, configure, or maintain. The entire system runs in a single process with thread-safe SQLite connections and isolated worker subprocesses.

### Absolute Memory & Process Isolation
Every task executes in its **own dedicated OS process** using the `multiprocessing` `spawn` start method (PEP 703 compatible). This guarantees:
- Complete memory isolation between the orchestrator and user code.
- Protection against segmentation faults and hard crashes in task functions.
- Deterministic cleanup of resources via context managers (`__enter__`/`__exit__`).

### Atomic Task Claiming with Write-Locking
snap eliminates race conditions through:
- `BEGIN IMMEDIATE` transactions that acquire an exclusive write lock before claiming tasks.
- Exponential backoff retry logic for lock contention scenarios.
- Filtered claims that respect DAG dependency constraints within the same transaction.

### Built-in Directed Acyclic Graph (DAG) Engine
The dependency system is mapped relationally in a dedicated `snap_task_dependencies` table with:
- Native topological cycle detection via `graphlib.TopologicalSorter` before enqueueing.
- Automatic filtering of blocked tasks during claiming (tasks wait until all parents complete).
- Efficient indexing for child-parent lookups.

### Fail-Safe Robustness
- **WAL mode** allows concurrent reads and writes without blocking.
- **Persistent heartbeats** from workers enable detection of stalled or dead processes.
- **Automatic reclamation** of tasks assigned to dead workers, with retry count incrementation.
- **Complete error traces** captured and persisted for failed tasks.
- **Progress checkpoints** allow long-running tasks to resume from the last successful step.

## Installation Guide

### Prerequisites
- Python 3.9 or higher
- No third-party packages required

### Installation

Install snap in editable mode for development, or directly for deployment:

```bash
# From the project root directory
pip install -e .
```

This installs the package and makes the `snap-scheduler` CLI entry point available. Verify installation:

```bash
python -c "import snap; print(snap.__version__)"
```

No additional configuration is needed. snap is ready to use immediately.

## Quick Start & Practical Usage

### Defining and Enqueueing Tasks

Create a separate module for your tasks to avoid Python's `__main__` import guard issues:

**`tasks.py`**:
```python
import time
from snap.interface.decorators import task

@task(max_retries=3, priority=10)
def send_email(recipient: str, subject: str, body: str):
    """Simulate sending an email."""
    print(f"Sending email to {recipient}...")
    time.sleep(2)  # Simulate work
    print(f"Email sent to {recipient}")
    return {"status": "sent", "recipient": recipient}

@task(max_retries=5, priority=5)
def process_image(image_path: str, format: str = "webp"):
    """Simulate image processing."""
    print(f"Processing {image_path} to {format}...")
    time.sleep(3)
    print(f"Processed {image_path}")
    return {"status": "processed", "path": image_path}

@task(max_retries=2, priority=1)
def generate_report(data_id: str):
    """Simulate report generation."""
    print(f"Generating report for {data_id}...")
    time.sleep(1)
    print(f"Report generated for {data_id}")
    return {"status": "generated", "report_id": data_id}
```

**`run.py`**:
```python
from tasks import send_email, process_image, generate_report
from snap.orchestrator.engine import Engine

# Initialize the engine (this also sets the global DB path)
engine = Engine(db_path="/tmp/snap.db", max_workers=4)

# Enqueue tasks using the .delay() API
email_task_id = send_email.delay(
    "user@example.com",
    "Welcome!",
    "Thank you for signing up."
)

image_task_id = process_image.delay(
    "/images/photo.jpg",
    format="avif"
)

report_task_id = generate_report.delay("REPORT-2024-001")

print(f"Enqueued tasks:")
print(f"  Email task:  {email_task_id}")
print(f"  Image task:  {image_task_id}")
print(f"  Report task: {report_task_id}")

# Start the engine to process tasks
engine.start()

# Block until tasks complete
engine.wait_for_completion(email_task_id, timeout=60)
engine.wait_for_completion(image_task_id, timeout=60)
engine.wait_for_completion(report_task_id, timeout=60)

engine.stop()
print("All tasks completed successfully.")
```

### Handling Task Dependencies (DAG)

Tasks can declare explicit dependencies on other tasks. The orchestrator ensures a task only runs **after all its parent tasks have completed**:

**`pipeline.py`**:
```python
from tasks import process_image, generate_report
from snap.orchestrator.engine import Engine

engine = Engine(db_path="/tmp/snap.db", max_workers=2)
engine.start()

# Step 1: Enqueue image processing (no dependencies)
image_task = process_image.delay("/images/photo.jpg")

# Step 2: Enqueue report generation that depends on the image task
# This task will not execute until 'image_task' is COMPLETED
report_task = generate_report.delay(
    "REPORT-2024-001",
    depends_on=[image_task]  # <-- DAG dependency
)

print(f"Image task:  {image_task}")
print(f"Report task: {report_task} (waiting for image to complete)")

# Wait for both tasks
engine.wait_for_completion(report_task, timeout=120)
engine.stop()
print("Pipeline completed successfully.")
```

The orchestrator automatically:
1. Validates the dependency graph for cycles before enqueueing.
2. Marks tasks as `READY` only when all parent tasks are `COMPLETED`.
3. Blocks claiming of tasks with uncompleted dependencies.

### Using Progress Checkpoints

For long-running tasks, you can persist intermediate state to enable recovery from failures:

**`long_task.py`**:
```python
import time
from snap.interface.decorators import task
from snap.worker.process import checkpoint

@task(max_retries=3, priority=5)
def process_large_dataset(dataset_id: str):
    """Process a large dataset with checkpoint support."""
    
    # Phase 1: Data loading
    print(f"[{dataset_id}] Loading data...")
    time.sleep(10)
    checkpoint("data_loaded", {
        "dataset_id": dataset_id,
        "loaded_rows": 1000000,
        "phase": "loading_complete"
    })
    
    # Phase 2: Transformation
    print(f"[{dataset_id}] Transforming data...")
    time.sleep(15)
    checkpoint("transformation_complete", {
        "dataset_id": dataset_id,
        "transformed_rows": 950000,
        "phase": "transformation_complete"
    })
    
    # Phase 3: Export
    print(f"[{dataset_id}] Exporting results...")
    time.sleep(5)
    
    return {"status": "complete", "dataset": dataset_id}
```

If the worker crashes during Phase 2, upon retry, the orchestrator will:
1. Detect the checkpoint from Phase 1.
2. Pass the checkpoint state as the first argument to the function.
3. Resume from Phase 2, avoiding re-execution of Phase 1.

## Running the Orchestrator (The Daemon Loop)

### Starting the Scheduler

After installing snap, start the orchestrator daemon using the CLI entry point:

```bash
# Start the scheduler with default settings
snap-scheduler

# Or specify a custom database path
export SNAP_ACTIVE_DB_PATH=/custom/path/snap.db
snap-scheduler
```

### What Happens Behind the Scenes

When `snap-scheduler` starts, it:

1. **Initializes the database schema** (`snap_tasks`, `snap_workers`, `snap_checkpoints`, `snap_task_dependencies` tables) if not present.
2. **Sets the global database path** via `DBManager.set_db_path()` so all `@task.delay()` calls resolve to the correct database.
3. **Begins an orchestration loop** that:
   - Updates the engine's heartbeat to avoid self-reclamation.
   - Reclaims orphaned tasks assigned to dead workers.
   - Calculates available worker slots.
   - **Atomically claims** the next batch of ready tasks (with DAG dependency filtering).
   - **Spawns isolated `WorkerProcess` instances** for each claimed task.
4. **Monitors worker health** via persistent heartbeat updates.
5. **Collects telemetry** from workers via a unidirectional `multiprocessing.Pipe`.

The loop runs at 100Hz by default, providing high throughput while maintaining resource efficiency.

### Graceful Shutdown

The scheduler handles `SIGINT` (Ctrl+C) and `SIGTERM` gracefully:
1. Sets the stop flag to prevent new task claims.
2. Terminates all active workers (graceful `terminate()` first, then `kill()` if needed).
3. Joins the orchestration thread.
4. Restores signal handlers and cleans up resources.

## Core API Reference & Technical Specification

### `@task(max_retries=3, priority=0)`

The primary decorator for converting a Python function into a snap task.

| Parameter     | Type   | Default | Description |
|---------------|--------|---------|-------------|
| `max_retries` | `int`  | `3`     | Maximum number of automatic retries when a task fails. |
| `priority`    | `int`  | `0`     | Scheduling priority. Higher values are dequeued first by the orchestrator. |

**Behavior**: The decorator wraps the function into a `TaskDefinition` object that preserves the original function's metadata. The task's fully qualified name (e.g., `mymodule.myfunc`) is automatically resolved.

**Returns**: A `TaskDefinition` instance with a `.delay()` method.

---

### `task.delay(*args, **kwargs)`

Enqueue a task for asynchronous execution by the orchestrator.

| Parameter | Type         | Description |
|-----------|--------------|-------------|
| `*args`   | `tuple`      | Positional arguments passed to the task function. |
| `**kwargs`| `dict`       | Keyword arguments passed to the task function. If only `**kwargs` are provided, they are automatically wrapped into a single positional argument for convenience. |

**Returns**: A `str` UUID representing the task ID. This ID can be used for:
- Waiting for completion via `engine.wait_for_completion(task_id)`.
- Defining DAG dependencies via `depends_on=[task_id]`.

**Side Effects**:
1. Sensitive data (keys matching patterns like `password`, `secret`, `token`) are **automatically masked** before serialization via `SecurityEngine`.
2. Arguments and keyword arguments are **pickled** into binary BLOBs for SQLite storage.
3. The database path is resolved **dynamically at execution time** from `SNAP_ACTIVE_DB_PATH` environment variable or the global path set by the Engine.

---

### `checkpoint(step_name, state_dict)`

Persist intermediate computation state for a running task. Enables recovery from failures.

| Parameter    | Type   | Description |
|--------------|--------|-------------|
| `step_name`  | `str`  | A human-readable label for this checkpoint step (e.g., `"data_loaded"`, `"transformation_complete"`). |
| `state_dict` | `dict` | Arbitrary application state to persist. Must be picklable. |

**Raises**: `RuntimeError` if called outside a snap worker process (i.e., `SNAP_TASK_ID` environment variable not set).

**Behavior**:
1. Serializes the state dictionary with `pickle`.
2. Opens a dedicated SQLite connection and performs `INSERT OR REPLACE` into `snap_checkpoints`.
3. The checkpoint ID is constructed as `{task_id}_{step_name}`.

**Recovery**: When a failed task is retried, the worker loads the **most recent checkpoint** and passes it as the first positional argument to the task function. The task can then inspect this state to skip already-completed phases.

---

### `Engine(db_path, max_workers=10, loop_interval=0.01, heartbeat_timeout=10.0, auto_initialize=True)`

The central orchestrator class.

| Parameter           | Type    | Default | Description |
|---------------------|---------|---------|-------------|
| `db_path`           | `str`   | Required | Path to the SQLite3 database file. |
| `max_workers`       | `int`   | `10`    | Maximum number of concurrent worker processes. |
| `loop_interval`     | `float` | `0.01`  | Sleep interval (seconds) between orchestration cycles. |
| `heartbeat_timeout` | `float` | `10.0`  | Seconds without heartbeat before a worker is considered dead. |
| `auto_initialize`   | `bool`  | `True`  | If True, automatically creates the database schema on construction. |

**Key Methods**:
| Method | Description |
|--------|-------------|
| `start()` | Start the orchestration loop and telemetry collector. |
| `stop(timeout=5.0)` | Gracefully stop the engine, terminate workers, and clean up resources. |
| `wait_for_completion(task_id, poll_interval=0.1, timeout=30.0)` | Block until a task reaches `COMPLETED` or `FAILED` status. Returns `True` if completed. |
| `active_worker_count` | Property returning the current number of alive worker processes. |

## Database State Machine Lifecycle

Each task in snap progresses through a well-defined state machine:

```
                    ┌─────────────┐
                    │   PENDING   │
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
                    │   READY *   │ (DAG dependencies satisfied)
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
               ┌───▶│  RUNNING   │◀───┐
               │    └──────┬──────┘    │
               │           │          │
               │    ┌──────▼──────┐   │
               │    │ COMPLETED   │   │
               │    └─────────────┘   │
               │                      │
               │    ┌─────────────┐   │
               ├───▶│   FAILED    │   │
               │    └─────────────┘   │
               │                      │
               │    ┌─────────────┐   │
               └───▶│ RECOVERING  │───┘ (retries < max_retries)
                    └─────────────┘
                           │
                    ┌──────▼──────┐
                    │  ║ DEAD ║   │ (max retries exhausted)
                    └─────────────┘
```

### State Descriptions

| State | Description |
|-------|-------------|
| **PENDING** | Task has been enqueued but is not yet eligible for execution. It may be waiting for DAG dependencies. |
| **READY *** | A **virtual state** (not persisted) indicating the task has no uncompleted parent dependencies. Tasks in this state are eligible for claiming. |
| **RUNNING** | Task has been claimed by a worker and is currently executing. The `worker_id` field is set. |
| **COMPLETED** | Task executed successfully without exceptions. The result is not persisted (future feature). |
| **FAILED** | Task raised an exception during execution. The `error_trace` field contains the full traceback. |
| **RECOVERING** | A previously `RUNNING` task whose worker was detected as dead (heartbeat timeout). The task will be retried if `retries < max_retries`. |
| **DEAD** | Task has exhausted its maximum number of retries. No further execution attempts will be made. |

### Transition Triggers

| Transition | Trigger |
|------------|---------|
| PENDING → RUNNING | Worker claims the task (DAG dependencies must be satisfied) |
| RUNNING → COMPLETED | Task function returns without raising an exception |
| RUNNING → FAILED | Task function raises an exception |
| RUNNING → RECOVERING | Orchestrator detects worker heartbeat timeout |
| RECOVERING → PENDING | Orchestrator resets task to pending state |
| PENDING → FAILED | Maximum retries exceeded (future enhancement) |
| RECOVERING → FAILED | Maximum retries exceeded (future enhancement) |

### Heartbeat & Reclamation

snap's recovery mechanism operates as follows:
1. Workers update their heartbeat in `snap_workers` every 500ms.
2. The orchestrator's main loop checks the `last_heartbeat` of all workers.
3. Any worker with `last_heartbeat` older than `heartbeat_timeout` is marked as `DEAD`.
4. All `RUNNING` tasks assigned to that worker are reverted to `RECOVERING` with `retries += 1`.
5. On the next cycle, these `RECOVERING` tasks become eligible for claiming by a new worker.
6. If `retries >= max_retries`, the task transitions to `FAILED`.

---

**snap** is engineered for teams who demand reliability, performance, and simplicity. With zero dependencies, process-level isolation, and automatic crash recovery, it provides enterprise-grade task orchestration without the operational burden of traditional message brokers.
