Metadata-Version: 2.4
Name: sandai-operator-sdk
Version: 0.4.16
Summary: A Python SDK for building high-performance, asynchronous batch processing operators
Home-page: https://github.com/world-sim-dev/sandai-data-project
Author: Sandai Team
Author-email: dev@sand.ai
Project-URL: Bug Reports, https://github.com/world-sim-dev/sandai-data-project/issues
Project-URL: Source, https://github.com/world-sim-dev/sandai-data-project
Project-URL: Documentation, https://sandai-data-project.readthedocs.io/
Keywords: batch processing,async,operator,data processing,pipeline
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: pydantic>=2.0.0
Requires-Dist: requests>=2.25.0
Requires-Dist: watchdog>=2.1.0
Requires-Dist: celery>=5.2.0
Requires-Dist: redis>=4.0.0
Requires-Dist: msgpack>=1.0.0
Requires-Dist: boto3>=1.26.0
Requires-Dist: oss2
Requires-Dist: python-dotenv
Requires-Dist: sentry-sdk
Provides-Extra: tracing
Requires-Dist: opentelemetry-api>=1.0.0; extra == "tracing"
Provides-Extra: dev
Requires-Dist: pytest>=6.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.18.0; extra == "dev"
Requires-Dist: black>=22.0.0; extra == "dev"
Requires-Dist: flake8>=4.0.0; extra == "dev"
Requires-Dist: mypy>=0.950; extra == "dev"
Requires-Dist: build>=0.8.0; extra == "dev"
Requires-Dist: twine>=4.0.0; extra == "dev"
Provides-Extra: examples
Requires-Dist: opencv-python>=4.5.0; extra == "examples"
Requires-Dist: ffmpeg-python>=0.2.0; extra == "examples"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: project-url
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# SandAI Operator SDK

Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.

## Overview

The Operator SDK provides the foundation for building data processing operators that form the **Dataflow layer** in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.

For the full documentation set, topic guides, and architecture notes, start with [`docs/README.md`](docs/README.md).

## Features

- **Asynchronous Batch Processing**: Concurrent processing with configurable batch size and concurrency
- **Smart File Monitoring**: Real-time file change detection with vim editor compatibility
- **Task Working Directories**: Isolated working directories for each task
- **Error Recovery**: Automatic handling of file operations and network interruptions
- **Standardized Interface**: Consistent operator lifecycle and API design
- **Celery Integration**: Built-in support for distributed task execution

## Installation

```bash
cd operator-sdk
pip install -e .
```

```
conda create -n sandai-operator python=3.8 -c conda-forge
```

Runtime requirement: the SDK supports Python 3.8+. If you enable free-threading/no-GIL on a newer Python version, you may get better CPU parallelism, but it is not required to run the SDK.

## Quick Start

```python
from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator

class Options(BaseModel):
    param: str = "default"

class Results(BaseModel):
    output: str

processor = BatchProcessor(name="my-processor", version="1.0.0")

@processor.on_batch(
    max_concurrency=4,
    max_batch_size=8,
    prepare_concurrency=4,
    output_concurrency=4,
)
def process_batch(
    batch_inputs: List[TaskInput[Options]], 
    operator_config: dict,
    context
) -> Generator[TaskOutput[Results], None, None]:
    
    for task_input in batch_inputs:
        # Get task working directory
        workdir = context.get_task_workdir(task_input.task_id)
        
        # Your processing logic here
        result = Results(output=f"processed-{task_input.options.param}")
        
        yield TaskOutput[Results](
            task_id=task_input.task_id,
            results=result,
            status="success"
        )

if __name__ == "__main__":
    processor.run()
```

`prepare_concurrency` and `output_concurrency` inherit from `max_concurrency` by default, so behavior remains backward-compatible when they are not configured. In the current implementation, prepare download/input conversion, output upload/cleanup, and channel pull/push already run on separate executors. That means you can increase `prepare_concurrency` or `output_concurrency` independently instead of forcing all IO work to compete in the same pool.

At the Celery protocol layer, the server still uses `json` encoding by default, but it now accepts both `json` and `msgpack` content types by default. The newer `operator-client` uses `msgpack` by default, so the two components interoperate out of the box.

If a Celery task is redelivered to another worker after `visibility_timeout`, the server now also supports limiting delivery attempts through `max_delivery_attempts`. Like other runtime parameters, you can configure it through `BatchProcessor(...)`, `@processor.on_batch(...)`, or the `SANDAI_OPERATOR_CELERY_MAX_DELIVERY_ATTEMPTS` environment variable. Once the limit is exceeded, the server marks the task as `TaskDeliveryLimitExceededError` and stops further redelivery. The default value is `0`, which disables this protection.

## Core Components

- **BatchProcessor**: Asynchronous batch processor with configurable concurrency
- **FileChannel**: File monitoring with real-time change detection
- **ProcessingContext**: Task-level working directory management
- **CeleryChannel**: Distributed task execution via Celery

