Metadata-Version: 2.4
Name: forktex-core
Version: 0.4.1
Summary: Postgres-native durable workflow execution for the ForkTex ecosystem — graph-first pipelines, state machines, and AI agent loops
License-Expression: AGPL-3.0-only
License-File: LICENSE
License-File: NOTICE
Keywords: forktex,durable-execution,flow,workflow,pipeline,state-machine,postgres,asyncpg,sqlalchemy
Author: FORKTEX
Author-email: info@forktex.com
Requires-Python: >=3.14,<4.0
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Database
Classifier: Topic :: Database :: Database Engines/Servers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Dist: asyncpg (>=0.29)
Requires-Dist: croniter (>=2.0)
Requires-Dist: pydantic (>=2.0)
Requires-Dist: redis[hiredis] (>=5.0)
Requires-Dist: sqlalchemy[asyncio] (>=2.0)
Project-URL: Bug Tracker, https://github.com/forktex/core-py/issues
Project-URL: Homepage, https://forktex.com
Project-URL: Repository, https://github.com/forktex/core-py
Description-Content-Type: text/markdown

# forktex-core

[![PyPI](https://img.shields.io/pypi/v/forktex-core.svg)](https://pypi.org/project/forktex-core/)
[![Python](https://img.shields.io/pypi/pyversions/forktex-core.svg)](https://pypi.org/project/forktex-core/)
[![License](https://img.shields.io/pypi/l/forktex-core.svg)](https://github.com/forktex/core-py/blob/master/LICENSE)

The shared substrate every ForkTex Python service builds on — async Postgres, caching, durable execution, secure object storage, background queues, and a virtual-data layer for tenant-defined schemas. **One library, six modules, one set of opinions.**

`forktex-core` exists so individual services don't reinvent connection management, retry/replay scaffolding, signed-URL plumbing, or the umpteenth bespoke state machine. Each consuming service imports what it needs and inherits the same operational shape.

## Install

```bash
pip install forktex-core
```

PostgreSQL (asyncpg + SQLAlchemy 2) and Redis (hiredis) are bundled defaults — no optional extras to remember. **Requires Python 3.12+.**

## Architecture overview

`forktex_core` is a flat namespace of independent modules. Pull in only what you need; nothing imports cross-module unless explicitly composed.

| Module | Status | Purpose |
|---|---|---|
| [`db`](#db--async-postgres) | **0.3** (currently `psql`, renaming) | Async SQLAlchemy engine, session lifecycle, base ORM (`BaseDBModel`, `TimestampMixin`, `AuditMixin`, `OrgScopedMixin`), CRUD helpers |
| [`cache`](#cache--redis) | **0.3** (currently `redis`, renaming) | Async Redis client, `@cached`, namespaced keys, sliding-window rate limiter |
| [`flow`](#flow--durable-execution) | **0.4** | Postgres-native durable execution: graph-first pipelines, state machines, and AI agent loops — `@flow.pipeline` / `@flow.graph` / `@flow.scheduled`, namespace-track runtime definitions, `InstanceQuery` filter/sort/page API |
| [`vault`](#vault--secret-encryption--kek-rotation) | **planned (0.4)** | Fernet-based symmetric encryption for credential blobs at rest, KEK (key-encryption-key) rotation flow, `EncryptedJSON` SQLAlchemy column type |
| [`storage`](#storage--object-storage--signed-urls) | **planned (0.4)** | S3/MinIO-compatible object storage client + secure callback URLs (consumer-driven public-config mapping) |
| [`queue`](#queue--background-jobs) | **planned (0.4)** | Lightweight background-job queue (likely arq-backed) for fire-and-forget work that isn't durable |
| [`data`](#data--virtual-schemas--query-result-abstraction) | **planned (0.5)** | Generalised "virtual database" layer: dynamic registers / fields / relationships with two layers of abstraction — `*Query` / `*Result` business objects above the Postgres primitives |

The first three are real today (with the `db` / `cache` renames in flight); the last four are the planned roadmap toward 1.0. The table is the source of truth — when in doubt, check what the module exports.

## Module reference

### `db` — async Postgres
*(currently exported as `forktex_core.psql` while the rename lands; both will be importable through 0.4 for migration breathing room.)*

```python
from forktex_core.db import (
    init_engine, close_engine, get_session,
    BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin,
)

init_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)

async with get_session() as session:
    user = await session.get(User, user_id)
```

What's in the box:

- **Engine + session lifecycle** — single `init_engine` / `close_engine` pair, `get_session` context manager that commits/rolls-back automatically.
- **Base model + mixins** — `BaseDBModel` (UUID PK + timestamps), `TimestampMixin` (created_at/updated_at), `AuditMixin` (created_by/updated_by), `OrgScopedMixin` (org_id FK + tenant filter helper). All composable.
- **CRUD helpers** — typed generic `Repository[Model]` with the usual list/get/create/update/delete + filter/sort/paginate built on SQLAlchemy 2's `select()` API.
- **Migration friendliness** — every table the library defines lives in its own schema (e.g. `forktex_flow.*`) so the consumer's alembic only sees consumer-owned tables.

### `cache` — Redis
*(currently exported as `forktex_core.redis`; same parallel-import migration window as `db`.)*

```python
from forktex_core.cache import init, close, cached, rate_limit

await init("redis://localhost:6379/0")

@cached(ttl=300, namespace="user:profile")
async def get_profile(user_id: str): ...

ok, retry_after = await rate_limit("login", key=ip, limit=5, window_seconds=60)
```

What's in the box:

- **Connection pool** — `init` / `close` / `get_client`, hiredis-backed.
- **`@cached(ttl=…)`** — transparent decorator caching for any async function with serialisable args.
- **Typed ops** — `get_json`, `set_json`, `incr`, `mget`, `delete_pattern` (cluster-safe).
- **Namespaces** — `Namespace("user.profile").key(user_id)` helper so cross-service key collisions stay impossible.
- **Sliding-window rate limiter** — production-tested limiter for auth middleware and similar hot paths.

### `flow` — durable execution

Postgres-native workflows that survive crashes, replay-on-resume from a checkpoint log, and elect one leader at a time across N workers via advisory locks. Schema-isolated under `forktex_flow.*` — the consumer's alembic never sees these tables.

**Everything is a graph.** A linear deploy pipeline is a chain graph. A user-onboarding state machine is a cyclic graph with event-driven edges. An AI agent loop is a cyclic graph with model-driven routing. All three use the same `@step` primitive and the same `Flow` runtime.

#### Quick start

```python
from forktex_core.flow import Flow, Ctx, step, node, parallel, edge, conditional, wait_edge, START, END
from typing import TypedDict, Annotated
import operator

flow = Flow(database_url="postgresql+asyncpg://user:pass@host/db")
```

#### Standalone steps — the single durable primitive

```python
# @step / @node are aliases — standalone, not bound to any Flow instance.
# Returns a partial state-update dict merged into the run's accumulated state.
# Every @step call is implicitly a durable checkpoint (cached on crash recovery).

@step
async def provision(ctx: Ctx, state: dict) -> dict:
    server_id = await provider.create(state["manifest"])
    return {"server_id": server_id}           # ← only the keys that changed

@step(max_attempts=5, backoff=(10.0, 60.0, 300.0))
async def configure(ctx: Ctx, state: dict) -> dict:
    await ansible.run(state["server_id"])
    return {}
```

#### Three workflow declaration styles

**`@flow.scheduled` — one function, one step, one workflow**

```python
@flow.scheduled("cloud.backup.create", version=1, cron="0 2 * * *")
async def backup_create(ctx: Ctx, state: dict) -> dict:
    return await run_backup(state)
```

Compiles to `START → backup_create → END`. Also manually triggerable via `flow.run()`.

**`@flow.pipeline` — linear steps array**

```python
from forktex_core.flow import step, parallel

class DeployState(TypedDict):
    org_id: str
    manifest: dict
    server_id: str | None
    logs: Annotated[list[str], operator.add]   # reducer: append, not overwrite

@flow.pipeline("cloud.deploy.up", version=1, state=DeployState)
class DeployUp:
    steps = [
        provision,
        configure,
        step(setup_dns, when=lambda s: s["manifest"].get("dns")),  # optional: skipped if False
        parallel(verify_dns, verify_ssl),                           # fan-out + implicit join
        health_check,
    ]
```

- `step(fn, when=cond)` — node is skipped (state passes through unchanged) if `cond(state)` is `False`
- `parallel(a, b, c)` — all execute concurrently; merged state continues to next step
- Optional `cron=` kwarg makes it also fire on a schedule

**`@flow.graph` — full explicit topology**

Graduation point from pipeline when you need branching, cycles, or event-driven transitions:

```python
from forktex_core.flow import edge, conditional, wait_edge

# Branching deploy
@flow.graph("cloud.deploy.advanced", version=1, state=DeployState)
class DeployAdvanced:
    nodes = {
        "provision":    provision,
        "configure":    configure,
        "rollback":     rollback,
        "health_check": health_check,
    }
    topology = [
        edge(START,           "provision"),
        edge("provision",     "configure"),
        conditional("configure", router_fn, {
            "rollback":     "rollback",
            "health_check": "health_check",
        }),
        edge("health_check",  END),
        edge("rollback",      END),
    ]

# State machine (event-driven)
@flow.graph("user.onboarding", version=1, state=OnboardingState)
class UserOnboarding:
    nodes    = {"email_pending": email_pending, "verified": verified}
    entry    = "email_pending"
    terminal = "verified"
    topology = [
        wait_edge("email_pending", "verified", on="email.verified"),  # suspends until signal
    ]

# AI agent loop (cyclic)
@flow.graph("intelligence.agent", version=1, state=AgentState)
class AgentLoop:
    nodes    = {"llm_call": llm_call, "tool_executor": tool_executor}
    topology = [
        edge(START, "llm_call"),
        conditional("llm_call", should_continue, {
            "tool_executor": "tool_executor",
            END:             END,
        }),
        edge("tool_executor", "llm_call"),   # cycle back
    ]
```

#### Namespace track — runtime definitions

For workflows defined by users at runtime (e.g. a network UI where each org configures their own automation):

```python
# Platform engineers register step templates (the building blocks)
@flow.step_template("network.reroute_traffic")
@step
async def reroute(ctx: Ctx, state: dict) -> dict: ...

@flow.step_template("network.send_alert")
@step
async def alert(ctx: Ctx, state: dict) -> dict: ...

# Org users define their workflow at runtime (via API / UI)
await flow.define(
    name      = "link_failure_response",
    namespace = "org-abc",
    version   = 1,
    config    = {
        "type":  "pipeline",
        "steps": [
            "network.reroute_traffic",
            {"step": "network.send_alert", "when": {"field": "ok", "is": False}},
        ],
    },
)

# Delete (raises if active runs exist)
await flow.undefine("link_failure_response", namespace="org-abc")
```

Config format for graphs:

```python
config = {
    "type": "graph",
    "topology": [
        {"from": "__START__", "to": "detect"},
        {"from": "detect",    "to": "reroute",   "on": "peer.down"},  # wait_edge
        {"from": "reroute",   "to": "__END__"},
    ],
}
```

#### Dispatch — identical for all workflow types

```python
# Platform-track (no namespace)
instance = await flow.run(
    "cloud.deploy.up",
    state    = {"org_id": "...", "manifest": {...}},
    metadata = {"org_id": "...", "kind": "up"},
)

# Namespace-track
instance = await flow.run(
    "link_failure_response",
    namespace = "org-abc",
    state     = {"link_id": "eth0-router-1"},
)

# Via definition handle
defn     = flow.workflow("cloud.deploy.up")
instance = await defn.run(state={...}, metadata={...})
```

#### WorkflowInstance — the run handle

```python
instance.instance_id      # UUID
instance.workflow_name    # "cloud.deploy.up"
instance.status           # "pending" | "running" | "completed" | "failed" | "cancelled"
instance.state            # current accumulated state dict
instance.current_node     # "configure" — which node is executing now
instance.nodes            # list[NodeInstance] with per-node status, duration, delta

await instance.cancel()
await instance.wait(timeout=300)                         # blocks until terminal
await instance.send("email.verified", payload={...})     # advance a wait_edge
await instance.refresh()                                 # re-fetch from DB
async for event in instance.stream(): ...                # live SSE events
```

#### Query — filter / sort / page / aggregate

```python
# Cloud dashboard: recent deploy runs for an org
page = await (
    flow.query()
    .workflow("cloud.deploy.up")
    .status("running", "failed")
    .metadata(org_id="abc")
    .sort("started_at", desc=True)
    .limit(20)
    .fetch()
)
page.items        # list[WorkflowInstance]
page.total        # total matching (for pagination UI)
page.next_cursor  # str | None — pass to .fetch(cursor=...) for next page

# Network dashboard: summary card
summary = await (
    flow.query()
    .namespace("org-abc")
    .since(datetime.now() - timedelta(days=7))
    .summary()
)
summary.total                  # 23
summary.by_status              # {"completed": 20, "failed": 3}
summary.avg_duration_seconds   # 183.4

# Find by payload field (JSONB @> filter on run.input)
instance = await (
    flow.query()
    .state(link_id="eth0-router-1")
    .status("running")
    .first()
)

# Stuck in a specific node
stuck = await (
    flow.query()
    .namespace("org-abc")
    .current_node("configure_routes")
    .status("running")
    .fetch()
)
```

Available filter methods: `.workflow(name, version=None)` · `.namespace(ns)` · `.status(*statuses)` · `.metadata(**kv)` · `.state(**kv)` · `.current_node(*names)` · `.since(dt)` · `.until(dt)` · `.triggered_by(*triggers)` · `.sort(field, desc=True)` · `.limit(n)`

Terminal methods: `.fetch(cursor=None)` → `InstancePage` · `.count()` → `int` · `.first()` → `WorkflowInstance | None` · `.summary()` → `InstanceSummary`

#### Child workflow orchestration (dynamic fan-out)

```python
# From inside a @step body:
@step
async def configure_all_peers(ctx: Ctx, state: NetworkState) -> dict:
    # Scatter: launch N child workflows in parallel
    results = await ctx.map(
        "network.peer.configure",
        states=[{"peer_id": p} for p in state["peers"]],
    )
    return {"peer_results": results}

# Or: spawn + wait for more control
@step
async def deploy_region(ctx: Ctx, state: dict) -> dict:
    instance_id = await ctx.spawn("cloud.deploy.svc", state={"region": state["region"]})
    result_state = await ctx.wait(instance_id)      # durable: survives restart
    return {"region_result": result_state}
```

#### Lifecycle

```python
await flow.init()    # apply schema migrations (idempotent)
await flow.start_driver()   # start leader-elected driver
await flow.stop_driver()    # graceful drain

# Or use as async context manager:
async with flow:
    await flow.run(...)

# For FastAPI:
@asynccontextmanager
async def lifespan(app):
    await flow.start_driver()
    yield
    await flow.stop_driver()
```

#### State reducers

```python
from typing import Annotated
import operator

class DeployState(TypedDict):
    server_id: str | None           # last-write-wins (default)
    logs: Annotated[list[str], operator.add]  # append reducer: [a] + [b] = [a, b]
```

Any `Annotated[T, fn]` field uses `fn(existing, new)` to merge partial updates. All other fields use last-write-wins. This makes parallel fan-out safe — multiple nodes can update different keys without stepping on each other.

### `vault` — secret encryption + KEK rotation
*(planned for 0.4)*

The minimal contract:

```python
from forktex_core.vault import Vault, EncryptedJSON

# One process-wide Vault, configured with the platform-wide Fernet KEK.
# (KEK = key-encryption-key — encrypts the blobs, NOT a tenant secret.
#  Leaking it still requires DB access to use; rotation is supported.)
vault = Vault.from_key(settings.master_key)

# Encrypt / decrypt arbitrary JSON-serialisable payloads:
token = vault.encrypt({"provider_token": "...", "ssh_key_id": 12345})
payload = vault.decrypt(token)

# Or use the SQLAlchemy column type for transparent at-rest encryption —
# reads decrypt automatically, writes encrypt automatically:
class OrgProviderCredential(BaseDBModel):
    __tablename__ = "org_provider_credential"
    payload: Mapped[dict] = mapped_column(EncryptedJSON(vault))
    # ... rest of the model

# Generate a fresh Fernet key (e.g. for ops bootstrapping a new env):
new_key = Vault.generate_key()  # base64-encoded 32-byte key
```

KEK rotation is a first-class flow:

```python
# When ops rotate the master key: re-encrypt every blob against the new
# Vault, then atomically swap the env var. The old Vault stays usable
# until every consumer process restarts with the new key.
old_vault = Vault.from_key(old_master_key)
new_vault = Vault.from_key(new_master_key)

await rotate_blobs(
    session,
    table=OrgProviderCredential,
    column="payload",
    old=old_vault,
    new=new_vault,
)
```

What it solves: every service that stores credentials (per-org provider tokens, per-tenant SMTP / API keys, per-pipeline upstream creds) ends up reinventing the same Fernet wrapper, the same "decrypt error means rotated KEK" diagnostic, and the same rotation script. `vault` packages those once.

Decision boundaries:

- **Symmetric only** — Fernet under the hood. No asymmetric / public-key crypto here; that's a different problem with different threat models.
- **At-rest encryption, not transport** — for transport-level secrecy, terminate TLS at the edge.
- **Single KEK by default** — per-tenant key derivation (DEK-per-org wrapped by KEK) is a v2 concern; today's threat model in the ForkTex products is well-served by one platform-wide KEK plus row-level access control.

### `storage` — object storage + signed URLs
*(planned for 0.4)*

The minimal contract:

```python
from forktex_core.storage import StorageClient, PublicAccess

storage = StorageClient(endpoint="s3://...", credentials=...)

# Upload from a service:
await storage.put("invoices/2026-04/INV-123.pdf", body, content_type="application/pdf")

# Mint a time-limited signed URL for a callback or a tenant download:
url = await storage.signed_url(
    "invoices/2026-04/INV-123.pdf",
    access=PublicAccess.read,
    ttl_seconds=600,
)
```

What it solves: every service that ships file uploads/downloads ends up reinventing the same five things — bucket layout, content-type sniffing, secure short-lived URLs, public-vs-private access policy, and a callback path that the consumer's public config (CDN host, custom domain) can override. `storage` packages those once, with the **public-link mapping driven by consumer-side configuration** so the same code works against MinIO in dev and S3 + CloudFront in prod.

S3-compatible by default (MinIO + AWS S3); GCS / Azure Blob behind the same `StorageClient` interface as a follow-up.

### `queue` — background jobs
*(planned for 0.4)*

For fire-and-forget work that doesn't need `flow`'s durability guarantees: send-an-email, refresh-a-cache, kick-off-a-report. Likely arq-backed (Redis-native, async-first, lighter than Celery), wrapped in a thin opinionated client so consumers get the same `enqueue` / `process` ergonomics across services.

Decision boundary vs. `flow`:

- **`flow`**: must survive crashes, must replay-on-resume, has steps with retry policies, lives for minutes-to-hours, observability matters.
- **`queue`**: short-lived, idempotent, "best-effort" fire-and-forget; if a worker dies mid-job, redelivery is cheap and the job runs again.

If you're unsure which you need, you probably need `flow`.

### `data` — virtual schemas + Query/Result abstraction
*(planned for 0.5)*

A common need across ForkTex services is a layer for **tenant-defined data**: registers (logical tables), fields (logical columns), relationships, all CRUD-able by end users at runtime. Flexible business-process tools pivot on this every day.

`data` lifts that pattern to two clean abstraction levels:

1. **Storage layer** (Postgres-backed, leans on `db`): the dynamic-schema mechanics — register definitions, field definitions, relationship graph, JSONB-backed row storage with per-field type coercion, indexable derived columns.
2. **Business-logic layer**: composable `*Query` objects (filtering / sorting / paginating-or-scrolling / soft-archiving / hard-deleting / search / faceting) returning typed `*Result` objects. The same `*Query` shape applies whether the underlying register is Postgres-stored, virtualised over an external API, or computed on the fly — so consumers don't have to know which.

```python
# Aspirational sketch — final API will be informed by the extraction.
from forktex_core.data import Register, RowQuery

invoices = Register.load("invoices", tenant_id=...)
result = await (
    RowQuery(invoices)
    .where(status="paid")
    .since(date(2026, 1, 1))
    .order_by("-issued_at")
    .scroll(page_size=50)
)
```

Why this lives in `core` rather than each consumer rolling its own: every B2B product reaches a point where end users want to add a field. Once you've solved that once, you don't want to solve it three more times in three more services.

## Configuration

`forktex-core` does not read environment variables directly. Connection strings, credentials, KEKs, and pool settings are passed in by the host service at `init_engine` / `init` / `Flow(...)` / `Vault.from_key(...)` / `StorageClient(...)` construction time. Each consuming service decides its own configuration source (env vars, secrets manager, manifest, etc.).

## Development

The root [`Makefile`](Makefile) is generated by `forktex fsd makefile sync` from [`forktex.json`](forktex.json) — do not hand-edit.

```bash
make help              # list every available target
make deps              # editable install
make format            # ruff format
make lint              # ruff check
make test              # pytest tests/
make build             # python3 -m build → dist/
make ci                # format-check + lint + license-check + audit + test + build
make clean             # remove caches and dist/
```

`make ci` is the single command that gates a publish: format-check, lint, dual-license header verification across every source file, dependency CVE audit, full pytest sweep (uses Postgres testcontainers for `flow`), wheel + sdist build, and `twine check`.

`make start` / `make stop` / `make logs` are intentional no-ops — `forktex-core` is a library, there is nothing to run.

### License headers

Every source file carries the AGPL-3.0 + Commercial dual-license SPDX header, applied idempotently via:

```bash
make license-check    # CI gate — fails if any source file is missing the header
make license-fix      # add or refresh headers across src/, tests/, scripts/
make license-strip    # remove headers (used before license-model changes)
```

## FSD Self-Description

This repo follows the [ForkTex Standard Delivery (FSD)](https://github.com/forktex/forktex-python) shape:

- root manifest: [`forktex.json`](forktex.json)
- profile: `workspace/python-monorepo`
- target maturity: `L3`
- single publishable package: `forktex-core` at path `.`

Re-validate locally with:

```bash
forktex fsd --project-dir . check
forktex arch discover --project-dir . --output-dir /tmp/arch-corepy
```

## License

Dual-licensed — **AGPL-3.0-or-later** for open-source use, **commercial** for everything else (proprietary products, SaaS without source release, redistribution in closed-source form). See [`LICENSE`](LICENSE) and [`NOTICE`](NOTICE) for the full terms.

Commercial licensing inquiries: info@forktex.com.

