Metadata-Version: 2.1
Name: prefect-cwl
Version: 0.2.0
Summary: Orchestrate CWL with Prefect
Author-email: Francesco Bruni <bruni@planetek.it>, Domenico Monaco <monaco@planetek.it>, Paolo Minel <minel@planetek.it>, Giuseppe Mastrogiacomo <mastrogiacomo@planetek.it>
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: prefect>=3.6.13
Requires-Dist: pydantic>=2.12.5
Requires-Dist: yml>=0.0.1
Provides-Extra: docker
Requires-Dist: prefect-docker>=0.7.1; extra == "docker"
Provides-Extra: k8s
Requires-Dist: prefect-kubernetes>=0.7.2; extra == "k8s"
Requires-Dist: kubernetes>=34.1.0; extra == "k8s"

<div align="center">

  ![Logo](./static/logo.png "Logo")
  # Prefect CWL
</div>

A lightweight adapter that bridges the *Common Workflow Language* (CWL) world with *Prefect*.
It not only executes CWL but lets you orchestrate it with Prefect’s scheduling, retries, observability, and deployments.
Execution is pluggable via backends, with both Docker and Kubernetes available.

In this library, the atomic unit is a single CWL step (a `CommandLineTool` or workflow step), not an entire workflow/flow.
Prefect orchestrates those steps according to the CWL-defined dependencies.

## What this achieves
- **Bridge CWL and Prefect**: Parse CWL, build a dependency graph, and run steps under Prefect orchestration.
- **Orchestrate, not just execute**: Use Prefect’s UI, scheduling, retries, mapping, and deployments to operate CWL workloads.
- **Pluggable execution backends**: Run each CWL step via Docker or Kubernetes.

## Key concepts
- **Atomic unit = CWL step**: Each CWL step is executed as a Prefect task invocation via a backend. Prefect orchestrates the order and parallelism.
- **Dependency “waves”**: Steps run in parallel when their dependencies are satisfied; no artificial serialization.
- **Typed IR**: CWL is parsed into a typed internal representation that drives orchestration and I/O wiring.

## Features
- Parse a practical subset of CWL v1.2 (tools, workflows, requirements, inputs/outputs).
- Build a dependency graph and infer parallel “waves”.
- Generate a Prefect flow whose signature mirrors CWL workflow inputs.
- Execute steps via a backend that handles containers, arguments, volumes, and exit codes.
- Initial backend: **Docker** and **Kubernetes**.

## Current limitations

- The adapter needs explicit working directory setup (`WRK_DIR` or backend-specific mount roots).
- This is still a pragmatic CWL subset, not full conformance.
- Scatter support is partial: single-input scatter is supported; multi-input dotproduct/crossproduct is not.
- Data exchange between steps is filesystem-based (host paths/PVC), not in-memory streaming.
A more in-depth list can be checked out inside the *DESIGN* file.

Check *sample_cwl* folder for those limits in practice.

## Backends
- **Docker backend**: Uses Prefect’s Docker primitives to pull images, mount volumes, and execute commands.
- **Kubernetes backend**: Same interface; schedules Jobs to run each CWL step.

## Output collection behavior

- Output `glob` values are resolved at runtime (including wildcard patterns and interpolated values).
- Relative globs are required; absolute paths and `..` traversal are rejected.
- Scalar outputs (`File` / `Directory`) must match exactly one artifact unless optional (`?`).
- Array outputs (`File[]` / `Directory[]`) collect all matches in stable sorted order.
- Collected artifacts are propagated to downstream steps as `Path` or `List[Path]` values.

## Quick start

After installing all the requirements, start *Prefect Server first*:

```sh
prefect server start
```

Then, create a new project:

```
mkdir this-is-just-the-client-callign
uv init
```

and install the library (with the *uv* CLI and *Docker* or *K8s* backend or both):

```
uv add "prefect-cwl[docker]"
uv add "prefect-cwl[k8s]"
```

from your shell:

```python
from prefect_cwl import create_flow_with_docker_backend
with open("myflow.cwl") as inp:
    runnable_flow = create_flow_with_docker_backend(
        inp.read(), Path("/tmp"), workflow_id="#flow_id"
    )

asyncio.run(runnable_flow(**inputs))

```

The *runnable_flow* is a Prefect flow that can be scheduled, deployed, and run as any other Prefect flow.

Shall you want to use *K8s* backend, special requirements apply:

