OMS Documentation v2.0 · Multi-Tenant Omni-Channel Order Management System
Production Grade FastAPI 0.111 Python 3.12 Multi-Tenant ← Back to App

OMS — Omni-Channel Order Management System

A production-grade, fully async, multi-tenant Order Management System built with Python 3.12, FastAPI, and a polyglot persistence layer — PostgreSQL, MongoDB, Redis, and Elasticsearch. Designed for high-throughput omni-channel retail with intelligent multi-node sourcing, real-time inventory reservation, a fully automated fulfillment pipeline, and complete data isolation between organizations and environments.

API Endpoints
80+
11 routers incl. orgs, envs, admin
Docker Services
9+
Main stack + per-tenant stacks
Celery Queues
7
With per-env fan-out tasks
Sourcing Strategies
7
Incl. AI_ADAPTIVE + AI_HYBRID
RBAC Layers
3
Platform → Org → Environment
Data Isolation
DB
Separate PG DB per environment

Quick start: Run docker compose up -d --build then seed with docker compose exec api python scripts/seed.py to get a fully operational system with 8 nodes, 64 inventory records, and 5 sourcing rules. A default organization and Production environment are seeded automatically on first startup.

1 Architecture Overview

┌─────────────────────────────────────────────────────────────────────────┐ │ CLIENT LAYER │ │ WEB │ MOBILE │ POS │ API │ MARKETPLACE │ └──────────────────────────────┬──────────────────────────────────────────┘ │ HTTP/REST + X-OMS-Environment header ┌──────────────────────────────▼──────────────────────────────────────────┐ │ CONTROL PLANE (shared oms_db) │ │ /auth /admin /organizations /environments │ │ users · user_groups · organizations · environments │ │ user_organization_roles · user_environment_roles │ └──────────────────────────────┬──────────────────────────────────────────┘ │ environment resolved → get_db() selects engine ┌───────────────────────┼───────────────────────────────┐ ▼ ▼ ▼ oms_db (default prod) oms_acme_dev oms_widgetco_prod ───────────────────── ──────────── ───────────────── /orders /inventory /orders /inventory /orders /inventory /nodes /sourcing-rules /nodes /nodes /connectors /architect │ │ │ ▼ ▼ ▼ PostgreSQL MongoDB Redis Elasticsearch (per-env DB) (per-env (Cache / (per-env index MongoDB DBs) Celery broker) prefix) │ ▼ CELERY WORKERS (7 queues, per-environment fan-out) sourcing → fulfillment → carrier → notifications → webhooks → connectors → learning ▲ AI Learning Loop (label outcomes · discover patterns evaluate experiments · propose rules)

AI-Native 5-Layer Architecture

┌─────────────────────────────────────────────────────────────────┐ │ Layer 5: Continuous Learning Loop │ │ (Nightly pattern discovery · Outcome labeling · A/B testing) │ ├─────────────────────────────────────────────────────────────────┤ │ Layer 4: Meta-AI Framework (Self-Modification) │ │ (Natural language → proposals → human approval → safe apply) │ ├─────────────────────────────────────────────────────────────────┤ │ Layer 3: AI Architect UI │ │ (Proposals · Patterns · Experiments · Performance dashboards) │ ├─────────────────────────────────────────────────────────────────┤ │ Layer 2: AI Sourcing Engine (AI_ADAPTIVE strategy) │ │ (KubeAI-scored nodes · confidence threshold · rule fallback) │ ├─────────────────────────────────────────────────────────────────┤ │ Layer 1: Intelligence Data Foundation │ │ (Outcome tracking · Pattern storage · Feature extraction) │ └─────────────────────────────────────────────────────────────────┘

Request Lifecycle

  1. Client POSTs an order to POST /orders
  2. FastAPI validates the request via Pydantic v2 schemas
  3. Order is written to PostgreSQL and indexed in Elasticsearch
  4. Order-created event is written to MongoDB audit log
  5. A source_order task is enqueued on the Redis-backed sourcing Celery queue
  6. Sourcing Engine evaluates active rules — may route via A/B experiment or AI_ADAPTIVE strategy
  7. Pipeline tasks flow: sourcing → fulfillment (pick → pack) → carrier (label + ship) → notifications → webhooks
  8. Delivery outcome is labeled by the learning worker and fed back into the pattern store

Directory Structure

D:\OMS\
├── Dockerfile                      # Multi-stage Python 3.12 image
├── docker-compose.yml              # All 9 services
├── requirements.txt                # All Python dependencies
├── .env                            # Local environment variables
│
├── app/
│   ├── main.py                     # FastAPI app, lifespan, router registration
│   ├── config.py                   # Pydantic Settings (reads .env)
│   │
│   ├── database/
│   │   ├── postgres.py             # Async SQLAlchemy engine + session factory
│   │   ├── mongodb.py              # Motor async client + index creation
│   │   ├── redis_client.py         # aioredis pool + cache helpers
│   │   └── elasticsearch_client.py # Async ES client + index mapping creation
│   │
│   ├── models/postgres/
│   │   ├── order_models.py         # Order, OrderItem, FulfillmentAllocation, Shipment
│   │   ├── inventory_models.py     # InventoryItem, InventoryAdjustment, InventoryReservation
│   │   ├── node_models.py          # FulfillmentNode + NodeType/NodeStatus enums
│   │   └── sourcing_rule_models.py # SourcingRule + SourcingStrategy/ConditionOperator enums
│   │
│   ├── schemas/                    # Pydantic v2 request/response schemas
│   ├── routers/                    # FastAPI route handlers (7 files, 44 endpoints)
│   ├── services/
│   │   ├── sourcing_engine.py      # Intelligence core (rule eval, node scoring, allocation)
│   │   └── webhook.py              # HMAC-SHA256 signed delivery
│   │
│   ├── models/postgres/
│   │   ├── ...existing models...
│   │   └── ai_models.py            # AIProposal, AIExperiment, CustomAttributeDefinition,
│   │                               #   UIWidget, SourcingOutcomeLabel + enums
│   │
│   ├── services/
│   │   ├── sourcing_engine.py      # Core + AI_ADAPTIVE + experiment traffic splitting
│   │   ├── ai_sourcing.py          # AISourcingAdvisor — KubeAI node scoring
│   │   ├── pattern_discovery.py    # PatternDiscoveryService — nightly cluster aggregation
│   │   └── schema_evolution.py     # SchemaEvolutionEngine — safe additive schema changes
│   │
│   └── workers/
│       ├── celery_app.py           # Celery factory + 7 queues + beat schedule
│       ├── sourcing.py             # source_order task (writes sourcing_outcomes to MongoDB)
│       ├── fulfillment.py          # start_picking, complete_packing, reset counters
│       ├── carrier.py              # book_shipment, simulate_delivery, sync tracking
│       ├── notifications.py        # Email/SMS notification tasks
│       ├── webhooks.py             # dispatch_webhook, retry_failed_webhooks
│       ├── connectors.py           # sync_fulfillment_to_connector, poll_amazon_orders
│       └── learning.py             # label_sourcing_outcomes, discover_patterns,
│                                   #   update_node_performance, evaluate_ai_experiments
│
├── frontend/                       # React + Vite + TypeScript GUI (port 3000)
├── scripts/
│   └── seed.py                     # Seeds all 4 databases with realistic data
└── tests/
    └── test_imports.py             # 13 import + unit tests

2 Tech Stack

LayerTechnologyPurpose
API FrameworkFastAPI 0.111 + UvicornAsync REST API, OpenAPI docs, Pydantic v2 validation
ORMSQLAlchemy 2.0 (async)PostgreSQL ORM with asyncpg driver
Primary DBPostgreSQL 16Orders, inventory, nodes, sourcing rules, webhooks
Document DB (events)MongoDB 7 (Motor) — oms_eventsOrder audit trail, product catalog, webhook log, error monitoring
Document DB (AI)MongoDB 7 (Motor) — oms_ai_learningAI learning pipeline: sourcing decisions, cluster patterns, node metrics. Isolated to prevent analytical writes from affecting transactional read latency. 90-day TTL on outcomes, 180-day on patterns.
Cache / QueueRedis 7.2Celery broker/backend, cache, rate-limiting
SearchElasticsearch 8.12Full-text order and product search
Task QueueCelery 5.4 + FlowerAsync pipeline workers, beat scheduler, learning loop
AI / LLMKubeAI (Anthropic)AI node scoring (AI_ADAPTIVE), NL → proposals
ValidationPydantic v2Request/response schemas, settings management
FrontendReact 18 + Vite + TypeScriptProfessional GUI (this app)
ContainersDocker ComposeAll 9 services in one command
HMAC Signinghashlib + hmac (stdlib)Webhook payload integrity verification
Geo Mathhaversine (stdlib math)Sourcing distance calculations

3 PostgreSQL Models

fulfillment_nodes

ColumnTypeDescription
idUUID PKNode identifier
codeVARCHAR(50) UNIQUEShort code e.g. DC-EAST
nameVARCHAR(200)Display name
node_typeENUMDISTRIBUTION_CENTER, RETAIL_STORE, DARK_STORE, WAREHOUSE, PICKUP_POINT
statusENUMACTIVE, INACTIVE, MAINTENANCE, CLOSED
latitude / longitudeFLOATGeographic coordinates for distance sourcing
can_ship / pickup / curbside / same_dayBOOLCapability flags for fulfillment type matching
daily_order_capacityINTMax orders per day
current_daily_ordersINTReset to 0 at midnight by Celery beat
shipping_cost_multiplierFLOATRelative cost weight for sourcing score

orders

ColumnTypeDescription
idUUID PKOrder identifier
order_numberVARCHAR(50) UNIQUEHuman-readable e.g. ORD-20240101-ABC123
channelENUMWEB, MOBILE, POS, API, MARKETPLACE
fulfillment_typeENUMSHIP_TO_HOME, STORE_PICKUP, SHIP_FROM_STORE, CURBSIDE_PICKUP, SAME_DAY_DELIVERY
statusENUM15-state machine (see Fulfillment Pipeline)
total_amountNUMERIC(12,2)Order total in order currency
shipping_latitude / longitudeFLOATCustomer location for haversine sourcing
pickup_node_idUUID FKFor BOPIS / curbside orders
sourcing_rule_idUUID FKWhich sourcing rule was applied

inventory_items

Per-node, per-SKU stock levels with three counters:

  • quantity_on_hand — physical stock in the warehouse
  • quantity_reserved — soft-reserved by active allocations
  • quantity_available = on_hand − reserved (what sourcing can use)

fulfillment_allocations

Bridges an order item to a specific fulfillment node. One order can have multiple allocations (split fulfillment). Tracks the full picking → packing → shipping timeline with timestamps for each transition.

sourcing_rules

Configurable rules evaluated in priority order (ascending). Each rule has:

  • conditions — JSON array of {field, operator, value} tuples
  • strategyDISTANCE_OPTIMAL, COST_OPTIMAL, STORE_NEAREST, INVENTORY_RESERVATION, LEAST_COST_SPLIT, AI_ADAPTIVE, or AI_HYBRID
  • allowed_node_types, required_capabilities — node pre-filter
  • max_split_nodes, cost_weight, distance_weight — algorithm parameters

Control-Plane Models (org_models.py)

These four tables live exclusively in oms_db — the shared control-plane database — and are never replicated into tenant databases.

ModelTableDescription
OrganizationorganizationsTop-level tenant grouping. Has a unique slug used as the namespace prefix for all database names (oms_{slug}_{env}). Instantaneous to create — no database provisioned at this stage.
EnvironmentenvironmentsAn isolated data silo within an org. Stores db_name, mongo_events_db, mongo_ai_db, es_index_prefix, and optional per-cluster pg_host/port/user/password overrides. status: PROVISIONING → ACTIVE → SUSPENDED → ARCHIVED.
UserOrganizationRoleuser_organization_rolesLinks a user to an org with a role: ORG_OWNER, ORG_ADMIN, ORG_MEMBER. One row per (user, org) pair — enforced by uq_user_org unique constraint.
UserEnvironmentRoleuser_environment_rolesLinks a user to a specific environment with a role: OWNER, ADMIN, MEMBER, VIEWER. One row per (user, env) pair — enforced by uq_user_env.

environments table — key columns

ColumnTypeNotes
idUUID PKSent as X-OMS-Environment header
organization_idUUID FKParent org, CASCADE DELETE
slugVARCHAR(80)Unique within org (uq_env_org_slug)
env_typeENUMDEV · QA · STAGING · PROD
statusENUMPROVISIONING · ACTIVE · SUSPENDED · ARCHIVED
db_nameVARCHAR UNIQUEPostgreSQL database name, e.g. oms_acme_prod
mongo_events_dbVARCHARe.g. oms_events_acme_prod
mongo_ai_dbVARCHARe.g. oms_ai_acme_prod
es_index_prefixVARCHARe.g. acme_prod
pg_host / pg_port / pg_user / pg_passwordVARCHAR NULLPer-cluster overrides; NULL = use same host as control plane
base_urlVARCHAR NULLDeployment URL of the tenant pod; used by the frontend switcher for cross-origin redirects
is_defaultBOOLFallback when no X-OMS-Environment header is sent (one per org)
provisioned_atTIMESTAMP NULLSet when database creation completes; NULL while PROVISIONING

