Metadata-Version: 2.4
Name: cf-pipeline-manager
Version: 0.1.0
Summary: Cogniflow pipeline manager daemon and orchestration runtime
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: cf-service-contracts>=0.1.0
Requires-Dist: duckdb>=0.9
Provides-Extra: test
Requires-Dist: pytest<9.0,>=8.0; extra == "test"

# cf_pipeline_manager

`cf_pipeline_manager` runs the local Cogniflow pipeline manager daemon.

It is the only supported project entrypoint for starting pipeline execution.

It owns:

- daemon lifecycle
- polling queued runs and pending events
- launching `cf_pipeline_v2`
- exposing manager commands over CLI

It does not replace `cf_semantics_runtime`. Runtime state, snapshots, queue
state, and event persistence live in `cf_pipeline_manager`.

## Runtime Files

The manager writes local daemon state under `workspace/pipeline_manager`:

- `daemon.json`
- `daemon.lock`
- `daemon.log`
- `runs/<run_id>/pipeline.engine.json`
- `runs/<run_id>/step_<index>.engine.json`
- `runs/<run_id>/engine.command.json`
- `runs/<run_id>/engine.stdout.log`
- `runs/<run_id>/engine.stderr.log`

The manager uses `~/.cogniflow/workspace/pipeline_manager` for its local daemon state.

## CLI Entry Points

You can use either surface:

- standalone: `python -m cf_pipeline_manager ...`
- unified CLI: `cf pipeline manager ...`

The command set is the same for both.

## Command Overview

```text
status
ensure-daemon
stop-daemon
activate
submit-run
snapshot
summaries
stop-run
emit-event
```

All commands return JSON.

## Typical Flow

### 1. Start the daemon

```powershell
python -m cf_pipeline_manager ensure-daemon
```

Or:

```powershell
cf pipeline manager ensure-daemon
```

This starts the daemon if it is not already running and writes
`workspace/pipeline_manager/daemon.json`.

### 2. Check status

```powershell
python -m cf_pipeline_manager status
```

Example output shape:

```json
{
  "running": true,
  "workspace_dir": "C:\\repo\\workspace\\pipeline_manager",
  "active_workers": [],
  "worker_errors": {}
}
```

### 3. Activate a pipeline for manager scheduling

```powershell
python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered
```

You can also set runtime state explicitly:

```powershell
python -m cf_pipeline_manager activate opcua_fifo_avg_to_duckdb_parquet_triggered --desired-state enabled --concurrency-limit 1 --priority 0
```

### 4. Submit a manual run

```powershell
python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual --request-id req-001
```

With JSON overrides:

```powershell
python -m cf_pipeline_manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered `
  --runner-overrides-json '{\"engine_iterations\":1}' `
  --input-overrides-json '[{\"node_id\":\"pipe:opcua_fifo_avg_to_duckdb_parquet_triggered/n1\",\"parameters\":{\"endpoint\":\"opc.tcp://127.0.0.1:4841/VirtualPhServer\"}}]' `
  --metadata-json '{\"origin\":\"cli\"}'
```

### 5. Inspect summaries or one pipeline snapshot

```powershell
python -m cf_pipeline_manager summaries
python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered
```

To inspect a specific run:

```powershell
python -m cf_pipeline_manager snapshot opcua_fifo_avg_to_duckdb_parquet_triggered --selected-run-id <run_id>
```

### 6. Emit an event instead of a manual run

```powershell
python -m cf_pipeline_manager emit-event opcua_fifo_avg_to_duckdb_parquet_triggered manual --source external --source-event-id evt-001 --dedupe-key evt-001 --payload-json '{\"kind\":\"manual\"}'
```

The manager will persist the event through `cf_ontology` and then try to drain
it into the run queue.

### 7. Stop a queued run

```powershell
python -m cf_pipeline_manager stop-run opcua_fifo_avg_to_duckdb_parquet_triggered <run_id>
```

Stopping a queued run returns a stopped result. Stopping a running run returns a
structured unsupported response instead of throwing.

### 8. Stop the daemon

```powershell
python -m cf_pipeline_manager stop-daemon
```

## Using The Unified CLI

If `cf_core_cli` is installed, the manager is also available here:

```powershell
cf pipeline manager --help
cf pipeline manager ensure-daemon
cf pipeline manager activate opcua_fifo_avg_to_duckdb_parquet_triggered
cf pipeline manager submit-run opcua_fifo_avg_to_duckdb_parquet_triggered --source manual
cf pipeline manager summaries
```

## Example Pipeline Assets

The canonical example pipeline used for manager testing is:

- `sandcastle/cf_pipeline/cf_pipeline_engine/examples/opcua_fifo_avg_to_duckdb_parquet_triggered.nq`

The mirrored setup demo is:

- `sandcastle/cf_setup/src/cf_setup/demo/opcua_fifo_avg_to_duckdb_parquet_triggered.nq`

The setup demo intentionally differs from the engine example in a few runtime
values such as OPC UA endpoint, window size, and engine duration.

## Notes

- The manager launches `cf_pipeline_v2` directly. It does not shell out through
  the `cf_pipeline_engine` CLI wrapper.
- The daemon binds to loopback HTTP only.
- Provider runtime path discovery still follows the existing
  `workspace/runtime/provider_registry.v1.json` convention.
