Metadata-Version: 2.4
Name: sandai-operator-sdk
Version: 0.3.0
Summary: A Python SDK for building high-performance, asynchronous batch processing operators
Home-page: https://github.com/world-sim-dev/sandai-data-project
Author: Sandai Team
Author-email: dev@sand.ai
Project-URL: Bug Reports, https://github.com/world-sim-dev/sandai-data-project/issues
Project-URL: Source, https://github.com/world-sim-dev/sandai-data-project
Project-URL: Documentation, https://sandai-data-project.readthedocs.io/
Keywords: batch processing,async,operator,data processing,pipeline
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: pydantic>=2.0.0
Requires-Dist: requests>=2.25.0
Requires-Dist: watchdog>=2.1.0
Requires-Dist: celery>=5.2.0
Requires-Dist: redis>=4.0.0
Requires-Dist: boto3>=1.26.0
Requires-Dist: oss2
Requires-Dist: python-dotenv
Requires-Dist: sentry-sdk
Provides-Extra: dev
Requires-Dist: pytest>=6.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.18.0; extra == "dev"
Requires-Dist: black>=22.0.0; extra == "dev"
Requires-Dist: flake8>=4.0.0; extra == "dev"
Requires-Dist: mypy>=0.950; extra == "dev"
Requires-Dist: build>=0.8.0; extra == "dev"
Requires-Dist: twine>=4.0.0; extra == "dev"
Provides-Extra: examples
Requires-Dist: opencv-python>=4.5.0; extra == "examples"
Requires-Dist: ffmpeg-python>=0.2.0; extra == "examples"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: project-url
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# SandAI Operator SDK

Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.

## Overview

The Operator SDK provides the foundation for building data processing operators that form the **Dataflow layer** in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.

## Features

- **Asynchronous Batch Processing**: Concurrent processing with configurable batch size and concurrency
- **Smart File Monitoring**: Real-time file change detection with vim editor compatibility
- **Task Working Directories**: Isolated working directories for each task
- **Error Recovery**: Automatic handling of file operations and network interruptions
- **Standardized Interface**: Consistent operator lifecycle and API design
- **Celery Integration**: Built-in support for distributed task execution

## Installation

```bash
cd operator-sdk
pip install -e .
```

```
conda create -n sandai-operator python=3.14=h0369b99_1_cp314t -c conda-forge
```

## Quick Start

```python
from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator

class Options(BaseModel):
    param: str = "default"

class Results(BaseModel):
    output: str

processor = BatchProcessor(name="my-processor", version="1.0.0")

@processor.on_batch(\n    max_concurrency=4,\n    max_batch_size=8,\n    prepare_concurrency=4,\n    output_concurrency=4,\n)
def process_batch(
    batch_inputs: List[TaskInput[Options]], 
    operator_config: dict,
    context
) -> Generator[TaskOutput[Results], None, None]:
    
    for task_input in batch_inputs:
        # Get task working directory
        workdir = context.get_task_workdir(task_input.task_id)
        
        # Your processing logic here
        result = Results(output=f"processed-{task_input.options.param}")
        
        yield TaskOutput[Results](
            task_id=task_input.task_id,
            results=result,
            status="success"
        )

if __name__ == "__main__":
    processor.run()
```

`prepare_concurrency` 和 `output_concurrency` 默认会继承 `max_concurrency`，因此不配置时行为与旧版本一致。当前实现里，prepare 下载/输入转换、output 上传/清理、channel pull/push 已经使用独立 executor；因此可以单独提高 `prepare_concurrency` 或 `output_concurrency`，而不是都挤在同一个 IO 池里竞争。

## Core Components

- **BatchProcessor**: Asynchronous batch processor with configurable concurrency
- **FileChannel**: File monitoring with real-time change detection
- **ProcessingContext**: Task-level working directory management
- **CeleryChannel**: Distributed task execution via Celery

## Architecture Integration

This SDK enables the **Dataflow layer** of the SandAI architecture:
- **Operators** built with this SDK are deployed in the `operators/` directory
- **Pipelines** in the `pipelines/` directory compose these operators
- **Workflows** in the `workflows/` directory orchestrate complete business processes

## Example Operators

See the `operators/` directory for complete implementations:
- `video-clipper/`: Video processing operator
- `data-transformer/`: Data format conversion operator

## Testing

```bash
make test          # Run all tests
make test-sdk      # Run SDK core tests
```

## Development

Setup Local Minio
```
brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio
```

Setup Local Redis
```
brew install redis
brew services start redis
```

Setup Local Postgres
```
brew install postgresql
brew services start postgresql
```

List Services
```
brew services list
```

### Creating New Operators

1. Create operator directory in `../operators/my-operator/`
2. Implement using this SDK
3. Deploy as Celery service
4. Use in pipelines and workflows

### Best Practices

- Keep operators focused on single responsibilities
- Use proper error handling and logging
- Implement comprehensive tests
- Document operator interfaces clearly

## License

MIT License




# 打包和上传
make build
ossutil cp dist/sandai_operator_sdk-0.2.7-py3-none-any.whl oss://python-artifacts/ -e oss-cn-shanghai.aliyuncs.com --acl public-read

# 本地开发安装
pip install -e /path/to/operator-sdk
