dashobserve
DashObserve — Notebook-native data observability for Databricks: freshness, volume, and schema-change monitoring, plus next-update prediction and volume forecasting. Launch the UI with dashobserve.launch() inside a Databricks notebook.
1""" 2DashObserve — Notebook-native data observability for Databricks: freshness, 3volume, and schema-change monitoring, plus next-update prediction and volume 4forecasting. Launch the UI with dashobserve.launch() inside a Databricks notebook. 5""" 6from dashobserve.monitors import ( 7 MonitorResult, check_freshness, check_schema, check_volume, diff_schema, 8 predict_next_update, predict_volume, 9) 10from dashobserve.runner import MonitorConfig, MonitorReport, run_monitors, ForecastReport, run_forecast 11from dashobserve.ui import launch 12 13__version__ = "0.1.4" 14__all__ = [ 15 "MonitorConfig", 16 "MonitorReport", 17 "MonitorResult", 18 "ForecastReport", 19 "run_monitors", 20 "run_forecast", 21 "check_freshness", 22 "check_volume", 23 "check_schema", 24 "diff_schema", 25 "predict_next_update", 26 "predict_volume", 27 "launch", 28]
18@dataclass 19class MonitorConfig: 20 table: str 21 freshness_column: str = None 22 max_staleness_minutes: float = None 23 min_rows: int = None 24 max_rows: int = None 25 volume_tolerance_pct: float = None 26 track_schema: bool = False
29class MonitorReport: 30 def __init__(self, results: list): 31 self.results = results 32 33 def to_dict(self) -> list: 34 return [r.to_dict() for r in self.results] 35 36 def to_pandas(self): 37 import pandas as pd 38 return pd.DataFrame(self.to_dict()) 39 40 def summary(self) -> dict: 41 total = len(self.results) 42 passed = sum(1 for r in self.results if r.status == "PASS") 43 return { 44 "total_monitors": total, 45 "passed": passed, 46 "failed": total - passed, 47 "pass_rate_pct": round(passed / total * 100, 1) if total else 0, 48 } 49 50 def display(self): 51 for r in self.results: 52 icon = "✅" if r.status == "PASS" else "❌" 53 print(f"{icon} [{r.monitor_type}] {r.table_name} — {r.message}")
13@dataclass 14class MonitorResult: 15 table_name: str 16 monitor_type: str # freshness | volume | schema 17 monitor_name: str 18 status: str # PASS | FAIL | ERROR 19 message: str 20 run_timestamp: str = "" 21 details: str = "{}" # json-encoded dict of monitor-specific details 22 23 def to_dict(self) -> dict: 24 return dict(self.__dict__)
158class ForecastReport: 159 def __init__(self, table: str, next_update: dict, volume_projections: list): 160 self.table = table 161 self.next_update = next_update 162 self.volume_projections = volume_projections 163 164 def summary(self) -> dict: 165 return { 166 "table": self.table, 167 "next_update_pattern": self.next_update["pattern"], 168 "predicted_next_update": self.next_update["predicted_next_update"], 169 "volume_periods_forecast": len(self.volume_projections), 170 } 171 172 def display(self): 173 print(f"📅 Next update prediction for {self.table}:") 174 if self.next_update["predicted_next_update"]: 175 print(f" Pattern: {self.next_update['pattern']}") 176 print(f" Avg interval: {self.next_update['avg_interval_hours']} hours") 177 print(f" Last update: {self.next_update['last_known_update']}") 178 print(f" Predicted next: {self.next_update['predicted_next_update']}") 179 print(f" Based on: {self.next_update['history_points']} observations") 180 else: 181 print(" Insufficient history — run monitors a few more times first") 182 183 if self.volume_projections: 184 print("\n📈 Volume forecast:") 185 print(f" {'Period':<14} {'Date':<14} {'Projected rows':>14} Range") 186 print(f" {'-'*14} {'-'*14} {'-'*14} {'-'*24}") 187 for p in self.volume_projections: 188 rng = f"{p['lower_bound']:,} – {p['upper_bound']:,}" 189 print(f" {p['period_label']:<14} {p['predicted_date']:<14} {p['predicted_row_count']:>14,} {rng}") 190 else: 191 print("\n📈 Volume forecast: insufficient history")
172 def display(self): 173 print(f"📅 Next update prediction for {self.table}:") 174 if self.next_update["predicted_next_update"]: 175 print(f" Pattern: {self.next_update['pattern']}") 176 print(f" Avg interval: {self.next_update['avg_interval_hours']} hours") 177 print(f" Last update: {self.next_update['last_known_update']}") 178 print(f" Predicted next: {self.next_update['predicted_next_update']}") 179 print(f" Based on: {self.next_update['history_points']} observations") 180 else: 181 print(" Insufficient history — run monitors a few more times first") 182 183 if self.volume_projections: 184 print("\n📈 Volume forecast:") 185 print(f" {'Period':<14} {'Date':<14} {'Projected rows':>14} Range") 186 print(f" {'-'*14} {'-'*14} {'-'*14} {'-'*24}") 187 for p in self.volume_projections: 188 rng = f"{p['lower_bound']:,} – {p['upper_bound']:,}" 189 print(f" {p['period_label']:<14} {p['predicted_date']:<14} {p['predicted_row_count']:>14,} {rng}") 190 else: 191 print("\n📈 Volume forecast: insufficient history")
56def run_monitors(config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport: 57 """Run every monitor configured on config.table and return a MonitorReport.""" 58 if spark is None: 59 from pyspark.sql import SparkSession 60 spark = SparkSession.getActiveSession() 61 from pyspark.sql import functions as F 62 63 df = spark.table(config.table) 64 run_ts = datetime.now().isoformat(timespec="seconds") 65 results = [] 66 67 if config.freshness_column and config.max_staleness_minutes is not None: 68 latest = df.agg(F.max(config.freshness_column)).collect()[0][0] 69 ok, msg = check_freshness(latest, config.max_staleness_minutes) 70 results.append(MonitorResult( 71 config.table, "freshness", "data_freshness", 72 "PASS" if ok else "FAIL", msg, run_ts, 73 details=json.dumps({"latest_timestamp": latest.isoformat() if latest else None}), 74 )) 75 76 if config.min_rows is not None or config.max_rows is not None or config.volume_tolerance_pct is not None: 77 row_count = df.count() 78 baseline_avg = None 79 if config.volume_tolerance_pct is not None and history_table: 80 baseline_avg = _read_volume_baseline(spark, history_table, config.table) 81 ok, msg = check_volume(row_count, config.min_rows, config.max_rows, baseline_avg, config.volume_tolerance_pct) 82 results.append(MonitorResult( 83 config.table, "volume", "row_count", 84 "PASS" if ok else "FAIL", msg, run_ts, 85 details=json.dumps({"row_count": row_count}), 86 )) 87 88 if config.track_schema: 89 new_schema = {f.name: str(f.dataType) for f in df.schema.fields} 90 old_schema = _read_last_schema(spark, history_table, config.table) if history_table else None 91 ok, msg, diff = check_schema(old_schema, new_schema) 92 results.append(MonitorResult( 93 config.table, "schema", "schema_change", 94 "PASS" if ok else "FAIL", msg, run_ts, 95 details=json.dumps({"schema": new_schema, "diff": diff}), 96 )) 97 98 if history_table and results: 99 _save_results(spark, history_table, results) 100 101 return MonitorReport(results)
Run every monitor configured on config.table and return a MonitorReport.
194def run_forecast( 195 table: str, 196 history_table: str, 197 n_periods: int = 4, 198 period: str = "weeks", 199 spark=None, 200) -> ForecastReport: 201 """ 202 Read historical monitor results and produce next-update and volume forecasts. 203 204 Requires that run_monitors() has been called at least a few times with the 205 same history_table so there is enough data to fit a trend. 206 """ 207 if spark is None: 208 from pyspark.sql import SparkSession 209 spark = SparkSession.getActiveSession() 210 from pyspark.sql import functions as F 211 from datetime import datetime 212 213 update_timestamps = [] 214 try: 215 rows = ( 216 spark.table(history_table) 217 .filter((F.col("table_name") == table) & (F.col("monitor_type") == "freshness")) 218 .orderBy("run_timestamp") 219 .select("details") 220 .collect() 221 ) 222 for r in rows: 223 try: 224 ts_str = json.loads(r["details"]).get("latest_timestamp") 225 if ts_str: 226 update_timestamps.append(datetime.fromisoformat(ts_str)) 227 except Exception: 228 continue 229 except Exception: 230 pass 231 232 volume_history = [] 233 try: 234 rows = ( 235 spark.table(history_table) 236 .filter((F.col("table_name") == table) & (F.col("monitor_type") == "volume")) 237 .orderBy("run_timestamp") 238 .select("run_timestamp", "details") 239 .collect() 240 ) 241 for r in rows: 242 try: 243 count = json.loads(r["details"]).get("row_count") 244 if count is not None: 245 volume_history.append((r["run_timestamp"], count)) 246 except Exception: 247 continue 248 except Exception: 249 pass 250 251 return ForecastReport( 252 table=table, 253 next_update=predict_next_update(update_timestamps), 254 volume_projections=predict_volume(volume_history, n_periods=n_periods, period=period), 255 )
Read historical monitor results and produce next-update and volume forecasts.
Requires that run_monitors() has been called at least a few times with the same history_table so there is enough data to fit a trend.
27def check_freshness(latest_timestamp, max_staleness_minutes: float, now: datetime = None) -> tuple[bool, str]: 28 """Most recent value in the freshness column must be within max_staleness_minutes of now.""" 29 if latest_timestamp is None: 30 return False, "No timestamp values found — table may be empty" 31 now = now or datetime.now(timezone.utc).replace(tzinfo=None) 32 if getattr(latest_timestamp, "tzinfo", None) is not None: 33 latest_timestamp = latest_timestamp.replace(tzinfo=None) 34 staleness_minutes = (now - latest_timestamp).total_seconds() / 60 35 ok = staleness_minutes <= max_staleness_minutes 36 msg = f"Data is {staleness_minutes:.1f} min old (threshold {max_staleness_minutes} min)" 37 return ok, msg
Most recent value in the freshness column must be within max_staleness_minutes of now.
40def check_volume(row_count: int, min_rows: int = None, max_rows: int = None, 41 baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]: 42 """Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.""" 43 if min_rows is not None and row_count < min_rows: 44 return False, f"Row count {row_count} below min_rows {min_rows}" 45 if max_rows is not None and row_count > max_rows: 46 return False, f"Row count {row_count} above max_rows {max_rows}" 47 if baseline_avg is not None and tolerance_pct is not None and baseline_avg > 0: 48 deviation_pct = abs(row_count - baseline_avg) / baseline_avg * 100 49 if deviation_pct > tolerance_pct: 50 return False, ( 51 f"Row count {row_count} deviates {deviation_pct:.1f}% from baseline " 52 f"{baseline_avg:.0f} (tolerance {tolerance_pct}%)" 53 ) 54 return True, f"Row count {row_count} within expected bounds"
Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.
174def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]: 175 """old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.""" 176 if old_schema is None: 177 return True, "Initial schema snapshot recorded", {"added": [], "removed": [], "type_changed": []} 178 179 diff = diff_schema(old_schema, new_schema) 180 changed = diff["added"] or diff["removed"] or diff["type_changed"] 181 if not changed: 182 return True, "No schema changes detected", diff 183 184 parts = [] 185 if diff["added"]: 186 parts.append(f"added: {', '.join(diff['added'])}") 187 if diff["removed"]: 188 parts.append(f"removed: {', '.join(diff['removed'])}") 189 if diff["type_changed"]: 190 parts.append(f"type changed: {', '.join(diff['type_changed'])}") 191 return False, "Schema changed — " + "; ".join(parts), diff
old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.
57def diff_schema(old_schema: dict, new_schema: dict) -> dict: 58 """Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.""" 59 old_cols, new_cols = set(old_schema), set(new_schema) 60 added = sorted(new_cols - old_cols) 61 removed = sorted(old_cols - new_cols) 62 type_changed = sorted(c for c in (old_cols & new_cols) if old_schema[c] != new_schema[c]) 63 return {"added": added, "removed": removed, "type_changed": type_changed}
Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.
66def predict_next_update(update_timestamps: list) -> dict: 67 """ 68 Predict when a table will next be updated based on historical update timestamps. 69 70 update_timestamps: list of datetime objects (when the freshness column last changed). 71 Returns dict: predicted_next_update (ISO str), avg_interval_hours, pattern, history_points. 72 """ 73 from datetime import timedelta 74 75 if len(update_timestamps) < 2: 76 return { 77 "predicted_next_update": None, 78 "avg_interval_hours": None, 79 "pattern": "insufficient_data", 80 "last_known_update": update_timestamps[0].isoformat(timespec="seconds") if update_timestamps else None, 81 "history_points": len(update_timestamps), 82 } 83 84 sorted_ts = sorted(update_timestamps) 85 intervals_h = [ 86 (sorted_ts[i + 1] - sorted_ts[i]).total_seconds() / 3600 87 for i in range(len(sorted_ts) - 1) 88 ] 89 avg_h = sum(intervals_h) / len(intervals_h) 90 91 pattern = "irregular" 92 for threshold_h, label in [ 93 (1.4, "hourly"), 94 (5.0, "every few hours"), 95 (18.0, "twice daily"), 96 (30.0, "daily"), 97 (55.0, "every few days"), 98 (200.0, "weekly"), 99 (400.0, "bi-weekly"), 100 (800.0, "monthly"), 101 ]: 102 if avg_h <= threshold_h: 103 pattern = label 104 break 105 106 predicted_next = sorted_ts[-1] + timedelta(hours=avg_h) 107 return { 108 "predicted_next_update": predicted_next.isoformat(timespec="seconds"), 109 "avg_interval_hours": round(avg_h, 2), 110 "pattern": pattern, 111 "last_known_update": sorted_ts[-1].isoformat(timespec="seconds"), 112 "history_points": len(sorted_ts), 113 }
Predict when a table will next be updated based on historical update timestamps.
update_timestamps: list of datetime objects (when the freshness column last changed). Returns dict: predicted_next_update (ISO str), avg_interval_hours, pattern, history_points.
116def predict_volume(history: list, n_periods: int = 4, period: str = "weeks") -> list[dict]: 117 """ 118 Forecast future row counts using a linear trend fitted to historical observations. 119 120 history: list of (datetime_or_isostr, row_count) pairs. 121 period: 'days' | 'weeks' | 'months' 122 Returns list of dicts: period_label, predicted_date, predicted_row_count, 123 lower_bound, upper_bound (95% prediction interval). 124 """ 125 from datetime import datetime, timedelta 126 127 PERIOD_DAYS = {"days": 1, "weeks": 7, "months": 30} 128 if period not in PERIOD_DAYS: 129 raise ValueError(f"period must be one of {list(PERIOD_DAYS)}, got {period!r}") 130 if len(history) < 2: 131 return [] 132 133 parsed = [] 134 for ts, count in history: 135 if isinstance(ts, str): 136 ts = datetime.fromisoformat(ts) 137 parsed.append((ts, int(count))) 138 parsed.sort(key=lambda x: x[0]) 139 140 t0 = parsed[0][0] 141 xs = [(p[0] - t0).total_seconds() / 86400 for p in parsed] 142 ys = [p[1] for p in parsed] 143 n = len(xs) 144 mean_x = sum(xs) / n 145 mean_y = sum(ys) / n 146 denom = sum((x - mean_x) ** 2 for x in xs) 147 slope = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n)) / denom if denom else 0 148 intercept = mean_y - slope * mean_x 149 150 residuals = [ys[i] - (intercept + slope * xs[i]) for i in range(n)] 151 std_err = (sum(r ** 2 for r in residuals) / max(n - 2, 1)) ** 0.5 152 153 step_days = PERIOD_DAYS[period] 154 last_x = xs[-1] 155 last_ts = parsed[-1][0] 156 157 projections = [] 158 for i in range(1, n_periods + 1): 159 future_x = last_x + i * step_days 160 future_ts = last_ts + timedelta(days=i * step_days) 161 predicted = max(0, int(intercept + slope * future_x)) 162 ci = int(1.96 * std_err) 163 projections.append({ 164 "period": i, 165 "period_label": f"+{i} {period}", 166 "predicted_date": future_ts.strftime("%Y-%m-%d"), 167 "predicted_row_count": predicted, 168 "lower_bound": max(0, predicted - ci), 169 "upper_bound": predicted + ci, 170 }) 171 return projections
Forecast future row counts using a linear trend fitted to historical observations.
history: list of (datetime_or_isostr, row_count) pairs. period: 'days' | 'weeks' | 'months' Returns list of dicts: period_label, predicted_date, predicted_row_count, lower_bound, upper_bound (95% prediction interval).
6def launch(): 7 try: 8 import ipywidgets as w 9 from IPython.display import display 10 except ImportError: 11 raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets") 12 13 import dashui 14 15 monitors: list[dict] = [] 16 17 # ── Add monitor ─────────────────────────────────────────────────────── 18 m_table = w.Text(description="UC Table:", placeholder="catalog.schema.table") 19 20 m_freshness_col = w.Text(description="Freshness col:", placeholder="updated_at (optional)") 21 m_max_staleness = w.IntText(description="Max staleness (min):", value=1440) 22 23 m_min_rows = w.Text(description="Min rows:", placeholder="optional") 24 m_max_rows = w.Text(description="Max rows:", placeholder="optional") 25 m_tolerance = w.Text(description="Baseline tolerance %:", placeholder="optional, e.g. 20") 26 27 m_track_schema = w.Checkbox(value=True, description="Track schema changes") 28 29 add_btn = dashui.action_button("Add Monitor", style="info", emoji="+") 30 monitors_output, render_monitors = dashui.running_list( 31 lambda i, m: ( 32 f" {i}. {m['table']} — " 33 f"freshness:{'on' if m['freshness_column'] else 'off'}, " 34 f"volume:{'on' if (m['min_rows'] or m['max_rows'] or m['volume_tolerance_pct']) else 'off'}, " 35 f"schema:{'on' if m['track_schema'] else 'off'}" 36 ) 37 ) 38 39 def _parse_int(text): 40 text = text.strip() 41 return int(text) if text else None 42 43 def _parse_float(text): 44 text = text.strip() 45 return float(text) if text else None 46 47 def on_add(b): 48 table = m_table.value.strip() 49 if not table: 50 return 51 monitors.append({ 52 "table": table, 53 "freshness_column": m_freshness_col.value.strip() or None, 54 "max_staleness_minutes": m_max_staleness.value, 55 "min_rows": _parse_int(m_min_rows.value), 56 "max_rows": _parse_int(m_max_rows.value), 57 "volume_tolerance_pct": _parse_float(m_tolerance.value), 58 "track_schema": m_track_schema.value, 59 }) 60 render_monitors(monitors) 61 m_table.value = m_freshness_col.value = m_min_rows.value = m_max_rows.value = m_tolerance.value = "" 62 63 add_btn.on_click(on_add) 64 65 # ── Run ────────────────────────────────────────────────────────────── 66 history_table = w.Text(description="History table:", placeholder="catalog.schema.observe_history (optional)") 67 run_btn = dashui.action_button("Run All Monitors", style="success", emoji="▶") 68 output = dashui.output_panel() 69 70 def on_run(b): 71 with output: 72 output.clear_output() 73 if not monitors: 74 print("⚠️ No monitors configured — add at least one above") 75 return 76 try: 77 from dashobserve.runner import MonitorConfig, run_monitors 78 hist = history_table.value.strip() or None 79 for m in monitors: 80 cfg = MonitorConfig( 81 table=m["table"], 82 freshness_column=m["freshness_column"], 83 max_staleness_minutes=m["max_staleness_minutes"] if m["freshness_column"] else None, 84 min_rows=m["min_rows"], 85 max_rows=m["max_rows"], 86 volume_tolerance_pct=m["volume_tolerance_pct"], 87 track_schema=m["track_schema"], 88 ) 89 report = run_monitors(cfg, history_table=hist) 90 report.display() 91 s = report.summary() 92 print(f" → {s['passed']}/{s['total_monitors']} passed\n") 93 except Exception as e: 94 print(f"❌ {e}") 95 96 run_btn.on_click(on_run) 97 98 # ── Forecast ───────────────────────────────────────────────────────── 99 f_table = w.Text(description="UC Table:", placeholder="catalog.schema.table") 100 f_history = w.Text(description="History table:", placeholder="catalog.schema.observe_history") 101 f_n_periods = w.IntText(description="Periods ahead:", value=4, min=1, max=52) 102 f_period = w.ToggleButtons(options=["days", "weeks", "months"], description="Period:") 103 forecast_btn = dashui.action_button("Run Forecast", style="info", emoji="📈") 104 forecast_output = dashui.output_panel() 105 106 def on_forecast(b): 107 with forecast_output: 108 forecast_output.clear_output() 109 table = f_table.value.strip() 110 hist = f_history.value.strip() 111 if not table or not hist: 112 print("⚠️ Specify both a UC table and the history table") 113 return 114 try: 115 from dashobserve.runner import run_forecast 116 report = run_forecast( 117 table=table, 118 history_table=hist, 119 n_periods=f_n_periods.value, 120 period=f_period.value, 121 ) 122 report.display() 123 except Exception as e: 124 print(f"❌ {e}") 125 126 forecast_btn.on_click(on_forecast) 127 128 ui = dashui.card([ 129 dashui.header("DashObserve — Data Observability", library="dashobserve", emoji="👁️"), 130 dashui.section("Step 1: Configure a monitor"), 131 m_table, 132 w.HBox([m_freshness_col, m_max_staleness]), 133 w.HBox([m_min_rows, m_max_rows, m_tolerance]), 134 m_track_schema, 135 add_btn, monitors_output, 136 dashui.section("Step 2: Run monitors"), 137 history_table, 138 run_btn, 139 output, 140 dashui.section("Step 3: Forecast"), 141 dashui.html( 142 "<div style='font-size:12px;color:#666;margin-bottom:4px'>" 143 "Predicts next table update and projects volume trend. " 144 "Requires history built from prior monitor runs.</div>" 145 ), 146 w.HBox([f_table, f_history]), 147 w.HBox([f_n_periods, f_period]), 148 forecast_btn, 149 forecast_output, 150 ]) 151 display(ui)