## Architecture Integration

This SDK enables the **Dataflow layer** of the SandAI architecture:
- **Operators** built with this SDK are deployed in the `operators/` directory
- **Pipelines** in the `pipelines/` directory compose these operators
- **Workflows** in the `workflows/` directory orchestrate complete business processes

## Example Operators

See the `operators/` directory for complete implementations:
- `video-clipper/`: Video processing operator
- `data-transformer/`: Data format conversion operator

## Testing

```bash
make test          # Run all tests
make test-sdk      # Run SDK core tests
```

## Supervisor CLI

`operator-sdk` provides `sdrun` for launching multiple identical worker processes, aggregating logs, forwarding signals, and supervising worker lifecycle policies.

```bash
sdrun -w 4 --restart always -- python main.py -j --mode file
```

- `-w` / `--worker`: number of worker processes to launch, default `1`
- `--restart never`: default; do not restart workers after a non-zero exit
- `--restart always`: always restart a worker after a non-zero exit
- `--restart N`: restart a worker at most `N` times after non-zero exits
- `--success-exit ignore`: default; when a worker exits with code `0`, do not affect other workers
- `--success-exit shutdown`: when a worker exits with code `0`, stop the remaining workers
- `--failure-exit ignore`: default; when a worker exits non-zero and will not be restarted, do not affect other workers
- `--failure-exit shutdown`: when a worker exits non-zero and will not be restarted, stop the remaining workers and return that worker's exit code
- `--startup-stagger SECONDS`: sequential startup delay, default `0`; for example `0.5` starts `worker-1` after `0.5s` and `worker-2` after `1.0s`

Policy model:

- `--restart` only controls whether the exited worker itself should be restarted after a non-zero exit.
- `--success-exit` controls whether a clean exit from one worker should stop the rest.
- `--failure-exit` controls whether a non-zero exit from one worker, once no more restarts apply, should stop the rest.
- If all workers eventually exit without supervisor-forced shutdown, `sdrun` exits with the sum of all final worker exit codes.
- If `--failure-exit shutdown` is used, `sdrun` exits with the first non-restarted failing worker's exit code.
- `SIGTERM`, `SIGINT`, `SIGHUP`, and `SIGQUIT` received by `sdrun` are forwarded to all workers.
- Logs are prefixed with worker identity, for example `[worker-2#1][stdout] ...`.
- On POSIX, `sdrun` starts each worker in its own process group. On Linux it also installs a parent-death signal before `exec` so workers are terminated if the supervisor disappears unexpectedly.
- Child processes receive `SDRUN_MODE=true`, `SDRUN_WORLD_SIZE`, `SDRUN_RANK`, and `SDRUN_LOCAL_RANK`.

Common combinations:

- Independent workers: `--restart never --success-exit ignore --failure-exit ignore`
- Fail-fast workers: `--restart never --success-exit ignore --failure-exit shutdown`
- Elastic recovery on failures: `--restart always --success-exit ignore --failure-exit shutdown`
- First clean completion wins: `--restart never --success-exit shutdown --failure-exit shutdown`

If `sdrun` causes GPU memory usage to explode because multiple worker processes each hold their own copy of large tensors or model weights, consider using `shared-tensor` to share those tensors across processes: `https://github.com/world-sim-dev/shared-tensor`. This is especially useful for single-GPU, multi-process inference when the model runtime is not thread-safe and threads cannot be used safely.

### FileChannel With SDRUN

When workers are launched by `sdrun` and the operator runs in file mode:

- `FileChannel` shards input lines by line index using `line_index % SDRUN_WORLD_SIZE == SDRUN_RANK`.
- Each worker processes only the JSONL rows assigned to its rank.
- Output files are renamed by inserting the rank before the extension, for example `output.jsonl` becomes `output.0.jsonl` and `output.1.jsonl`.
- If the output file has no extension, the rank suffix is appended directly to the filename.

This means `sdrun -w 4 -- python main.py --mode file ...` produces 4 parallel output files that must be merged by the caller if a single combined result is needed.

## Development

Set Up Local MinIO
```
brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio
```

Set Up Local Redis
```
brew install redis
brew services start redis
```

Set Up Local Postgres
```
brew install postgresql
brew services start postgresql
```

List Services
```
brew services list
```

### Creating New Operators

1. Create operator directory in `../operators/my-operator/`
2. Implement using this SDK
3. Deploy as Celery service
4. Use in pipelines and workflows

### Best Practices

- Keep operators focused on single responsibilities
- Use proper error handling and logging
- Implement comprehensive tests
- Document operator interfaces clearly

## License

MIT License




# Build and Upload
make build
ossutil cp dist/sandai_operator_sdk-0.2.7-py3-none-any.whl oss://python-artifacts/ -e oss-cn-shanghai.aliyuncs.com --acl public-read

# Local Development Install
pip install -e /path/to/operator-sdk