AI Models (ai_models.py)

ModelDescription
ai_proposalsAll AI-proposed changes. Lifecycle: PENDING → APPROVED → APPLIED (or REJECTED / ROLLED_BACK). Stores proposal_data JSONB and rollback_data JSONB. Nothing applied without human approval.
ai_experimentsA/B tests between two sourcing strategies. Traffic split via traffic_split_pct (1–50%). Stores per-arm results JSONB and winner when concluded.
custom_attribute_definitionsDynamic schema extensions for orders/products/nodes — uses existing metadata_ JSONB columns. No DDL required.
ui_widgetsJSON-configured dashboard widgets. widget_type: metric_card, time_series, bar_chart, table, distribution.
sourcing_outcome_labelsPostgreSQL mirror of labeled MongoDB sourcing_outcomes docs for fast analytical queries.

Other Models

ModelDescription
order_itemsLine items linked to an order. Tracks quantity_fulfilled as allocations are shipped.
shipmentsOne per allocation. Carrier, tracking number, label URL, JSON array of tracking events.
inventory_adjustmentsImmutable audit log of every stock change with before/after quantities.
webhook_endpointsRegistered webhook receivers with HMAC secret.
webhook_eventsPersistent delivery records with retry state machine.
connectorsExternal platform integrations (Shopify, etc.) with masked config and counters.
connector_eventsPer-connector inbound/outbound event log with payload and response snapshots.
connector_inventory_mappingsMaps an OMS InventoryItem (by SKU + connector) to platform-specific IDs needed for inventory sync: shopify_inventory_item_id, shopify_location_id, amazon_asin, amazon_fnsku, etc.

4 MongoDB Collections

MongoDB is split into two databases to isolate transactional read latency from analytical write load. Config keys: MONGODB_DB=oms_events and MONGODB_AI_DB=oms_ai_learning. Both use the same MongoDB instance/container; a separate container is easy to provision later by pointing MONGODB_AI_DB to a different connection string.

Database: oms_events — transactional / operational

Latency-sensitive collections written on the hot request path. Fast point lookups by order_id.

CollectionPurposeTTLKey Indexes
order_eventsAppend-only audit trail for every order state change(order_id, timestamp), event_type
error_eventsRaw error/exception records from all services30 daystimestamp, fingerprint, source_service, level
error_issuesAggregated issues grouped by fingerprint (one doc per unique error type)fingerprint (unique), status+last_seen
product_catalogRich product data: images, attributes, descriptionsText index on name + description
webhook_deliveriesPer-event delivery attempt historyevent_id, status
notificationsEmail/SMS notification logorder_id, created_at

Database: oms_ai_learning — analytical / AI pipeline

High-volume write collections used by the AI learning loop. Aggregation-heavy reads run here without contending with order audit lookups. TTLs bound storage growth automatically.

CollectionPurposeTTLKey Indexes
sourcing_outcomesPer-allocation sourcing decision snapshot + delivery outcome labels. Written by sourcing worker at order time; enriched by carrier worker on SHIPPED/DELIVERED; labeled with outcome_score hourly by the learning worker.90 days — raw decisions expire; aggregated patterns retain the learnings(cluster_key, outcome_score), (node_id, sourced_at), (experiment_id, strategy_used)
sourcing_patternsAggregated node performance per order-feature cluster (channel|region|amount|type). Upserted nightly by discover_patterns. Feeds the AI sourcing prompt and the Patterns tab in AI Architect.180 days — stale clusters expire; nightly run re-creates active onescluster_key (unique), sample_count
node_performance_metricsRolling 7-day and 30-day stats per node: avg outcome score, avg delivery hours, backorder rate %, return rate %. One doc per (node_id, period_days) pair, replaced on each run. Feeds the AI sourcing prompt and the Node Performance tab.(node_id, period_days) (unique)

All MongoDB writes are non-blocking. The Motor async driver ensures no latency impact on the main request path. AI learning writes are additionally isolated in a separate database so nightly aggregations never compete with order audit reads for buffer pool or I/O.

5 Redis Key Schema

Key PatternTypeTTLPurpose
oms:versionSTRING24hCurrent application version
oms:statsHASHAggregate counters (orders, revenue)
oms:active_strategiesSTRING1hCached sourcing strategy list
env:{env_uuid}STRING (JSON)60sCached Environment record for the middleware hot path. Written on first miss from control DB. Evicted on environment update or status change. Prevents a control-plane DB query on every API request in a high-throughput environment.
celery:*VariousCelery broker/result state (DB 1 + DB 2)

Redis is used as the Celery broker (DB 1) and result backend (DB 2), keeping pipeline task state separate from application cache (DB 0). The env:{id} cache key is the critical performance optimization for the multi-tenant middleware — without it, every API request would require a synchronous lookup into oms_db to resolve the active environment.

6 Elasticsearch Indexes

oms_orders

Optimized for order search. Documents are indexed on order creation and updated on every status transition.

FieldTypeNotes
order_numberkeywordExact match
customer_nametextFull-text, fuzzy
customer_emailkeywordExact match
channel, status, fulfillment_typekeywordFilter / aggregation
total_amountfloatRange filter
created_atdateDate range filter, sort
tagskeyword[]Multi-value filter
line_itemsnestedNested SKU / product_name search

oms_products

Product catalog search backed by MongoDB documents synced to Elasticsearch.

FieldTypeNotes
skukeywordExact match
nametextFull-text search
descriptiontextFull-text search
categorykeywordFilter
pricefloatRange filter

7 Orders API /orders

MethodPathDescription
POST/orders/Create new order — triggers sourcing pipeline
GET/orders/List orders with filters (status, channel, date range, email)
GET/orders/{order_id}Get single order with all relationships
GET/orders/number/{order_number}Get order by human-readable order number
PATCH/orders/{order_id}/statusTransition order status (with validation)
POST/orders/{order_id}/cancelCancel an order (notifies customer + fires webhook)
GET/orders/{order_id}/eventsGet MongoDB audit trail events

Create Order — Request Body

{
  "channel": "WEB",
  "fulfillment_type": "SHIP_TO_HOME",
  "customer_email": "alice@example.com",
  "customer_name": "Alice Smith",
  "line_items": [
    {
      "sku": "SKU-WIDGET-A",
      "product_name": "Premium Widget A",
      "quantity": 2,
      "unit_price": 29.99
    }
  ],
  "shipping_address": {
    "address1": "123 Main St",
    "city": "New York",
    "state": "NY",
    "postal_code": "10001",
    "latitude": 40.7484,
    "longitude": -73.9967
  }
}

Order Status Values

PENDING CONFIRMED SOURCING SOURCED PICKING PACKING READY_TO_SHIP SHIPPED OUT_FOR_DELIVERY DELIVERED READY_FOR_PICKUP PICKED_UP CANCELLED RETURNED REFUNDED

8 Inventory API /inventory

MethodPathDescription
POST/inventory/Create inventory item for a node/SKU
GET/inventory/List inventory (filter by node, SKU, low-stock)
GET/inventory/sku/{sku}All node stock for a specific SKU
GET/inventory/productsAggregated product list grouped by SKU — totals across all nodes (search, node_id, low_stock_only filters)
PATCH/inventory/products/{sku}Update product-level attributes (name, cost, weight, reorder point) for all nodes at once
GET/inventory/{item_id}Single inventory item with history
PATCH/inventory/{item_id}Update item metadata (reorder point, cost, etc.)
POST/inventory/{item_id}/adjustApply stock adjustment — reason must be a valid enum value (see below)
POST/inventory/check-availabilityBulk availability check across all nodes
POST/inventory/transferTransfer stock between nodes

Adjustment Reason Codes

RECEIVED SOLD RETURNED DAMAGED CYCLE_COUNT TRANSFER_IN TRANSFER_OUT CORRECTION

Adjust Stock Example

{
  "quantity_delta": 50,
  "reason": "RECEIVED",
  "notes": "PO-2024-0042 inbound shipment",
  "created_by": "ops_user"
}

9 Fulfillment Nodes API /nodes

MethodPathDescription
POST/nodes/Register a new DC or store
GET/nodes/List nodes (filter by type, status, capabilities)
GET/nodes/{node_id}Get node details with current capacity
PATCH/nodes/{node_id}Update node configuration or status
DELETE/nodes/{node_id}Deactivate node (soft delete, sets status=INACTIVE)
GET/nodes/{node_id}/capacityGet daily capacity utilization percentage

10 Sourcing Rules API /sourcing-rules

MethodPathDescription
POST/sourcing-rules/Create new sourcing rule
GET/sourcing-rules/List rules sorted by priority (ascending)
GET/sourcing-rules/{rule_id}Get rule with full condition/strategy detail
PATCH/sourcing-rules/{rule_id}Update rule parameters
DELETE/sourcing-rules/{rule_id}Delete rule permanently
POST/sourcing-rules/{rule_id}/toggleEnable / disable rule without deleting
POST/sourcing-rules/evaluateManually run sourcing engine for a test order

12 Analytics API /analytics

MethodPathDescription
GET/analytics/dashboardFull KPI dashboard: revenue, order counts, breakdowns, top nodes, alerts
GET/analytics/orders/volumeDaily order volume and revenue over N days (default 30)
GET/analytics/inventory/summaryAggregate inventory health: total SKUs, on-hand, available, low stock count

Dashboard Response Shape

{
  "period_start": "2024-01-01",
  "period_end": "2024-12-31",
  "total_orders": 1247,
  "total_revenue": 184293.50,
  "avg_order_value": 147.79,
  "orders_by_status": { "DELIVERED": 891, "SHIPPED": 124, ... },
  "orders_by_channel": [
    { "channel": "WEB", "count": 623, "total_revenue": 98241.00, "percentage": 49.9 }
  ],
  "top_nodes": [
    { "node_id": "...", "node_name": "DC-EAST", "total_allocations": 412, "capacity_utilization": 0.61 }
  ],
  "inventory_alerts": [
    { "sku": "SKU-GADGET-X", "node": "STR-MIA-01", "available": 2, "reorder_point": 10 }
  ]
}

13 Webhooks API /webhooks

MethodPathDescription
POST/webhooks/endpointsRegister webhook endpoint with secret + event types
GET/webhooks/endpointsList all registered endpoints
GET/webhooks/endpoints/{id}Get endpoint details
PATCH/webhooks/endpoints/{id}Update URL, secret, or subscribed events
DELETE/webhooks/endpoints/{id}Delete endpoint permanently
POST/webhooks/endpoints/{id}/testSend a test order.test event
GET/webhooks/eventsList all delivery events with status
POST/webhooks/events/{id}/retryManually retry a failed delivery

14 Connectors API /connectors

Superadmin-only CRUD for managing platform integrations. The inbound webhook receiver is public (authenticated via HMAC) — all other endpoints require a superadmin JWT.

MethodPathAuthDescription
POST/connectors/SuperadminCreate a new connector
GET/connectors/SuperadminList connectors (filter by status)
GET/connectors/{id}SuperadminGet single connector with masked config
PATCH/connectors/{id}SuperadminUpdate name, direction, or config
DELETE/connectors/{id}SuperadminDelete connector and all events
POST/connectors/{id}/toggleSuperadminEnable / disable connector
POST/connectors/{id}/testSuperadminTest live API connection to the platform
GET/connectors/{id}/eventsSuperadminPaginated inbound / outbound event log
POST/connectors/generate-secretSuperadminGenerate a cryptographically secure webhook secret
POST/connectors/{id}/webhookPublicHMAC-validated inbound webhook receiver

Create Connector Example

curl -X POST http://localhost:8001/connectors/ \
  -H "Authorization: Bearer {superadmin_token}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "My Shopify Store",
    "connector_type": "SHOPIFY",
    "direction": "BIDIRECTIONAL",
    "config": {
      "shop_url": "mystore.myshopify.com",
      "access_token": "shpat_xxxxxxxxxxxx",
      "webhook_secret": "my-hmac-secret",
      "api_version": "2024-01"
    }
  }'

Note: Sensitive config keys (access_token, webhook_secret, api_key, etc.) are always masked as *** in all API responses.

15 AI Architect /architect

The AI Architect is a superadmin-only control center that closes the loop between order data and sourcing intelligence. It is the place where the system observes its own performance, proposes improvements, and waits for a human to say yes before changing anything.

Access: Navigate to AI Architect in the sidebar (visible to superadmin users only). The page has four tabs: Proposals, Patterns, Experiments, and Node Performance.

What does the AI Architect do?

