dashobserve

DashObserve — Data observability for Databricks: freshness, volume, and schema-change monitoring. Launch the UI with dashobserve.launch() inside a Databricks notebook.

 1"""
 2DashObserve — Data observability for Databricks: freshness, volume, and
 3schema-change monitoring. Launch the UI with dashobserve.launch() inside
 4a Databricks notebook.
 5"""
 6from dashobserve.monitors import MonitorResult, check_freshness, check_schema, check_volume, diff_schema
 7from dashobserve.runner import MonitorConfig, MonitorReport, run_monitors
 8from dashobserve.ui import launch
 9
10__version__ = "0.1.2"
11__all__ = [
12    "MonitorConfig",
13    "MonitorReport",
14    "MonitorResult",
15    "run_monitors",
16    "check_freshness",
17    "check_volume",
18    "check_schema",
19    "diff_schema",
20    "launch",
21]
@dataclass
class MonitorConfig:
15@dataclass
16class MonitorConfig:
17    table: str
18    freshness_column: str = None
19    max_staleness_minutes: float = None
20    min_rows: int = None
21    max_rows: int = None
22    volume_tolerance_pct: float = None
23    track_schema: bool = False
MonitorConfig( table: str, freshness_column: str = None, max_staleness_minutes: float = None, min_rows: int = None, max_rows: int = None, volume_tolerance_pct: float = None, track_schema: bool = False)
table: str
freshness_column: str = None
max_staleness_minutes: float = None
min_rows: int = None
max_rows: int = None
volume_tolerance_pct: float = None
track_schema: bool = False
class MonitorReport:
26class MonitorReport:
27    def __init__(self, results: list):
28        self.results = results
29
30    def to_dict(self) -> list:
31        return [r.to_dict() for r in self.results]
32
33    def to_pandas(self):
34        import pandas as pd
35        return pd.DataFrame(self.to_dict())
36
37    def summary(self) -> dict:
38        total = len(self.results)
39        passed = sum(1 for r in self.results if r.status == "PASS")
40        return {
41            "total_monitors": total,
42            "passed": passed,
43            "failed": total - passed,
44            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
45        }
46
47    def display(self):
48        for r in self.results:
49            icon = "✅" if r.status == "PASS" else "❌"
50            print(f"{icon} [{r.monitor_type}] {r.table_name}{r.message}")
MonitorReport(results: list)
27    def __init__(self, results: list):
28        self.results = results
results
def to_dict(self) -> list:
30    def to_dict(self) -> list:
31        return [r.to_dict() for r in self.results]
def to_pandas(self):
33    def to_pandas(self):
34        import pandas as pd
35        return pd.DataFrame(self.to_dict())
def summary(self) -> dict:
37    def summary(self) -> dict:
38        total = len(self.results)
39        passed = sum(1 for r in self.results if r.status == "PASS")
40        return {
41            "total_monitors": total,
42            "passed": passed,
43            "failed": total - passed,
44            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
45        }
def display(self):
47    def display(self):
48        for r in self.results:
49            icon = "✅" if r.status == "PASS" else "❌"
50            print(f"{icon} [{r.monitor_type}] {r.table_name}{r.message}")
@dataclass
class MonitorResult:
12@dataclass
13class MonitorResult:
14    table_name: str
15    monitor_type: str      # freshness | volume | schema
16    monitor_name: str
17    status: str             # PASS | FAIL | ERROR
18    message: str
19    run_timestamp: str = ""
20    details: str = "{}"     # json-encoded dict of monitor-specific details
21
22    def to_dict(self) -> dict:
23        return dict(self.__dict__)
MonitorResult( table_name: str, monitor_type: str, monitor_name: str, status: str, message: str, run_timestamp: str = '', details: str = '{}')
table_name: str
monitor_type: str
monitor_name: str
status: str
message: str
run_timestamp: str = ''
details: str = '{}'
def to_dict(self) -> dict:
22    def to_dict(self) -> dict:
23        return dict(self.__dict__)
def run_monitors( config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport:
53def run_monitors(config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport:
54    """Run every monitor configured on config.table and return a MonitorReport."""
55    if spark is None:
56        from pyspark.sql import SparkSession
57        spark = SparkSession.getActiveSession()
58    from pyspark.sql import functions as F
59
60    df = spark.table(config.table)
61    run_ts = datetime.now().isoformat(timespec="seconds")
62    results = []
63
64    if config.freshness_column and config.max_staleness_minutes is not None:
65        latest = df.agg(F.max(config.freshness_column)).collect()[0][0]
66        ok, msg = check_freshness(latest, config.max_staleness_minutes)
67        results.append(MonitorResult(
68            config.table, "freshness", "data_freshness",
69            "PASS" if ok else "FAIL", msg, run_ts,
70        ))
71
72    if config.min_rows is not None or config.max_rows is not None or config.volume_tolerance_pct is not None:
73        row_count = df.count()
74        baseline_avg = None
75        if config.volume_tolerance_pct is not None and history_table:
76            baseline_avg = _read_volume_baseline(spark, history_table, config.table)
77        ok, msg = check_volume(row_count, config.min_rows, config.max_rows, baseline_avg, config.volume_tolerance_pct)
78        results.append(MonitorResult(
79            config.table, "volume", "row_count",
80            "PASS" if ok else "FAIL", msg, run_ts,
81            details=json.dumps({"row_count": row_count}),
82        ))
83
84    if config.track_schema:
85        new_schema = {f.name: str(f.dataType) for f in df.schema.fields}
86        old_schema = _read_last_schema(spark, history_table, config.table) if history_table else None
87        ok, msg, diff = check_schema(old_schema, new_schema)
88        results.append(MonitorResult(
89            config.table, "schema", "schema_change",
90            "PASS" if ok else "FAIL", msg, run_ts,
91            details=json.dumps({"schema": new_schema, "diff": diff}),
92        ))
93
94    if history_table and results:
95        _save_results(spark, history_table, results)
96
97    return MonitorReport(results)

Run every monitor configured on config.table and return a MonitorReport.

def check_freshness( latest_timestamp, max_staleness_minutes: float, now: datetime.datetime = None) -> tuple[bool, str]:
26def check_freshness(latest_timestamp, max_staleness_minutes: float, now: datetime = None) -> tuple[bool, str]:
27    """Most recent value in the freshness column must be within max_staleness_minutes of now."""
28    if latest_timestamp is None:
29        return False, "No timestamp values found — table may be empty"
30    now = now or datetime.now(timezone.utc).replace(tzinfo=None)
31    if getattr(latest_timestamp, "tzinfo", None) is not None:
32        latest_timestamp = latest_timestamp.replace(tzinfo=None)
33    staleness_minutes = (now - latest_timestamp).total_seconds() / 60
34    ok = staleness_minutes <= max_staleness_minutes
35    msg = f"Data is {staleness_minutes:.1f} min old (threshold {max_staleness_minutes} min)"
36    return ok, msg

Most recent value in the freshness column must be within max_staleness_minutes of now.

def check_volume( row_count: int, min_rows: int = None, max_rows: int = None, baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]:
39def check_volume(row_count: int, min_rows: int = None, max_rows: int = None,
40                  baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]:
41    """Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline."""
42    if min_rows is not None and row_count < min_rows:
43        return False, f"Row count {row_count} below min_rows {min_rows}"
44    if max_rows is not None and row_count > max_rows:
45        return False, f"Row count {row_count} above max_rows {max_rows}"
46    if baseline_avg is not None and tolerance_pct is not None and baseline_avg > 0:
47        deviation_pct = abs(row_count - baseline_avg) / baseline_avg * 100
48        if deviation_pct > tolerance_pct:
49            return False, (
50                f"Row count {row_count} deviates {deviation_pct:.1f}% from baseline "
51                f"{baseline_avg:.0f} (tolerance {tolerance_pct}%)"
52            )
53    return True, f"Row count {row_count} within expected bounds"

Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.

def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]:
65def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]:
66    """old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure."""
67    if old_schema is None:
68        return True, "Initial schema snapshot recorded", {"added": [], "removed": [], "type_changed": []}
69
70    diff = diff_schema(old_schema, new_schema)
71    changed = diff["added"] or diff["removed"] or diff["type_changed"]
72    if not changed:
73        return True, "No schema changes detected", diff
74
75    parts = []
76    if diff["added"]:
77        parts.append(f"added: {', '.join(diff['added'])}")
78    if diff["removed"]:
79        parts.append(f"removed: {', '.join(diff['removed'])}")
80    if diff["type_changed"]:
81        parts.append(f"type changed: {', '.join(diff['type_changed'])}")
82    return False, "Schema changed — " + "; ".join(parts), diff

