Metadata-Version: 2.4
Name: temporal-workdir
Version: 0.3.1
Summary: Remote-backed workspace sync for Temporal activities
Project-URL: Homepage, https://github.com/saeedseyfi/temporal-workdir
Project-URL: Repository, https://github.com/saeedseyfi/temporal-workdir
Author-email: Saeed Seyfi <me@saeedseyfi.com>
License-Expression: MIT
License-File: LICENSE
Keywords: distributed,fsspec,temporal,workflow,workspace
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.10
Requires-Dist: fsspec>=2024.1.0
Requires-Dist: temporalio>=1.0.0
Provides-Extra: dev
Requires-Dist: pyright>=1.1; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Requires-Dist: ruff>=0.5.0; extra == 'dev'
Description-Content-Type: text/markdown

# Workspace Sync for Temporal Activities

Sync a local directory with remote storage before and after a Temporal activity. Enables file-based activities to work across distributed workers where disk is not shared.

## Problem

Temporal activities that read/write files on local disk break when you scale to multiple worker instances. Each worker has its own disk. This module syncs a remote storage location to a local temp directory before the activity runs, and pushes changes back after.

## Install

```bash
pip install temporal-workdir

# With a specific cloud backend:
pip install temporal-workdir gcsfs    # Google Cloud Storage
pip install temporal-workdir s3fs     # Amazon S3
pip install temporal-workdir adlfs    # Azure Blob Storage
```

## Usage

### As a context manager (generic, works anywhere)

```python
from temporal_workdir import Workspace

async with Workspace("gs://my-bucket/pipeline/component-x") as ws:
    # ws.path is a local Path — read and write files normally
    data = json.loads((ws.path / "component.json").read_text())
    (ws.path / "result.csv").write_text("col1,col2\nval1,val2")
    # On clean exit: local dir is archived and uploaded
    # On exception: no upload (remote state unchanged)
```

### As a Temporal activity decorator

```python
from temporalio import activity
from temporal_workdir import workspace, get_workspace_path

@workspace("gs://my-bucket/{workflow_id}/{activity_type}")
@activity.defn
async def extract(input: ExtractInput) -> ExtractOutput:
    ws = get_workspace_path()
    # Template vars resolved from activity.info()
    source = (ws / "source.json").read_text()
    (ws / "output.csv").write_text(process(source))
    return ExtractOutput(success=True)
```

### Custom template variables

```python
@workspace(
    "gs://my-bucket/{workflow_id}/components/{component}",
    key_fn=lambda input: {"component": input.component_name},
)
@activity.defn
async def register(input: RegisterInput) -> RegisterOutput:
    ws = get_workspace_path()
    ...
```

## How It Works

1. **Pull**: On entry, downloads `{remote_url}.tar.gz` and unpacks to a temp directory
2. **Execute**: Your activity reads/writes files in the local directory
3. **Push**: On clean exit, packs the directory into `tar.gz` and uploads

If the archive doesn't exist yet (first run), the local directory starts empty. If the activity raises an exception, no push happens. Remote state is untouched.

## Storage Backends

Any backend supported by [fsspec](https://filesystem-spec.readthedocs.io/):

| Scheme | Backend | Extra package |
|--------|---------|--------------|
| `gs://` | Google Cloud Storage | `gcsfs` |
| `s3://` | Amazon S3 | `s3fs` |
| `az://` | Azure Blob Storage | `adlfs` |
| `file://` | Local filesystem | (none) |
| `memory://` | In-memory (testing) | (none) |

Pass backend-specific options as keyword arguments:

```python
Workspace("gs://bucket/key", project="my-gcp-project", token="cloud")
```