Normally, sourcing rules are manually written by admins — you define conditions, pick a strategy, and the engine applies it. The AI Architect adds a self-improving layer on top:

  1. It watches every sourcing decision — every order allocation writes a record to MongoDB (sourcing_outcomes) capturing which node was chosen, at what cost, via which strategy.
  2. It labels outcomes once the order is delivered — after delivery, the learning worker scores the decision: was it fast? was the cost accurate? did the item backorder or get returned?
  3. It discovers patterns nightly — orders are grouped by channel, region, order-size bucket, and fulfillment type. Within each group, it ranks nodes by their historical outcome scores and compares AI-sourced orders against rule-based ones.
  4. It proposes improvements — when a pattern has enough data and AI consistently outperforms the baseline by ≥10%, the system drafts a new sourcing rule proposal and queues it for your review.
  5. You approve or reject it — nothing changes in the system until you click Approve → Apply. Every action is reversible.

Tab 1 — Proposals

Proposals are the core output of the learning loop. They represent changes the AI wants to make, held in a queue until a human reviews them.

Where do proposals come from?

  • Pattern discovery (nightly) — when the discover_patterns Celery task finds a cluster where AI_ADAPTIVE achieves ≥10% better outcome scores than DISTANCE_OPTIMAL across ≥50 labeled orders, it auto-creates a pending sourcing rule proposal.
  • Experiment completion (daily) — when an A/B experiment accumulates ≥50 labeled orders per arm and declares a winner, it creates a proposal to promote the winning strategy as a permanent rule.

Proposal lifecycle — step by step

① System creates proposal (status: PENDING) │ │ Admin reviews: title, confidence score, rationale, data evidence │ ├──► Click REJECT (provide reason) ──► status: REJECTED [no changes made] │ └──► Click APPROVE ──► status: APPROVED │ │ Admin is ready to apply the change │ └──► Click APPLY ──► status: APPLIED [change written to database] │ │ Something went wrong / want to undo │ └──► Click ROLLBACK ──► status: ROLLED_BACK [change undone]

Reading a proposal

Each proposal card shows:

  • Title — plain-language summary, e.g. "Use AI_ADAPTIVE for WEB|NY|100-250|SHIP_TO_HOME cluster (+14.2% better outcomes)"
  • Confidence score — 0–1 bar combining improvement magnitude and sample size. A score of 0.8+ means strong evidence; below 0.5 means the data is suggestive but not conclusive.
  • Rationale — the numbers behind the proposal: how many orders, what average outcome scores, which thresholds were met.
  • Proposal data — the exact JSON that will be written to the database if you click Apply. For a sourcing rule proposal this includes: name, strategy, priority, and the auto-generated conditions derived from the cluster's dimensions.
  • Status badge — PENDING (yellow), APPROVED (blue), APPLIED (green), REJECTED (red), ROLLED_BACK (gray).

Safety guarantee — applied proposals are additive only:
A sourcing rule proposal inserts a new row into sourcing_rules with is_active = False. It does not activate itself. After applying, you still need to go to the Sourcing Rules page and manually toggle it active — giving you a final review point before live traffic is affected.

Filtering proposals

Use the status filter tabs (All / Pending / Approved / Applied / Rejected) at the top of the Proposals tab to focus on what needs action. "Pending" is the most important queue — these are waiting for your decision.

Tab 2 — Patterns

The Patterns tab shows you what the system has learned from historical orders, grouped by order characteristics. This is the raw intelligence that proposals are built from.

What is a pattern cluster?

A cluster is a combination of four dimensions that describes a type of order:

DimensionExample valuesWhere it comes from
ChannelWEB, MOBILE, POS, MARKETPLACEorder.channel
RegionNY, CA, TX, UNKNOWNorder.shipping_state
Amount bucket0-50, 50-100, 100-250, 250-500, 500+bucketed from order.total_amount
Fulfillment typeSHIP_TO_HOME, STORE_PICKUP, SAME_DAY_DELIVERYorder.fulfillment_type

Example cluster key: WEB|NY|100-250|SHIP_TO_HOME — web orders shipped to New York customers, order value $100–$250.

Reading a pattern card

  • Sample count — total number of labeled (delivered) orders in this cluster. More samples = more reliable data. Below 50 samples, treat it as indicative only.
  • Node performance table — nodes ranked by avg_outcome_score for this cluster. Shows average delivery hours, average cost, and how many orders they've handled.
  • Best node — highlighted at the top. This is the node the system would prefer for AI_ADAPTIVE sourcing within this cluster.

Patterns are updated nightly. If a cluster has fewer than 10 samples, the AI sourcing advisor will not use it (fallback to DISTANCE_OPTIMAL instead).

What to look for

  • Clusters with high sample counts (100+) and a clear top node are good candidates for a dedicated sourcing rule.
  • If the top node's avg_delivery_hours is significantly lower than others, AI_ADAPTIVE will reliably pick it.
  • Clusters where multiple nodes have similar scores indicate the order type is not sensitive to node choice — no specific rule needed.

Tab 3 — Experiments

Experiments let you A/B test two sourcing strategies on live traffic before committing to one. A percentage of orders are routed to the "treatment" strategy; the rest continue with the "control". The system measures outcomes on both arms and declares a winner when the data is conclusive.

Creating an experiment

Click New Experiment and fill in:

FieldDescriptionExample
NameHuman-readable label for this testAI Sourcing Test — West Coast
Strategy A (control)The current / baseline strategyDISTANCE_OPTIMAL
Strategy B (treatment)The strategy you're testingAI_ADAPTIVE
Traffic split %Percentage of matching orders routed to strategy B. Range: 1–50%. Start conservatively (10%).10
Filter conditions (optional)Restrict the experiment to specific order types. Available keys: channel, fulfillment_type, region, amount_min, amount_max. Leave empty to apply to all orders.{"channel": "WEB", "region": "CA"}

Tip — start small: Begin with 10% traffic split on a specific channel or region. Once the experiment accumulates enough samples and you see a clear winner, you can promote it to a sourcing rule. You don't need 50/50 splits — even 10% treatment vs 90% control gives you a valid comparison.

How traffic splitting works

When an order is sourced, the engine checks all running experiments whose filter conditions match the order. For the first matching experiment, it uses a random draw:

# In sourcing_engine._check_experiment():
if random.random() * 100 < exp.traffic_split_pct:
    strategy = exp.strategy_b   # treatment arm (e.g. AI_ADAPTIVE)
else:
    strategy = exp.strategy_a   # control arm (e.g. DISTANCE_OPTIMAL)

The experiment ID is stamped on the sourcing_outcomes MongoDB document so the learning worker can attribute the outcome to the correct arm.

Reading experiment results

The results panel shows per-arm data once labeled outcomes are available (orders must reach DELIVERED status first):

  • Total orders — how many orders were routed to each arm (labeled + unlabeled).
  • Labeled orders — orders that have reached DELIVERED and have an outcome_score computed.
  • Avg outcome score — the primary metric. Higher is better. A difference of 0.05 or more (on a 0–1 scale) triggers automatic winner declaration.
  • Avg delivery hours — secondary indicator. Lower is better.
  • Backorder rate % — what % of orders in this arm had an item backordered.

When does an experiment end?

The evaluate_ai_experiments Celery task runs daily at 03:00 UTC and checks every running experiment. It declares a winner when all three conditions are met:

  • Each arm has ≥ 50 labeled outcomes
  • The absolute score difference between arms is ≥ 0.05
  • (If the difference is smaller, the experiment continues collecting data)

When a winner is declared, the experiment status changes to COMPLETED and a new sourcing_experiment proposal is created, suggesting you promote the winning strategy as a permanent sourcing rule.

Managing experiments

  • Pause — stops new orders from being routed to this experiment. Existing labeled outcomes are preserved. Useful if you see an unexpected issue with the treatment arm.
  • Resume — re-activates a paused experiment from where it left off.

Tab 4 — Node Performance

Shows rolling performance metrics for every fulfillment node, computed by the update_node_performance task every 4 hours.

  • 7-day view — reflects recent changes in node behavior. Use this to spot a node that has degraded in the last week (rising backorder rate, slower delivery).
  • 30-day view — a more stable baseline. Use this to compare nodes over a meaningful time horizon.

Each node card shows: orders fulfilled, avg outcome score, avg delivery hours, avg cost, backorder rate %, and return rate %. Nodes are ranked by avg outcome score descending. A node consistently in the top 3 by outcome score is a strong candidate to be the "best node" for matching order clusters.

Setting up AI sourcing on your orders

Follow these steps to enable AI-powered sourcing for a subset of your orders:

Option A — Direct rule (if you already trust the data)

  1. Go to Sourcing Rules in the sidebar.
  2. Create a new rule. Set Strategy to AI_ADAPTIVE.
  3. Add conditions to restrict which orders this rule applies to — e.g. channel = WEB and shipping_state = CA.
  4. Set a priority number lower than your other rules (lower number = higher priority, e.g. 10).
  5. Toggle the rule Active.
  6. From this point, matching orders will be scored by KubeAI using historical pattern data. If pattern data is insufficient, it automatically falls back to DISTANCE_OPTIMAL.

Option B — A/B experiment first (recommended for production)

  1. Go to AI Architect → Experiments tab.
  2. Create an experiment: Strategy A = DISTANCE_OPTIMAL, Strategy B = AI_ADAPTIVE, traffic split = 10%.
  3. Add filter conditions to target a specific order segment (e.g. a single channel or region).
  4. Wait until the experiment accumulates ≥50 labeled orders per arm (orders must be delivered). This typically takes 1–4 weeks depending on volume.
  5. When the experiment completes, review the auto-generated proposal in the Proposals tab.
  6. If the results are good, approve and apply the proposal. The new sourcing rule will be inserted (inactive); activate it in Sourcing Rules to go live.

Option C — Wait for automatic proposals

If you use AI_ADAPTIVE on some orders (via a rule), the learning pipeline will observe outcomes over time. Nightly pattern discovery will automatically create proposals when a specific order cluster shows ≥10% improvement. You just need to review the Proposals tab periodically.

Common workflows

"I want to know if AI sourcing is working better than my current setup"

  1. Go to AI Architect → Experiments, create an experiment comparing your current rule strategy against AI_ADAPTIVE.
  2. Monitor the Node Performance tab to ensure AI-sourced orders are going to high-performing nodes.
  3. After sufficient data is collected, check the experiment results. Compare avg_outcome_score, avg_delivery_hours, and backorder_rate between arms.
  4. If AI_ADAPTIVE wins, approve and apply the auto-generated proposal.

"I received a proposal — should I approve it?"

Look at three things:

  • Confidence score: Above 0.7 — generally safe to approve. Below 0.5 — let more data accumulate first (reject and wait).
  • Sample count in rationale: More than 100 total samples with 20+ AI samples is a solid basis. If the AI arm had only 10 orders, the improvement could be noise.
  • Improvement %: A 10–15% improvement is meaningful. 50%+ suggests something unusual — verify that the cluster key is sensible and not covering a very small order type.

When you click Apply, the sourcing rule is created inactive. Review it in Sourcing Rules before activating it to see the exact conditions and confirm they look right.

"I applied a proposal but the new sourcing rule isn't performing as expected"

  1. Go to AI Architect → Proposals, find the applied proposal, and click Rollback. This deletes the inserted sourcing rule.
  2. The original routing behavior resumes immediately (no restart needed).
  3. Alternatively, go to Sourcing Rules and deactivate the rule rather than rolling back — this preserves it for future reference.

"How do I see what node was chosen for a specific order and why?"

Go to Orders → [order] → Sourcing Decision. The decision trail shows: strategy used, rule matched, every candidate node with its distance/cost/score, and — for AI-sourced orders — the AI score and KubeAI's one-sentence reasoning per node.

API Reference

All endpoints require superadmin JWT. Base path: /architect.

Proposals

MethodPathDescription
GET/architect/proposalsList proposals — filter by status and proposal_type. Params: limit (max 200), offset.
GET/architect/proposals/{id}Full proposal detail including rationale, proposal_data JSON, and rollback_data.
POST/architect/proposals/{id}/approveTransition PENDING → APPROVED. Records approver email. No DB changes yet.
POST/architect/proposals/{id}/rejectTransition to REJECTED. Body: {"reason": "string"}. Can reject PENDING or APPROVED.
POST/architect/proposals/{id}/applyExecute the change (APPROVED only). Writes rollback_data before committing. Transitions to APPLIED.
POST/architect/proposals/{id}/rollbackUndo an APPLIED proposal using stored rollback_data. Transitions to ROLLED_BACK.

Patterns & Performance

MethodPathDescription
GET/architect/patternsDiscovered clusters from MongoDB. Params: channel filter, limit (max 100). Sorted by sample_count desc.
GET/architect/node-performanceRolling node metrics. Params: period_days=7 or 30, limit (max 200).
GET/architect/ai-sourcing/performancePer-strategy outcome aggregates + AI vs baseline improvement %. Reads from sourcing_outcomes.

Experiments

MethodPathDescription
GET/architect/experimentsList experiments. Filter: status=running|paused|completed.
POST/architect/experimentsCreate experiment. Body: name, strategy_a, strategy_b, traffic_split_pct (1–50), filter_conditions. Starts in RUNNING status immediately.
POST/architect/experiments/{id}/pausePause a running experiment. New orders stop being routed to it; existing outcomes are preserved.
POST/architect/experiments/{id}/resumeResume a paused experiment.
GET/architect/experiments/{id}/resultsLive per-arm aggregation from MongoDB: outcome scores, delivery hours, backorder rates, labeled vs total order counts.

