Metadata-Version: 2.4
Name: dt-forge
Version: 0.4.3
Summary: Domain-agnostic Python framework for digital twin architectures
License: MIT
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: paho-mqtt>=2.0
Requires-Dist: influxdb-client>=1.40
Requires-Dist: pymongo>=4.6
Requires-Dist: motor>=3.3
Requires-Dist: redis>=5.0
Requires-Dist: minio>=7.2
Requires-Dist: pydantic>=2.5
Requires-Dist: pydantic-settings>=2.1
Requires-Dist: python-dotenv>=1.0
Requires-Dist: fastapi>=0.109
Requires-Dist: uvicorn[standard]>=0.27
Requires-Dist: httpx>=0.26
Requires-Dist: sse-starlette>=2.0
Requires-Dist: scipy>=1.12
Requires-Dist: numpy>=1.26
Requires-Dist: simpy>=4.1
Requires-Dist: onnxruntime>=1.17
Requires-Dist: scikit-learn>=1.4
Requires-Dist: prophet>=1.1
Requires-Dist: transitions>=0.9
Requires-Dist: simple-pid>=2.0
Requires-Dist: neo4j>=5.17
Requires-Dist: vaderSentiment>=3.3
Requires-Dist: langchain>=0.3
Requires-Dist: langchain-classic>=0.1
Requires-Dist: langchain-community>=0.3
Requires-Dist: langchain-openai>=0.2
Requires-Dist: langchain-anthropic>=0.2
Requires-Dist: langchain-ollama>=0.2
Requires-Dist: stable-baselines3>=2.2
Requires-Dist: gymnasium>=0.29
Requires-Dist: pyyaml>=6.0
Requires-Dist: click>=8.1
Requires-Dist: pyserial>=3.5
Provides-Extra: dev
Requires-Dist: pytest>=7.4; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: pytest-cov>=4.1; extra == "dev"
Requires-Dist: ruff>=0.2; extra == "dev"
Requires-Dist: mypy>=1.8; extra == "dev"

# DT-Forge

A domain-agnostic Python framework for building digital twins of any physical
or process asset — pumps, soil, plants, sales pipelines, manufacturing lines.

DT-Forge implements a **six-layer Digital Twin architecture** plus a
cross-cutting **connector subsystem** for inter-twin communication and four
**collection-twin patterns** for grouping twins together. You declare the
sensors, write the asset-specific glue, and the framework provides the rest:
storage, modelling, services, reactive control, multi-agent diagnosis, and
autonomous decision-making.

## Architecture

```
┌─────────────────────────────────────────────────────────┐
│              AUTONOMOUS LAYER (Layer 6)                 │
│  OODA loop · GoalPlanner · AutonomousOverseer (LLM)     │
│  RL policy (SAC/TD3/PPO/A2C) · HumanNotifier            │
├─────────────────────────────────────────────────────────┤
│             INTELLIGENT LAYER (Layer 5)                 │
│  MultiAgentSystem · LangChain DiagnosticAgent           │
│  Neo4j knowledge graph · per-agent status / history     │
├─────────────────────────────────────────────────────────┤
│              REACTIVE LAYER (Layer 4)                   │
│  ThresholdRuleEngine · MultiStateFSMRuleEngine          │
│  simple-pid PIDController · PostgreSQL RuleRepository   │
├─────────────────────────────────────────────────────────┤
│              SERVICES LAYER (Layer 3)                   │
│  Eclipse Ditto sync · FastAPI REST + SSE chat           │
├─────────────────────────────────────────────────────────┤
│         SIMULATION & MODEL LAYER (Layer 2)              │
│  scipy ODE · ONNX/sklearn surrogate · SimPy · Prophet   │
├─────────────────────────────────────────────────────────┤
│                 DATA LAYER (Layer 1)                    │
│  MQTT ingest · TelemetryRouter · DataManagementPipeline │
│  InfluxDB · MongoDB · Redis · MinIO · PostgreSQL        │
│  ProvenanceLog · SessionStore · TextIngestor            │
└─────────────────────────────────────────────────────────┘

    Cross-cutting:  Connector subsystem  (MQTT · Ditto · HTTP API)
                    Collection twins     (Aggregate · Collection ·
                                          Composite · Network)
```

Every layer is optional — pick the ones your twin needs and skip the rest.
A monitoring-only twin needs Data + Network. A self-driving asset needs all
six. The bundled `dtforge` CLI generates a `docker-compose.yml` for exactly
the layers you ask for.

## Collection patterns

| Pattern        | Use case                                                       |
|----------------|----------------------------------------------------------------|
| `AggregateDT`  | Fleet of identical assets — fused state, shared control        |
| `CollectionDT` | Batch monitoring, outlier detection, statistical comparison    |
| `CompositeDT`  | Hierarchical systems with boundary-condition exchange + swap   |
| `NetworkDT`    | Graph-topology systems, cascade risk, bottleneck detection     |

