Metadata-Version: 2.4
Name: amsdal_workflow
Version: 0.1.0
Summary: amsdal_workflow plugin for AMSDAL Framework
Requires-Python: >=3.11
Requires-Dist: amsdal[cli]>=0.5.14
Requires-Dist: langgraph>=0.6.8
Provides-Extra: openai
Requires-Dist: langchain[openai]; extra == 'openai'
Description-Content-Type: text/markdown

# AMSDAL Workflow

[![Python Version](https://img.shields.io/badge/python-3.11%2B-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE)

A LangGraph checkpoint persistence plugin for the [AMSDAL Framework](https://github.com/amsdal/amsdal). This plugin enables persistent and recoverable LangGraph workflows by storing checkpoint state in AMSDAL-managed databases.

## Features

- **Persistent Workflow State**: Store LangGraph checkpoints in any AMSDAL-supported database (SQLite, PostgreSQL, etc.)
- **Dual Mode Support**: Both synchronous and asynchronous operations
- **Thread-based Organization**: Manage multiple workflow threads with checkpoint namespacing
- **Drop-in Replacement**: Compatible with LangGraph's `BaseCheckpointSaver` interface
- **Production Ready**: Built on the robust AMSDAL ORM with comprehensive testing

## Installation

Install via pip:

```bash
pip install amsdal-workflow
```

Or with optional dependencies:

```bash
# With OpenAI support
pip install amsdal-workflow[openai]
```

## Quick Start

### Basic Usage

```python
from langgraph.graph import StateGraph
from amsdal_workflow.checkpoint import AmsdalCheckpointSaver

# Initialize the checkpoint saver
checkpointer = AmsdalCheckpointSaver()

# Create your LangGraph workflow
workflow = StateGraph(...)
# ... define your workflow nodes and edges ...

# Compile with checkpoint support
app = workflow.compile(checkpointer=checkpointer)

# Run with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = app.invoke(input_data, config=config)
```

### Async Usage

```python
from amsdal_workflow.checkpoint import AmsdalCheckpointSaver

# Same checkpointer works for async
checkpointer = AmsdalCheckpointSaver()

# Compile async workflow
app = workflow.compile(checkpointer=checkpointer)

# Run async with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = await app.ainvoke(input_data, config=config)
```

### Advanced Configuration

```python
from langchain_core.runnables import RunnableConfig
from amsdal_workflow.checkpoint import AmsdalCheckpointSaver

checkpointer = AmsdalCheckpointSaver()

# Configuration with checkpoint namespace
config: RunnableConfig = {
    'configurable': {
        'thread_id': 'conversation-456',
        'checkpoint_ns': 'production',  # Optional namespace
    }
}

# Run workflow
result = app.invoke(input_data, config=config)

# Resume from checkpoint
checkpoint_tuple = checkpointer.get_tuple(config)
if checkpoint_tuple:
    # Continue from last checkpoint
    result = app.invoke(input_data, config=config)
```

## Architecture

### Core Components

- **AmsdalCheckpointSaver**: Main class implementing LangGraph's `BaseCheckpointSaver` protocol
- **Checkpoint Model**: Stores checkpoint snapshots with metadata
- **CheckpointWrites Model**: Stores pending write operations for each checkpoint

### Data Models

#### Checkpoint
Stores workflow state snapshots:
- `thread_id`: Workflow thread identifier
- `checkpoint_ns`: Optional namespace for organization
- `checkpoint_id`: Unique checkpoint identifier
- `parent_checkpoint_id`: Reference to parent checkpoint
- `checkpoint`: Serialized checkpoint data
- `meta`: Checkpoint metadata

#### CheckpointWrites
Stores pending writes associated with checkpoints:
- `thread_id`, `checkpoint_ns`, `checkpoint_id`: Links to checkpoint
- `task_id`: Task identifier
- `idx`: Write operation index
- `channel`: Channel name
- `value`: Serialized write value

## API Reference

### AmsdalCheckpointSaver

#### Methods

##### Synchronous Methods

- `get_tuple(config: RunnableConfig) -> CheckpointTuple | None`
  - Retrieve a checkpoint tuple by configuration

- `list(config: RunnableConfig | None, *, filter: dict | None = None, before: RunnableConfig | None = None, limit: int | None = None) -> Iterator[CheckpointTuple]`
  - List checkpoints with optional filtering

- `put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions) -> RunnableConfig`
  - Store a new checkpoint

- `put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str) -> None`
  - Store pending writes for a checkpoint

- `delete_thread(thread_id: str) -> None`
  - Delete all checkpoints and writes for a thread

##### Asynchronous Methods

All synchronous methods have async equivalents prefixed with `a`:
- `aget_tuple(...)`
- `alist(...)`
- `aput(...)`
- `aput_writes(...)`
- `adelete_thread(...)`

## Configuration

### Database Setup

AMSDAL Workflow uses your existing AMSDAL configuration. Ensure you have configured your database connection:

```python
from amsdal.manager import AmsdalManager

# Initialize AMSDAL
manager = AmsdalManager()
manager.setup()

# Now use AmsdalCheckpointSaver
checkpointer = AmsdalCheckpointSaver()
```

### Migration

The plugin includes migration files for creating the required tables. Run migrations before first use:

```bash
amsdal migrate
```

## Development

### Setup Development Environment

```bash
# Clone the repository
git clone https://github.com/amsdal/amsdal-workflow.git
cd amsdal-workflow

# Install dependencies
hatch run sync
```

### Running Tests

```bash
# Run all tests
hatch run test

# Run with coverage
hatch run cov

# Run specific test file
hatch run test tests/test_checkpoint.py

# Run with verbose output
hatch run test -v
```

### Code Quality

```bash
# Format code
hatch run fmt

# Check code style
hatch run style

# Run type checking
hatch run typing

# Run all checks
hatch run all
```

### Project Structure

```
amsdal_workflow/
├── amsdal_workflow/           # Main package
│   ├── __init__.py
│   ├── checkpoint.py          # AmsdalCheckpointSaver implementation
│   ├── utils.py               # Utility functions
│   ├── models/                # Data models
│   │   ├── checkpoint.py      # Checkpoint model
│   │   └── checkpoint_writes.py  # CheckpointWrites model
│   └── migrations/            # Database migrations
├── tests/                     # Test suite
│   ├── conftest.py            # Test fixtures
│   └── test_checkpoint.py     # Checkpoint tests
├── pyproject.toml             # Project configuration
├── README.md                  # This file
```

## Contributing

Contributions are welcome! Please follow these steps:

1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Make your changes
4. Run tests and code quality checks
5. Commit your changes (`git commit -m 'Add amazing feature'`)
6. Push to the branch (`git push origin feature/amazing-feature`)
7. Open a Pull Request

### Code Standards

- Python 3.11+ required
- Follow PEP 8 style guide (enforced by Ruff)
- Add type hints to all functions
- Write tests for new features
- Maintain test coverage above 90%

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Acknowledgments

- Built on [LangGraph](https://github.com/langchain-ai/langgraph) by LangChain
- Powered by [AMSDAL Framework](https://github.com/amsdal/amsdal)

## Support

- **Documentation**: [AMSDAL Docs](https://docs.amsdal.com)
- **Issues**: [GitHub Issues](https://github.com/amsdal/amsdal-workflow/issues)
- **Discussions**: [GitHub Discussions](https://github.com/amsdal/amsdal-workflow/discussions)

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for a list of changes in each release.