Strictly additive — the safety contract:
Applied proposals only ever INSERT new rows or ADD columns. They never UPDATE existing sourcing rules, never DELETE data, and never run DDL that could affect existing rows. Every apply stores rollback_data before committing, so any change can be undone in one click. Sourcing rules are always inserted with is_active=False — a final human gate before live traffic is affected.

16 Sourcing Rules Engine

File: app/services/sourcing_engine.py

The engine is the intelligence core. It runs every time an order needs to be sourced, evaluating prioritized rules to determine which fulfillment nodes should handle each SKU.

Processing Pipeline

Order │ ▼ RuleSelector ──► Find highest-priority SourcingRule where ALL conditions match │ ▼ NodeFilter ──► Apply: node type filter, capability filter, distance filter, │ capacity filter, excluded node list ▼ InventoryLoader ──► Load quantity_available per (node, SKU) in one query │ ▼ NodeScorer ──► Compute normalized score per strategy │ ▼ AllocationDecider ──► Single-node or split allocation │ ▼ Persist ──► Write FulfillmentAllocation rows + reserve inventory

Seven Sourcing Strategies

DISTANCE_OPTIMAL

score = distance_norm × 0.7 + inventory_norm × 0.3

Picks the node closest to the customer's shipping address. Uses Haversine great-circle distance. Falls back to split if no single node can fulfill all items.

COST_OPTIMAL

score = cost_norm × cost_weight + distance_norm × distance_weight

Minimizes total estimated shipping cost (base_rate + per_km × distance × node_multiplier). Weights configurable per rule (default 50/50).

STORE_NEAREST

score = distance_norm × 0.7 + inventory_norm × 0.3

Identical to DISTANCE_OPTIMAL but pre-restricts nodes to RETAIL_STORE and DARK_STORE types. Used for same-day delivery.

INVENTORY_RESERVATION

score = inventory_norm × 0.8 + distance_norm × 0.2

Prefers nodes with deepest available stock — reduces chance of a reservation failing downstream. Useful for high-velocity SKUs.

LEAST_COST_SPLIT

Greedy algorithm that assigns each SKU to the cheapest eligible node:

  1. Sort SKUs by fulfillability (hardest first = fewest nodes with stock)
  2. For each SKU, iterate nodes in score order
  3. Allocate as much as possible from each node until the full quantity is covered
  4. Enforces max_split_nodes limit

AI_ADAPTIVE

final_score = 0.6 × ai_score + 0.4 × rule_score

Uses KubeAI to score candidate nodes based on historical pattern clusters and rolling 7-day performance data. Automatically falls back to DISTANCE_OPTIMAL if: pattern sample count < 10, KubeAI API error, or max AI score < 0.4.

AI_HYBRID

final_score = 0.6 × ai_score + 0.4 × rule_score

Identical to AI_ADAPTIVE with the same blend formula. Intended for transitional rollouts where full AI trust is not yet established. Distinguishable in sourcing_outcomes by strategy_used field.

Condition Operators

OperatorExample
EQUALSchannel == WEB
NOT_EQUALSchannel != MARKETPLACE
GREATER_THANtotal_amount > 200
LESS_THANtotal_amount < 50
GREATER_THAN_OR_EQUALtotal_amount >= 100
LESS_THAN_OR_EQUALtotal_amount <= 500
INshipping_state IN [NY, NJ, CT]
NOT_INchannel NOT_IN [POS]
CONTAINScustomer_email CONTAINS example.com
STARTS_WITHshipping_state STARTS_WITH N

Haversine Distance Formula

Used for all distance-based sourcing calculations. Accuracy: ±0.5% vs actual road distance.

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0  # Earth radius in km
    φ1, φ2 = radians(lat1), radians(lat2)
    Δφ = radians(lat2 - lat1)
    Δλ = radians(lon2 - lon1)
    a = sin(Δφ/2)**2 + cos(φ1)*cos(φ2)*sin(Δλ/2)**2
    return R * 2 * asin(sqrt(a))

17 AI-Native Architecture — How It Works

The closed loop

The OMS has a continuous feedback loop between order execution and sourcing intelligence. Every sourcing decision is an experiment. Every delivery is a data point. The system accumulates evidence, identifies patterns, and proposes improvements — all without touching any existing configuration until a human approves.

┌─────────────────────────────────┐ │ ORDERS FLOW IN │ └────────────────┬────────────────┘ │ ▼ ┌─────────────────────────────────┐ │ sourcing_engine.source_order │ │ ┌─────────────────────────┐ │ │ │ experiment check │ │ │ │ (A/B traffic split) │ │ │ └──────────┬──────────────┘ │ │ │ strategy selected │ │ ┌──────────▼──────────────┐ │ │ │ AI_ADAPTIVE? │ │ │ │ → KubeAI scores nodes │ │ │ │ → fallback if needed │ │ │ └──────────┬──────────────┘ │ │ │ allocations made │ └─────────────┼───────────────────┘ │ ▼ ┌─────────────────────────────────┐ │ sourcing_outcomes │ │ (MongoDB: oms_ai_learning) │ │ per-allocation snapshot: │ │ strategy, node, cost, AI score │ │ cluster_key, experiment_id │ └────────────────┬────────────────┘ │ (order delivered) ▼ ┌─────────────────────────────────┐ │ label_sourcing_outcomes (hourly)│ │ computes outcome_score 0–1 │ │ based on delivery time, cost │ │ variance, backorders, returns │ └────────────────┬────────────────┘ │ (nightly 02:00 UTC) ▼ ┌─────────────────────────────────┐ │ discover_patterns │ │ group by cluster_key × node_id │ │ rank nodes by avg outcome_score │ │ compare AI vs DISTANCE_OPTIMAL │ │ → create AIProposal if AI wins │ └────────────────┬────────────────┘ │ ▼ ┌─────────────────────────────────┐ │ AI Architect UI (Proposals tab) │ │ Admin: Approve → Apply │ │ New sourcing rule inserted │ │ (is_active=False until enabled) │ └─────────────────────────────────┘

Design principles

PrincipleWhat it means in practice
Additive-onlyAI can only INSERT new rows or ADD nullable columns. It cannot UPDATE existing rules, DROP tables, or DELETE data. Every database write is audited.
Human-gatedProposals sit in PENDING status indefinitely. Nothing is applied until a superadmin explicitly clicks Approve → Apply. The admin sees the full rationale and exact data to be written before confirming.
Fallback-safeEvery AI code path has a fallback. If KubeAI returns an error, times out, or gives low-confidence scores (max < 0.4), the sourcing engine silently falls back to DISTANCE_OPTIMAL. Orders are never held waiting for the AI.
Fully auditedEvery sourcing decision writes a sourcing_outcomes document. Every proposal records who approved it, who applied it, and stores rollback_data before the change is committed.
Evidence-backedProposals are only generated when statistical thresholds are met: ≥50 total samples, ≥10 AI-sourced samples, ≥10% improvement. Confidence scores reflect both margin of improvement and sample size.

Layer 1 — Intelligence data foundation

The entire learning system is built on one collection: sourcing_outcomes in the oms_ai_learning MongoDB database. Every time the sourcing engine assigns an order to a node, it writes a document capturing a snapshot of the decision. This database is separate from oms_events so that nightly aggregation queries do not contend with transactional order audit reads.

sourcing_outcomes document lifecycle

Written at sourcing time (by sourcing worker): order_id, allocation_id, node_id, node_name, sku strategy_used, rule_applied sourcing_score, predicted_cost, predicted_distance_miles channel, region, amount_bucket, fulfillment_type, cluster_key sourced_at, experiment_id (if any) ai_score, ai_reasoning (if AI_ADAPTIVE was used) Updated at delivery (by carrier worker): actual_delivery_hours ← set when order status → DELIVERED actual_cost, cost_variance_pct Updated at backorder/return (by fulfillment/order workers): was_backordered = true ← set if item goes to backorder was_returned = true ← set if order is returned Labeled (by label_sourcing_outcomes task, hourly): outcome_score = computed 0–1 quality score labeled_at

Outcome score formula

outcome_score = (
    0.4 * delivery_score    # 1.0 if ≤24h | 0.5 if ≤48h | 0.2 if ≤72h | 0.0 if >72h
  + 0.3 * cost_score        # 1.0 if variance ≤5% | 0.7 if ≤15% | 0.3 if ≤25% | 0.0 if >25%
  + 0.2 * (1 - backordered) # 0.2 if no backorder, 0.0 if backordered
  + 0.1 * (1 - returned)    # 0.1 if not returned, 0.0 if returned
)

A perfect sourcing decision (delivered in under 24 hours, cost within 5% of estimate, no backorder, no return) scores 1.0. A bad one (72+ hour delivery, 25%+ cost overrun, backorder, and return) scores 0.0.

Layer 2 — AI sourcing engine

When a sourcing rule uses AI_ADAPTIVE or AI_HYBRID strategy, the AISourcingAdvisor is invoked before the final node is chosen.

What KubeAI receives

The advisor builds a structured prompt containing:

  • Order context — channel, fulfillment type, customer region, order amount, SKU count.
  • Historical patterns — the top matching cluster(s) for this order type, with node rankings and sample counts. Up to 3 clusters are included (exact match first, then broader channel+fulfillment_type matches).
  • Node performance — rolling 7-day metrics for each candidate node: orders fulfilled, avg outcome score, avg delivery hours, backorder rate.
  • Candidates — each eligible node with its distance, estimated cost, and available inventory.

Score blending

# AI_ADAPTIVE: KubeAI score weighted 60%, rule-based score 40%
final_score = 0.6 * ai_score + 0.4 * rule_score

# AI_HYBRID: same formula, different label in sourcing_outcomes
# (useful for distinguishing "AI primary" vs "transitional AI" in analytics)

Fallback triggers

ConditionFallback behaviour
Pattern sample count < 10Skip KubeAI entirely, use DISTANCE_OPTIMAL. Logged as fallback_reason: "Insufficient pattern data".
KubeAI API error or timeout (10s)Skip KubeAI, use DISTANCE_OPTIMAL. Error logged with exception type.
Max AI score across all nodes < 0.4KubeAI scored all nodes poorly — use DISTANCE_OPTIMAL as more reliable. Logged with the max score.
KubeAI returns invalid JSONSkip KubeAI, use DISTANCE_OPTIMAL. Logged with the raw response snippet.

Fallback decisions are visible in the order's sourcing decision trail under rule_details.ai_sourcing.fallback_used and fallback_reason.

Layer 3 — Pattern discovery

The PatternDiscoveryService runs nightly (Celery beat, 02:00 UTC) via the discover_patterns task in the learning queue.

Step 1 — Aggregate patterns

MongoDB $group aggregation on sourcing_outcomes where outcome_score exists, grouped by (cluster_key, node_id). For each cluster, nodes are ranked by avg_outcome_score and upserted into sourcing_patterns.

Step 2 — Compare strategies

A second aggregation groups by (cluster_key, strategy_used) to get avg outcome scores per strategy per cluster. Qualifying clusters must meet all three thresholds:

  • Total samples (AI + baseline) ≥ 50
  • AI_ADAPTIVE samples ≥ 10
  • AI improvement over DISTANCE_OPTIMAL ≥ 10%

Step 3 — Generate proposals

For each qualifying cluster, one pending AIProposal is created — but only if no active (PENDING or APPROVED) proposal already exists for that cluster key. This prevents duplicate proposals. The proposal includes auto-generated sourcing rule conditions derived from the cluster dimensions (channel, region, amount range, fulfillment type).

Layer 4 — A/B experiments

Experiments allow side-by-side comparison of any two sourcing strategies on real orders without committing to a permanent rule change. See §15 AI Architect for the full experiment usage guide.

Experiment evaluation (daily 03:00 UTC)

The evaluate_ai_experiments task checks all running experiments. Winner declaration requires:

  • Each arm has ≥ 50 labeled outcomes (orders that have reached DELIVERED)
  • Absolute outcome score difference between arms ≥ 0.05

If neither arm has a clear edge, the experiment continues until both thresholds are met or an admin manually stops it.

Layer 5 — Node performance metrics

The update_node_performance task runs every 4 hours. It aggregates sourcing_outcomes over the last 7 and 30 days per node, computing: orders fulfilled, avg outcome score, avg delivery hours, avg actual cost, backorder rate %, return rate %. These metrics feed into the AI sourcing prompt as the "NODE PERFORMANCE" context block.

MongoDB collections reference

All three AI learning collections live in oms_ai_learning (config key: MONGODB_AI_DB), separate from the oms_events transactional database.

CollectionDatabaseTTLWritten byRead byPurpose
sourcing_outcomesoms_ai_learning90 dayssourcing worker (skeleton at allocation time), carrier worker (actual cost on SHIPPED, delivery hours on DELIVERED), learning worker (outcome_score hourly)AISourcingAdvisor, pattern discovery, experiment evaluator, AI Architect UIPer-allocation decision log and learning data. The primary raw material for the entire AI pipeline.
sourcing_patternsoms_ai_learning180 daysdiscover_patterns (nightly 02:00 UTC)AISourcingAdvisor._fetch_context(), AI Architect Patterns tabAggregated node performance per order cluster. Stale clusters expire automatically; nightly run re-creates any cluster with recent activity.
node_performance_metricsoms_ai_learningupdate_node_performance (every 4h)AISourcingAdvisor._fetch_context(), AI Architect Node Performance tabRolling 7-day and 30-day node stats. One doc per (node_id, period_days); replaced in full on each run.