## Installation

Requires **Python 3.11+** and Docker (for the backing infrastructure).

```bash
pip install dt-forge
```

This installs the framework plus the `dtforge` CLI.

## Quick start

### 1. Configure

Configuration is driven by environment variables with the `DT_` prefix and
double-underscore nested-field delimiter. Put them in a `.env` file in your
project directory:

```bash
DT_ASSET_ID=pump_001
DT_ASSET_TYPE=centrifugal_pump
DT_ASSET_NAME="Plant A Pump"

DT_MQTT__BROKER=localhost
DT_MQTT__PORT=1883

DT_INFLUX__URL=http://localhost:8086
DT_INFLUX__TOKEN=my-token
DT_INFLUX__ORG=digital_twin
DT_INFLUX__BUCKET=asset_telemetry

DT_MONGO__URI=mongodb://admin:password@localhost:27017
DT_REDIS__URL=redis://localhost:6379
DT_DITTO__URL=http://localhost:8080

# Optional — for the intelligent layer
DT_LLM__PROVIDER=anthropic        # openai | anthropic | ollama
DT_LLM__MODEL=claude-sonnet-4-6
DT_LLM__API_KEY=sk-ant-...
DT_NEO4J__URI=bolt://localhost:7687
```

### 2. Start the infrastructure you need

`dtforge infra up` writes a `docker-compose.yml` tuned for the layers you ask
for and runs `docker compose up -d`:

```bash
# Minimal monitoring twin (data + MQTT + Ditto sync):
dtforge infra up --layers data,network,services

# Add the intelligent layer (also brings Neo4j up):
dtforge infra up --layers data,network,services,intelligent

# Write the compose file without starting containers:
dtforge infra up --layers data,network,services --generate-only
docker compose up -d
```

Wait for everything to come up, then verify:

```bash
dtforge infra check --layers data,network,services,intelligent
```

Every service shows `✓` once the twin can reach it.

**Containers started by `infra up`:**

| Service        | Host port  | Layer needed         | Purpose                          |
|----------------|------------|----------------------|----------------------------------|
| Mosquitto      | 1883, 9001 | `network`            | MQTT broker + WebSocket          |
| InfluxDB       | 8086       | `data` / `network`   | Time-series telemetry            |
| MongoDB        | 27017      | `data` / `network`   | Events + provenance              |
| Redis          | 6379       | `data` / `network`   | Cache, FSM state, sessions       |
| MinIO          | 9000, 9002 | `data` / `network`   | Trained models, large objects    |
| Eclipse Ditto  | 8080       | `services`           | Canonical twin state (nginx+auth)|
| Neo4j          | 7474, 7687 | `intelligent`        | Knowledge graph                  |
| Grafana        | 3000       | always               | Observability                    |

### 3. Scaffold a new twin

```bash
dtforge init \
  --asset-type centrifugal_pump \
  --name "Plant A Pump" \
  --asset-id pump_001
```

That writes:

| File         | Purpose                                                 |
|--------------|---------------------------------------------------------|
| `twin.py`    | Twin class scaffold — edit `build_layers()` to wire it  |
| `.env`       | Environment configuration                               |

The scaffold imports the core layers (Data, Network ingest, Data management,
Ditto sync, Reactive) and gives you a runnable starting point. Add the
intelligent and autonomous layers as your twin grows.

### 4. Run

```bash
python twin.py            # direct
# or:
dtforge run twin          # via the CLI (loads twin.py, calls run_forever)
```

The FastAPI service exposes `/health`, `/api/twin/state`,
`/api/twin/telemetry`, `/api/twin/events`, and the SSE chat endpoint
`/api/chat` once the services layer is wired in.

## A 30-line monitoring twin

```python
import asyncio, logging
from dotenv import load_dotenv
from dt_forge.core.config import TwinConfig, SensorFieldSpec
from dt_forge.core.base import AbstractDigitalTwin
from dt_forge.core.lifecycle import TwinLifecycle
from dt_forge.data import InfluxAdapter, MongoAdapter, RedisAdapter
from dt_forge.data.writer import TelemetryRouter
from dt_forge.data.management import DataManagementPipeline
from dt_forge.network import MQTTIngestor
from dt_forge.reactive import ThresholdRuleEngine

load_dotenv()
logging.basicConfig(level=logging.INFO)

config = TwinConfig(sensor_fields=[
    SensorFieldSpec(name="temperature_c", nominal=25.0, noise_std=0.5,
                    warn_threshold=60.0, crit_threshold=75.0),
    SensorFieldSpec(name="pressure_bar", nominal=4.0, noise_std=0.05,
                    warn_threshold=3.0, crit_threshold=2.0,
                    threshold_direction="low"),
])

class PumpTwin(AbstractDigitalTwin):
    def build_layers(self):
        ts, doc, cache = InfluxAdapter(self.config), MongoAdapter(self.config), RedisAdapter(self.config)
        router = TelemetryRouter(self.config, self.bus,
                                 ts_store=ts, doc_store=doc, cache=cache)
        return {
            "data":      router,
            "network":   MQTTIngestor(self.config, self.bus, router=router),
            "data_mgmt": DataManagementPipeline(self.config, self.bus,
                                                ts_store=ts, cache=cache),
            "reactive":  ThresholdRuleEngine(self.config, self.bus,
                                              ts_store=ts, cache=cache, doc_store=doc),
        }

if __name__ == "__main__":
    lc = TwinLifecycle(); lc.add(PumpTwin(config))
    asyncio.run(lc.run_forever())
```