old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.

def diff_schema(old_schema: dict, new_schema: dict) -> dict:
56def diff_schema(old_schema: dict, new_schema: dict) -> dict:
57    """Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O."""
58    old_cols, new_cols = set(old_schema), set(new_schema)
59    added = sorted(new_cols - old_cols)
60    removed = sorted(old_cols - new_cols)
61    type_changed = sorted(c for c in (old_cols & new_cols) if old_schema[c] != new_schema[c])
62    return {"added": added, "removed": removed, "type_changed": type_changed}

Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.

def launch():
  6def launch():
  7    try:
  8        import ipywidgets as w
  9        from IPython.display import display
 10    except ImportError:
 11        raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets")
 12
 13    import dashui
 14
 15    monitors: list[dict] = []
 16
 17    # ── Add monitor ───────────────────────────────────────────────────────
 18    m_table = w.Text(description="UC Table:", placeholder="catalog.schema.table")
 19
 20    m_freshness_col = w.Text(description="Freshness col:", placeholder="updated_at (optional)")
 21    m_max_staleness = w.IntText(description="Max staleness (min):", value=1440)
 22
 23    m_min_rows = w.Text(description="Min rows:", placeholder="optional")
 24    m_max_rows = w.Text(description="Max rows:", placeholder="optional")
 25    m_tolerance = w.Text(description="Baseline tolerance %:", placeholder="optional, e.g. 20")
 26
 27    m_track_schema = w.Checkbox(value=True, description="Track schema changes")
 28
 29    add_btn = dashui.action_button("Add Monitor", style="info", emoji="+")
 30    monitors_output, render_monitors = dashui.running_list(
 31        lambda i, m: (
 32            f"  {i}. {m['table']} — "
 33            f"freshness:{'on' if m['freshness_column'] else 'off'}, "
 34            f"volume:{'on' if (m['min_rows'] or m['max_rows'] or m['volume_tolerance_pct']) else 'off'}, "
 35            f"schema:{'on' if m['track_schema'] else 'off'}"
 36        )
 37    )
 38
 39    def _parse_int(text):
 40        text = text.strip()
 41        return int(text) if text else None
 42
 43    def _parse_float(text):
 44        text = text.strip()
 45        return float(text) if text else None
 46
 47    def on_add(b):
 48        table = m_table.value.strip()
 49        if not table:
 50            return
 51        monitors.append({
 52            "table": table,
 53            "freshness_column": m_freshness_col.value.strip() or None,
 54            "max_staleness_minutes": m_max_staleness.value,
 55            "min_rows": _parse_int(m_min_rows.value),
 56            "max_rows": _parse_int(m_max_rows.value),
 57            "volume_tolerance_pct": _parse_float(m_tolerance.value),
 58            "track_schema": m_track_schema.value,
 59        })
 60        render_monitors(monitors)
 61        m_table.value = m_freshness_col.value = m_min_rows.value = m_max_rows.value = m_tolerance.value = ""
 62
 63    add_btn.on_click(on_add)
 64
 65    # ── Run ──────────────────────────────────────────────────────────────
 66    history_table = w.Text(description="History table:", placeholder="catalog.schema.observe_history (optional)")
 67    run_btn = dashui.action_button("Run All Monitors", style="success", emoji="▶")
 68    output = dashui.output_panel()
 69
 70    def on_run(b):
 71        with output:
 72            output.clear_output()
 73            if not monitors:
 74                print("⚠️  No monitors configured — add at least one above")
 75                return
 76            try:
 77                from dashobserve.runner import MonitorConfig, run_monitors
 78                hist = history_table.value.strip() or None
 79                for m in monitors:
 80                    cfg = MonitorConfig(
 81                        table=m["table"],
 82                        freshness_column=m["freshness_column"],
 83                        max_staleness_minutes=m["max_staleness_minutes"] if m["freshness_column"] else None,
 84                        min_rows=m["min_rows"],
 85                        max_rows=m["max_rows"],
 86                        volume_tolerance_pct=m["volume_tolerance_pct"],
 87                        track_schema=m["track_schema"],
 88                    )
 89                    report = run_monitors(cfg, history_table=hist)
 90                    report.display()
 91                    s = report.summary()
 92                    print(f"   → {s['passed']}/{s['total_monitors']} passed\n")
 93            except Exception as e:
 94                print(f"❌ {e}")
 95
 96    run_btn.on_click(on_run)
 97
 98    ui = dashui.card([
 99        dashui.header("DashObserve — Data Observability", library="dashobserve", emoji="👁️"),
100        dashui.section("Step 1: Configure a monitor"),
101        m_table,
102        w.HBox([m_freshness_col, m_max_staleness]),
103        w.HBox([m_min_rows, m_max_rows, m_tolerance]),
104        m_track_schema,
105        add_btn, monitors_output,
106        dashui.section("Step 2: Run"),
107        history_table,
108        run_btn,
109        output,
110    ])
111    display(ui)