PostgreSQL tables reference

TablePurpose
ai_proposalsAll AI-generated change proposals with full lifecycle tracking (status, approved_by, rollback_data)
ai_experimentsA/B experiment definitions (strategy pair, traffic split, filter conditions, results)
sourcing_outcome_labelsPostgreSQL mirror of labeled MongoDB outcomes for fast SQL analytics
custom_attribute_definitionsDynamic schema extensions (proposed by AI, stored here, rendered in forms via existing JSONB columns)
ui_widgetsJSON-configured dashboard widgets (proposed by AI, rendered by DynamicWidget component)

Celery learning tasks

TaskQueueScheduleWhat it does
label_sourcing_outcomeslearningEvery hourFinds delivered orders with unlabeled outcomes, computes and writes outcome_score. Processes up to 500 docs per run.
discover_patternslearningNightly 02:00 UTCAggregates patterns, ranks nodes per cluster, generates proposals for qualifying clusters.
update_node_performancelearningEvery 4 hoursRecomputes rolling 7-day and 30-day node metrics from labeled outcomes.
evaluate_ai_experimentslearningDaily 03:00 UTCChecks running experiments, declares winners, generates sourcing_experiment proposals.

18 Fulfillment Pipeline

Order Status State Machine

PENDING │ (order confirmed / payment authorized) ▼ CONFIRMED │ (sourcing engine runs) ▼ SOURCING ──► SOURCED │ ▼ PICKING │ ▼ PACKING │ ▼ READY_TO_SHIP │ (carrier booked) ▼ SHIPPED │ (tracking events) ▼ OUT_FOR_DELIVERY │ ▼ DELIVERED ◄─── PICKED_UP (for BOPIS) │ ▼ RETURNED ──► REFUNDED ── from any pre-shipped state ──► CANCELLED

Fulfillment Types & Required Capabilities

TypeDescriptionRequired Capability
SHIP_TO_HOMEStandard home deliverycan_ship = true
STORE_PICKUPBuy online, pick up in store (BOPIS)can_pickup = true
SHIP_FROM_STORERetail store ships the ordercan_ship = true
CURBSIDE_PICKUPDrive-up curbside pickupcan_curbside = true
SAME_DAY_DELIVERYSame-day home deliverycan_same_day = true

19 Celery Workers

QueueWorker FileTasks
sourcingworkers/sourcing.pysource_order — runs the full sourcing engine; writes sourcing_outcomes to MongoDB
fulfillmentworkers/fulfillment.pystart_picking, complete_packing, reset_node_daily_counters
carrierworkers/carrier.pybook_shipment, simulate_delivery, sync_all_tracking
notificationsworkers/notifications.pysend_order_confirmation, send_shipment_notification, send_delivery_notification, send_cancellation_notification
webhooksworkers/webhooks.pydispatch_webhook, retry_failed_webhooks, retry_webhook_event
connectorsworkers/connectors.pysync_fulfillment_to_connector, sync_order_cancel_to_connector, poll_amazon_orders
learningworkers/learning.pylabel_sourcing_outcomes, discover_patterns, update_node_performance, evaluate_ai_experiments — low-priority AI learning loop

Celery Beat Schedule

TaskScheduleDescription
reset_node_daily_countersDaily 00:00 UTCReset current_daily_orders on all active nodes
retry_failed_webhooksEvery 5 minutesRetry FAILED webhook events that are due for retry
sync_all_trackingEvery 15 minutesSync carrier tracking for all in-transit shipments
retry_backordered_ordersEvery 30 minutesRe-run sourcing engine for orders stuck in backorder state
poll_amazon_ordersEvery 15 minutesPoll all active Amazon SP-API connectors for new Unshipped/PartiallyShipped orders
label_sourcing_outcomesEvery hourCompute outcome_score for DELIVERED orders; write labels to MongoDB + PostgreSQL
update_node_performanceEvery 4 hoursCompute rolling 7d/30d node stats from labeled outcomes
discover_patternsDaily 02:00 UTCAggregate patterns, compare strategies, auto-generate pending AIProposal records
evaluate_ai_experimentsDaily 03:00 UTCCompute per-arm outcomes; declare winner when ≥50 samples per arm + score diff ≥0.05

Start Workers

celery -A app.workers.celery_app worker \
  --loglevel=info \
  -Q sourcing,fulfillment,carrier,notifications,webhooks,connectors,learning \
  --concurrency=4

Flower monitoring UI: http://localhost:5555 — real-time task queue visualization, worker status, and task history.

21 Webhook System

HMAC-SHA256 Signing

Every outbound webhook request is cryptographically signed. Receivers can verify payload integrity:

POST https://your-endpoint.com/hooks HTTP/1.1
Content-Type: application/json
X-OMS-Signature: sha256=abc123...
X-OMS-Timestamp: 1708000000
X-OMS-Event: order.shipped

Verification (Python)

import hmac, hashlib