- a running K8s cluster
- a PVC installed and deployed and usable by Prefect
- the following environment env vars set, if needed:
  - `KUBECONFIG`, for custom configuration
  - `PREFECT_CWL_K8S_NAMESPACE`, for custom namespace (default: `prefect`)
  - `PREFECT_CWL_K8S_PVC_NAME`, for custom PVC name (default: `prefect-shared-pvc`)
  - `PREFECT_CWL_K8S_PVC_MOUNT_PATH`, for custom PVC mount path (default: `/data`)
  - `PREFECT_CWL_K8S_SERVICE_ACCOUNT_NAME`, for custom service account name (default: `prefect-flow-runner`)
  - `PREFECT_CWL_K8S_PULL_SECRETS`, for custom pull secrets (default: `[]`)
  - `PREFECT_CWL_K8S_LOG_LEVEL`, for step summary log level (default: `INFO`)
  - `PREFECT_CWL_K8S_STREAM_LOG_LEVEL`, for streamed job output log level (default: `DEBUG`)

K8s precedence note (deployed runs):
- Merge order for supported keys is:
  - Prefect base job-template defaults (including `variables.properties.*.default`, when a template is available)
  - runtime `flow_run.job_variables`
  - local/backend overrides (constructor args, `PREFECT_CWL_K8S_*`, and optional `job_variables` passed to `K8sBackend`)
- Important: local/backend explicit overrides always win when present; fallback defaults are used only when a value is not provided by template/runtime/explicit override.
- Supported merged fields include:
  - `namespace`, `service_account_name`, `env`, `volumes`, `volume_mounts`/`volumeMounts`, `image_pull_secrets`, `labels`, `finished_job_ttl`, `image_pull_policy`
- `PREFECT_CWL_K8S_PVC_NAME` and `PREFECT_CWL_K8S_PVC_MOUNT_PATH` are always enforced by `prefect-cwl` for its required work volume/mount.
- `PREFECT_CWL_K8S_PVC_MOUNT_PATH` is the authoritative in-container root used by `prefect-cwl` to create per-run data directories in the shared PVC.
- For deployed runs, you can control `prefect-cwl` log verbosity at deployment level by setting those env vars in worker/work-pool `job_variables.env`.

For running a local K8s cluster, configured with Prefect and all the above requirements, check the *prefect-k8s-demo* folder.

## Runtime concurrency controls

`prefect-cwl` supports CWL scatter with runtime guardrails:

- `PREFECT_CWL_SCATTER_CONCURRENCY` (default: `4`): local in-flow throttling for submitted scattered runs. Set `0` or negative to disable this local gate.
- `PREFECT_CWL_SCATTER_TAG` (default: `prefect-cwl-scatter`): Prefect tag attached to `run_step` task submissions. Set to empty to disable tag attachment.

To enforce a hard orchestration limit, create a Prefect concurrency limit on that tag:

```bash
prefect concurrency-limit create prefect-cwl-scatter 8
prefect concurrency-limit inspect prefect-cwl-scatter
```

Notes:
- The Prefect tag limit is the hard control across workers/flows using the same API/server.
- The local `PREFECT_CWL_SCATTER_CONCURRENCY` gate is process-local and complements (does not replace) Prefect tag limits.
- With current implementation, `PREFECT_CWL_SCATTER_TAG` is applied to all `run_step` task submissions, including non-scattered runs.

## Install the library locally

Prerequisite: install `uv` (https://github.com/astral-sh/uv). Once `uv` has been installed successfully, move in the project folder and use:

```bash
uv sync --all-extras --group dev
```

Be sure to set the *PYTHONPATH* variable to *prefect_cwl* directory.
Alternatively, use the command `echo PYTHONPATH=$PWD`, to set the path pointing to the current folder.
Otherwise, install it into *editable* mode. Should you run tests, install *dev* dependencies.

Start the Prefect server using the command:

```bash
uv run prefect server start
```

Now we can run the python script using the command:

```bash
uv run <file_path>
```

## Sample CWL (WIP)
See `sample_cwl/` for ready-to-run examples you can use to test the library. These are work-in-progress and may evolve as the adapter expands CWL coverage and features.
This includes `sample_cwl/nbr/`, currently added as a working subset for next-iteration refinement.

## Test selection
- Sample-flow end-to-end tests are marked with `e2e` (`tests/test_sample_cwl_e2e.py`).
- Heavy remote I/O/network cases are additionally marked with `heavy_io`.
- Run default non-E2E tests:
  ```bash
  uv run --group dev python -m pytest -q -m "not e2e"
  ```
- Run the full suite (unit/integration + e2e):
  ```bash
  uv run --group dev python -m pytest -q -m "e2e or not e2e"
  ```
- Enable heavy I/O e2e cases:
  ```bash
  PREFECT_CWL_E2E_HEAVY_IO=1 uv run --group dev python -m pytest -q -m "e2e or not e2e"
  ```
  (`PREFECT_CWL_E2E_NETWORK=1` is also supported as an alias.)

## Project status
Early-stage and evolving. Expect changes in models, supported CWL features, and backend interfaces as we harden the adapter.

## Design
The package design is detailed in `DESIGN.md` and reflects the latest codebase, including planning vs execution for Docker and Kubernetes backends.