Point a device (or `dt_forge.physical.simulator.GenericSimulator`) at
`dt/pump_001/telemetry` and the twin will route, store, smooth, score health,
and drive the FSM automatically. Add a simulation model, a knowledge graph, a
LangChain agent, and an OODA loop on top of this skeleton when you need them.

## Extension points

The framework is built around protocols and abstract base classes. Replace any
piece without touching the rest:

| Extension              | Mechanism                                             |
|------------------------|-------------------------------------------------------|
| Storage backend        | Implement `TimeSeriesStore` / `DocumentStore` /       |
|                        | `CacheStore` / `ObjectStore` protocol                 |
| Physics model          | Subclass `ODEModel` or implement `TwinModel`          |
| ML surrogate           | Implement `TwinModel` with ONNX/sklearn/Torch         |
| Discrete-event model   | Wrap a generator with `SimPyModel`                    |
| Forecaster             | Wrap any time-series model with `ProphetForecaster`   |
| Custom reactive rule   | Implement `Rule.evaluate(readings)` (sync)            |
| N-state FSM            | Subclass `MultiStateFSMRuleEngine`                    |
| Diagnostic agent tools | Override `DiagnosticAgent._build_extra_tools()`       |
| LLM provider           | Set `DT_LLM__PROVIDER` (openai/anthropic/ollama)      |
| Goal-based assessment  | Subclass `GoalPlanner.assess()`                       |
| Strategic LLM decisions| Wire an `AutonomousOverseer` into `OODALoop`          |
| RL policy              | `PolicyTrainer` + `PolicyDeployer` (SB3)              |
| Reward function        | Pass `reward_fn` to `GenericTwinEnv`                  |
| Notification channel   | Implement `NotificationBackend` (email/Slack/webhook) |
| Cross-twin transport   | Implement `ConnectorProtocol`                         |
| New collection pattern | Subclass `AbstractCollectionTwin`                     |
| Domain session state   | Subclass `SessionContext`, use `SessionStore[T]`      |

## Technology stack

| Concern          | Library                                            |
|------------------|----------------------------------------------------|
| Messaging        | Eclipse Mosquitto / paho-mqtt 2.x                  |
| Time-series      | InfluxDB 2 (`influxdb-client`)                     |
| Documents        | MongoDB (`pymongo` + `motor`)                      |
| Cache / pub-sub  | Redis                                              |
| Object storage   | MinIO (S3 API)                                     |
| Relational       | PostgreSQL (`asyncpg`) — used by `RuleRepository`  |
| Twin state       | Eclipse Ditto                                      |
| Knowledge graph  | Neo4j 5                                            |
| Web API          | FastAPI + Uvicorn + sse-starlette                  |
| Config           | Pydantic v2 + pydantic-settings                    |
| State machine    | `transitions`                                      |
| PID control      | `simple-pid`                                       |
| Physics          | SciPy (`solve_ivp`)                                |
| Surrogate / ML   | ONNX Runtime + scikit-learn                        |
| Forecasting      | Prophet                                            |
| Discrete event   | SimPy                                              |
| Sentiment / NLP  | vaderSentiment (swappable for any callable)        |
| Agents           | LangChain + langchain-classic                      |
| LLM providers    | OpenAI · Anthropic · Ollama                        |
| RL               | Stable-Baselines3 + Gymnasium                      |
| CLI              | Click                                              |

## CLI reference

```bash
dtforge init        --asset-type TYPE --name NAME --asset-id ID [--out DIR]
dtforge infra up    --layers data,network,services[,intelligent] [--out FILE] [--generate-only]
dtforge infra check [--layers ...]              # default: read .dtforge-layers
dtforge run         [TWIN_MODULE]               # default module: "twin"
dtforge train       [--algorithm SAC|TD3|PPO|A2C] [--timesteps N]
                    [--env-module M] [--save NAME]
```

Run any command with `--help` for the full flag list.

## License

MIT