def verify_webhook(body: bytes, signature: str, secret: str) -> bool:
    expected = "sha256=" + hmac.new(
        secret.encode(), body, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

Supported Event Types

order.created order.confirmed order.sourced order.picking order.packed order.shipped order.delivered order.cancelled order.test

Retry Strategy (Exponential Backoff)

AttemptDelayStatus
1st retry5 minutesPENDING
2nd retry10 minutesPENDING
3rd retry20 minutesPENDING
4+ retriesABANDONED

22 Connector System

The Connector System enables bidirectional integration between the OMS and external platforms — e-commerce engines, carriers, WMS, and TMS systems. Each connector has a configurable direction (inbound, outbound, or bidirectional) and handles its own authentication protocol.

Architecture

Shopify                              OMS (single URL per connector)
────────                             ─────────────────────────────
orders/create  ──┐
orders/paid    ──┼──► POST /connectors/{id}/webhook
products/create──┤        │ HMAC-SHA256 validated
products/update──┘        │ routes by X-Shopify-Topic header
                          │
             ┌────────────┴────────────────────────┐
             │                                      │
         Order topics                         Product topics
             │                                      │
             ▼                                      ▼
 normalize_order() → OMS Order         normalize_product() → InventoryItems
 (channel=MARKETPLACE,                 (upsert per active node,
  connector_id set, dedup check)        seed stock from Shopify qty)
             │
             ▼  (when order.status → SHIPPED)
 Celery: sync_fulfillment_to_connector
             │
             ▼
 push_fulfillment() → POST /orders/{shopify_id}/fulfillments
             │
             ▼
 Shopify notifies buyer of tracking info

Supported Platforms

PlatformCategoryDirectionStatus
ShopifyE-commerceBidirectionalLive
WooCommerceE-commerceBidirectionalPlanned
Amazon SP-APIE-commerceBidirectional (polling)Live
Magento 2E-commerceBidirectionalPlanned
BigCommerceE-commerceBidirectionalPlanned
FedExCarrierOutboundPlanned
UPSCarrierOutboundPlanned
DHLCarrierOutboundPlanned
CustomGenericInboundFramework ready

Shopify Setup

1. In the OMS Admin Console → Connectors, click Add Connector and select Shopify.

2. Enter your Shopify store URL, Admin API access token, and generate a webhook secret.

3. Copy the webhook URL displayed in the connector card:

https://your-oms.example.com/connectors/{connector_id}/webhook

4. In Shopify Admin → Settings → Notifications → Webhooks, register the same URL for each topic you want to receive. All topics share one URL and one signing secret:

orders/create orders/paid products/create products/update

Order topics → import as OMS orders. Product topics → upsert InventoryItem records across all active fulfillment nodes.

5. Enable the connector (toggle to Active). Orders and product catalog changes from Shopify will automatically sync into the OMS.

Amazon SP-API Setup

Amazon uses polling, not webhooks. The OMS polls the SP-API every 15 minutes automatically via a Celery Beat scheduled task — no webhook registration is required.

1. In Seller Central, navigate to Apps & Services → Develop Apps. Create a new app to obtain your client_id and client_secret.

2. Generate a refresh_token by completing the OAuth authorization flow for your selling account.

3. In the OMS Admin Console → Connectors, click Add Connector and select Amazon SP-API. Provide the following fields:

Config FieldDescription
client_idLWA (Login with Amazon) client ID from Seller Central app
client_secretLWA client secret from Seller Central app
refresh_tokenOAuth refresh token authorizing OMS to act on behalf of your seller account
marketplace_idSP-API marketplace ID (e.g. ATVPDKIKX0DER for US)
seller_idYour Amazon seller / merchant ID
regionAPI region endpoint: na, eu, or fe

4. Click Test Connection. The OMS calls GET /sellers/v1/marketplaceParticipations to verify credentials and retrieve marketplace details.

5. Enable the connector. The Celery Beat schedule (poll_amazon_orders) will automatically pull new orders every 15 minutes and import them as OMS orders with channel=MARKETPLACE.

No webhook registration needed for Amazon. All inbound order data flows through the polling task. The OMS deduplicates by external_order_id (Amazon order ID) so repeated polls are safe.

Multi-Topic Support (Same URL)

A single connector webhook URL handles every event topic Shopify sends to it. The OMS routes by the X-Shopify-Topic header after HMAC validation:

TopicHandlerOMS Action
orders/createnormalize_order()Create OMS Order (dedup by external_order_id)
orders/paidnormalize_order()Create OMS Order (alternate payment trigger)
products/createnormalize_product()Insert InventoryItem rows for each variant × active node
products/updatenormalize_product()Update product_name / price / active flag; preserve stock levels
any other topicAccepted (200 OK) and logged; payload ignored

Note — Amazon SP-API: Amazon does not use webhooks. Instead, the OMS polls the SP-API every 15 minutes via a Celery Beat scheduled task to pull new orders. See the Amazon SP-API Setup section below for configuration details.

Inventory seeding: When a new SKU arrives via products/create, the OMS creates one InventoryItem per active fulfillment node seeded with Shopify's inventory_quantity. On subsequent products/update events, only metadata (name, price, active flag) is updated — stock quantities are left untouched because the OMS is the system of record for inventory after initial import.

Inbound Webhook Security

Connector webhook endpoints (POST /connectors/{id}/webhook) are exempt from JWT authentication and instead validated using platform-native HMAC signatures.

# Shopify HMAC validation (inside ShopifyConnector)
import hmac, hashlib, base64

def validate_webhook(self, headers: dict, raw_body: bytes) -> bool:
    hmac_header = headers.get("x-shopify-hmac-sha256", "")
    secret = self.config.get("webhook_secret", "")
    digest = base64.b64encode(
        hmac.new(secret.encode(), raw_body, hashlib.sha256).digest()
    ).decode()
    return hmac.compare_digest(digest, hmac_header)

Outbound Fulfillment Sync

When an OMS order (sourced from a connector) transitions to SHIPPED status, a Celery task is automatically enqueued on the connectors queue to push the tracking update back to the originating platform.

# Triggered automatically from PATCH /orders/{id}/status
if payload.status == OrderStatus.SHIPPED and order.connector_id:
    background_tasks.add_task(_trigger_connector_sync, str(order.id))

Outbound Sync — OMS → Platform

When the OMS changes an order's state or adjusts inventory, it pushes the update back to the originating platform. Each push is handled by a dedicated Celery task on the connectors queue.

Shopify

TriggerCelery TaskAPI CallEffect
OMS order → SHIPPED sync_fulfillment POST /orders/{id}/fulfillments.json Shopify marks the order fulfilled and emails tracking info to the buyer
Inventory adjusted in OMS push_inventory_to_connectors POST /inventory_levels/set.json Syncs on-hand quantity for the SKU at the given Shopify location. Requires primary_location_id (fetched during Test Connection) and shopify_inventory_item_id (populated when the products/create webhook is received)
OMS order → CANCELLED (sourced from Shopify) sync_order_cancel POST /orders/{id}/cancel.json Cancels the corresponding order in Shopify

Amazon SP-API

TriggerAPI CallEffect
OMS order → SHIPPED POST /orders/v0/orders/{amazonOrderId}/shipment Confirms shipment to Amazon with carrier code and tracking number; Amazon notifies the buyer
Inventory adjusted in OMS PATCH /listings/2021-08-01/items/{sellerId}/{sku} (Listings Items API) Updates the fulfillment_availability attribute for the listing to reflect current on-hand quantity
OMS order → CANCELLED Not supported via SP-API for seller-initiated cancels. Buyers must cancel through Amazon. The OMS records the cancellation locally and logs a connector_events entry, but no API call is made.

Extending with New Connectors

All connectors inherit from BaseConnector in app/services/connectors/base.py. Four methods are abstract (required); two are optional overrides for product/catalog sync:

class BaseConnector(ABC):
    # ── Required (abstract) ──────────────────────────────────────
    def validate_webhook(self, headers, raw_body) -> bool: ...
    def get_event_type(self, headers) -> str: ...
    def normalize_order(self, payload) -> dict: ...       # → OMS OrderCreate dict
    async def push_fulfillment(self, order, shipment) -> dict: ...
    async def test_connection(self) -> dict: ...

    # ── Optional (default: empty set / empty list) ────────────────
    def get_inbound_topics(self) -> set[str]: ...         # order-creation topics
    def get_product_topics(self) -> set[str]: ...         # product-sync topics
    def normalize_product(self, payload) -> list[dict]: ...  # → variant dicts

    # ── Outbound sync (default: no-op) ───────────────────────────
    async def push_inventory_update(self, sku, quantity_available, mapping) -> dict: ...
    async def push_order_cancel(self, order) -> dict: ...
MethodSignatureDescription
validate_webhook(headers, raw_body) → boolValidate platform HMAC/signature on inbound webhook
get_event_type(headers) → strExtract topic/event type from request headers
normalize_order(payload) → dictTransform platform order payload into OMS OrderCreate dict
push_fulfillment(order, shipment) → dictPush tracking/fulfillment confirmation to platform
test_connection() → dictVerify credentials and return platform metadata
push_inventory_update(sku, quantity_available, mapping) → dictPush current stock level for a SKU to the platform. mapping is the connector_inventory_mappings row for the SKU.
push_order_cancel(order) → dictPush cancellation to the platform for an OMS order that was cancelled. No-op by default; Amazon override is a local-only log.

Register the new class in app/services/connectors/registry.py to make it available.

Event Audit Log

Every inbound and outbound connector activity is recorded in the connector_events table:

event_typedirectionDescription
orders/createinboundOrder received from platform and imported into OMS
orders/paidinboundPayment confirmed — alternative trigger for order import
products/createinboundProduct created in Shopify — variants upserted as InventoryItems in all active nodes
products/updateinboundProduct updated — metadata refreshed; stock levels preserved
fulfillment.pushedoutboundTracking/fulfillment update sent to platform
webhook.errorinboundSignature validation or parsing failed

23 Monitoring & Traceability

The OMS has a built-in error monitoring console accessible at Admin → Monitoring. Every unhandled exception across all services — FastAPI routes, Celery workers, background tasks — is automatically captured, fingerprinted, and stored in MongoDB for analysis.

Error Capture Pipeline

Exception raised (anywhere)
        │
        ▼
capture_error() / capture_error_sync()   ← app/services/monitoring.py
        │
        ├─► error_events (MongoDB)   — one document per occurrence, 30-day TTL
        │     event_id, fingerprint, timestamp, level, source_service,
        │     error_type, error_message, stack_frames,
        │     request_context, task_context, order_context
        │
        └─► error_issues (MongoDB)   — one document per unique fingerprint
              fingerprint, status (open/resolved/muted),
              occurrence_count, first_seen_at, last_seen_at

Source Services

SourceConstantWhere instrumented
apiSOURCE_APIGlobal FastAPI exception handler + all unhandled route errors
sourcing_workerSOURCE_SOURCINGCelery source_order task
fulfillment_workerSOURCE_FULFILLMENTCelery start_picking and complete_packing
carrier_workerSOURCE_CARRIERCelery book_shipment
webhook_workerSOURCE_WEBHOOKCelery dispatch_webhook
connector_workerSOURCE_CONNECTORCelery sync_fulfillment_to_connector + inbound background tasks

Monitoring API /monitoring

All endpoints require superadmin authentication.

MethodPathDescription
GET/monitoring/summaryQuick stats: open issues, errors/1h, errors/24h, warnings/24h
GET/monitoring/eventsPaginated raw event list — filter by time, level, source, error_type, order_id
GET/monitoring/events/{event_id}Single event with full stack trace
GET/monitoring/issuesAggregated issues — filter by status, source, level
GET/monitoring/issues/{fingerprint}Single issue with 10 most recent occurrences
PATCH/monitoring/issues/{fingerprint}Update status: open, resolved, muted (+ mute_hours)
GET/monitoring/metrics/rateError rate over time bucketed by hour
GET/monitoring/metrics/topTop N issues by occurrence count
GET/monitoring/metrics/sourcesError breakdown by source service
POST/monitoring/test-errorInject a synthetic test error to verify the capture pipeline

Fingerprinting

Each error is assigned a stable 16-character fingerprint so that repeated occurrences of the same error (e.g. a recurring database connection timeout) are grouped into a single issue rather than creating noise:

fingerprint = SHA256(f"{source_service}:{error_type}:{top_stack_frame}")[:16]

The top stack frame is the deepest frame inside application code (excluding library internals), extracted from the Python traceback.

Issue Lifecycle

StatusMeaningTransitions
openActive — being investigated→ resolved, → muted
resolvedFixed — will reopen if the error recurs→ open (auto on new occurrence)
mutedSuppressed for N hours→ open (after muted_until expires)

Verifying the Pipeline

curl -X POST http://localhost:8001/monitoring/test-error \
  -H "Authorization: Bearer {superadmin_token}" \
  -H "Content-Type: application/json" \
  -d '{"message": "Smoke test", "source": "api", "level": "ERROR"}'

After calling this endpoint, navigate to Monitoring → Issues or Monitoring → Events in the UI to confirm the entry appears.

24 Configuration

All configuration is managed via environment variables. For local development, create a .env file in the project root.

VariableDefaultDescription
Data Layer
DATABASE_URLpostgresql+asyncpg://...Async PostgreSQL URL — tenant data-plane database (oms_db on main pod, oms_{org}_{env} on tenant pods)
SYNC_DATABASE_URLpostgresql+psycopg2://...Sync PostgreSQL URL for Celery workers (same DB as DATABASE_URL)
CONTROL_DATABASE_URLblank (= DATABASE_URL)Multi-tenant only. Points every pod at the shared oms_db control-plane database. Set on tenant pods: postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_db. Blank on the main pod (falls back to DATABASE_URL).
MONGODB_URLmongodb://...MongoDB connection string
MONGODB_DBoms_eventsEvents MongoDB database name (overridden per-environment to oms_events_{org}_{env})
MONGODB_AI_DBoms_ai_learningAI learning MongoDB database name (overridden per-environment)
REDIS_URLredis://:pass@.../0Redis URL for application cache and environment resolution (DB 0)
CELERY_BROKER_URLredis://:pass@.../1Redis DB 1 for Celery broker
CELERY_RESULT_BACKENDredis://:pass@.../2Redis DB 2 for task results
ELASTICSEARCH_URLhttp://localhost:9200Elasticsearch connection URL
Multi-Tenant Identity
TENANT_SLUGunset on main podTenant pods only. Set to the org slug, e.g. abc. Used in docker-compose labels and container naming. Has no runtime effect — informational only.
ENVIRONMENTunset on main podTenant pods only. Set to the env slug, e.g. dev. Informational only.
PLAN_TIERunsetSubscription tier for rate limiting: STARTER · GROWTH · ENTERPRISE. Not yet enforced.
FRONTEND_URLhttp://localhost:3001Origin URL of the frontend. Used in CORS allowed origins and redirect URLs.
ALLOWED_ORIGINShttp://localhost:3001Comma-separated CORS origins allowed to call this API pod. Include the main frontend URL and the tenant pod URL if different.
Security & App Settings
SECRET_KEYJWT signing key. Use a random 32-byte hex string in production.
WEBHOOK_SECRETDefault HMAC signing secret for outbound webhooks
WEBHOOK_TIMEOUT_SECONDS10HTTP timeout per webhook delivery attempt
WEBHOOK_MAX_RETRIES3Max retry attempts before ABANDONED
DEFAULT_SOURCING_STRATEGYDISTANCE_OPTIMALFallback strategy when no rule matches
MAX_SPLIT_NODES3Global max nodes for split fulfillment
ANTHROPIC_API_KEYRequired for AI_ADAPTIVE / AI_HYBRID strategies and AI Architect proposals

25 Running the System

Prerequisites: Docker Desktop must be running. Python 3.12+ is needed only for local dev/testing outside Docker.

Start All Services

cd D:\OMS
docker compose up -d --build

Services started after this command:

ContainerPortDescription
oms_postgres5432PostgreSQL 16 primary database
oms_mongodb27017MongoDB 7 document store
oms_redis6379Redis 7.2 cache + broker
oms_elasticsearch9200Elasticsearch 8.12
oms_api8001FastAPI application
oms_celery_workerAll 7 queue workers
oms_celery_beatScheduled task runner
oms_flower5555Celery monitoring UI
oms_frontend3001React GUI (this app)

Seed All Databases

docker compose exec api python scripts/seed.py

Seeds the following data:

  • PostgreSQL: 8 fulfillment nodes, 64 inventory items (8 SKUs × 8 nodes), 5 sourcing rules, 1 webhook endpoint
  • MongoDB: 8 product catalog documents, 3 sample order events
  • Redis: version, stats, and cache warmup keys
  • Elasticsearch: 8 product documents, 3 sample order documents

Service URLs

ServiceURL
Frontend GUIhttp://localhost:3001
FastAPI (Swagger UI)http://localhost:8001/docs
FastAPI (ReDoc)http://localhost:8001/redoc
OpenAPI JSONhttp://localhost:8001/openapi.json
Health Checkhttp://localhost:8001/health
Flower (Celery Monitor)http://localhost:5555

Run Tests

PYTHONPATH=D:\OMS python -m pytest tests/test_imports.py -v

26 End-to-End Order Flow

Step 1 — Create an Order

curl -X POST http://localhost:8001/orders/ \
  -H "Content-Type: application/json" \
  -d '{
    "channel": "WEB",
    "fulfillment_type": "SHIP_TO_HOME",
    "customer_email": "customer@example.com",
    "customer_name": "John Doe",
    "line_items": [
      {"sku": "SKU-WIDGET-A", "product_name": "Premium Widget A",
       "quantity": 2, "unit_price": 29.99},
      {"sku": "SKU-GADGET-X", "product_name": "Gadget X Pro",
       "quantity": 1, "unit_price": 99.99}
    ],
    "shipping_address": {
      "address1": "456 Park Ave", "city": "New York", "state": "NY",
      "postal_code": "10022", "latitude": 40.7614, "longitude": -73.9776
    }
  }'

Response (HTTP 201): Returns the new order with "status": "PENDING" and a generated order_number like ORD-20240215-XK9M2A.

Step 2 — Sourcing Fires Automatically (~2 seconds)

  1. Celery sourcing worker picks up the source_order task
  2. Loads all active sourcing rules sorted by priority
  3. Evaluates the catch-all "Default — Distance Optimal" rule
  4. Loads all ACTIVE nodes with inventory for each requested SKU
  5. Computes haversine distance from customer coordinates to each node
  6. Scores nodes — STR-NYC-01 at ~1.8km wins for a NYC customer
  7. Creates FulfillmentAllocation rows (one per SKU)
  8. Reserves inventory: quantity_available -= quantity_allocated
  9. Order transitions to SOURCED

Step 3 — Fulfillment Pipeline (~5–10 seconds)

  • Picking (t+2s): Allocations → PICKING, order → PICKING
  • Packing (t+7s): Allocations → PACKED, order → PACKINGREADY_TO_SHIP

Step 4 — Carrier Booking

  • Selects a random carrier (UPS, FedEx, USPS, DHL)
  • Generates a mock tracking number
  • Creates a Shipment record with estimated delivery date
  • Order → SHIPPED, allocations → SHIPPED
  • Fires order.shipped notification + webhook

Step 5 — Delivery Simulation (~10 seconds later)

Adds tracking events: IN_TRANSIT → OUT_FOR_DELIVERY → DELIVERED. Order → DELIVERED. Fires order.delivered webhook.

Step 6 — Verify via Search

curl -X POST http://localhost:8001/search/orders \
  -H "Content-Type: application/json" \
  -d '{"query": "John Doe", "status": "DELIVERED"}'

Step 7 — Check Audit Trail

curl http://localhost:8001/orders/{order_id}/events

Returns chronological MongoDB events: order.created → order.sourced → order.shipped → order.delivered

Step 8 — Analytics

curl "http://localhost:8001/analytics/dashboard?from_date=2024-01-01"

27 Seed Data Reference

Fulfillment Nodes (8 total)

DC-EAST Distribution Center
Edison, NJ — 2000 orders/day
SHIP
DC-WEST Distribution Center
Los Angeles, CA — 2500 orders/day
SHIP SAME-DAY
DC-MID Distribution Center
Chicago, IL — 1800 orders/day
SHIP
STR-NYC-01 Retail Store
New York, NY — 300 orders/day
SHIP PICKUP CURBSIDE SAME-DAY
STR-LA-01 Retail Store
Beverly Hills, CA — 250 orders/day
SHIP PICKUP CURBSIDE SAME-DAY
STR-CHI-01 Retail Store
Chicago, IL — 200 orders/day
SHIP PICKUP SAME-DAY
STR-MIA-01 Retail Store
Miami Beach, FL — 150 orders/day
SHIP PICKUP CURBSIDE
DARK-SF-01 Dark Store
San Francisco, CA — 500 orders/day
SHIP SAME-DAY

