dashobserve
DashObserve — Data observability for Databricks: freshness, volume, and schema-change monitoring. Launch the UI with dashobserve.launch() inside a Databricks notebook.
1""" 2DashObserve — Data observability for Databricks: freshness, volume, and 3schema-change monitoring. Launch the UI with dashobserve.launch() inside 4a Databricks notebook. 5""" 6from dashobserve.monitors import MonitorResult, check_freshness, check_schema, check_volume, diff_schema 7from dashobserve.runner import MonitorConfig, MonitorReport, run_monitors 8from dashobserve.ui import launch 9 10__version__ = "0.1.2" 11__all__ = [ 12 "MonitorConfig", 13 "MonitorReport", 14 "MonitorResult", 15 "run_monitors", 16 "check_freshness", 17 "check_volume", 18 "check_schema", 19 "diff_schema", 20 "launch", 21]
@dataclass
class
MonitorConfig:
15@dataclass 16class MonitorConfig: 17 table: str 18 freshness_column: str = None 19 max_staleness_minutes: float = None 20 min_rows: int = None 21 max_rows: int = None 22 volume_tolerance_pct: float = None 23 track_schema: bool = False
class
MonitorReport:
26class MonitorReport: 27 def __init__(self, results: list): 28 self.results = results 29 30 def to_dict(self) -> list: 31 return [r.to_dict() for r in self.results] 32 33 def to_pandas(self): 34 import pandas as pd 35 return pd.DataFrame(self.to_dict()) 36 37 def summary(self) -> dict: 38 total = len(self.results) 39 passed = sum(1 for r in self.results if r.status == "PASS") 40 return { 41 "total_monitors": total, 42 "passed": passed, 43 "failed": total - passed, 44 "pass_rate_pct": round(passed / total * 100, 1) if total else 0, 45 } 46 47 def display(self): 48 for r in self.results: 49 icon = "✅" if r.status == "PASS" else "❌" 50 print(f"{icon} [{r.monitor_type}] {r.table_name} — {r.message}")
@dataclass
class
MonitorResult:
12@dataclass 13class MonitorResult: 14 table_name: str 15 monitor_type: str # freshness | volume | schema 16 monitor_name: str 17 status: str # PASS | FAIL | ERROR 18 message: str 19 run_timestamp: str = "" 20 details: str = "{}" # json-encoded dict of monitor-specific details 21 22 def to_dict(self) -> dict: 23 return dict(self.__dict__)
53def run_monitors(config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport: 54 """Run every monitor configured on config.table and return a MonitorReport.""" 55 if spark is None: 56 from pyspark.sql import SparkSession 57 spark = SparkSession.getActiveSession() 58 from pyspark.sql import functions as F 59 60 df = spark.table(config.table) 61 run_ts = datetime.now().isoformat(timespec="seconds") 62 results = [] 63 64 if config.freshness_column and config.max_staleness_minutes is not None: 65 latest = df.agg(F.max(config.freshness_column)).collect()[0][0] 66 ok, msg = check_freshness(latest, config.max_staleness_minutes) 67 results.append(MonitorResult( 68 config.table, "freshness", "data_freshness", 69 "PASS" if ok else "FAIL", msg, run_ts, 70 )) 71 72 if config.min_rows is not None or config.max_rows is not None or config.volume_tolerance_pct is not None: 73 row_count = df.count() 74 baseline_avg = None 75 if config.volume_tolerance_pct is not None and history_table: 76 baseline_avg = _read_volume_baseline(spark, history_table, config.table) 77 ok, msg = check_volume(row_count, config.min_rows, config.max_rows, baseline_avg, config.volume_tolerance_pct) 78 results.append(MonitorResult( 79 config.table, "volume", "row_count", 80 "PASS" if ok else "FAIL", msg, run_ts, 81 details=json.dumps({"row_count": row_count}), 82 )) 83 84 if config.track_schema: 85 new_schema = {f.name: str(f.dataType) for f in df.schema.fields} 86 old_schema = _read_last_schema(spark, history_table, config.table) if history_table else None 87 ok, msg, diff = check_schema(old_schema, new_schema) 88 results.append(MonitorResult( 89 config.table, "schema", "schema_change", 90 "PASS" if ok else "FAIL", msg, run_ts, 91 details=json.dumps({"schema": new_schema, "diff": diff}), 92 )) 93 94 if history_table and results: 95 _save_results(spark, history_table, results) 96 97 return MonitorReport(results)
Run every monitor configured on config.table and return a MonitorReport.
def
check_freshness( latest_timestamp, max_staleness_minutes: float, now: datetime.datetime = None) -> tuple[bool, str]:
26def check_freshness(latest_timestamp, max_staleness_minutes: float, now: datetime = None) -> tuple[bool, str]: 27 """Most recent value in the freshness column must be within max_staleness_minutes of now.""" 28 if latest_timestamp is None: 29 return False, "No timestamp values found — table may be empty" 30 now = now or datetime.now(timezone.utc).replace(tzinfo=None) 31 if getattr(latest_timestamp, "tzinfo", None) is not None: 32 latest_timestamp = latest_timestamp.replace(tzinfo=None) 33 staleness_minutes = (now - latest_timestamp).total_seconds() / 60 34 ok = staleness_minutes <= max_staleness_minutes 35 msg = f"Data is {staleness_minutes:.1f} min old (threshold {max_staleness_minutes} min)" 36 return ok, msg
Most recent value in the freshness column must be within max_staleness_minutes of now.
def
check_volume( row_count: int, min_rows: int = None, max_rows: int = None, baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]:
39def check_volume(row_count: int, min_rows: int = None, max_rows: int = None, 40 baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]: 41 """Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.""" 42 if min_rows is not None and row_count < min_rows: 43 return False, f"Row count {row_count} below min_rows {min_rows}" 44 if max_rows is not None and row_count > max_rows: 45 return False, f"Row count {row_count} above max_rows {max_rows}" 46 if baseline_avg is not None and tolerance_pct is not None and baseline_avg > 0: 47 deviation_pct = abs(row_count - baseline_avg) / baseline_avg * 100 48 if deviation_pct > tolerance_pct: 49 return False, ( 50 f"Row count {row_count} deviates {deviation_pct:.1f}% from baseline " 51 f"{baseline_avg:.0f} (tolerance {tolerance_pct}%)" 52 ) 53 return True, f"Row count {row_count} within expected bounds"
Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.
def
check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]:
65def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]: 66 """old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.""" 67 if old_schema is None: 68 return True, "Initial schema snapshot recorded", {"added": [], "removed": [], "type_changed": []} 69 70 diff = diff_schema(old_schema, new_schema) 71 changed = diff["added"] or diff["removed"] or diff["type_changed"] 72 if not changed: 73 return True, "No schema changes detected", diff 74 75 parts = [] 76 if diff["added"]: 77 parts.append(f"added: {', '.join(diff['added'])}") 78 if diff["removed"]: 79 parts.append(f"removed: {', '.join(diff['removed'])}") 80 if diff["type_changed"]: 81 parts.append(f"type changed: {', '.join(diff['type_changed'])}") 82 return False, "Schema changed — " + "; ".join(parts), diff
old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.
def
diff_schema(old_schema: dict, new_schema: dict) -> dict:
56def diff_schema(old_schema: dict, new_schema: dict) -> dict: 57 """Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.""" 58 old_cols, new_cols = set(old_schema), set(new_schema) 59 added = sorted(new_cols - old_cols) 60 removed = sorted(old_cols - new_cols) 61 type_changed = sorted(c for c in (old_cols & new_cols) if old_schema[c] != new_schema[c]) 62 return {"added": added, "removed": removed, "type_changed": type_changed}
Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.
def
launch():
6def launch(): 7 try: 8 import ipywidgets as w 9 from IPython.display import display 10 except ImportError: 11 raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets") 12 13 import dashui 14 15 monitors: list[dict] = [] 16 17 # ── Add monitor ─────────────────────────────────────────────────────── 18 m_table = w.Text(description="UC Table:", placeholder="catalog.schema.table") 19 20 m_freshness_col = w.Text(description="Freshness col:", placeholder="updated_at (optional)") 21 m_max_staleness = w.IntText(description="Max staleness (min):", value=1440) 22 23 m_min_rows = w.Text(description="Min rows:", placeholder="optional") 24 m_max_rows = w.Text(description="Max rows:", placeholder="optional") 25 m_tolerance = w.Text(description="Baseline tolerance %:", placeholder="optional, e.g. 20") 26 27 m_track_schema = w.Checkbox(value=True, description="Track schema changes") 28 29 add_btn = dashui.action_button("Add Monitor", style="info", emoji="+") 30 monitors_output, render_monitors = dashui.running_list( 31 lambda i, m: ( 32 f" {i}. {m['table']} — " 33 f"freshness:{'on' if m['freshness_column'] else 'off'}, " 34 f"volume:{'on' if (m['min_rows'] or m['max_rows'] or m['volume_tolerance_pct']) else 'off'}, " 35 f"schema:{'on' if m['track_schema'] else 'off'}" 36 ) 37 ) 38 39 def _parse_int(text): 40 text = text.strip() 41 return int(text) if text else None 42 43 def _parse_float(text): 44 text = text.strip() 45 return float(text) if text else None 46 47 def on_add(b): 48 table = m_table.value.strip() 49 if not table: 50 return 51 monitors.append({ 52 "table": table, 53 "freshness_column": m_freshness_col.value.strip() or None, 54 "max_staleness_minutes": m_max_staleness.value, 55 "min_rows": _parse_int(m_min_rows.value), 56 "max_rows": _parse_int(m_max_rows.value), 57 "volume_tolerance_pct": _parse_float(m_tolerance.value), 58 "track_schema": m_track_schema.value, 59 }) 60 render_monitors(monitors) 61 m_table.value = m_freshness_col.value = m_min_rows.value = m_max_rows.value = m_tolerance.value = "" 62 63 add_btn.on_click(on_add) 64 65 # ── Run ────────────────────────────────────────────────────────────── 66 history_table = w.Text(description="History table:", placeholder="catalog.schema.observe_history (optional)") 67 run_btn = dashui.action_button("Run All Monitors", style="success", emoji="▶") 68 output = dashui.output_panel() 69 70 def on_run(b): 71 with output: 72 output.clear_output() 73 if not monitors: 74 print("⚠️ No monitors configured — add at least one above") 75 return 76 try: 77 from dashobserve.runner import MonitorConfig, run_monitors 78 hist = history_table.value.strip() or None 79 for m in monitors: 80 cfg = MonitorConfig( 81 table=m["table"], 82 freshness_column=m["freshness_column"], 83 max_staleness_minutes=m["max_staleness_minutes"] if m["freshness_column"] else None, 84 min_rows=m["min_rows"], 85 max_rows=m["max_rows"], 86 volume_tolerance_pct=m["volume_tolerance_pct"], 87 track_schema=m["track_schema"], 88 ) 89 report = run_monitors(cfg, history_table=hist) 90 report.display() 91 s = report.summary() 92 print(f" → {s['passed']}/{s['total_monitors']} passed\n") 93 except Exception as e: 94 print(f"❌ {e}") 95 96 run_btn.on_click(on_run) 97 98 ui = dashui.card([ 99 dashui.header("DashObserve — Data Observability", library="dashobserve", emoji="👁️"), 100 dashui.section("Step 1: Configure a monitor"), 101 m_table, 102 w.HBox([m_freshness_col, m_max_staleness]), 103 w.HBox([m_min_rows, m_max_rows, m_tolerance]), 104 m_track_schema, 105 add_btn, monitors_output, 106 dashui.section("Step 2: Run"), 107 history_table, 108 run_btn, 109 output, 110 ]) 111 display(ui)