Metadata-Version: 2.4
Name: flowgram-runtime
Version: 0.1.0
Summary: Python runtime for bytedance/flowgram.ai workflows — execute visually-authored graph workflows with LLM, HTTP, condition, loop and code (Python sandbox) nodes.
License-Expression: MIT
License-File: LICENSE
Keywords: workflow,workflow-engine,flowgram,llm,ai,agent,automation,graph,fastapi,openai
Author: flowgram-runtime contributors
Requires-Python: >=3.11
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Framework :: FastAPI
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Provides-Extra: dev
Requires-Dist: RestrictedPython (>=7.1)
Requires-Dist: build (>=1.2) ; extra == "dev"
Requires-Dist: click (>=8.1)
Requires-Dist: fastapi (>=0.110)
Requires-Dist: httpx (>=0.27)
Requires-Dist: json5 (>=0.9) ; extra == "dev"
Requires-Dist: jsonschema (>=4.21)
Requires-Dist: openai (>=1.30)
Requires-Dist: pydantic (>=2.6)
Requires-Dist: pytest (>=8.0) ; extra == "dev"
Requires-Dist: pytest-asyncio (>=0.23) ; extra == "dev"
Requires-Dist: pytest-cov (>=4.1) ; extra == "dev"
Requires-Dist: ruff (>=0.4) ; extra == "dev"
Requires-Dist: twine (>=5.0) ; extra == "dev"
Requires-Dist: uvicorn[standard] (>=0.27)
Project-URL: Documentation, https://flowgram.ai/guide/runtime/introduction.html
Project-URL: Homepage, https://github.com/wn0x00/flowgram-runtime
Project-URL: Issues, https://github.com/wn0x00/flowgram-runtime/issues
Project-URL: Repository, https://github.com/wn0x00/flowgram-runtime
Project-URL: Upstream, https://github.com/bytedance/flowgram.ai
Description-Content-Type: text/markdown

# flowgram-runtime