Sourcing Rules (5 active)

PriorityNameStrategyConditions
10 Same-Day — West Coast STORE_NEAREST fulfillment_type = SAME_DAY_DELIVERY AND state IN [CA,WA,OR]
20 BOPIS / Curbside INVENTORY_RESERVATION fulfillment_type IN [STORE_PICKUP, CURBSIDE_PICKUP]
30 High-Value Orders COST_OPTIMAL total_amount > 200
40 Marketplace LEAST_COST_SPLIT channel = MARKETPLACE
100 Default (catch-all) DISTANCE_OPTIMAL No conditions — matches everything

Product SKUs (8 SKUs × 8 nodes = 64 inventory records)

SKUProduct NamePrice
SKU-WIDGET-APremium Widget A$29.99
SKU-WIDGET-BStandard Widget B$19.99
SKU-GADGET-XGadget X Pro$99.99
SKU-GADGET-YGadget Y Basic$49.99
SKU-GIZMO-1Gizmo 1$14.99
SKU-GIZMO-2Gizmo 2 Deluxe$39.99
SKU-TOOL-ZPower Tool Z$149.99
SKU-ACCESSORY-1Accessory Pack 1$9.99

28 Admin Console

The Admin Console (/admin) is the central hub for user management, permission configuration, and access control across the entire platform. It is accessible only to users with SUPERADMIN or PLATFORM_OWNER platform roles.

Navigation: Click Admin in the sidebar. The page is organized into four tabs: Users, Groups & Permissions, Access Control, and Access Matrix.

RBAC Model — Three-Layer Hierarchy

Access in the OMS is controlled by three layers that stack on top of each other:

LAYER 1 — Platform Role (global, stored on users.platform_role) PLATFORM_OWNER — Full control. Create orgs, environments, assign roles. SUPERADMIN — Manage users, webhooks, connectors, monitoring. Cannot create orgs/environments. USER — Standard access. Sees only envs explicitly granted. │ ▼ LAYER 2 — Org Role (per organization, user_organization_roles table) ORG_OWNER — Manage all environments in the org, invite/remove org members. ORG_ADMIN — Manage environments, cannot delete org or remove ORG_OWNER. ORG_MEMBER — Read-only view of org and its environments. │ ▼ LAYER 3 — Env Role (per environment, user_environment_roles table) OWNER — Full control of the environment, including member management. ADMIN — Manage orders, inventory, nodes, sourcing rules. MEMBER — Create and update records. Cannot delete or change config. VIEWER — Read-only access to all environment data.

Effective access: A user's final access to an environment is the highest applicable role across all three layers. A SUPERADMIN implicitly has full access to every environment. An ORG_OWNER has full access to every environment within that org even without an explicit env role.

Tab 1 — Users

Shows all users registered on the platform. Each row displays the username, email, platform role badge, group membership, account status (active / suspended), and the last login time.

Platform Role Management

Click any user row to open the User Access Drawer — a right-side panel showing the user's complete access picture across all organizations and environments. From the drawer you can:

  • Change platform role — upgrade/downgrade between PLATFORM_OWNER, SUPERADMIN, USER.
  • Grant org role — assign the user an organization-level role (ORG_OWNER / ORG_ADMIN / ORG_MEMBER) for any existing organization. This gives the user visibility into all environments within that org.
  • Revoke org role — removes the user's access to all environments within that organization (unless they also have individual env roles).
  • Grant env role — assign a specific role (OWNER / ADMIN / MEMBER / VIEWER) for a specific environment, regardless of org membership.
  • Revoke env role — removes the user's individual access to that environment.

Creating a User with Org/Environment Access

To add a new user and grant them access to a specific organization and environment:

  1. Create the user account

    In the Users tab, click Create User. Fill in name, email, password, platform role (USER for standard access), and optionally assign a group.

  2. Open the User Access Drawer

    Click the user row in the Users table. The drawer opens on the right side.

  3. Grant an Org Role (optional)

    In the Organization Access section, click Grant Org Role. Select the organization and role (ORG_MEMBER for read-only, ORG_ADMIN to let them manage environments). This lets the user see the org in their environment switcher.

  4. Grant an Env Role

    In the Environment Access section, click Grant Env Role. Select the specific environment and role (ADMIN to manage orders/inventory, MEMBER for day-to-day operations, VIEWER for read-only). The user can now switch to this environment using the environment switcher.

Tab 3 — Access Control

A higher-level view showing who has access to each organization and environment, organized by the tenant hierarchy rather than by user. Useful for auditing who can access what without going user-by-user.

The tab has two sub-tabs:

  • Org View — shows each organization with a table of all users who have an org-level role. Use Add Member to directly grant an org role here. Use the trash icon to revoke it.
  • Env View — shows each environment (grouped by org) with its member list. Use Add Member to grant an env role. The environment type badge (DEV/QA/STAGING/PROD) is color-coded for quick scanning.

Granting access directly from Access Control

  1. Switch to "Org View" or "Env View"

    Use the sub-tabs at the top of the Access Control tab.

  2. Find the org or environment

    Each card shows the org/env name and the current member list.

  3. Click "Add Member"

    A modal appears. Start typing a name or email to search for existing users. Select the user and pick a role using the radio buttons.

  4. Confirm

    Click Grant Access. The member list updates immediately. The user will see the environment in their switcher on next page load.

Tab 2 — Groups & Permissions

Permission groups let you define reusable sets of feature permissions that can be assigned to users. A permission is a feature:action string, e.g. orders:write, inventory:read, connectors:manage.

PermissionWhat it grants
orders:readView orders, order detail, audit trail
orders:writeCreate orders, update status, cancel
inventory:readView inventory levels, products, nodes
inventory:writeAdjust stock, transfer, update products
sourcing:readView sourcing rules and decisions
sourcing:writeCreate/update/delete sourcing rules
connectors:manageCreate, configure, and toggle platform connectors
webhooks:manageRegister and manage webhook endpoints
analytics:readAccess analytics dashboard and reports
admin:usersView and manage user accounts
admin:rolesGrant and revoke org/env roles

Tab 4 — Access Matrix

A grid view showing all users (rows) × all environments (columns). Each cell displays the user's effective role for that environment — considering both their org role and their direct env role. Empty cells mean the user has no access to that environment.

This view is ideal for a quick compliance audit: you can see at a glance whether any user has unexpectedly broad access (e.g. OWNER on the Production environment) and immediately open their drawer to adjust.

Multi-Tenant Platform Architecture

The OMS is a multi-tenant, multi-environment platform. A single infrastructure deployment serves multiple client organizations, each with fully isolated data environments — their own PostgreSQL databases, MongoDB databases, and Elasticsearch index namespaces. No data ever crosses organizational boundaries at the storage layer.

+-----------------------------------------------------------------------+ | CONTROL PLANE (oms_db -- single shared database) | | | | organizations environments users user_groups | | user_organization_roles user_environment_roles | | | | All pods connect here for: | | * Authentication (JWT issuance + validation) | | * Environment resolution (X-OMS-Environment header) | | * Org and environment management API | | * Admin Console (user, role, access management) | +------------------------+----------------------------------------------+ | each env resolves to its own data plane +----------------+-----------------+------------------+ | | | | oms_db oms_acme_dev oms_acme_prod oms_widgetco_qa (default prod) PostgreSQL DB PostgreSQL DB PostgreSQL DB ------------- MongoDB x2 MongoDB x2 MongoDB x2 Existing data ES prefix: ES prefix: ES prefix: untouched acme_dev_* acme_prod_* widgetco_qa_*

Key design principles

PrincipleWhat it means in practice
Database-per-environmentThe strongest possible isolation. No shared tables, no row-level tenant filters. A SQL error in one environment cannot leak data from another.
Single control planeoms_db is the one shared database for auth, user management, and environment metadata. All pods connect to it regardless of which tenant they serve.
Zero breaking changesThe existing oms_db data is registered as the default Production environment at first startup. No migration needed. JWT tokens without an X-OMS-Environment header continue to work.
Human-gated provisioningEnvironments are created by an admin through the UI or API. The system provisions the database asynchronously and reports status. Nothing happens automatically without an explicit action.

29a Role Hierarchy

Platform roles (global — stored on users.platform_role)

RoleWho holds itCapabilities
PLATFORM_OWNER The operator of the OMS deployment Create orgs & environments, assign any platform role, full admin, manage all users
SUPERADMIN Client power users / support staff Admin console, webhooks, connectors, monitoring, AI Architect — cannot create orgs/environments
USER Regular operators Access only to environments where they have been explicitly granted a role

Organization roles (per org — user_organization_roles)

RoleCapabilities within the org
ORG_OWNERManage all environments, invite/remove any org member, delete the org. Implicitly has OWNER access to every environment in the org.
ORG_ADMINCreate & configure environments, manage members below admin level. Cannot delete the org or demote ORG_OWNER.
ORG_MEMBERRead-only view of the org and its environments. Can switch to any environment they have an env role on.

Environment roles (per environment — user_environment_roles)

RoleCapabilities within the environment
OWNERFull control: manage orders, inventory, sourcing rules, connectors, webhooks, and the environment member list.
ADMINManage all business data. Cannot manage environment members or delete the environment.
MEMBERCreate and update records (orders, inventory adjustments, nodes). Cannot delete records or change configuration.
VIEWERRead-only access to all environment data, including orders, inventory, analytics, and audit trail.

Effective access: A user's final access to an environment is the highest applicable role across all layers. A SUPERADMIN implicitly has full access to every environment. An ORG_OWNER has full access to every environment within that org even without an explicit env role.

29b Creating an Organization — Step by Step

An organization is a logical tenant container. No database is provisioned at this point — it is a pure control-plane record. Creating one is instantaneous.

Via the UI (Platform Console)

  1. Navigate to Platform Console

    In the sidebar, click Platform. Visible only to PLATFORM_OWNER and SUPERADMIN users. Shows all organizations and a summary of their environments.

  2. Click “New Organization”

    Fill in Name (e.g. Acme Corp), Slug (e.g. acme — globally unique, pattern ^[a-z0-9][a-z0-9-]*[a-z0-9]$), and optional description. The slug becomes the prefix for all database names in this org.

  3. Submit

    POST /organizations201 Created. Org appears in the list with zero environments. No database has been created yet.

  4. Add the first environment

    Click the org card, then New Environment. See the provisioning section below.

Via the API

curl -X POST http://localhost:8001/organizations \
  -H "Authorization: Bearer {platform_owner_token}" \
  -H "Content-Type: application/json" \
  -d '{"name": "Acme Corp", "slug": "acme", "description": "Our first tenant"}'

Slug naming convention

ResourcePatternExample (org=acme, env=dev)
PostgreSQL DBoms_{slug}_{env_slug}oms_acme_dev
MongoDB Events DBoms_events_{slug}_{env_slug}oms_events_acme_dev
MongoDB AI DBoms_ai_{slug}_{env_slug}oms_ai_acme_dev
Elasticsearch prefix{slug}_{env_slug}acme_dev
Docker container (API)oms_api_{slug}_{env_slug}oms_api_acme_dev
Docker Compose filedocker-compose.{slug}_{env_slug}.ymldocker-compose.acme_dev.yml

Choose slugs carefully. Once an environment is provisioned, its database name is fixed. Short, lowercase, alphanumeric slugs work best: acme, widgetco, retailer1.

Organizations API

MethodPathAuthDescription
GET/organizationsAnySuperadmin: all orgs. Others: orgs where they have a role.
POST/organizationsPlatform Owner / SuperadminCreate organization (instant, no DB provisioned)
GET/organizations/{id}Org Member+Organization detail
PATCH/organizations/{id}Org Admin+Update name, description, is_active
GET/organizations/{id}/membersOrg Member+List UserOrganizationRole records
POST/organizations/{id}/membersOrg Admin+Grant {user_id, role} org membership
DELETE/organizations/{id}/members/{uid}Org Admin+Revoke org membership

29c Provisioning an Environment — Step by Step

An environment is where business data actually lives — orders, inventory, sourcing rules, connectors. Creating one triggers a full infrastructure provisioning sequence that runs asynchronously. The API returns 202 Accepted with status: "PROVISIONING"; the UI polls every 5 seconds until status becomes "ACTIVE".

