OMS — Omni-Channel Order Management System
A production-grade, fully async, multi-tenant Order Management System built with Python 3.12, FastAPI, and a polyglot persistence layer — PostgreSQL, MongoDB, Redis, and Elasticsearch. Designed for high-throughput omni-channel retail with intelligent multi-node sourcing, real-time inventory reservation, a fully automated fulfillment pipeline, and complete data isolation between organizations and environments.
Quick start: Run docker compose up -d --build then seed with docker compose exec api python scripts/seed.py to get a fully operational system with 8 nodes, 64 inventory records, and 5 sourcing rules. A default organization and Production environment are seeded automatically on first startup.
1 Architecture Overview
AI-Native 5-Layer Architecture
Request Lifecycle
- Client POSTs an order to
POST /orders - FastAPI validates the request via Pydantic v2 schemas
- Order is written to PostgreSQL and indexed in Elasticsearch
- Order-created event is written to MongoDB audit log
- A
source_ordertask is enqueued on the Redis-backedsourcingCelery queue - Sourcing Engine evaluates active rules — may route via A/B experiment or
AI_ADAPTIVEstrategy - Pipeline tasks flow:
sourcing → fulfillment (pick → pack) → carrier (label + ship) → notifications → webhooks - Delivery outcome is labeled by the
learningworker and fed back into the pattern store
Directory Structure
D:\OMS\
├── Dockerfile # Multi-stage Python 3.12 image
├── docker-compose.yml # All 9 services
├── requirements.txt # All Python dependencies
├── .env # Local environment variables
│
├── app/
│ ├── main.py # FastAPI app, lifespan, router registration
│ ├── config.py # Pydantic Settings (reads .env)
│ │
│ ├── database/
│ │ ├── postgres.py # Async SQLAlchemy engine + session factory
│ │ ├── mongodb.py # Motor async client + index creation
│ │ ├── redis_client.py # aioredis pool + cache helpers
│ │ └── elasticsearch_client.py # Async ES client + index mapping creation
│ │
│ ├── models/postgres/
│ │ ├── order_models.py # Order, OrderItem, FulfillmentAllocation, Shipment
│ │ ├── inventory_models.py # InventoryItem, InventoryAdjustment, InventoryReservation
│ │ ├── node_models.py # FulfillmentNode + NodeType/NodeStatus enums
│ │ └── sourcing_rule_models.py # SourcingRule + SourcingStrategy/ConditionOperator enums
│ │
│ ├── schemas/ # Pydantic v2 request/response schemas
│ ├── routers/ # FastAPI route handlers (7 files, 44 endpoints)
│ ├── services/
│ │ ├── sourcing_engine.py # Intelligence core (rule eval, node scoring, allocation)
│ │ └── webhook.py # HMAC-SHA256 signed delivery
│ │
│ ├── models/postgres/
│ │ ├── ...existing models...
│ │ └── ai_models.py # AIProposal, AIExperiment, CustomAttributeDefinition,
│ │ # UIWidget, SourcingOutcomeLabel + enums
│ │
│ ├── services/
│ │ ├── sourcing_engine.py # Core + AI_ADAPTIVE + experiment traffic splitting
│ │ ├── ai_sourcing.py # AISourcingAdvisor — KubeAI node scoring
│ │ ├── pattern_discovery.py # PatternDiscoveryService — nightly cluster aggregation
│ │ └── schema_evolution.py # SchemaEvolutionEngine — safe additive schema changes
│ │
│ └── workers/
│ ├── celery_app.py # Celery factory + 7 queues + beat schedule
│ ├── sourcing.py # source_order task (writes sourcing_outcomes to MongoDB)
│ ├── fulfillment.py # start_picking, complete_packing, reset counters
│ ├── carrier.py # book_shipment, simulate_delivery, sync tracking
│ ├── notifications.py # Email/SMS notification tasks
│ ├── webhooks.py # dispatch_webhook, retry_failed_webhooks
│ ├── connectors.py # sync_fulfillment_to_connector, poll_amazon_orders
│ └── learning.py # label_sourcing_outcomes, discover_patterns,
│ # update_node_performance, evaluate_ai_experiments
│
├── frontend/ # React + Vite + TypeScript GUI (port 3000)
├── scripts/
│ └── seed.py # Seeds all 4 databases with realistic data
└── tests/
└── test_imports.py # 13 import + unit tests
2 Tech Stack
| Layer | Technology | Purpose |
|---|---|---|
| API Framework | FastAPI 0.111 + Uvicorn | Async REST API, OpenAPI docs, Pydantic v2 validation |
| ORM | SQLAlchemy 2.0 (async) | PostgreSQL ORM with asyncpg driver |
| Primary DB | PostgreSQL 16 | Orders, inventory, nodes, sourcing rules, webhooks |
| Document DB (events) | MongoDB 7 (Motor) — oms_events | Order audit trail, product catalog, webhook log, error monitoring |
| Document DB (AI) | MongoDB 7 (Motor) — oms_ai_learning | AI learning pipeline: sourcing decisions, cluster patterns, node metrics. Isolated to prevent analytical writes from affecting transactional read latency. 90-day TTL on outcomes, 180-day on patterns. |
| Cache / Queue | Redis 7.2 | Celery broker/backend, cache, rate-limiting |
| Search | Elasticsearch 8.12 | Full-text order and product search |
| Task Queue | Celery 5.4 + Flower | Async pipeline workers, beat scheduler, learning loop |
| AI / LLM | KubeAI (Anthropic) | AI node scoring (AI_ADAPTIVE), NL → proposals |
| Validation | Pydantic v2 | Request/response schemas, settings management |
| Frontend | React 18 + Vite + TypeScript | Professional GUI (this app) |
| Containers | Docker Compose | All 9 services in one command |
| HMAC Signing | hashlib + hmac (stdlib) | Webhook payload integrity verification |
| Geo Math | haversine (stdlib math) | Sourcing distance calculations |
3 PostgreSQL Models
fulfillment_nodes
| Column | Type | Description |
|---|---|---|
id | UUID PK | Node identifier |
code | VARCHAR(50) UNIQUE | Short code e.g. DC-EAST |
name | VARCHAR(200) | Display name |
node_type | ENUM | DISTRIBUTION_CENTER, RETAIL_STORE, DARK_STORE, WAREHOUSE, PICKUP_POINT |
status | ENUM | ACTIVE, INACTIVE, MAINTENANCE, CLOSED |
latitude / longitude | FLOAT | Geographic coordinates for distance sourcing |
can_ship / pickup / curbside / same_day | BOOL | Capability flags for fulfillment type matching |
daily_order_capacity | INT | Max orders per day |
current_daily_orders | INT | Reset to 0 at midnight by Celery beat |
shipping_cost_multiplier | FLOAT | Relative cost weight for sourcing score |
orders
| Column | Type | Description |
|---|---|---|
id | UUID PK | Order identifier |
order_number | VARCHAR(50) UNIQUE | Human-readable e.g. ORD-20240101-ABC123 |
channel | ENUM | WEB, MOBILE, POS, API, MARKETPLACE |
fulfillment_type | ENUM | SHIP_TO_HOME, STORE_PICKUP, SHIP_FROM_STORE, CURBSIDE_PICKUP, SAME_DAY_DELIVERY |
status | ENUM | 15-state machine (see Fulfillment Pipeline) |
total_amount | NUMERIC(12,2) | Order total in order currency |
shipping_latitude / longitude | FLOAT | Customer location for haversine sourcing |
pickup_node_id | UUID FK | For BOPIS / curbside orders |
sourcing_rule_id | UUID FK | Which sourcing rule was applied |
inventory_items
Per-node, per-SKU stock levels with three counters:
quantity_on_hand— physical stock in the warehousequantity_reserved— soft-reserved by active allocationsquantity_available= on_hand − reserved (what sourcing can use)
fulfillment_allocations
Bridges an order item to a specific fulfillment node. One order can have multiple allocations (split fulfillment). Tracks the full picking → packing → shipping timeline with timestamps for each transition.
sourcing_rules
Configurable rules evaluated in priority order (ascending). Each rule has:
conditions— JSON array of{field, operator, value}tuplesstrategy—DISTANCE_OPTIMAL,COST_OPTIMAL,STORE_NEAREST,INVENTORY_RESERVATION,LEAST_COST_SPLIT,AI_ADAPTIVE, orAI_HYBRIDallowed_node_types,required_capabilities— node pre-filtermax_split_nodes,cost_weight,distance_weight— algorithm parameters
Control-Plane Models (org_models.py)
These four tables live exclusively in oms_db — the shared control-plane database — and are never replicated into tenant databases.
| Model | Table | Description |
|---|---|---|
Organization | organizations | Top-level tenant grouping. Has a unique slug used as the namespace prefix for all database names (oms_{slug}_{env}). Instantaneous to create — no database provisioned at this stage. |
Environment | environments | An isolated data silo within an org. Stores db_name, mongo_events_db, mongo_ai_db, es_index_prefix, and optional per-cluster pg_host/port/user/password overrides. status: PROVISIONING → ACTIVE → SUSPENDED → ARCHIVED. |
UserOrganizationRole | user_organization_roles | Links a user to an org with a role: ORG_OWNER, ORG_ADMIN, ORG_MEMBER. One row per (user, org) pair — enforced by uq_user_org unique constraint. |
UserEnvironmentRole | user_environment_roles | Links a user to a specific environment with a role: OWNER, ADMIN, MEMBER, VIEWER. One row per (user, env) pair — enforced by uq_user_env. |
environments table — key columns
| Column | Type | Notes |
|---|---|---|
id | UUID PK | Sent as X-OMS-Environment header |
organization_id | UUID FK | Parent org, CASCADE DELETE |
slug | VARCHAR(80) | Unique within org (uq_env_org_slug) |
env_type | ENUM | DEV · QA · STAGING · PROD |
status | ENUM | PROVISIONING · ACTIVE · SUSPENDED · ARCHIVED |
db_name | VARCHAR UNIQUE | PostgreSQL database name, e.g. oms_acme_prod |
mongo_events_db | VARCHAR | e.g. oms_events_acme_prod |
mongo_ai_db | VARCHAR | e.g. oms_ai_acme_prod |
es_index_prefix | VARCHAR | e.g. acme_prod |
pg_host / pg_port / pg_user / pg_password | VARCHAR NULL | Per-cluster overrides; NULL = use same host as control plane |
base_url | VARCHAR NULL | Deployment URL of the tenant pod; used by the frontend switcher for cross-origin redirects |
is_default | BOOL | Fallback when no X-OMS-Environment header is sent (one per org) |
provisioned_at | TIMESTAMP NULL | Set when database creation completes; NULL while PROVISIONING |
AI Models (ai_models.py)
| Model | Description |
|---|---|
ai_proposals | All AI-proposed changes. Lifecycle: PENDING → APPROVED → APPLIED (or REJECTED / ROLLED_BACK). Stores proposal_data JSONB and rollback_data JSONB. Nothing applied without human approval. |
ai_experiments | A/B tests between two sourcing strategies. Traffic split via traffic_split_pct (1–50%). Stores per-arm results JSONB and winner when concluded. |
custom_attribute_definitions | Dynamic schema extensions for orders/products/nodes — uses existing metadata_ JSONB columns. No DDL required. |
ui_widgets | JSON-configured dashboard widgets. widget_type: metric_card, time_series, bar_chart, table, distribution. |
sourcing_outcome_labels | PostgreSQL mirror of labeled MongoDB sourcing_outcomes docs for fast analytical queries. |
Other Models
| Model | Description |
|---|---|
order_items | Line items linked to an order. Tracks quantity_fulfilled as allocations are shipped. |
shipments | One per allocation. Carrier, tracking number, label URL, JSON array of tracking events. |
inventory_adjustments | Immutable audit log of every stock change with before/after quantities. |
webhook_endpoints | Registered webhook receivers with HMAC secret. |
webhook_events | Persistent delivery records with retry state machine. |
connectors | External platform integrations (Shopify, etc.) with masked config and counters. |
connector_events | Per-connector inbound/outbound event log with payload and response snapshots. |
connector_inventory_mappings | Maps an OMS InventoryItem (by SKU + connector) to platform-specific IDs needed for inventory sync: shopify_inventory_item_id, shopify_location_id, amazon_asin, amazon_fnsku, etc. |
4 MongoDB Collections
MongoDB is split into two databases to isolate transactional read latency from analytical write load. Config keys: MONGODB_DB=oms_events and MONGODB_AI_DB=oms_ai_learning. Both use the same MongoDB instance/container; a separate container is easy to provision later by pointing MONGODB_AI_DB to a different connection string.
Database: oms_events — transactional / operational
Latency-sensitive collections written on the hot request path. Fast point lookups by order_id.
| Collection | Purpose | TTL | Key Indexes |
|---|---|---|---|
order_events | Append-only audit trail for every order state change | — | (order_id, timestamp), event_type |
error_events | Raw error/exception records from all services | 30 days | timestamp, fingerprint, source_service, level |
error_issues | Aggregated issues grouped by fingerprint (one doc per unique error type) | — | fingerprint (unique), status+last_seen |
product_catalog | Rich product data: images, attributes, descriptions | — | Text index on name + description |
webhook_deliveries | Per-event delivery attempt history | — | event_id, status |
notifications | Email/SMS notification log | — | order_id, created_at |
Database: oms_ai_learning — analytical / AI pipeline
High-volume write collections used by the AI learning loop. Aggregation-heavy reads run here without contending with order audit lookups. TTLs bound storage growth automatically.
| Collection | Purpose | TTL | Key Indexes |
|---|---|---|---|
sourcing_outcomes | Per-allocation sourcing decision snapshot + delivery outcome labels. Written by sourcing worker at order time; enriched by carrier worker on SHIPPED/DELIVERED; labeled with outcome_score hourly by the learning worker. | 90 days — raw decisions expire; aggregated patterns retain the learnings | (cluster_key, outcome_score), (node_id, sourced_at), (experiment_id, strategy_used) |
sourcing_patterns | Aggregated node performance per order-feature cluster (channel|region|amount|type). Upserted nightly by discover_patterns. Feeds the AI sourcing prompt and the Patterns tab in AI Architect. | 180 days — stale clusters expire; nightly run re-creates active ones | cluster_key (unique), sample_count |
node_performance_metrics | Rolling 7-day and 30-day stats per node: avg outcome score, avg delivery hours, backorder rate %, return rate %. One doc per (node_id, period_days) pair, replaced on each run. Feeds the AI sourcing prompt and the Node Performance tab. | — | (node_id, period_days) (unique) |
All MongoDB writes are non-blocking. The Motor async driver ensures no latency impact on the main request path. AI learning writes are additionally isolated in a separate database so nightly aggregations never compete with order audit reads for buffer pool or I/O.
5 Redis Key Schema
| Key Pattern | Type | TTL | Purpose |
|---|---|---|---|
oms:version | STRING | 24h | Current application version |
oms:stats | HASH | — | Aggregate counters (orders, revenue) |
oms:active_strategies | STRING | 1h | Cached sourcing strategy list |
env:{env_uuid} | STRING (JSON) | 60s | Cached Environment record for the middleware hot path. Written on first miss from control DB. Evicted on environment update or status change. Prevents a control-plane DB query on every API request in a high-throughput environment. |
celery:* | Various | — | Celery broker/result state (DB 1 + DB 2) |
Redis is used as the Celery broker (DB 1) and result backend (DB 2), keeping pipeline task state separate from application cache (DB 0). The env:{id} cache key is the critical performance optimization for the multi-tenant middleware — without it, every API request would require a synchronous lookup into oms_db to resolve the active environment.
6 Elasticsearch Indexes
oms_orders
Optimized for order search. Documents are indexed on order creation and updated on every status transition.
| Field | Type | Notes |
|---|---|---|
order_number | keyword | Exact match |
customer_name | text | Full-text, fuzzy |
customer_email | keyword | Exact match |
channel, status, fulfillment_type | keyword | Filter / aggregation |
total_amount | float | Range filter |
created_at | date | Date range filter, sort |
tags | keyword[] | Multi-value filter |
line_items | nested | Nested SKU / product_name search |
oms_products
Product catalog search backed by MongoDB documents synced to Elasticsearch.
| Field | Type | Notes |
|---|---|---|
sku | keyword | Exact match |
name | text | Full-text search |
description | text | Full-text search |
category | keyword | Filter |
price | float | Range filter |
7 Orders API /orders
| Method | Path | Description |
|---|---|---|
| POST | /orders/ | Create new order — triggers sourcing pipeline |
| GET | /orders/ | List orders with filters (status, channel, date range, email) |
| GET | /orders/{order_id} | Get single order with all relationships |
| GET | /orders/number/{order_number} | Get order by human-readable order number |
| PATCH | /orders/{order_id}/status | Transition order status (with validation) |
| POST | /orders/{order_id}/cancel | Cancel an order (notifies customer + fires webhook) |
| GET | /orders/{order_id}/events | Get MongoDB audit trail events |
Create Order — Request Body
{
"channel": "WEB",
"fulfillment_type": "SHIP_TO_HOME",
"customer_email": "alice@example.com",
"customer_name": "Alice Smith",
"line_items": [
{
"sku": "SKU-WIDGET-A",
"product_name": "Premium Widget A",
"quantity": 2,
"unit_price": 29.99
}
],
"shipping_address": {
"address1": "123 Main St",
"city": "New York",
"state": "NY",
"postal_code": "10001",
"latitude": 40.7484,
"longitude": -73.9967
}
}
Order Status Values
8 Inventory API /inventory
| Method | Path | Description |
|---|---|---|
| POST | /inventory/ | Create inventory item for a node/SKU |
| GET | /inventory/ | List inventory (filter by node, SKU, low-stock) |
| GET | /inventory/sku/{sku} | All node stock for a specific SKU |
| GET | /inventory/products | Aggregated product list grouped by SKU — totals across all nodes (search, node_id, low_stock_only filters) |
| PATCH | /inventory/products/{sku} | Update product-level attributes (name, cost, weight, reorder point) for all nodes at once |
| GET | /inventory/{item_id} | Single inventory item with history |
| PATCH | /inventory/{item_id} | Update item metadata (reorder point, cost, etc.) |
| POST | /inventory/{item_id}/adjust | Apply stock adjustment — reason must be a valid enum value (see below) |
| POST | /inventory/check-availability | Bulk availability check across all nodes |
| POST | /inventory/transfer | Transfer stock between nodes |
Adjustment Reason Codes
Adjust Stock Example
{
"quantity_delta": 50,
"reason": "RECEIVED",
"notes": "PO-2024-0042 inbound shipment",
"created_by": "ops_user"
}
9 Fulfillment Nodes API /nodes
| Method | Path | Description |
|---|---|---|
| POST | /nodes/ | Register a new DC or store |
| GET | /nodes/ | List nodes (filter by type, status, capabilities) |
| GET | /nodes/{node_id} | Get node details with current capacity |
| PATCH | /nodes/{node_id} | Update node configuration or status |
| DELETE | /nodes/{node_id} | Deactivate node (soft delete, sets status=INACTIVE) |
| GET | /nodes/{node_id}/capacity | Get daily capacity utilization percentage |
10 Sourcing Rules API /sourcing-rules
| Method | Path | Description |
|---|---|---|
| POST | /sourcing-rules/ | Create new sourcing rule |
| GET | /sourcing-rules/ | List rules sorted by priority (ascending) |
| GET | /sourcing-rules/{rule_id} | Get rule with full condition/strategy detail |
| PATCH | /sourcing-rules/{rule_id} | Update rule parameters |
| DELETE | /sourcing-rules/{rule_id} | Delete rule permanently |
| POST | /sourcing-rules/{rule_id}/toggle | Enable / disable rule without deleting |
| POST | /sourcing-rules/evaluate | Manually run sourcing engine for a test order |
11 Search API /search
| Method | Path | Description |
|---|---|---|
| POST | /search/orders | Full-text order search with filters and pagination |
| GET | /search/orders | GET-style order search (query params) |
| POST | /search/products | Full-text product catalog search |
Supports: fuzzy matching, multi-field search, date/amount range filters, pagination, sort order. Results include a score field for relevance ranking.
Search Request Example
{
"query": "John Doe",
"status": "DELIVERED",
"from_date": "2024-01-01",
"to_date": "2024-12-31",
"min_amount": 50,
"page": 1,
"page_size": 20,
"sort_by": "created_at",
"sort_order": "desc"
}
12 Analytics API /analytics
| Method | Path | Description |
|---|---|---|
| GET | /analytics/dashboard | Full KPI dashboard: revenue, order counts, breakdowns, top nodes, alerts |
| GET | /analytics/orders/volume | Daily order volume and revenue over N days (default 30) |
| GET | /analytics/inventory/summary | Aggregate inventory health: total SKUs, on-hand, available, low stock count |
Dashboard Response Shape
{
"period_start": "2024-01-01",
"period_end": "2024-12-31",
"total_orders": 1247,
"total_revenue": 184293.50,
"avg_order_value": 147.79,
"orders_by_status": { "DELIVERED": 891, "SHIPPED": 124, ... },
"orders_by_channel": [
{ "channel": "WEB", "count": 623, "total_revenue": 98241.00, "percentage": 49.9 }
],
"top_nodes": [
{ "node_id": "...", "node_name": "DC-EAST", "total_allocations": 412, "capacity_utilization": 0.61 }
],
"inventory_alerts": [
{ "sku": "SKU-GADGET-X", "node": "STR-MIA-01", "available": 2, "reorder_point": 10 }
]
}
13 Webhooks API /webhooks
| Method | Path | Description |
|---|---|---|
| POST | /webhooks/endpoints | Register webhook endpoint with secret + event types |
| GET | /webhooks/endpoints | List all registered endpoints |
| GET | /webhooks/endpoints/{id} | Get endpoint details |
| PATCH | /webhooks/endpoints/{id} | Update URL, secret, or subscribed events |
| DELETE | /webhooks/endpoints/{id} | Delete endpoint permanently |
| POST | /webhooks/endpoints/{id}/test | Send a test order.test event |
| GET | /webhooks/events | List all delivery events with status |
| POST | /webhooks/events/{id}/retry | Manually retry a failed delivery |
14 Connectors API /connectors
Superadmin-only CRUD for managing platform integrations. The inbound webhook receiver is public (authenticated via HMAC) — all other endpoints require a superadmin JWT.
| Method | Path | Auth | Description |
|---|---|---|---|
| POST | /connectors/ | Superadmin | Create a new connector |
| GET | /connectors/ | Superadmin | List connectors (filter by status) |
| GET | /connectors/{id} | Superadmin | Get single connector with masked config |
| PATCH | /connectors/{id} | Superadmin | Update name, direction, or config |
| DELETE | /connectors/{id} | Superadmin | Delete connector and all events |
| POST | /connectors/{id}/toggle | Superadmin | Enable / disable connector |
| POST | /connectors/{id}/test | Superadmin | Test live API connection to the platform |
| GET | /connectors/{id}/events | Superadmin | Paginated inbound / outbound event log |
| POST | /connectors/generate-secret | Superadmin | Generate a cryptographically secure webhook secret |
| POST | /connectors/{id}/webhook | Public | HMAC-validated inbound webhook receiver |
Create Connector Example
curl -X POST http://localhost:8001/connectors/ \
-H "Authorization: Bearer {superadmin_token}" \
-H "Content-Type: application/json" \
-d '{
"name": "My Shopify Store",
"connector_type": "SHOPIFY",
"direction": "BIDIRECTIONAL",
"config": {
"shop_url": "mystore.myshopify.com",
"access_token": "shpat_xxxxxxxxxxxx",
"webhook_secret": "my-hmac-secret",
"api_version": "2024-01"
}
}'
Note: Sensitive config keys (access_token, webhook_secret, api_key, etc.) are always masked as *** in all API responses.
15 AI Architect /architect
The AI Architect is a superadmin-only control center that closes the loop between order data and sourcing intelligence. It is the place where the system observes its own performance, proposes improvements, and waits for a human to say yes before changing anything.
Access: Navigate to AI Architect in the sidebar (visible to superadmin users only). The page has four tabs: Proposals, Patterns, Experiments, and Node Performance.
What does the AI Architect do?
Normally, sourcing rules are manually written by admins — you define conditions, pick a strategy, and the engine applies it. The AI Architect adds a self-improving layer on top:
- It watches every sourcing decision — every order allocation writes a record to MongoDB (
sourcing_outcomes) capturing which node was chosen, at what cost, via which strategy. - It labels outcomes once the order is delivered — after delivery, the learning worker scores the decision: was it fast? was the cost accurate? did the item backorder or get returned?
- It discovers patterns nightly — orders are grouped by channel, region, order-size bucket, and fulfillment type. Within each group, it ranks nodes by their historical outcome scores and compares AI-sourced orders against rule-based ones.
- It proposes improvements — when a pattern has enough data and AI consistently outperforms the baseline by ≥10%, the system drafts a new sourcing rule proposal and queues it for your review.
- You approve or reject it — nothing changes in the system until you click Approve → Apply. Every action is reversible.
Tab 1 — Proposals
Proposals are the core output of the learning loop. They represent changes the AI wants to make, held in a queue until a human reviews them.
Where do proposals come from?
- Pattern discovery (nightly) — when the
discover_patternsCelery task finds a cluster whereAI_ADAPTIVEachieves ≥10% better outcome scores thanDISTANCE_OPTIMALacross ≥50 labeled orders, it auto-creates a pending sourcing rule proposal. - Experiment completion (daily) — when an A/B experiment accumulates ≥50 labeled orders per arm and declares a winner, it creates a proposal to promote the winning strategy as a permanent rule.
Proposal lifecycle — step by step
Reading a proposal
Each proposal card shows:
- Title — plain-language summary, e.g. "Use AI_ADAPTIVE for WEB|NY|100-250|SHIP_TO_HOME cluster (+14.2% better outcomes)"
- Confidence score — 0–1 bar combining improvement magnitude and sample size. A score of 0.8+ means strong evidence; below 0.5 means the data is suggestive but not conclusive.
- Rationale — the numbers behind the proposal: how many orders, what average outcome scores, which thresholds were met.
- Proposal data — the exact JSON that will be written to the database if you click Apply. For a sourcing rule proposal this includes: name, strategy, priority, and the auto-generated conditions derived from the cluster's dimensions.
- Status badge — PENDING (yellow), APPROVED (blue), APPLIED (green), REJECTED (red), ROLLED_BACK (gray).
Safety guarantee — applied proposals are additive only:
A sourcing rule proposal inserts a new row into sourcing_rules with is_active = False. It does not activate itself. After applying, you still need to go to the Sourcing Rules page and manually toggle it active — giving you a final review point before live traffic is affected.
Filtering proposals
Use the status filter tabs (All / Pending / Approved / Applied / Rejected) at the top of the Proposals tab to focus on what needs action. "Pending" is the most important queue — these are waiting for your decision.
Tab 2 — Patterns
The Patterns tab shows you what the system has learned from historical orders, grouped by order characteristics. This is the raw intelligence that proposals are built from.
What is a pattern cluster?
A cluster is a combination of four dimensions that describes a type of order:
| Dimension | Example values | Where it comes from |
|---|---|---|
| Channel | WEB, MOBILE, POS, MARKETPLACE | order.channel |
| Region | NY, CA, TX, UNKNOWN | order.shipping_state |
| Amount bucket | 0-50, 50-100, 100-250, 250-500, 500+ | bucketed from order.total_amount |
| Fulfillment type | SHIP_TO_HOME, STORE_PICKUP, SAME_DAY_DELIVERY | order.fulfillment_type |
Example cluster key: WEB|NY|100-250|SHIP_TO_HOME — web orders shipped to New York customers, order value $100–$250.
Reading a pattern card
- Sample count — total number of labeled (delivered) orders in this cluster. More samples = more reliable data. Below 50 samples, treat it as indicative only.
- Node performance table — nodes ranked by
avg_outcome_scorefor this cluster. Shows average delivery hours, average cost, and how many orders they've handled. - Best node — highlighted at the top. This is the node the system would prefer for AI_ADAPTIVE sourcing within this cluster.
Patterns are updated nightly. If a cluster has fewer than 10 samples, the AI sourcing advisor will not use it (fallback to DISTANCE_OPTIMAL instead).
What to look for
- Clusters with high sample counts (100+) and a clear top node are good candidates for a dedicated sourcing rule.
- If the top node's avg_delivery_hours is significantly lower than others, AI_ADAPTIVE will reliably pick it.
- Clusters where multiple nodes have similar scores indicate the order type is not sensitive to node choice — no specific rule needed.
Tab 3 — Experiments
Experiments let you A/B test two sourcing strategies on live traffic before committing to one. A percentage of orders are routed to the "treatment" strategy; the rest continue with the "control". The system measures outcomes on both arms and declares a winner when the data is conclusive.
Creating an experiment
Click New Experiment and fill in:
| Field | Description | Example |
|---|---|---|
| Name | Human-readable label for this test | AI Sourcing Test — West Coast |
| Strategy A (control) | The current / baseline strategy | DISTANCE_OPTIMAL |
| Strategy B (treatment) | The strategy you're testing | AI_ADAPTIVE |
| Traffic split % | Percentage of matching orders routed to strategy B. Range: 1–50%. Start conservatively (10%). | 10 |
| Filter conditions (optional) | Restrict the experiment to specific order types. Available keys: channel, fulfillment_type, region, amount_min, amount_max. Leave empty to apply to all orders. | {"channel": "WEB", "region": "CA"} |
Tip — start small: Begin with 10% traffic split on a specific channel or region. Once the experiment accumulates enough samples and you see a clear winner, you can promote it to a sourcing rule. You don't need 50/50 splits — even 10% treatment vs 90% control gives you a valid comparison.
How traffic splitting works
When an order is sourced, the engine checks all running experiments whose filter conditions match the order. For the first matching experiment, it uses a random draw:
# In sourcing_engine._check_experiment():
if random.random() * 100 < exp.traffic_split_pct:
strategy = exp.strategy_b # treatment arm (e.g. AI_ADAPTIVE)
else:
strategy = exp.strategy_a # control arm (e.g. DISTANCE_OPTIMAL)
The experiment ID is stamped on the sourcing_outcomes MongoDB document so the learning worker can attribute the outcome to the correct arm.
Reading experiment results
The results panel shows per-arm data once labeled outcomes are available (orders must reach DELIVERED status first):
- Total orders — how many orders were routed to each arm (labeled + unlabeled).
- Labeled orders — orders that have reached DELIVERED and have an
outcome_scorecomputed. - Avg outcome score — the primary metric. Higher is better. A difference of 0.05 or more (on a 0–1 scale) triggers automatic winner declaration.
- Avg delivery hours — secondary indicator. Lower is better.
- Backorder rate % — what % of orders in this arm had an item backordered.
When does an experiment end?
The evaluate_ai_experiments Celery task runs daily at 03:00 UTC and checks every running experiment. It declares a winner when all three conditions are met:
- Each arm has ≥ 50 labeled outcomes
- The absolute score difference between arms is ≥ 0.05
- (If the difference is smaller, the experiment continues collecting data)
When a winner is declared, the experiment status changes to COMPLETED and a new sourcing_experiment proposal is created, suggesting you promote the winning strategy as a permanent sourcing rule.
Managing experiments
- Pause — stops new orders from being routed to this experiment. Existing labeled outcomes are preserved. Useful if you see an unexpected issue with the treatment arm.
- Resume — re-activates a paused experiment from where it left off.
Tab 4 — Node Performance
Shows rolling performance metrics for every fulfillment node, computed by the update_node_performance task every 4 hours.
- 7-day view — reflects recent changes in node behavior. Use this to spot a node that has degraded in the last week (rising backorder rate, slower delivery).
- 30-day view — a more stable baseline. Use this to compare nodes over a meaningful time horizon.
Each node card shows: orders fulfilled, avg outcome score, avg delivery hours, avg cost, backorder rate %, and return rate %. Nodes are ranked by avg outcome score descending. A node consistently in the top 3 by outcome score is a strong candidate to be the "best node" for matching order clusters.
Setting up AI sourcing on your orders
Follow these steps to enable AI-powered sourcing for a subset of your orders:
Option A — Direct rule (if you already trust the data)
- Go to Sourcing Rules in the sidebar.
- Create a new rule. Set Strategy to
AI_ADAPTIVE. - Add conditions to restrict which orders this rule applies to — e.g.
channel = WEBandshipping_state = CA. - Set a priority number lower than your other rules (lower number = higher priority, e.g. 10).
- Toggle the rule Active.
- From this point, matching orders will be scored by KubeAI using historical pattern data. If pattern data is insufficient, it automatically falls back to DISTANCE_OPTIMAL.
Option B — A/B experiment first (recommended for production)
- Go to AI Architect → Experiments tab.
- Create an experiment: Strategy A =
DISTANCE_OPTIMAL, Strategy B =AI_ADAPTIVE, traffic split = 10%. - Add filter conditions to target a specific order segment (e.g. a single channel or region).
- Wait until the experiment accumulates ≥50 labeled orders per arm (orders must be delivered). This typically takes 1–4 weeks depending on volume.
- When the experiment completes, review the auto-generated proposal in the Proposals tab.
- If the results are good, approve and apply the proposal. The new sourcing rule will be inserted (inactive); activate it in Sourcing Rules to go live.
Option C — Wait for automatic proposals
If you use AI_ADAPTIVE on some orders (via a rule), the learning pipeline will observe outcomes over time. Nightly pattern discovery will automatically create proposals when a specific order cluster shows ≥10% improvement. You just need to review the Proposals tab periodically.
Common workflows
"I want to know if AI sourcing is working better than my current setup"
- Go to AI Architect → Experiments, create an experiment comparing your current rule strategy against AI_ADAPTIVE.
- Monitor the Node Performance tab to ensure AI-sourced orders are going to high-performing nodes.
- After sufficient data is collected, check the experiment results. Compare avg_outcome_score, avg_delivery_hours, and backorder_rate between arms.
- If AI_ADAPTIVE wins, approve and apply the auto-generated proposal.
"I received a proposal — should I approve it?"
Look at three things:
- Confidence score: Above 0.7 — generally safe to approve. Below 0.5 — let more data accumulate first (reject and wait).
- Sample count in rationale: More than 100 total samples with 20+ AI samples is a solid basis. If the AI arm had only 10 orders, the improvement could be noise.
- Improvement %: A 10–15% improvement is meaningful. 50%+ suggests something unusual — verify that the cluster key is sensible and not covering a very small order type.
When you click Apply, the sourcing rule is created inactive. Review it in Sourcing Rules before activating it to see the exact conditions and confirm they look right.
"I applied a proposal but the new sourcing rule isn't performing as expected"
- Go to AI Architect → Proposals, find the applied proposal, and click Rollback. This deletes the inserted sourcing rule.
- The original routing behavior resumes immediately (no restart needed).
- Alternatively, go to Sourcing Rules and deactivate the rule rather than rolling back — this preserves it for future reference.
"How do I see what node was chosen for a specific order and why?"
Go to Orders → [order] → Sourcing Decision. The decision trail shows: strategy used, rule matched, every candidate node with its distance/cost/score, and — for AI-sourced orders — the AI score and KubeAI's one-sentence reasoning per node.
API Reference
All endpoints require superadmin JWT. Base path: /architect.
Proposals
| Method | Path | Description |
|---|---|---|
| GET | /architect/proposals | List proposals — filter by status and proposal_type. Params: limit (max 200), offset. |
| GET | /architect/proposals/{id} | Full proposal detail including rationale, proposal_data JSON, and rollback_data. |
| POST | /architect/proposals/{id}/approve | Transition PENDING → APPROVED. Records approver email. No DB changes yet. |
| POST | /architect/proposals/{id}/reject | Transition to REJECTED. Body: {"reason": "string"}. Can reject PENDING or APPROVED. |
| POST | /architect/proposals/{id}/apply | Execute the change (APPROVED only). Writes rollback_data before committing. Transitions to APPLIED. |
| POST | /architect/proposals/{id}/rollback | Undo an APPLIED proposal using stored rollback_data. Transitions to ROLLED_BACK. |
Patterns & Performance
| Method | Path | Description |
|---|---|---|
| GET | /architect/patterns | Discovered clusters from MongoDB. Params: channel filter, limit (max 100). Sorted by sample_count desc. |
| GET | /architect/node-performance | Rolling node metrics. Params: period_days=7 or 30, limit (max 200). |
| GET | /architect/ai-sourcing/performance | Per-strategy outcome aggregates + AI vs baseline improvement %. Reads from sourcing_outcomes. |
Experiments
| Method | Path | Description |
|---|---|---|
| GET | /architect/experiments | List experiments. Filter: status=running|paused|completed. |
| POST | /architect/experiments | Create experiment. Body: name, strategy_a, strategy_b, traffic_split_pct (1–50), filter_conditions. Starts in RUNNING status immediately. |
| POST | /architect/experiments/{id}/pause | Pause a running experiment. New orders stop being routed to it; existing outcomes are preserved. |
| POST | /architect/experiments/{id}/resume | Resume a paused experiment. |
| GET | /architect/experiments/{id}/results | Live per-arm aggregation from MongoDB: outcome scores, delivery hours, backorder rates, labeled vs total order counts. |
Strictly additive — the safety contract:
Applied proposals only ever INSERT new rows or ADD columns. They never UPDATE existing sourcing rules, never DELETE data, and never run DDL that could affect existing rows. Every apply stores rollback_data before committing, so any change can be undone in one click. Sourcing rules are always inserted with is_active=False — a final human gate before live traffic is affected.
16 Sourcing Rules Engine
File: app/services/sourcing_engine.py
The engine is the intelligence core. It runs every time an order needs to be sourced, evaluating prioritized rules to determine which fulfillment nodes should handle each SKU.
Processing Pipeline
Seven Sourcing Strategies
DISTANCE_OPTIMAL
score = distance_norm × 0.7 + inventory_norm × 0.3Picks the node closest to the customer's shipping address. Uses Haversine great-circle distance. Falls back to split if no single node can fulfill all items.
COST_OPTIMAL
score = cost_norm × cost_weight + distance_norm × distance_weightMinimizes total estimated shipping cost (base_rate + per_km × distance × node_multiplier). Weights configurable per rule (default 50/50).
STORE_NEAREST
score = distance_norm × 0.7 + inventory_norm × 0.3Identical to DISTANCE_OPTIMAL but pre-restricts nodes to RETAIL_STORE and DARK_STORE types. Used for same-day delivery.
INVENTORY_RESERVATION
score = inventory_norm × 0.8 + distance_norm × 0.2Prefers nodes with deepest available stock — reduces chance of a reservation failing downstream. Useful for high-velocity SKUs.
LEAST_COST_SPLIT
Greedy algorithm that assigns each SKU to the cheapest eligible node:
- Sort SKUs by fulfillability (hardest first = fewest nodes with stock)
- For each SKU, iterate nodes in score order
- Allocate as much as possible from each node until the full quantity is covered
- Enforces
max_split_nodeslimit
AI_ADAPTIVE
final_score = 0.6 × ai_score + 0.4 × rule_scoreUses KubeAI to score candidate nodes based on historical pattern clusters and rolling 7-day performance data. Automatically falls back to DISTANCE_OPTIMAL if: pattern sample count < 10, KubeAI API error, or max AI score < 0.4.
AI_HYBRID
final_score = 0.6 × ai_score + 0.4 × rule_scoreIdentical to AI_ADAPTIVE with the same blend formula. Intended for transitional rollouts where full AI trust is not yet established. Distinguishable in sourcing_outcomes by strategy_used field.
Condition Operators
| Operator | Example |
|---|---|
EQUALS | channel == WEB |
NOT_EQUALS | channel != MARKETPLACE |
GREATER_THAN | total_amount > 200 |
LESS_THAN | total_amount < 50 |
GREATER_THAN_OR_EQUAL | total_amount >= 100 |
LESS_THAN_OR_EQUAL | total_amount <= 500 |
IN | shipping_state IN [NY, NJ, CT] |
NOT_IN | channel NOT_IN [POS] |
CONTAINS | customer_email CONTAINS example.com |
STARTS_WITH | shipping_state STARTS_WITH N |
Haversine Distance Formula
Used for all distance-based sourcing calculations. Accuracy: ±0.5% vs actual road distance.
def haversine_km(lat1, lon1, lat2, lon2):
R = 6371.0 # Earth radius in km
φ1, φ2 = radians(lat1), radians(lat2)
Δφ = radians(lat2 - lat1)
Δλ = radians(lon2 - lon1)
a = sin(Δφ/2)**2 + cos(φ1)*cos(φ2)*sin(Δλ/2)**2
return R * 2 * asin(sqrt(a))
17 AI-Native Architecture — How It Works
The closed loop
The OMS has a continuous feedback loop between order execution and sourcing intelligence. Every sourcing decision is an experiment. Every delivery is a data point. The system accumulates evidence, identifies patterns, and proposes improvements — all without touching any existing configuration until a human approves.
Design principles
| Principle | What it means in practice |
|---|---|
| Additive-only | AI can only INSERT new rows or ADD nullable columns. It cannot UPDATE existing rules, DROP tables, or DELETE data. Every database write is audited. |
| Human-gated | Proposals sit in PENDING status indefinitely. Nothing is applied until a superadmin explicitly clicks Approve → Apply. The admin sees the full rationale and exact data to be written before confirming. |
| Fallback-safe | Every AI code path has a fallback. If KubeAI returns an error, times out, or gives low-confidence scores (max < 0.4), the sourcing engine silently falls back to DISTANCE_OPTIMAL. Orders are never held waiting for the AI. |
| Fully audited | Every sourcing decision writes a sourcing_outcomes document. Every proposal records who approved it, who applied it, and stores rollback_data before the change is committed. |
| Evidence-backed | Proposals are only generated when statistical thresholds are met: ≥50 total samples, ≥10 AI-sourced samples, ≥10% improvement. Confidence scores reflect both margin of improvement and sample size. |
Layer 1 — Intelligence data foundation
The entire learning system is built on one collection: sourcing_outcomes in the oms_ai_learning MongoDB database. Every time the sourcing engine assigns an order to a node, it writes a document capturing a snapshot of the decision. This database is separate from oms_events so that nightly aggregation queries do not contend with transactional order audit reads.
sourcing_outcomes document lifecycle
Outcome score formula
outcome_score = (
0.4 * delivery_score # 1.0 if ≤24h | 0.5 if ≤48h | 0.2 if ≤72h | 0.0 if >72h
+ 0.3 * cost_score # 1.0 if variance ≤5% | 0.7 if ≤15% | 0.3 if ≤25% | 0.0 if >25%
+ 0.2 * (1 - backordered) # 0.2 if no backorder, 0.0 if backordered
+ 0.1 * (1 - returned) # 0.1 if not returned, 0.0 if returned
)
A perfect sourcing decision (delivered in under 24 hours, cost within 5% of estimate, no backorder, no return) scores 1.0. A bad one (72+ hour delivery, 25%+ cost overrun, backorder, and return) scores 0.0.
Layer 2 — AI sourcing engine
When a sourcing rule uses AI_ADAPTIVE or AI_HYBRID strategy, the AISourcingAdvisor is invoked before the final node is chosen.
What KubeAI receives
The advisor builds a structured prompt containing:
- Order context — channel, fulfillment type, customer region, order amount, SKU count.
- Historical patterns — the top matching cluster(s) for this order type, with node rankings and sample counts. Up to 3 clusters are included (exact match first, then broader channel+fulfillment_type matches).
- Node performance — rolling 7-day metrics for each candidate node: orders fulfilled, avg outcome score, avg delivery hours, backorder rate.
- Candidates — each eligible node with its distance, estimated cost, and available inventory.
Score blending
# AI_ADAPTIVE: KubeAI score weighted 60%, rule-based score 40% final_score = 0.6 * ai_score + 0.4 * rule_score # AI_HYBRID: same formula, different label in sourcing_outcomes # (useful for distinguishing "AI primary" vs "transitional AI" in analytics)
Fallback triggers
| Condition | Fallback behaviour |
|---|---|
| Pattern sample count < 10 | Skip KubeAI entirely, use DISTANCE_OPTIMAL. Logged as fallback_reason: "Insufficient pattern data". |
| KubeAI API error or timeout (10s) | Skip KubeAI, use DISTANCE_OPTIMAL. Error logged with exception type. |
| Max AI score across all nodes < 0.4 | KubeAI scored all nodes poorly — use DISTANCE_OPTIMAL as more reliable. Logged with the max score. |
| KubeAI returns invalid JSON | Skip KubeAI, use DISTANCE_OPTIMAL. Logged with the raw response snippet. |
Fallback decisions are visible in the order's sourcing decision trail under rule_details.ai_sourcing.fallback_used and fallback_reason.
Layer 3 — Pattern discovery
The PatternDiscoveryService runs nightly (Celery beat, 02:00 UTC) via the discover_patterns task in the learning queue.
Step 1 — Aggregate patterns
MongoDB $group aggregation on sourcing_outcomes where outcome_score exists, grouped by (cluster_key, node_id). For each cluster, nodes are ranked by avg_outcome_score and upserted into sourcing_patterns.
Step 2 — Compare strategies
A second aggregation groups by (cluster_key, strategy_used) to get avg outcome scores per strategy per cluster. Qualifying clusters must meet all three thresholds:
- Total samples (AI + baseline) ≥ 50
- AI_ADAPTIVE samples ≥ 10
- AI improvement over DISTANCE_OPTIMAL ≥ 10%
Step 3 — Generate proposals
For each qualifying cluster, one pending AIProposal is created — but only if no active (PENDING or APPROVED) proposal already exists for that cluster key. This prevents duplicate proposals. The proposal includes auto-generated sourcing rule conditions derived from the cluster dimensions (channel, region, amount range, fulfillment type).
Layer 4 — A/B experiments
Experiments allow side-by-side comparison of any two sourcing strategies on real orders without committing to a permanent rule change. See §15 AI Architect for the full experiment usage guide.
Experiment evaluation (daily 03:00 UTC)
The evaluate_ai_experiments task checks all running experiments. Winner declaration requires:
- Each arm has ≥ 50 labeled outcomes (orders that have reached DELIVERED)
- Absolute outcome score difference between arms ≥ 0.05
If neither arm has a clear edge, the experiment continues until both thresholds are met or an admin manually stops it.
Layer 5 — Node performance metrics
The update_node_performance task runs every 4 hours. It aggregates sourcing_outcomes over the last 7 and 30 days per node, computing: orders fulfilled, avg outcome score, avg delivery hours, avg actual cost, backorder rate %, return rate %. These metrics feed into the AI sourcing prompt as the "NODE PERFORMANCE" context block.
MongoDB collections reference
All three AI learning collections live in oms_ai_learning (config key: MONGODB_AI_DB), separate from the oms_events transactional database.
| Collection | Database | TTL | Written by | Read by | Purpose |
|---|---|---|---|---|---|
sourcing_outcomes | oms_ai_learning | 90 days | sourcing worker (skeleton at allocation time), carrier worker (actual cost on SHIPPED, delivery hours on DELIVERED), learning worker (outcome_score hourly) | AISourcingAdvisor, pattern discovery, experiment evaluator, AI Architect UI | Per-allocation decision log and learning data. The primary raw material for the entire AI pipeline. |
sourcing_patterns | oms_ai_learning | 180 days | discover_patterns (nightly 02:00 UTC) | AISourcingAdvisor._fetch_context(), AI Architect Patterns tab | Aggregated node performance per order cluster. Stale clusters expire automatically; nightly run re-creates any cluster with recent activity. |
node_performance_metrics | oms_ai_learning | — | update_node_performance (every 4h) | AISourcingAdvisor._fetch_context(), AI Architect Node Performance tab | Rolling 7-day and 30-day node stats. One doc per (node_id, period_days); replaced in full on each run. |
PostgreSQL tables reference
| Table | Purpose |
|---|---|
ai_proposals | All AI-generated change proposals with full lifecycle tracking (status, approved_by, rollback_data) |
ai_experiments | A/B experiment definitions (strategy pair, traffic split, filter conditions, results) |
sourcing_outcome_labels | PostgreSQL mirror of labeled MongoDB outcomes for fast SQL analytics |
custom_attribute_definitions | Dynamic schema extensions (proposed by AI, stored here, rendered in forms via existing JSONB columns) |
ui_widgets | JSON-configured dashboard widgets (proposed by AI, rendered by DynamicWidget component) |
Celery learning tasks
| Task | Queue | Schedule | What it does |
|---|---|---|---|
label_sourcing_outcomes | learning | Every hour | Finds delivered orders with unlabeled outcomes, computes and writes outcome_score. Processes up to 500 docs per run. |
discover_patterns | learning | Nightly 02:00 UTC | Aggregates patterns, ranks nodes per cluster, generates proposals for qualifying clusters. |
update_node_performance | learning | Every 4 hours | Recomputes rolling 7-day and 30-day node metrics from labeled outcomes. |
evaluate_ai_experiments | learning | Daily 03:00 UTC | Checks running experiments, declares winners, generates sourcing_experiment proposals. |
18 Fulfillment Pipeline
Order Status State Machine
Fulfillment Types & Required Capabilities
| Type | Description | Required Capability |
|---|---|---|
| SHIP_TO_HOME | Standard home delivery | can_ship = true |
| STORE_PICKUP | Buy online, pick up in store (BOPIS) | can_pickup = true |
| SHIP_FROM_STORE | Retail store ships the order | can_ship = true |
| CURBSIDE_PICKUP | Drive-up curbside pickup | can_curbside = true |
| SAME_DAY_DELIVERY | Same-day home delivery | can_same_day = true |
19 Celery Workers
| Queue | Worker File | Tasks |
|---|---|---|
sourcing | workers/sourcing.py | source_order — runs the full sourcing engine; writes sourcing_outcomes to MongoDB |
fulfillment | workers/fulfillment.py | start_picking, complete_packing, reset_node_daily_counters |
carrier | workers/carrier.py | book_shipment, simulate_delivery, sync_all_tracking |
notifications | workers/notifications.py | send_order_confirmation, send_shipment_notification, send_delivery_notification, send_cancellation_notification |
webhooks | workers/webhooks.py | dispatch_webhook, retry_failed_webhooks, retry_webhook_event |
connectors | workers/connectors.py | sync_fulfillment_to_connector, sync_order_cancel_to_connector, poll_amazon_orders |
learning | workers/learning.py | label_sourcing_outcomes, discover_patterns, update_node_performance, evaluate_ai_experiments — low-priority AI learning loop |
Celery Beat Schedule
| Task | Schedule | Description |
|---|---|---|
reset_node_daily_counters | Daily 00:00 UTC | Reset current_daily_orders on all active nodes |
retry_failed_webhooks | Every 5 minutes | Retry FAILED webhook events that are due for retry |
sync_all_tracking | Every 15 minutes | Sync carrier tracking for all in-transit shipments |
retry_backordered_orders | Every 30 minutes | Re-run sourcing engine for orders stuck in backorder state |
poll_amazon_orders | Every 15 minutes | Poll all active Amazon SP-API connectors for new Unshipped/PartiallyShipped orders |
label_sourcing_outcomes | Every hour | Compute outcome_score for DELIVERED orders; write labels to MongoDB + PostgreSQL |
update_node_performance | Every 4 hours | Compute rolling 7d/30d node stats from labeled outcomes |
discover_patterns | Daily 02:00 UTC | Aggregate patterns, compare strategies, auto-generate pending AIProposal records |
evaluate_ai_experiments | Daily 03:00 UTC | Compute per-arm outcomes; declare winner when ≥50 samples per arm + score diff ≥0.05 |
Start Workers
celery -A app.workers.celery_app worker \ --loglevel=info \ -Q sourcing,fulfillment,carrier,notifications,webhooks,connectors,learning \ --concurrency=4
Flower monitoring UI: http://localhost:5555 — real-time task queue visualization, worker status, and task history.
21 Webhook System
HMAC-SHA256 Signing
Every outbound webhook request is cryptographically signed. Receivers can verify payload integrity:
POST https://your-endpoint.com/hooks HTTP/1.1 Content-Type: application/json X-OMS-Signature: sha256=abc123... X-OMS-Timestamp: 1708000000 X-OMS-Event: order.shipped
Verification (Python)
import hmac, hashlib
def verify_webhook(body: bytes, signature: str, secret: str) -> bool:
expected = "sha256=" + hmac.new(
secret.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
Supported Event Types
Retry Strategy (Exponential Backoff)
| Attempt | Delay | Status |
|---|---|---|
| 1st retry | 5 minutes | PENDING |
| 2nd retry | 10 minutes | PENDING |
| 3rd retry | 20 minutes | PENDING |
| 4+ retries | — | ABANDONED |
22 Connector System
The Connector System enables bidirectional integration between the OMS and external platforms — e-commerce engines, carriers, WMS, and TMS systems. Each connector has a configurable direction (inbound, outbound, or bidirectional) and handles its own authentication protocol.
Architecture
Shopify OMS (single URL per connector)
──────── ─────────────────────────────
orders/create ──┐
orders/paid ──┼──► POST /connectors/{id}/webhook
products/create──┤ │ HMAC-SHA256 validated
products/update──┘ │ routes by X-Shopify-Topic header
│
┌────────────┴────────────────────────┐
│ │
Order topics Product topics
│ │
▼ ▼
normalize_order() → OMS Order normalize_product() → InventoryItems
(channel=MARKETPLACE, (upsert per active node,
connector_id set, dedup check) seed stock from Shopify qty)
│
▼ (when order.status → SHIPPED)
Celery: sync_fulfillment_to_connector
│
▼
push_fulfillment() → POST /orders/{shopify_id}/fulfillments
│
▼
Shopify notifies buyer of tracking info
Supported Platforms
| Platform | Category | Direction | Status |
|---|---|---|---|
| Shopify | E-commerce | Bidirectional | Live |
| WooCommerce | E-commerce | Bidirectional | Planned |
| Amazon SP-API | E-commerce | Bidirectional (polling) | Live |
| Magento 2 | E-commerce | Bidirectional | Planned |
| BigCommerce | E-commerce | Bidirectional | Planned |
| FedEx | Carrier | Outbound | Planned |
| UPS | Carrier | Outbound | Planned |
| DHL | Carrier | Outbound | Planned |
| Custom | Generic | Inbound | Framework ready |
Shopify Setup
1. In the OMS Admin Console → Connectors, click Add Connector and select Shopify.
2. Enter your Shopify store URL, Admin API access token, and generate a webhook secret.
3. Copy the webhook URL displayed in the connector card:
https://your-oms.example.com/connectors/{connector_id}/webhook
4. In Shopify Admin → Settings → Notifications → Webhooks, register the same URL for each topic you want to receive. All topics share one URL and one signing secret:
Order topics → import as OMS orders. Product topics → upsert InventoryItem records across all active fulfillment nodes.
5. Enable the connector (toggle to Active). Orders and product catalog changes from Shopify will automatically sync into the OMS.
Amazon SP-API Setup
Amazon uses polling, not webhooks. The OMS polls the SP-API every 15 minutes automatically via a Celery Beat scheduled task — no webhook registration is required.
1. In Seller Central, navigate to Apps & Services → Develop Apps. Create a new app to obtain your client_id and client_secret.
2. Generate a refresh_token by completing the OAuth authorization flow for your selling account.
3. In the OMS Admin Console → Connectors, click Add Connector and select Amazon SP-API. Provide the following fields:
| Config Field | Description |
|---|---|
client_id | LWA (Login with Amazon) client ID from Seller Central app |
client_secret | LWA client secret from Seller Central app |
refresh_token | OAuth refresh token authorizing OMS to act on behalf of your seller account |
marketplace_id | SP-API marketplace ID (e.g. ATVPDKIKX0DER for US) |
seller_id | Your Amazon seller / merchant ID |
region | API region endpoint: na, eu, or fe |
4. Click Test Connection. The OMS calls GET /sellers/v1/marketplaceParticipations to verify credentials and retrieve marketplace details.
5. Enable the connector. The Celery Beat schedule (poll_amazon_orders) will automatically pull new orders every 15 minutes and import them as OMS orders with channel=MARKETPLACE.
No webhook registration needed for Amazon. All inbound order data flows through the polling task. The OMS deduplicates by external_order_id (Amazon order ID) so repeated polls are safe.
Multi-Topic Support (Same URL)
A single connector webhook URL handles every event topic Shopify sends to it. The OMS routes by the X-Shopify-Topic header after HMAC validation:
| Topic | Handler | OMS Action |
|---|---|---|
orders/create | normalize_order() | Create OMS Order (dedup by external_order_id) |
orders/paid | normalize_order() | Create OMS Order (alternate payment trigger) |
products/create | normalize_product() | Insert InventoryItem rows for each variant × active node |
products/update | normalize_product() | Update product_name / price / active flag; preserve stock levels |
| any other topic | — | Accepted (200 OK) and logged; payload ignored |
Note — Amazon SP-API: Amazon does not use webhooks. Instead, the OMS polls the SP-API every 15 minutes via a Celery Beat scheduled task to pull new orders. See the Amazon SP-API Setup section below for configuration details.
Inventory seeding: When a new SKU arrives via products/create, the OMS creates one InventoryItem per active fulfillment node seeded with Shopify's inventory_quantity. On subsequent products/update events, only metadata (name, price, active flag) is updated — stock quantities are left untouched because the OMS is the system of record for inventory after initial import.
Inbound Webhook Security
Connector webhook endpoints (POST /connectors/{id}/webhook) are exempt from JWT authentication and instead validated using platform-native HMAC signatures.
# Shopify HMAC validation (inside ShopifyConnector)
import hmac, hashlib, base64
def validate_webhook(self, headers: dict, raw_body: bytes) -> bool:
hmac_header = headers.get("x-shopify-hmac-sha256", "")
secret = self.config.get("webhook_secret", "")
digest = base64.b64encode(
hmac.new(secret.encode(), raw_body, hashlib.sha256).digest()
).decode()
return hmac.compare_digest(digest, hmac_header)
Outbound Fulfillment Sync
When an OMS order (sourced from a connector) transitions to SHIPPED status, a Celery task is automatically enqueued on the connectors queue to push the tracking update back to the originating platform.
# Triggered automatically from PATCH /orders/{id}/status
if payload.status == OrderStatus.SHIPPED and order.connector_id:
background_tasks.add_task(_trigger_connector_sync, str(order.id))
Outbound Sync — OMS → Platform
When the OMS changes an order's state or adjusts inventory, it pushes the update back to the originating platform. Each push is handled by a dedicated Celery task on the connectors queue.
Shopify
| Trigger | Celery Task | API Call | Effect |
|---|---|---|---|
OMS order → SHIPPED |
sync_fulfillment |
POST /orders/{id}/fulfillments.json |
Shopify marks the order fulfilled and emails tracking info to the buyer |
| Inventory adjusted in OMS | push_inventory_to_connectors |
POST /inventory_levels/set.json |
Syncs on-hand quantity for the SKU at the given Shopify location. Requires primary_location_id (fetched during Test Connection) and shopify_inventory_item_id (populated when the products/create webhook is received) |
OMS order → CANCELLED (sourced from Shopify) |
sync_order_cancel |
POST /orders/{id}/cancel.json |
Cancels the corresponding order in Shopify |
Amazon SP-API
| Trigger | API Call | Effect |
|---|---|---|
OMS order → SHIPPED |
POST /orders/v0/orders/{amazonOrderId}/shipment |
Confirms shipment to Amazon with carrier code and tracking number; Amazon notifies the buyer |
| Inventory adjusted in OMS | PATCH /listings/2021-08-01/items/{sellerId}/{sku} (Listings Items API) |
Updates the fulfillment_availability attribute for the listing to reflect current on-hand quantity |
OMS order → CANCELLED |
— | Not supported via SP-API for seller-initiated cancels. Buyers must cancel through Amazon. The OMS records the cancellation locally and logs a connector_events entry, but no API call is made. |
Extending with New Connectors
All connectors inherit from BaseConnector in app/services/connectors/base.py. Four methods are abstract (required); two are optional overrides for product/catalog sync:
class BaseConnector(ABC):
# ── Required (abstract) ──────────────────────────────────────
def validate_webhook(self, headers, raw_body) -> bool: ...
def get_event_type(self, headers) -> str: ...
def normalize_order(self, payload) -> dict: ... # → OMS OrderCreate dict
async def push_fulfillment(self, order, shipment) -> dict: ...
async def test_connection(self) -> dict: ...
# ── Optional (default: empty set / empty list) ────────────────
def get_inbound_topics(self) -> set[str]: ... # order-creation topics
def get_product_topics(self) -> set[str]: ... # product-sync topics
def normalize_product(self, payload) -> list[dict]: ... # → variant dicts
# ── Outbound sync (default: no-op) ───────────────────────────
async def push_inventory_update(self, sku, quantity_available, mapping) -> dict: ...
async def push_order_cancel(self, order) -> dict: ...
| Method | Signature | Description |
|---|---|---|
validate_webhook | (headers, raw_body) → bool | Validate platform HMAC/signature on inbound webhook |
get_event_type | (headers) → str | Extract topic/event type from request headers |
normalize_order | (payload) → dict | Transform platform order payload into OMS OrderCreate dict |
push_fulfillment | (order, shipment) → dict | Push tracking/fulfillment confirmation to platform |
test_connection | () → dict | Verify credentials and return platform metadata |
push_inventory_update | (sku, quantity_available, mapping) → dict | Push current stock level for a SKU to the platform. mapping is the connector_inventory_mappings row for the SKU. |
push_order_cancel | (order) → dict | Push cancellation to the platform for an OMS order that was cancelled. No-op by default; Amazon override is a local-only log. |
Register the new class in app/services/connectors/registry.py to make it available.
Event Audit Log
Every inbound and outbound connector activity is recorded in the connector_events table:
| event_type | direction | Description |
|---|---|---|
orders/create | inbound | Order received from platform and imported into OMS |
orders/paid | inbound | Payment confirmed — alternative trigger for order import |
products/create | inbound | Product created in Shopify — variants upserted as InventoryItems in all active nodes |
products/update | inbound | Product updated — metadata refreshed; stock levels preserved |
fulfillment.pushed | outbound | Tracking/fulfillment update sent to platform |
webhook.error | inbound | Signature validation or parsing failed |
23 Monitoring & Traceability
The OMS has a built-in error monitoring console accessible at Admin → Monitoring. Every unhandled exception across all services — FastAPI routes, Celery workers, background tasks — is automatically captured, fingerprinted, and stored in MongoDB for analysis.
Error Capture Pipeline
Exception raised (anywhere)
│
▼
capture_error() / capture_error_sync() ← app/services/monitoring.py
│
├─► error_events (MongoDB) — one document per occurrence, 30-day TTL
│ event_id, fingerprint, timestamp, level, source_service,
│ error_type, error_message, stack_frames,
│ request_context, task_context, order_context
│
└─► error_issues (MongoDB) — one document per unique fingerprint
fingerprint, status (open/resolved/muted),
occurrence_count, first_seen_at, last_seen_at
Source Services
| Source | Constant | Where instrumented |
|---|---|---|
api | SOURCE_API | Global FastAPI exception handler + all unhandled route errors |
sourcing_worker | SOURCE_SOURCING | Celery source_order task |
fulfillment_worker | SOURCE_FULFILLMENT | Celery start_picking and complete_packing |
carrier_worker | SOURCE_CARRIER | Celery book_shipment |
webhook_worker | SOURCE_WEBHOOK | Celery dispatch_webhook |
connector_worker | SOURCE_CONNECTOR | Celery sync_fulfillment_to_connector + inbound background tasks |
Monitoring API /monitoring
All endpoints require superadmin authentication.
| Method | Path | Description |
|---|---|---|
| GET | /monitoring/summary | Quick stats: open issues, errors/1h, errors/24h, warnings/24h |
| GET | /monitoring/events | Paginated raw event list — filter by time, level, source, error_type, order_id |
| GET | /monitoring/events/{event_id} | Single event with full stack trace |
| GET | /monitoring/issues | Aggregated issues — filter by status, source, level |
| GET | /monitoring/issues/{fingerprint} | Single issue with 10 most recent occurrences |
| PATCH | /monitoring/issues/{fingerprint} | Update status: open, resolved, muted (+ mute_hours) |
| GET | /monitoring/metrics/rate | Error rate over time bucketed by hour |
| GET | /monitoring/metrics/top | Top N issues by occurrence count |
| GET | /monitoring/metrics/sources | Error breakdown by source service |
| POST | /monitoring/test-error | Inject a synthetic test error to verify the capture pipeline |
Fingerprinting
Each error is assigned a stable 16-character fingerprint so that repeated occurrences of the same error (e.g. a recurring database connection timeout) are grouped into a single issue rather than creating noise:
fingerprint = SHA256(f"{source_service}:{error_type}:{top_stack_frame}")[:16]
The top stack frame is the deepest frame inside application code (excluding library internals), extracted from the Python traceback.
Issue Lifecycle
| Status | Meaning | Transitions |
|---|---|---|
open | Active — being investigated | → resolved, → muted |
resolved | Fixed — will reopen if the error recurs | → open (auto on new occurrence) |
muted | Suppressed for N hours | → open (after muted_until expires) |
Verifying the Pipeline
curl -X POST http://localhost:8001/monitoring/test-error \
-H "Authorization: Bearer {superadmin_token}" \
-H "Content-Type: application/json" \
-d '{"message": "Smoke test", "source": "api", "level": "ERROR"}'
After calling this endpoint, navigate to Monitoring → Issues or Monitoring → Events in the UI to confirm the entry appears.
24 Configuration
All configuration is managed via environment variables. For local development, create a .env file in the project root.
| Variable | Default | Description |
|---|---|---|
| Data Layer | ||
DATABASE_URL | postgresql+asyncpg://... | Async PostgreSQL URL — tenant data-plane database (oms_db on main pod, oms_{org}_{env} on tenant pods) |
SYNC_DATABASE_URL | postgresql+psycopg2://... | Sync PostgreSQL URL for Celery workers (same DB as DATABASE_URL) |
CONTROL_DATABASE_URL | blank (= DATABASE_URL) | Multi-tenant only. Points every pod at the shared oms_db control-plane database. Set on tenant pods: postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_db. Blank on the main pod (falls back to DATABASE_URL). |
MONGODB_URL | mongodb://... | MongoDB connection string |
MONGODB_DB | oms_events | Events MongoDB database name (overridden per-environment to oms_events_{org}_{env}) |
MONGODB_AI_DB | oms_ai_learning | AI learning MongoDB database name (overridden per-environment) |
REDIS_URL | redis://:pass@.../0 | Redis URL for application cache and environment resolution (DB 0) |
CELERY_BROKER_URL | redis://:pass@.../1 | Redis DB 1 for Celery broker |
CELERY_RESULT_BACKEND | redis://:pass@.../2 | Redis DB 2 for task results |
ELASTICSEARCH_URL | http://localhost:9200 | Elasticsearch connection URL |
| Multi-Tenant Identity | ||
TENANT_SLUG | unset on main pod | Tenant pods only. Set to the org slug, e.g. abc. Used in docker-compose labels and container naming. Has no runtime effect — informational only. |
ENVIRONMENT | unset on main pod | Tenant pods only. Set to the env slug, e.g. dev. Informational only. |
PLAN_TIER | unset | Subscription tier for rate limiting: STARTER · GROWTH · ENTERPRISE. Not yet enforced. |
FRONTEND_URL | http://localhost:3001 | Origin URL of the frontend. Used in CORS allowed origins and redirect URLs. |
ALLOWED_ORIGINS | http://localhost:3001 | Comma-separated CORS origins allowed to call this API pod. Include the main frontend URL and the tenant pod URL if different. |
| Security & App Settings | ||
SECRET_KEY | — | JWT signing key. Use a random 32-byte hex string in production. |
WEBHOOK_SECRET | — | Default HMAC signing secret for outbound webhooks |
WEBHOOK_TIMEOUT_SECONDS | 10 | HTTP timeout per webhook delivery attempt |
WEBHOOK_MAX_RETRIES | 3 | Max retry attempts before ABANDONED |
DEFAULT_SOURCING_STRATEGY | DISTANCE_OPTIMAL | Fallback strategy when no rule matches |
MAX_SPLIT_NODES | 3 | Global max nodes for split fulfillment |
ANTHROPIC_API_KEY | — | Required for AI_ADAPTIVE / AI_HYBRID strategies and AI Architect proposals |
25 Running the System
Prerequisites: Docker Desktop must be running. Python 3.12+ is needed only for local dev/testing outside Docker.
Start All Services
cd D:\OMS docker compose up -d --build
Services started after this command:
| Container | Port | Description |
|---|---|---|
oms_postgres | 5432 | PostgreSQL 16 primary database |
oms_mongodb | 27017 | MongoDB 7 document store |
oms_redis | 6379 | Redis 7.2 cache + broker |
oms_elasticsearch | 9200 | Elasticsearch 8.12 |
oms_api | 8001 | FastAPI application |
oms_celery_worker | — | All 7 queue workers |
oms_celery_beat | — | Scheduled task runner |
oms_flower | 5555 | Celery monitoring UI |
oms_frontend | 3001 | React GUI (this app) |
Seed All Databases
docker compose exec api python scripts/seed.py
Seeds the following data:
- PostgreSQL: 8 fulfillment nodes, 64 inventory items (8 SKUs × 8 nodes), 5 sourcing rules, 1 webhook endpoint
- MongoDB: 8 product catalog documents, 3 sample order events
- Redis: version, stats, and cache warmup keys
- Elasticsearch: 8 product documents, 3 sample order documents
Service URLs
| Service | URL |
|---|---|
| Frontend GUI | http://localhost:3001 |
| FastAPI (Swagger UI) | http://localhost:8001/docs |
| FastAPI (ReDoc) | http://localhost:8001/redoc |
| OpenAPI JSON | http://localhost:8001/openapi.json |
| Health Check | http://localhost:8001/health |
| Flower (Celery Monitor) | http://localhost:5555 |
Run Tests
PYTHONPATH=D:\OMS python -m pytest tests/test_imports.py -v
26 End-to-End Order Flow
Step 1 — Create an Order
curl -X POST http://localhost:8001/orders/ \
-H "Content-Type: application/json" \
-d '{
"channel": "WEB",
"fulfillment_type": "SHIP_TO_HOME",
"customer_email": "customer@example.com",
"customer_name": "John Doe",
"line_items": [
{"sku": "SKU-WIDGET-A", "product_name": "Premium Widget A",
"quantity": 2, "unit_price": 29.99},
{"sku": "SKU-GADGET-X", "product_name": "Gadget X Pro",
"quantity": 1, "unit_price": 99.99}
],
"shipping_address": {
"address1": "456 Park Ave", "city": "New York", "state": "NY",
"postal_code": "10022", "latitude": 40.7614, "longitude": -73.9776
}
}'
Response (HTTP 201): Returns the new order with "status": "PENDING" and a generated order_number like ORD-20240215-XK9M2A.
Step 2 — Sourcing Fires Automatically (~2 seconds)
- Celery
sourcingworker picks up thesource_ordertask - Loads all active sourcing rules sorted by priority
- Evaluates the catch-all "Default — Distance Optimal" rule
- Loads all ACTIVE nodes with inventory for each requested SKU
- Computes haversine distance from customer coordinates to each node
- Scores nodes —
STR-NYC-01at ~1.8km wins for a NYC customer - Creates
FulfillmentAllocationrows (one per SKU) - Reserves inventory:
quantity_available -= quantity_allocated - Order transitions to
SOURCED
Step 3 — Fulfillment Pipeline (~5–10 seconds)
- Picking (t+2s): Allocations →
PICKING, order →PICKING - Packing (t+7s): Allocations →
PACKED, order →PACKING→READY_TO_SHIP
Step 4 — Carrier Booking
- Selects a random carrier (UPS, FedEx, USPS, DHL)
- Generates a mock tracking number
- Creates a
Shipmentrecord with estimated delivery date - Order →
SHIPPED, allocations →SHIPPED - Fires
order.shippednotification + webhook
Step 5 — Delivery Simulation (~10 seconds later)
Adds tracking events: IN_TRANSIT → OUT_FOR_DELIVERY → DELIVERED. Order → DELIVERED. Fires order.delivered webhook.
Step 6 — Verify via Search
curl -X POST http://localhost:8001/search/orders \
-H "Content-Type: application/json" \
-d '{"query": "John Doe", "status": "DELIVERED"}'
Step 7 — Check Audit Trail
curl http://localhost:8001/orders/{order_id}/events
Returns chronological MongoDB events: order.created → order.sourced → order.shipped → order.delivered
Step 8 — Analytics
curl "http://localhost:8001/analytics/dashboard?from_date=2024-01-01"
27 Seed Data Reference
Fulfillment Nodes (8 total)
Sourcing Rules (5 active)
| Priority | Name | Strategy | Conditions |
|---|---|---|---|
| 10 | Same-Day — West Coast | STORE_NEAREST | fulfillment_type = SAME_DAY_DELIVERY AND state IN [CA,WA,OR] |
| 20 | BOPIS / Curbside | INVENTORY_RESERVATION | fulfillment_type IN [STORE_PICKUP, CURBSIDE_PICKUP] |
| 30 | High-Value Orders | COST_OPTIMAL | total_amount > 200 |
| 40 | Marketplace | LEAST_COST_SPLIT | channel = MARKETPLACE |
| 100 | Default (catch-all) | DISTANCE_OPTIMAL | No conditions — matches everything |
Product SKUs (8 SKUs × 8 nodes = 64 inventory records)
| SKU | Product Name | Price |
|---|---|---|
SKU-WIDGET-A | Premium Widget A | $29.99 |
SKU-WIDGET-B | Standard Widget B | $19.99 |
SKU-GADGET-X | Gadget X Pro | $99.99 |
SKU-GADGET-Y | Gadget Y Basic | $49.99 |
SKU-GIZMO-1 | Gizmo 1 | $14.99 |
SKU-GIZMO-2 | Gizmo 2 Deluxe | $39.99 |
SKU-TOOL-Z | Power Tool Z | $149.99 |
SKU-ACCESSORY-1 | Accessory Pack 1 | $9.99 |
28 Admin Console
The Admin Console (/admin) is the central hub for user management, permission configuration, and access control across the entire platform. It is accessible only to users with SUPERADMIN or PLATFORM_OWNER platform roles.
Navigation: Click Admin in the sidebar. The page is organized into four tabs: Users, Groups & Permissions, Access Control, and Access Matrix.
RBAC Model — Three-Layer Hierarchy
Access in the OMS is controlled by three layers that stack on top of each other:
Effective access: A user's final access to an environment is the highest applicable role across all three layers. A SUPERADMIN implicitly has full access to every environment. An ORG_OWNER has full access to every environment within that org even without an explicit env role.
Tab 1 — Users
Shows all users registered on the platform. Each row displays the username, email, platform role badge, group membership, account status (active / suspended), and the last login time.
Platform Role Management
Click any user row to open the User Access Drawer — a right-side panel showing the user's complete access picture across all organizations and environments. From the drawer you can:
- Change platform role — upgrade/downgrade between PLATFORM_OWNER, SUPERADMIN, USER.
- Grant org role — assign the user an organization-level role (ORG_OWNER / ORG_ADMIN / ORG_MEMBER) for any existing organization. This gives the user visibility into all environments within that org.
- Revoke org role — removes the user's access to all environments within that organization (unless they also have individual env roles).
- Grant env role — assign a specific role (OWNER / ADMIN / MEMBER / VIEWER) for a specific environment, regardless of org membership.
- Revoke env role — removes the user's individual access to that environment.
Creating a User with Org/Environment Access
To add a new user and grant them access to a specific organization and environment:
-
Create the user account
In the Users tab, click Create User. Fill in name, email, password, platform role (
USERfor standard access), and optionally assign a group. -
Open the User Access Drawer
Click the user row in the Users table. The drawer opens on the right side.
-
Grant an Org Role (optional)
In the Organization Access section, click Grant Org Role. Select the organization and role (
ORG_MEMBERfor read-only,ORG_ADMINto let them manage environments). This lets the user see the org in their environment switcher. -
Grant an Env Role
In the Environment Access section, click Grant Env Role. Select the specific environment and role (
ADMINto manage orders/inventory,MEMBERfor day-to-day operations,VIEWERfor read-only). The user can now switch to this environment using the environment switcher.
Tab 3 — Access Control
A higher-level view showing who has access to each organization and environment, organized by the tenant hierarchy rather than by user. Useful for auditing who can access what without going user-by-user.
The tab has two sub-tabs:
- Org View — shows each organization with a table of all users who have an org-level role. Use Add Member to directly grant an org role here. Use the trash icon to revoke it.
- Env View — shows each environment (grouped by org) with its member list. Use Add Member to grant an env role. The environment type badge (DEV/QA/STAGING/PROD) is color-coded for quick scanning.
Granting access directly from Access Control
-
Switch to "Org View" or "Env View"
Use the sub-tabs at the top of the Access Control tab.
-
Find the org or environment
Each card shows the org/env name and the current member list.
-
Click "Add Member"
A modal appears. Start typing a name or email to search for existing users. Select the user and pick a role using the radio buttons.
-
Confirm
Click Grant Access. The member list updates immediately. The user will see the environment in their switcher on next page load.
Tab 2 — Groups & Permissions
Permission groups let you define reusable sets of feature permissions that can be assigned to users. A permission is a feature:action string, e.g. orders:write, inventory:read, connectors:manage.
| Permission | What it grants |
|---|---|
orders:read | View orders, order detail, audit trail |
orders:write | Create orders, update status, cancel |
inventory:read | View inventory levels, products, nodes |
inventory:write | Adjust stock, transfer, update products |
sourcing:read | View sourcing rules and decisions |
sourcing:write | Create/update/delete sourcing rules |
connectors:manage | Create, configure, and toggle platform connectors |
webhooks:manage | Register and manage webhook endpoints |
analytics:read | Access analytics dashboard and reports |
admin:users | View and manage user accounts |
admin:roles | Grant and revoke org/env roles |
Tab 4 — Access Matrix
A grid view showing all users (rows) × all environments (columns). Each cell displays the user's effective role for that environment — considering both their org role and their direct env role. Empty cells mean the user has no access to that environment.
This view is ideal for a quick compliance audit: you can see at a glance whether any user has unexpectedly broad access (e.g. OWNER on the Production environment) and immediately open their drawer to adjust.
Multi-Tenant Platform Architecture
The OMS is a multi-tenant, multi-environment platform. A single infrastructure deployment serves multiple client organizations, each with fully isolated data environments — their own PostgreSQL databases, MongoDB databases, and Elasticsearch index namespaces. No data ever crosses organizational boundaries at the storage layer.
Key design principles
| Principle | What it means in practice |
|---|---|
| Database-per-environment | The strongest possible isolation. No shared tables, no row-level tenant filters. A SQL error in one environment cannot leak data from another. |
| Single control plane | oms_db is the one shared database for auth, user management, and environment metadata. All pods connect to it regardless of which tenant they serve. |
| Zero breaking changes | The existing oms_db data is registered as the default Production environment at first startup. No migration needed. JWT tokens without an X-OMS-Environment header continue to work. |
| Human-gated provisioning | Environments are created by an admin through the UI or API. The system provisions the database asynchronously and reports status. Nothing happens automatically without an explicit action. |
29a Role Hierarchy
Platform roles (global — stored on users.platform_role)
| Role | Who holds it | Capabilities |
|---|---|---|
| PLATFORM_OWNER | The operator of the OMS deployment | Create orgs & environments, assign any platform role, full admin, manage all users |
| SUPERADMIN | Client power users / support staff | Admin console, webhooks, connectors, monitoring, AI Architect — cannot create orgs/environments |
| USER | Regular operators | Access only to environments where they have been explicitly granted a role |
Organization roles (per org — user_organization_roles)
| Role | Capabilities within the org |
|---|---|
| ORG_OWNER | Manage all environments, invite/remove any org member, delete the org. Implicitly has OWNER access to every environment in the org. |
| ORG_ADMIN | Create & configure environments, manage members below admin level. Cannot delete the org or demote ORG_OWNER. |
| ORG_MEMBER | Read-only view of the org and its environments. Can switch to any environment they have an env role on. |
Environment roles (per environment — user_environment_roles)
| Role | Capabilities within the environment |
|---|---|
| OWNER | Full control: manage orders, inventory, sourcing rules, connectors, webhooks, and the environment member list. |
| ADMIN | Manage all business data. Cannot manage environment members or delete the environment. |
| MEMBER | Create and update records (orders, inventory adjustments, nodes). Cannot delete records or change configuration. |
| VIEWER | Read-only access to all environment data, including orders, inventory, analytics, and audit trail. |
Effective access: A user's final access to an environment is the highest applicable role across all layers. A SUPERADMIN implicitly has full access to every environment. An ORG_OWNER has full access to every environment within that org even without an explicit env role.
29b Creating an Organization — Step by Step
An organization is a logical tenant container. No database is provisioned at this point — it is a pure control-plane record. Creating one is instantaneous.
Via the UI (Platform Console)
-
Navigate to Platform Console
In the sidebar, click Platform. Visible only to
PLATFORM_OWNERandSUPERADMINusers. Shows all organizations and a summary of their environments. -
Click “New Organization”
Fill in Name (e.g. Acme Corp), Slug (e.g.
acme— globally unique, pattern^[a-z0-9][a-z0-9-]*[a-z0-9]$), and optional description. The slug becomes the prefix for all database names in this org. -
Submit
POST /organizations→201 Created. Org appears in the list with zero environments. No database has been created yet. -
Add the first environment
Click the org card, then New Environment. See the provisioning section below.
Via the API
curl -X POST http://localhost:8001/organizations \
-H "Authorization: Bearer {platform_owner_token}" \
-H "Content-Type: application/json" \
-d '{"name": "Acme Corp", "slug": "acme", "description": "Our first tenant"}'
Slug naming convention
| Resource | Pattern | Example (org=acme, env=dev) |
|---|---|---|
| PostgreSQL DB | oms_{slug}_{env_slug} | oms_acme_dev |
| MongoDB Events DB | oms_events_{slug}_{env_slug} | oms_events_acme_dev |
| MongoDB AI DB | oms_ai_{slug}_{env_slug} | oms_ai_acme_dev |
| Elasticsearch prefix | {slug}_{env_slug} | acme_dev |
| Docker container (API) | oms_api_{slug}_{env_slug} | oms_api_acme_dev |
| Docker Compose file | docker-compose.{slug}_{env_slug}.yml | docker-compose.acme_dev.yml |
Choose slugs carefully. Once an environment is provisioned, its database name is fixed. Short, lowercase, alphanumeric slugs work best: acme, widgetco, retailer1.
Organizations API
| Method | Path | Auth | Description |
|---|---|---|---|
| GET | /organizations | Any | Superadmin: all orgs. Others: orgs where they have a role. |
| POST | /organizations | Platform Owner / Superadmin | Create organization (instant, no DB provisioned) |
| GET | /organizations/{id} | Org Member+ | Organization detail |
| PATCH | /organizations/{id} | Org Admin+ | Update name, description, is_active |
| GET | /organizations/{id}/members | Org Member+ | List UserOrganizationRole records |
| POST | /organizations/{id}/members | Org Admin+ | Grant {user_id, role} org membership |
| DELETE | /organizations/{id}/members/{uid} | Org Admin+ | Revoke org membership |
29c Provisioning an Environment — Step by Step
An environment is where business data actually lives — orders, inventory, sourcing rules, connectors. Creating one triggers a full infrastructure provisioning sequence that runs asynchronously. The API returns 202 Accepted with status: "PROVISIONING"; the UI polls every 5 seconds until status becomes "ACTIVE".
Via the UI (Environments page)
-
Open the Environments page
Click Environments in the sidebar. Lists all environments the current user can access, grouped by organization.
-
Click “New Environment”
Fill in: Organization, Name (e.g. Development), Slug (e.g.
dev, unique within org), Environment Type (DEV/QA/STAGING/PROD), and optionally Set as default. -
Submit — provisioning begins
Environment created with
status: PROVISIONING. Background task starts. UI shows a spinner polling every 5 seconds. -
Wait for ACTIVE status
Typical provisioning time: 5–15 seconds. The environment card turns green when
status = ACTIVEand shows theprovisioned_attimestamp. -
Generate Docker Compose file (for isolated deployment)
Click Generate Docker Compose on the environment card. Downloads
docker-compose.acme_dev.ymlpre-configured with the correct database names, ports, and network settings. -
Set Deployment URL
If the environment runs as a separate pod, set the Deployment URL field (e.g.
http://localhost:3463). The environment switcher uses this for cross-origin pod redirects with seamless auth handoff.
Provisioning sequence (what happens in the background)
Step 1 -- Create PostgreSQL database
Raw asyncpg connection to postgres system DB
CREATE DATABASE oms_acme_dev; (idempotent -- checks existence first)
Step 2 -- Build SQLAlchemy engine
create_async_engine("postgresql+asyncpg://.../oms_acme_dev",
pool_size=5, max_overflow=10, pool_pre_ping=True, pool_recycle=3600)
Register: EnvironmentEngineRegistry._engines[env_id] = engine
Step 3 -- Create all data-plane tables (checkfirst=True -- skips if exists)
orders, order_items, fulfillment_allocations, shipments
inventory_items, inventory_adjustments, inventory_reservations
fulfillment_nodes, sourcing_rules
webhook_endpoints, webhook_events
connectors, connector_events, connector_inventory_mappings
ai_proposals, ai_experiments
sourcing_outcome_labels, custom_attribute_definitions, ui_widgets
NOT created here (stay in oms_db only):
users, user_groups, organizations, environments
user_environment_roles, user_organization_roles
Step 4 -- Create MongoDB indexes
oms_events_acme_dev:
order_events -> compound (order_id, timestamp), index on event_type
oms_ai_acme_dev:
sourcing_outcomes -> order_id index
sourcing_patterns -> unique cluster_key index
Step 5 -- Create Elasticsearch index template
PUT /_index_template/acme_dev_template
{"index_patterns": ["acme_dev_*"], "mappings": {...}}
Step 6 -- Mark environment ACTIVE
UPDATE environments SET status='ACTIVE', provisioned_at=now()
WHERE id = <env_id> (committed on control DB oms_db)
Idempotent re-provisioning: If provisioning fails mid-way, call POST /environments/{id}/provision to retry safely. Every step can be repeated without side effects.
Connection pool per environment
pool_size=5 # 5 persistent connections max_overflow=10 # burst up to 15 total pool_pre_ping=True # discard stale connections pool_recycle=3600 # recycle every hour
20 active environments = at most 300 PostgreSQL connections total — well within default max_connections=100–500.
Environments API
| Method | Path | Auth | Description |
|---|---|---|---|
| GET | /environments | Any | List all environments the calling user can access |
| POST | /environments | Platform Owner / Superadmin | Create + provision (202 Accepted, background task) |
| GET | /environments/{id} | Any with access | Detail + status + provisioned_at |
| PATCH | /environments/{id} | Env OWNER+ | Update name, is_default, base_url |
| POST | /environments/{id}/provision | Platform Owner | Re-provision failed / archived environment (idempotent) |
| DELETE | /environments/{id} | Platform Owner | Delete environment record + DROP the PostgreSQL database |
| GET | /environments/{id}/members | Env OWNER+ | List UserEnvironmentRole records |
| POST | /environments/{id}/members | Env OWNER+ | Grant {user_id, role} |
| DELETE | /environments/{id}/members/{uid} | Env OWNER+ | Revoke env access for a specific user |
29d Environment Types & Lifecycle
| Type | Badge | Warning Banner | Typical Use |
|---|---|---|---|
| DEV | DEV | None | Developer testing, feature branches. Data can be reset freely. |
| QA | QA | None | Quality assurance, integration tests, UAT. |
| STAGING | STAGING | None | Pre-production validation, load testing. |
| PROD | PROD | ⚠ Production — changes affect live customers (persistent red banner) | Live customer-facing environment. |
Environment status lifecycle
| Status | Meaning | API behaviour |
|---|---|---|
| PROVISIONING | Database creation in progress | Middleware returns 503 for all data-plane requests |
| ACTIVE | Fully operational | All requests routed normally |
| SUSPENDED | Temporarily disabled | 503; data preserved intact |
| ARCHIVED | Permanently disabled | 503; connection pool evicted to free resources |
29e Managing Environment Members
Grant org membership (gives org-wide visibility)
curl -X POST http://localhost:8001/organizations/{org_id}/members \
-H "Authorization: Bearer {admin_token}" \
-H "Content-Type: application/json" \
-d '{"user_id": "abc123...", "role": "ORG_MEMBER"}'
Grant environment access
curl -X POST http://localhost:8001/admin/users/{user_id}/env-roles \
-H "Authorization: Bearer {admin_token}" \
-H "Content-Type: application/json" \
-d '{"env_id": "def456...", "role": "ADMIN"}'
View a user's full access
GET /admin/users/{user_id}/access
{
"user_id": "...",
"platform_role": "USER",
"org_roles": [
{"org_id": "...", "org_name": "Acme Corp", "role": "ORG_MEMBER"}
],
"env_roles": [
{"env_id": "...", "env_name": "Development",
"org_name": "Acme Corp", "env_type": "DEV", "role": "ADMIN"}
]
}
Admin Console access management endpoints
| Method | Path | Description |
|---|---|---|
| GET | /admin/users/{id}/access | Full access summary: platform role + all org roles + all env roles |
| POST | /admin/users/{id}/org-roles | Grant/update org role: body {org_id, role} |
| DELETE | /admin/users/{id}/org-roles/{org_id} | Revoke org membership |
| POST | /admin/users/{id}/env-roles | Grant/update env role: body {env_id, role} |
| DELETE | /admin/users/{id}/env-roles/{env_id} | Revoke env access |
29f Environment Switcher & Cross-Pod Auth
Switching environments
The Environment Switcher lives in the top header bar, grouped by organization. On switch, the frontend:
- Stores the selected
env_idinlocalStorage('oms_environment_id') - Calls
queryClient.clear()to flush all cached data from the previous environment - Re-fetches all active queries with the new environment header
- If the environment has a
base_urlon a different origin, performs a cross-origin redirect with token handoff (no re-login)
How X-OMS-Environment works
axiosInstance.interceptors.request.use(config => {
const envId = localStorage.getItem('oms_environment_id')
if (envId) config.headers['X-OMS-Environment'] = envId
const token = localStorage.getItem('oms_auth_token')
if (token) config.headers['Authorization'] = `Bearer ${token}`
return config
})
Cross-pod auth handoff (no re-login on switch)
;(function bootstrapFromUrl() {
const params = new URLSearchParams(window.location.search)
const token = params.get('_oms_token')
const userRaw = params.get('_oms_user')
if (token && userRaw) {
localStorage.setItem('oms_auth_token', token)
localStorage.setItem('oms_auth_user', userRaw)
params.delete('_oms_token'); params.delete('_oms_user')
const clean = params.toString()
window.history.replaceState({}, '', window.location.pathname +
(clean ? '?' + clean : '') + window.location.hash)
}
})()
Security note: The token is a short-lived JWT transferred once via HTTPS redirect and immediately stripped from the URL. Subsequent requests use the token from localStorage.
29g Deploying a Tenant Environment (Docker Compose)
Each environment can run as an isolated Docker Compose stack — its own API and frontend containers — attached to the shared infrastructure via the oms_default external network.
Example: docker-compose.acme_dev.yml
version: "3.9"
services:
api_acme_dev:
build: { context: ., dockerfile: Dockerfile }
container_name: oms_api_acme_dev
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
environment:
- TENANT_SLUG=acme
- ENVIRONMENT=dev
- DATABASE_URL=postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_acme_dev
- SYNC_DATABASE_URL=postgresql+psycopg2://oms_user:oms_pass@postgres:5432/oms_acme_dev
- CONTROL_DATABASE_URL=postgresql+asyncpg://oms_user:oms_pass@postgres:5432/oms_db
- MONGODB_DB=oms_events_acme_dev
- MONGODB_AI_DB=oms_ai_acme_dev
- FRONTEND_URL=http://localhost:3463
ports: ["8463:8000"]
networks: [oms_default]
frontend_acme_dev:
build: { context: ./frontend, dockerfile: Dockerfile }
container_name: oms_frontend_acme_dev
environment:
- API_HOST=oms_api_acme_dev
ports: ["3463:80"]
networks: [oms_default]
depends_on: [api_acme_dev]
networks:
oms_default:
external: true
name: oms_default
Key points: DATABASE_URL points at the tenant DB. CONTROL_DATABASE_URL points back at oms_db so this pod can verify JWTs and read org/env metadata. Both containers join oms_default to access shared infra without re-exposing ports. The nginx frontend uses ${API_HOST} via envsubst to proxy /api/* to the correct API container.
Start the tenant stack
docker-compose -f docker-compose.acme_dev.yml up -d --build # Then set the Deployment URL: # Platform Console > Environments > Acme Dev > Deployment URL = http://localhost:3463
Delete an environment (with data)
-
Stop the tenant stack
docker-compose -f docker-compose.acme_dev.yml down
-
Delete from the UI
Environments page → trash icon on the environment card → confirm. Calls
DELETE /environments/{id}, which drops the PostgreSQL database and removes all control-plane records. This is irreversible. -
Remove the Compose file
rm docker-compose.acme_dev.yml
29h How Requests Are Routed to the Right Database
Every API request carries an X-OMS-Environment HTTP header. The EnvironmentMiddleware resolves it to the correct database before any route handler runs.
Browser
X-OMS-Environment: <env_uuid>
|
EnvironmentMiddleware (app/middleware/environment.py)
|
+-- Exempt paths (bypass resolution):
| /health, /docs, /redoc, /openapi.json
| /auth/*, /testing, /connectors/{id}/webhook
|
+-- 1. Redis: GET env:<env_uuid> TTL=60s
| HIT -> deserialize cached env object (lock-free hot path)
| MISS -> SELECT * FROM environments WHERE id=? (control DB)
| cache -> SET env:<env_uuid> EX 60
|
+-- 2. env.status != ACTIVE -> return 503 "Environment unavailable"
|
+-- 3. request.state.environment = env -> call_next(request)
Route handler: db = Depends(get_db)
|
+-- env.db_name == "oms_db" (default production)
| -> yield from main session factory
|
+-- env-specific database
-> registry.get_or_create_engine(env) (lock-free if already cached)
-> yield session from env-specific session factory
All SQL runs against oms_acme_dev. Zero code changes in business logic.
Which routes use which database
| Router | Dependency | Database |
|---|---|---|
/auth/* | get_db | oms_db (users table is control-plane only) |
/admin | get_db | oms_db |
/organizations | get_control_db | oms_db (must bypass env routing) |
/environments | get_control_db | oms_db |
/orders | get_db (env-aware) | oms_{org}_{env} |
/inventory | get_db (env-aware) | oms_{org}_{env} |
/nodes | get_db (env-aware) | oms_{org}_{env} |
/sourcing-rules | get_db (env-aware) | oms_{org}_{env} |
/connectors | get_db (env-aware) | oms_{org}_{env} |
/architect | get_db (env-aware) | oms_{org}_{env} |
29i Data Isolation & Backward Compatibility
Database-per-environment isolation
Row-level security (shared table + tenant_id column) > Weakest. A missing WHERE clause leaks all tenant data. > OMS does NOT use this. Schema-per-tenant (shared DB, different schemas) > Medium. Connection pool shared; search_path misconfiguration leaks data. > OMS does NOT use this. Database-per-tenant <-- OMS uses this for each environment > Strongest. Separate connection pool, separate WAL, separate backups. > No SQL query can cross the database boundary.
Backward compatibility — zero breaking changes
The first startup seeds a "Default Organization" with a "Production" environment pointing at the existing oms_db database. All existing data (orders, inventory, nodes, sourcing rules) is untouched. Requests without an X-OMS-Environment header fall back to the environment where is_default=true, which points at oms_db. Existing JWT tokens and API clients all continue to work.
Celery — environment-aware fan-out
@celery_app.task
def source_pending_orders_fanout():
# Beat fires this every 2 min. One task per active environment.
env_ids = list_active_environment_ids() # sync SQL on oms_db
for env_id in env_ids:
source_pending_orders.delay(env_id)
@celery_app.task
def source_pending_orders(environment_id: str = ""):
# Empty env_id = backward-compat fallback to settings.DATABASE_URL
db_url = get_env_db_url(environment_id)
# ... unchanged sourcing logic ...
MongoDB & Elasticsearch per environment
| Environment | MongoDB Events DB | MongoDB AI DB | ES Prefix |
|---|---|---|---|
| Default (oms_db) | oms_events | oms_ai_learning | default_prod |
| acme / dev | oms_events_acme_dev | oms_ai_acme_dev | acme_dev |
| acme / prod | oms_events_acme_prod | oms_ai_acme_prod | acme_prod |
| widgetco / qa | oms_events_widgetco_qa | oms_ai_widgetco_qa | widgetco_qa |