Python port of [bytedance/flowgram.ai](https://github.com/bytedance/flowgram.ai) `packages/runtime`.

A workflow runtime engine that parses and executes graph-based workflows containing the same
node set as the upstream TS reference implementation:

| Node | Status | Notes |
|---|---|---|
| `start` / `end` | ✅ | full input/output declaration support |
| `condition` | ✅ | 14 operators × 9 types via rules table |
| `loop` / `break` / `continue` | ✅ | sub-context scope chain, locals (`item`, `index`) |
| `block-start` / `block-end` | ✅ | loop sub-graph boundaries |
| `llm` | ✅ | OpenAI-compatible chat completions (async) |
| `http` | ✅ | 6 body types (none / json / form-data / x-www-form-urlencoded / raw-text / binary), retry with exponential backoff |
| `code` | ✅ | **Python** sandbox via [RestrictedPython](https://restrictedpython.readthedocs.io) (60s timeout, blocks imports / dunder access) — note: upstream uses QuickJS; this is the intentional language difference. |

## Install

```bash
pip install -e ".[dev]"   # development
# or after publish:
pip install flowgram-runtime
```

## CLI

After installing the package (`pip install flowgram-runtime`), the `flowgram` command becomes
available:

```bash
flowgram --version
flowgram serve --host 0.0.0.0 --port 4000
```

- OpenAPI / Swagger UI: <http://localhost:4000/docs>
- OpenAPI JSON spec: <http://localhost:4000/openapi.json>
- tRPC compatible endpoint: <http://localhost:4000/trpc/*>

## REST API

Matches upstream paths exactly so frontends configured for the TS runtime can swap the host.

| Method | Path | Body / Query | Response |
|---|---|---|---|
| GET  | `/info` | — | server info |
| POST | `/task/run` | `{schema: string, inputs: object}` | `{taskID}` |
| GET  | `/task/report` | `?taskID=` | `IReport` |
| GET  | `/task/result` | `?taskID=` | workflow outputs |
| PUT  | `/task/cancel` | `{taskID}` | `{success: bool}` |
| POST | `/task/validate` | `{schema: string, inputs: object}` | `{valid, errors?}` |

`schema` is a **JSON string** of the workflow definition (mirrors upstream contract).

## tRPC compatibility

The `/trpc/<procedure>` namespace accepts the same procedures by both slash and dot notation:

- `info`, `task.run`, `task.report`, `task.result`, `task.cancel`, `task.validate`

Supported request formats:

- `GET /trpc/<proc>?input=<url-encoded-json>`
- `POST /trpc/<proc>` with body `{"json": <payload>}` (tRPC v10 wrapper) or bare `<payload>`
- Batch mode: `?batch=1` with comma-separated procedure list and indexed input map

Responses are wrapped as `{"result": {"data": <output>}}` per tRPC convention.

## Python SDK

```python
import flowgram_runtime as fg

schema = {
    "nodes": [
        {"id": "start_0", "type": "start", "meta": {"position": {"x": 0, "y": 0}},
         "data": {"outputs": {"type": "object", "properties": {"msg": {"type": "string"}}}}},
        {"id": "end_0", "type": "end", "meta": {"position": {"x": 200, "y": 0}},
         "data": {
             "inputs": {"type": "object", "properties": {"echo": {"type": "string"}}},
             "inputsValues": {"echo": {"type": "ref", "content": ["start_0", "msg"]}},
         }},
    ],
    "edges": [{"sourceNodeID": "start_0", "targetNodeID": "end_0"}],
}
```

### Fire-and-await

```python
outputs = await fg.run(schema, inputs={"msg": "hello"})
# {'echo': 'hello'}
```

### With a task handle (status / report / cancel)

```python
task = fg.start(schema, inputs={"msg": "hello"})
outputs = await task.wait(timeout=10)        # raises TimeoutError on timeout
print(task.status)                            # WorkflowStatus.Succeeded
print(task.report().reports.keys())           # per-node reports
task.cancel()                                 # idempotent
```

### From sync code

```python
outputs = fg.run_sync(schema, inputs={"msg": "hi"})
```

Refuses to run inside an event loop — use `await fg.run(...)` from async code.

### Validate without running

```python
result = fg.validate(schema, inputs={"msg": "x"})
if not result.valid:
    print(result.errors)
```

### Custom node

```python
class SentimentNode(fg.INodeExecutor):
    type = "sentiment"
    async def execute(self, ctx: fg.ExecutionContext) -> fg.ExecutionResult:
        text = ctx.inputs.get("text", "")
        return fg.ExecutionResult(outputs={"score": 0.87})

fg.register_executor(SentimentNode())
# Workflows with `"type": "sentiment"` nodes can now run.
```

### Embed the HTTP server inside your FastAPI

```python
your_app.mount("/_flowgram", fg.create_app())
```

`fg.create_app` is lazy — `import flowgram_runtime` doesn't pull in FastAPI/uvicorn
unless you actually call it.

### Schema input forms

`fg.run` / `fg.start` / `fg.validate` accept the schema as a dict, a JSON string,
or a `fg.WorkflowSchema` instance — pick whatever fits your storage layer.

### Lower-level handles (advanced)

The facade covers the 90% case. For full control reach for the underlying types:

```python
from flowgram_runtime import WorkflowApplication, InvokeParams, WorkflowSchema
app = WorkflowApplication.instance()
task_id = app.run(InvokeParams(schema=WorkflowSchema.model_validate(schema), inputs={...}))
```

## Code node — Python sandbox contract

The script must define a function `main(p)` where `p["params"]` is the parsed inputs dict.
It should return a dict (which becomes the node outputs); non-dict returns are wrapped as
`{"result": <value>}`.

```python
# data.script.content
def main(p):
    return {"sum": p["params"]["a"] + p["params"]["b"]}
```

Restrictions enforced by RestrictedPython:

- No `import` statements
- No access to dunder attributes (`__class__`, `__bases__`, …)
- Restricted builtins (no `eval`, `exec`, `open`, `__import__`, …)
- 60-second hard timeout

## Architecture

```
src/flowgram_runtime/
├── interface/          # Pydantic models + ABCs (mirrors @flowgram.ai/runtime-interface)
│   ├── schema/         # WorkflowSchema / NodeSchema / EdgeSchema / IFlowValue / IJsonSchema
│   ├── node/           # FlowGramNode enum + node-specific Pydantic models
│   ├── api/            # 6 API I/O models + FlowGramAPIs registry
│   └── runtime/        # IEngine / IExecutor / IContext / IState / IVariableStore / ...
├── core/               # implementation (mirrors @flowgram.ai/runtime-js)
│   ├── domain/         # engine, executor, context, document, state, variable, snapshot,
│   │                   # status, message, report, io_center, cache, validation, task, container
│   ├── application/    # WorkflowApplication façade (singleton)
│   ├── nodes/          # 11 node executors
│   └── infrastructure/ # uuid, runtime_type, json_schema_validator, compare_node_groups
└── server/             # FastAPI app: REST + tRPC compat + Swagger UI
```

## Testing

```bash
pytest -q
pytest --cov=flowgram_runtime --cov-report=term-missing
```

## Differences vs upstream

1. **Code node language is Python** (upstream is JavaScript via QuickJS WASM). The runtime
   contract — `main(p)`, `p["params"]` — is preserved.
2. **No tRPC native protocol** — we expose a tRPC-compatible HTTP shim on `/trpc/*` that
   accepts the same procedure names and produces tRPC-style responses, sufficient for
   `@trpc/client` to talk to.
3. Loop scheduling is sequential within a single loop iteration (matches upstream); fan-out
   between sibling nodes is concurrent via `asyncio.gather`.

## License

MIT (matching upstream).