Via the UI (Environments page)

  1. Open the Environments page

    Click Environments in the sidebar. Lists all environments the current user can access, grouped by organization.

  2. Click “New Environment”

    Fill in: Organization, Name (e.g. Development), Slug (e.g. dev, unique within org), Environment Type (DEV/QA/STAGING/PROD), and optionally Set as default.

  3. Submit — provisioning begins

    Environment created with status: PROVISIONING. Background task starts. UI shows a spinner polling every 5 seconds.

  4. Wait for ACTIVE status

    Typical provisioning time: 5–15 seconds. The environment card turns green when status = ACTIVE and shows the provisioned_at timestamp.

  5. Generate Docker Compose file (for isolated deployment)

    Click Generate Docker Compose on the environment card. Downloads docker-compose.acme_dev.yml pre-configured with the correct database names, ports, and network settings.

  6. Set Deployment URL

    If the environment runs as a separate pod, set the Deployment URL field (e.g. http://localhost:3463). The environment switcher uses this for cross-origin pod redirects with seamless auth handoff.

Provisioning sequence (what happens in the background)

app/database/env_registry.py — provision_environment()
Step 1 -- Create PostgreSQL database
  Raw asyncpg connection to postgres system DB
  CREATE DATABASE oms_acme_dev;   (idempotent -- checks existence first)

Step 2 -- Build SQLAlchemy engine
  create_async_engine("postgresql+asyncpg://.../oms_acme_dev",
    pool_size=5, max_overflow=10, pool_pre_ping=True, pool_recycle=3600)
  Register: EnvironmentEngineRegistry._engines[env_id] = engine

Step 3 -- Create all data-plane tables  (checkfirst=True -- skips if exists)
  orders, order_items, fulfillment_allocations, shipments
  inventory_items, inventory_adjustments, inventory_reservations
  fulfillment_nodes, sourcing_rules
  webhook_endpoints, webhook_events
  connectors, connector_events, connector_inventory_mappings
  ai_proposals, ai_experiments
  sourcing_outcome_labels, custom_attribute_definitions, ui_widgets

  NOT created here (stay in oms_db only):
  users, user_groups, organizations, environments
  user_environment_roles, user_organization_roles

Step 4 -- Create MongoDB indexes
  oms_events_acme_dev:
    order_events -> compound (order_id, timestamp), index on event_type
  oms_ai_acme_dev:
    sourcing_outcomes -> order_id index
    sourcing_patterns -> unique cluster_key index

Step 5 -- Create Elasticsearch index template
  PUT /_index_template/acme_dev_template
  {"index_patterns": ["acme_dev_*"], "mappings": {...}}

Step 6 -- Mark environment ACTIVE
  UPDATE environments SET status='ACTIVE', provisioned_at=now()
  WHERE id = <env_id>   (committed on control DB oms_db)

Idempotent re-provisioning: If provisioning fails mid-way, call POST /environments/{id}/provision to retry safely. Every step can be repeated without side effects.

Connection pool per environment

Engine pool settings per environment
pool_size=5        # 5 persistent connections
max_overflow=10    # burst up to 15 total
pool_pre_ping=True # discard stale connections
pool_recycle=3600  # recycle every hour

20 active environments = at most 300 PostgreSQL connections total — well within default max_connections=100–500.

Environments API

MethodPathAuthDescription
GET/environmentsAnyList all environments the calling user can access
POST/environmentsPlatform Owner / SuperadminCreate + provision (202 Accepted, background task)
GET/environments/{id}Any with accessDetail + status + provisioned_at
PATCH/environments/{id}Env OWNER+Update name, is_default, base_url
POST/environments/{id}/provisionPlatform OwnerRe-provision failed / archived environment (idempotent)
DELETE/environments/{id}Platform OwnerDelete environment record + DROP the PostgreSQL database
GET/environments/{id}/membersEnv OWNER+List UserEnvironmentRole records
POST/environments/{id}/membersEnv OWNER+Grant {user_id, role}
DELETE/environments/{id}/members/{uid}Env OWNER+Revoke env access for a specific user

29d Environment Types & Lifecycle

TypeBadgeWarning BannerTypical Use
DEVDEVNoneDeveloper testing, feature branches. Data can be reset freely.
QAQANoneQuality assurance, integration tests, UAT.
STAGINGSTAGINGNonePre-production validation, load testing.
PRODPROD⚠ Production — changes affect live customers (persistent red banner)Live customer-facing environment.

Environment status lifecycle

PROVISIONING --> ACTIVE --> SUSPENDED --> ARCHIVED (background) (operational) (503, data (503, pool preserved) evicted)
StatusMeaningAPI behaviour
PROVISIONINGDatabase creation in progressMiddleware returns 503 for all data-plane requests
ACTIVEFully operationalAll requests routed normally
SUSPENDEDTemporarily disabled503; data preserved intact
ARCHIVEDPermanently disabled503; connection pool evicted to free resources

29e Managing Environment Members

Grant org membership (gives org-wide visibility)

curl -X POST http://localhost:8001/organizations/{org_id}/members \
  -H "Authorization: Bearer {admin_token}" \
  -H "Content-Type: application/json" \
  -d '{"user_id": "abc123...", "role": "ORG_MEMBER"}'

Grant environment access

curl -X POST http://localhost:8001/admin/users/{user_id}/env-roles \
  -H "Authorization: Bearer {admin_token}" \
  -H "Content-Type: application/json" \
  -d '{"env_id": "def456...", "role": "ADMIN"}'

View a user's full access

GET /admin/users/{user_id}/access
{
  "user_id": "...",
  "platform_role": "USER",
  "org_roles": [
    {"org_id": "...", "org_name": "Acme Corp", "role": "ORG_MEMBER"}
  ],
  "env_roles": [
    {"env_id": "...", "env_name": "Development",
     "org_name": "Acme Corp", "env_type": "DEV", "role": "ADMIN"}
  ]
}

Admin Console access management endpoints

MethodPathDescription
GET/admin/users/{id}/accessFull access summary: platform role + all org roles + all env roles
POST/admin/users/{id}/org-rolesGrant/update org role: body {org_id, role}
DELETE/admin/users/{id}/org-roles/{org_id}Revoke org membership
POST/admin/users/{id}/env-rolesGrant/update env role: body {env_id, role}
DELETE/admin/users/{id}/env-roles/{env_id}Revoke env access

29f Environment Switcher & Cross-Pod Auth

Switching environments

The Environment Switcher lives in the top header bar, grouped by organization. On switch, the frontend:

  1. Stores the selected env_id in localStorage('oms_environment_id')
  2. Calls queryClient.clear() to flush all cached data from the previous environment
  3. Re-fetches all active queries with the new environment header
  4. If the environment has a base_url on a different origin, performs a cross-origin redirect with token handoff (no re-login)

How X-OMS-Environment works

axiosInstance.interceptors.request.use(config => {
  const envId = localStorage.getItem('oms_environment_id')
  if (envId) config.headers['X-OMS-Environment'] = envId
  const token = localStorage.getItem('oms_auth_token')
  if (token) config.headers['Authorization'] = `Bearer ${token}`
  return config
})

Cross-pod auth handoff (no re-login on switch)

frontend/src/main.tsx — bootstrap from URL before React renders
;(function bootstrapFromUrl() {
  const params = new URLSearchParams(window.location.search)
  const token   = params.get('_oms_token')
  const userRaw = params.get('_oms_user')
  if (token && userRaw) {
    localStorage.setItem('oms_auth_token', token)
    localStorage.setItem('oms_auth_user', userRaw)
    params.delete('_oms_token'); params.delete('_oms_user')
    const clean = params.toString()
    window.history.replaceState({}, '', window.location.pathname +
      (clean ? '?' + clean : '') + window.location.hash)
  }
})()

Security note: The token is a short-lived JWT transferred once via HTTPS redirect and immediately stripped from the URL. Subsequent requests use the token from localStorage.

29g Deploying a Tenant Environment (Docker Compose)

Each environment can run as an isolated Docker Compose stack — its own API and frontend containers — attached to the shared infrastructure via the oms_default external network.

Example: docker-compose.acme_dev.yml

version: "3.9"

services:
  api_acme_dev:
    build: { context: ., dockerfile: Dockerfile }
    container_name: oms_api_acme_dev
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    environment:
      - TENANT_SLUG=acme
      - ENVIRONMENT=dev
      - DATABASE_URL=postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_acme_dev
      - SYNC_DATABASE_URL=postgresql+psycopg2://oms_user:oms_pass@postgres:5432/oms_acme_dev
      - CONTROL_DATABASE_URL=postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_db
      - MONGODB_DB=oms_events_acme_dev
      - MONGODB_AI_DB=oms_ai_acme_dev
      - FRONTEND_URL=http://localhost:3463
    ports: ["8463:8000"]
    networks: [oms_default]

  frontend_acme_dev:
    build: { context: ./frontend, dockerfile: Dockerfile }
    container_name: oms_frontend_acme_dev
    environment:
      - API_HOST=oms_api_acme_dev
    ports: ["3463:80"]
    networks: [oms_default]
    depends_on: [api_acme_dev]

networks:
  oms_default:
    external: true
    name: oms_default

Key points: DATABASE_URL points at the tenant DB. CONTROL_DATABASE_URL points back at oms_db so this pod can verify JWTs and read org/env metadata. Both containers join oms_default to access shared infra without re-exposing ports. The nginx frontend uses ${API_HOST} via envsubst to proxy /api/* to the correct API container.

Start the tenant stack

docker-compose -f docker-compose.acme_dev.yml up -d --build

# Then set the Deployment URL:
# Platform Console > Environments > Acme Dev > Deployment URL = http://localhost:3463

Delete an environment (with data)

  1. Stop the tenant stack
    docker-compose -f docker-compose.acme_dev.yml down
  2. Delete from the UI

    Environments page → trash icon on the environment card → confirm. Calls DELETE /environments/{id}, which drops the PostgreSQL database and removes all control-plane records. This is irreversible.

  3. Remove the Compose file
    rm docker-compose.acme_dev.yml

29h How Requests Are Routed to the Right Database

Every API request carries an X-OMS-Environment HTTP header. The EnvironmentMiddleware resolves it to the correct database before any route handler runs.

Request lifecycle — EnvironmentMiddleware → get_db() → route handler
Browser
  X-OMS-Environment: <env_uuid>
  |
EnvironmentMiddleware (app/middleware/environment.py)
  |
  +-- Exempt paths (bypass resolution):
  |   /health, /docs, /redoc, /openapi.json
  |   /auth/*, /testing, /connectors/{id}/webhook
  |
  +-- 1. Redis: GET env:<env_uuid>  TTL=60s
  |      HIT  -> deserialize cached env object (lock-free hot path)
  |      MISS -> SELECT * FROM environments WHERE id=?  (control DB)
  |             cache -> SET env:<env_uuid> EX 60
  |
  +-- 2. env.status != ACTIVE -> return 503 "Environment unavailable"
  |
  +-- 3. request.state.environment = env -> call_next(request)

Route handler: db = Depends(get_db)
  |
  +-- env.db_name == "oms_db"  (default production)
  |    -> yield from main session factory
  |
  +-- env-specific database
       -> registry.get_or_create_engine(env)  (lock-free if already cached)
       -> yield session from env-specific session factory

All SQL runs against oms_acme_dev. Zero code changes in business logic.

Which routes use which database

RouterDependencyDatabase
/auth/*get_dboms_db (users table is control-plane only)
/adminget_dboms_db
/organizationsget_control_dboms_db (must bypass env routing)
/environmentsget_control_dboms_db
/ordersget_db (env-aware)oms_{org}_{env}
/inventoryget_db (env-aware)oms_{org}_{env}
/nodesget_db (env-aware)oms_{org}_{env}
/sourcing-rulesget_db (env-aware)oms_{org}_{env}
/connectorsget_db (env-aware)oms_{org}_{env}
/architectget_db (env-aware)oms_{org}_{env}

29i Data Isolation & Backward Compatibility

Database-per-environment isolation

Isolation model comparison
Row-level security (shared table + tenant_id column)
  > Weakest. A missing WHERE clause leaks all tenant data.
  > OMS does NOT use this.

Schema-per-tenant (shared DB, different schemas)
  > Medium. Connection pool shared; search_path misconfiguration leaks data.
  > OMS does NOT use this.

Database-per-tenant  <--  OMS uses this for each environment
  > Strongest. Separate connection pool, separate WAL, separate backups.
  > No SQL query can cross the database boundary.

Backward compatibility — zero breaking changes

The first startup seeds a "Default Organization" with a "Production" environment pointing at the existing oms_db database. All existing data (orders, inventory, nodes, sourcing rules) is untouched. Requests without an X-OMS-Environment header fall back to the environment where is_default=true, which points at oms_db. Existing JWT tokens and API clients all continue to work.

Celery — environment-aware fan-out

app/workers/sourcing.py — fan-out pattern
@celery_app.task
def source_pending_orders_fanout():
    # Beat fires this every 2 min. One task per active environment.
    env_ids = list_active_environment_ids()   # sync SQL on oms_db
    for env_id in env_ids:
        source_pending_orders.delay(env_id)

@celery_app.task
def source_pending_orders(environment_id: str = ""):
    # Empty env_id = backward-compat fallback to settings.DATABASE_URL
    db_url = get_env_db_url(environment_id)
    # ... unchanged sourcing logic ...

MongoDB & Elasticsearch per environment

EnvironmentMongoDB Events DBMongoDB AI DBES Prefix
Default (oms_db)oms_eventsoms_ai_learningdefault_prod
acme / devoms_events_acme_devoms_ai_acme_devacme_dev
acme / prodoms_events_acme_prodoms_ai_acme_prodacme_prod
widgetco / qaoms_events_widgetco_qaoms_ai_widgetco_qawidgetco_qa