Code-first, traceable BI and analytics framework for Python
TraceBi is a Python library for building BI and analytics workflows entirely in code — no GUI builders, no config files. Every data operation from raw ingest to formatted report is tracked in a full, immutable lineage chain. You always know exactly where a number came from and how it was calculated.
The same DataModel and lineage chain powers every output — whether that's an Excel file, an HTML report, a live Dash dashboard, or a scheduled pipeline writing back to a database.
Connectors, DataModel, DataSet with immutable lineage chain
Excel + HTML renderers, lineage manifest saved with every render
Bronze/Silver/Gold layers, StarSchema queries, LineageDiagram
Live Dash app with associative filters driven by the same DataModel
PipelineRunner with APScheduler, DB write-back, cross-layer lineage
FastAPI + Jinja2 browser UI with embedded Dash dashboards, medallion-aware demo, and registry pattern
Connectors are the entry point to any data source. Every connector implements
connect(), load(source), and write(df, table).
They are registered in a DataModel by name.
| Class | Source | Extra dep | Notes |
|---|---|---|---|
CSVConnector |
CSV / Excel files | openpyxl | directory= + filename as source |
SQLConnector |
Any SQL database | sqlalchemy | SQLite, Postgres, MySQL, etc. Supports write() |
BigQueryConnector |
Google BigQuery | google-cloud-bigquery | Uses Application Default Credentials |
SnowflakeConnector |
Snowflake | snowflake-connector-python | Keyword auth supported |
MemoryConnector |
In-memory DataFrames | none | Dict of DataFrames. Supports write(). Used in tests and demos. |
from tracebi import SQLConnector, MemoryConnector, DataModel
db = SQLConnector("sales_db", url="sqlite:///data/tracebi.db")
model = DataModel("SalesModel")
model.add_connector(db)
model.add_table("orders", connector="sales_db", source="orders")
model.add_table("customers", connector="sales_db", source="customers")
model.add_relationship("orders_customers", "orders", "customers",
left_key="customer_id", how="left")
DataSet is an immutable pandas DataFrame wrapper. Every fluent operation
returns a new DataSet — the original is never mutated. Each operation
appends a LineageNode to the chain.
| Method | What it does |
|---|---|
.filter(expr) | pandas query string — records rows before/after |
.transform(func) | arbitrary (DataFrame) → DataFrame |
.sort(by, ascending) | sort by one or more columns |
.select(columns) | keep only the specified columns |
.rename(columns) | {old: new} column mapping |
.print_lineage() | pretty-print the full lineage chain |
.fingerprint() | MD5 hash of DataFrame content |
orders = (
model.load("orders")
.filter("status == 'shipped'", description="Shipped orders only")
.transform(
lambda df: df.assign(margin=df["revenue"] - df["cost"]),
description="margin = revenue - cost",
)
.sort("margin", ascending=False)
)
orders.print_lineage()
# ============================================================
# Lineage for DataSet: 'orders' Shape: 198 rows × 7 cols
# ============================================================
# Step 1: [LOAD] Loaded 'orders' from connector 'sales_db'
# Step 2: [FILTER] Shipped orders only (250 → 198 rows)
# Step 3: [TRANSFORM] margin = revenue - cost
# Step 4: [SORT] Sorted by margin (desc)
# ============================================================
| Field | Type | Description |
|---|---|---|
operation | str | load, filter, transform, join, bronze, silver, gold, … |
description | str | human-readable step description |
connector | dict | connector name + type (load/bronze steps) |
source | str | table name, file path, or query |
timestamp | str | UTC ISO-8601, auto-set |
metadata | dict | rows_before/after, agg spec, cast map, … |
Used in LineageDiagram to color-code each step:
Reports are built with a fluent builder and rendered to Excel or HTML.
Every render saves a .manifest.json alongside the output file
containing the full lineage for each section's dataset.
TextSection — heading, paragraph, or noteTableSection — DataFrame table with optional totals rowChartSection — line, bar, or pie chart via matplotlib.spacer() — blank row between sectionsExcelRenderer().render(report, path) — multi-sheet .xlsxHTMLRenderer().render(report, path) — standalone .htmlHTMLRenderer().serve(report, port=8080) — local browserHTMLRenderer().preview(report) — Jupyter inlinefrom tracebi.reports.report import Report, TextSection, TableSection, ChartSection
from tracebi.reports.excel_renderer import ExcelRenderer
from tracebi.reports.html_renderer import HTMLRenderer
report = (
Report("Q2 Sales Report")
.author("Data Team")
.parameter("period", "Q2 2024")
.add(TextSection(title="Summary", content="Summary", style="heading1"))
.add(TextSection(content="Revenue up 12% vs Q1.", style="normal"))
.add(ChartSection(title="Revenue Trend", dataset=trend_ds,
chart_type="line", x="month", y="revenue"))
.add(TableSection(title="Orders", dataset=orders_ds,
columns=["region", "product", "revenue"],
totals=["revenue"],
number_formats={"revenue": "{:,.2f}"}))
)
ExcelRenderer().render(report, "output/q2_sales.xlsx")
HTMLRenderer().serve(report, port=8080) # opens browser
Three distinct layer classes enforce the Bronze → Silver → Gold contract.
Each layer can run standalone or as part of a PipelineRunner
that coordinates scheduling and writes results back to a database.
Load data from any connector with zero transforms. Stamps lineage with
operation="bronze" + ingestion timestamp.
Declarative pipeline: .cast(), .drop_nulls(),
.deduplicate(), .rename(), .transform().
Each step adds a operation="silver" lineage node.
Delegates to StarSchema.query(). Stamps
operation="gold" with query parameters in metadata.
Dimension references use dot notation: "dim_customer.region".
Measures are a dict: {"revenue": "sum", "order_id": "count"}.
Supported aggregations: sum, count, mean,
min, max, nunique.
from tracebi.model.star_schema import StarSchema
schema = StarSchema("Sales", model=model)
schema.add_dimension("dim_customer", table_name="customers_silver",
key_col="customer_id", attributes=["region", "segment"])
schema.add_fact("fact_orders", table_name="orders_silver",
measures=["revenue", "qty"],
foreign_keys={"dim_customer": "customer_id"})
ds = schema.query(
fact="fact_orders",
measures={"revenue": "sum", "qty": "sum"},
dimensions=["dim_customer.region"],
filters={"status": "shipped"},
aggregate=True,
)
from tracebi.lineage.diagram import LineageDiagram
diag = LineageDiagram(ds) # or LineageDiagram(report)
diag.show() # matplotlib window / Jupyter inline
diag.to_html("lineage.html") # standalone HTML with embedded SVG
print(diag.to_mermaid()) # paste into GitHub markdown
PipelineRunner registers each layer with its own independent cron schedule
and persists every run to a SQLite database. Layers are linked via
depends_on, enabling refresh=True to walk the full chain.
tracebi_layers — registered layer config + schedulestracebi_runs — every run: status, rows_in/out, timestamps, upstream_run_idtracebi_schemas — StarSchema definitionstracebi_dimensions — dimension table registrationstracebi_facts — fact table registrationstracebi_relationships — DataModel join definitions.register(layer, name, schedule, depends_on).run(name, refresh=False) — on-demand.start(blocking=True) — APScheduler.stop().lineage(name) — print run history.status() — all layers + last run.register_schema(schema).register_model(model)from tracebi.pipeline.runner import PipelineRunner
runner = PipelineRunner(db_url="sqlite:///data/tracebi.db")
# Each layer registered with its own schedule
runner.register(orders_bronze, name="orders_bronze", schedule="0 * * * *")
runner.register(orders_silver, name="orders_silver", schedule="15 * * * *",
depends_on="orders_bronze")
runner.register(revenue_by_region, name="revenue_by_region", schedule="30 6 * * *",
depends_on="orders_silver")
# On-demand: single layer only
runner.run("orders_silver")
# On-demand: full chain refresh (bronze → silver → gold)
runner.run("revenue_by_region", refresh=True)
# Start all scheduled jobs (blocking — Ctrl+C to stop)
runner.start()
Each run record stores an upstream_run_id pointing to the most recent
successful run of its upstream layer. This means you can always trace a gold table
row back through silver → bronze to the exact source ingestion.
runner.lineage("revenue_by_region")
# ============================================================
# Pipeline Lineage — 'revenue_by_region'
# Chain: orders_bronze → orders_silver → revenue_by_region
# ============================================================
# [orders_bronze] schedule=0 * * * * runs=3
# ✓ run_id= 1 2026-05-12T02:27:22 rows 11 → 11 [success]
#
# [orders_silver] schedule=15 * * * * ← orders_bronze runs=2
# ✓ run_id= 3 2026-05-12T02:27:25 rows 11 → 10 [success] upstream_run=1
#
# [revenue_by_region] schedule=30 6 * * * ← orders_silver runs=1
# ✓ run_id= 5 2026-05-12T02:27:27 rows 10 → 4 [success] upstream_run=3
This script: creates data/tracebi.db, seeds source tables
(orders_raw, customers_raw), registers all six layers
with the runner, and runs the initial Bronze load so the DB is ready immediately.
A live Dash 2.x application driven by the same DataModel. Filters are
associative — selecting a value in any FilterPanel
automatically filters every panel whose dataset contains that column.
FilterPanel — dropdown filter (single or multi-select)MetricPanel — KPI card with aggregation (sum/count/mean)ChartPanel — bar, line, or pie chart via PlotlyTablePanel — paginated data tableEach panel accepts either:
dataset=DataSettable_name= string — loaded via DataModel.load()
on each render, always freshtransform_fn= applied after loadingfrom tracebi.dashboard import Dashboard, DashboardServer
from tracebi.dashboard import FilterPanel, MetricPanel, ChartPanel, TablePanel
dashboard = (
Dashboard("Q2 Sales Dashboard")
.columns(2)
.add_filter(FilterPanel("region-filter", label="Region",
column="region", table_name="orders"))
.add_panel(MetricPanel("total-revenue", title="Total Revenue",
table_name="orders", column="revenue",
aggregation="sum", prefix="$"))
.add_panel(ChartPanel("by-region", title="Revenue by Region",
table_name="orders", chart_type="bar",
x="region", y="revenue"))
.add_panel(TablePanel("orders-table", title="Orders",
table_name="orders",
columns=["order_id", "region", "revenue"],
width=2)) # span both columns
)
DashboardServer(dashboard, model=model).run(port=8050)
# Open http://localhost:8050/
A FastAPI + Jinja2 browser interface over your TraceBi registry. Connectors, models, reports, pipelines, and live Dash dashboards — all accessible from one server on a single port.
Each DashboardServer is mounted inside FastAPI at
/dashboards/<name>/ using Starlette's
WSGIMiddleware. One port, no second server.
The standalone DashboardServer.run() path is unaffected.
# Install web dependencies
pip install -r web/requirements.txt
# Start the dev server (hot-reload on)
python web/run.py
# → http://localhost:8000
# Point at your own app module
TRACEBI_APP=mypackage.tracebi_config python web/run.py
Everything flows through a single Registry singleton. Your app
module (default: web/demo_app.py) registers objects at startup;
every API route reads from the registry at request time.
from web.api.registry import registry
registry.add_connector(connector)
registry.add_model(model)
@registry.report("my_report", description="...")
def my_report():
ds = model.load("orders")
return Report("My Report").add(TableSection(dataset=ds, title="Orders"))
registry.add_dashboard("sales", DashboardServer(dashboard, model=model))
registry.add_pipeline("main", runner)
demo_app.py detects data/tracebi.db at startup and adapts:
orders_silver ⋈ customers_silvergold_revenue_by_region report registeredMemoryConnector data with a warning| Group | Command | Includes |
|---|---|---|
reports | pip install -e ".[reports]" | openpyxl, matplotlib |
dashboard | pip install -e ".[dashboard]" | dash, plotly |
pipeline | pip install -e ".[pipeline]" | apscheduler, sqlalchemy |
lineage | pip install -e ".[lineage]" | networkx, matplotlib |
sql | pip install -e ".[sql]" | sqlalchemy |