dashobserve

DashObserve — Notebook-native data observability for Databricks: freshness, volume, and schema-change monitoring, plus next-update prediction and volume forecasting. Launch the UI with dashobserve.launch() inside a Databricks notebook.

 1"""
 2DashObserve — Notebook-native data observability for Databricks: freshness,
 3volume, and schema-change monitoring, plus next-update prediction and volume
 4forecasting. Launch the UI with dashobserve.launch() inside a Databricks notebook.
 5"""
 6from dashobserve.monitors import (
 7    MonitorResult, check_freshness, check_schema, check_volume, diff_schema,
 8    predict_next_update, predict_volume,
 9)
10from dashobserve.runner import MonitorConfig, MonitorReport, run_monitors, ForecastReport, run_forecast
11from dashobserve.ui import launch
12
13__version__ = "0.1.4"
14__all__ = [
15    "MonitorConfig",
16    "MonitorReport",
17    "MonitorResult",
18    "ForecastReport",
19    "run_monitors",
20    "run_forecast",
21    "check_freshness",
22    "check_volume",
23    "check_schema",
24    "diff_schema",
25    "predict_next_update",
26    "predict_volume",
27    "launch",
28]
@dataclass
class MonitorConfig:
18@dataclass
19class MonitorConfig:
20    table: str
21    freshness_column: str = None
22    max_staleness_minutes: float = None
23    min_rows: int = None
24    max_rows: int = None
25    volume_tolerance_pct: float = None
26    track_schema: bool = False
MonitorConfig( table: str, freshness_column: str = None, max_staleness_minutes: float = None, min_rows: int = None, max_rows: int = None, volume_tolerance_pct: float = None, track_schema: bool = False)
table: str
freshness_column: str = None
max_staleness_minutes: float = None
min_rows: int = None
max_rows: int = None
volume_tolerance_pct: float = None
track_schema: bool = False
class MonitorReport:
29class MonitorReport:
30    def __init__(self, results: list):
31        self.results = results
32
33    def to_dict(self) -> list:
34        return [r.to_dict() for r in self.results]
35
36    def to_pandas(self):
37        import pandas as pd
38        return pd.DataFrame(self.to_dict())
39
40    def summary(self) -> dict:
41        total = len(self.results)
42        passed = sum(1 for r in self.results if r.status == "PASS")
43        return {
44            "total_monitors": total,
45            "passed": passed,
46            "failed": total - passed,
47            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
48        }
49
50    def display(self):
51        for r in self.results:
52            icon = "✅" if r.status == "PASS" else "❌"
53            print(f"{icon} [{r.monitor_type}] {r.table_name}{r.message}")
MonitorReport(results: list)
30    def __init__(self, results: list):
31        self.results = results
results
def to_dict(self) -> list:
33    def to_dict(self) -> list:
34        return [r.to_dict() for r in self.results]
def to_pandas(self):
36    def to_pandas(self):
37        import pandas as pd
38        return pd.DataFrame(self.to_dict())
def summary(self) -> dict:
40    def summary(self) -> dict:
41        total = len(self.results)
42        passed = sum(1 for r in self.results if r.status == "PASS")
43        return {
44            "total_monitors": total,
45            "passed": passed,
46            "failed": total - passed,
47            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
48        }
def display(self):
50    def display(self):
51        for r in self.results:
52            icon = "✅" if r.status == "PASS" else "❌"
53            print(f"{icon} [{r.monitor_type}] {r.table_name}{r.message}")
@dataclass
class MonitorResult:
13@dataclass
14class MonitorResult:
15    table_name: str
16    monitor_type: str      # freshness | volume | schema
17    monitor_name: str
18    status: str             # PASS | FAIL | ERROR
19    message: str
20    run_timestamp: str = ""
21    details: str = "{}"     # json-encoded dict of monitor-specific details
22
23    def to_dict(self) -> dict:
24        return dict(self.__dict__)
MonitorResult( table_name: str, monitor_type: str, monitor_name: str, status: str, message: str, run_timestamp: str = '', details: str = '{}')
table_name: str
monitor_type: str
monitor_name: str
status: str
message: str
run_timestamp: str = ''
details: str = '{}'
def to_dict(self) -> dict:
23    def to_dict(self) -> dict:
24        return dict(self.__dict__)
class ForecastReport:
158class ForecastReport:
159    def __init__(self, table: str, next_update: dict, volume_projections: list):
160        self.table = table
161        self.next_update = next_update
162        self.volume_projections = volume_projections
163
164    def summary(self) -> dict:
165        return {
166            "table": self.table,
167            "next_update_pattern": self.next_update["pattern"],
168            "predicted_next_update": self.next_update["predicted_next_update"],
169            "volume_periods_forecast": len(self.volume_projections),
170        }
171
172    def display(self):
173        print(f"📅 Next update prediction for {self.table}:")
174        if self.next_update["predicted_next_update"]:
175            print(f"   Pattern:         {self.next_update['pattern']}")
176            print(f"   Avg interval:    {self.next_update['avg_interval_hours']} hours")
177            print(f"   Last update:     {self.next_update['last_known_update']}")
178            print(f"   Predicted next:  {self.next_update['predicted_next_update']}")
179            print(f"   Based on:        {self.next_update['history_points']} observations")
180        else:
181            print("   Insufficient history — run monitors a few more times first")
182
183        if self.volume_projections:
184            print("\n📈 Volume forecast:")
185            print(f"   {'Period':<14} {'Date':<14} {'Projected rows':>14}   Range")
186            print(f"   {'-'*14} {'-'*14} {'-'*14}   {'-'*24}")
187            for p in self.volume_projections:
188                rng = f"{p['lower_bound']:,}{p['upper_bound']:,}"
189                print(f"   {p['period_label']:<14} {p['predicted_date']:<14} {p['predicted_row_count']:>14,}   {rng}")
190        else:
191            print("\n📈 Volume forecast: insufficient history")
ForecastReport(table: str, next_update: dict, volume_projections: list)
159    def __init__(self, table: str, next_update: dict, volume_projections: list):
160        self.table = table
161        self.next_update = next_update
162        self.volume_projections = volume_projections
table
next_update
volume_projections
def summary(self) -> dict:
164    def summary(self) -> dict:
165        return {
166            "table": self.table,
167            "next_update_pattern": self.next_update["pattern"],
168            "predicted_next_update": self.next_update["predicted_next_update"],
169            "volume_periods_forecast": len(self.volume_projections),
170        }
def display(self):
172    def display(self):
173        print(f"📅 Next update prediction for {self.table}:")
174        if self.next_update["predicted_next_update"]:
175            print(f"   Pattern:         {self.next_update['pattern']}")
176            print(f"   Avg interval:    {self.next_update['avg_interval_hours']} hours")
177            print(f"   Last update:     {self.next_update['last_known_update']}")
178            print(f"   Predicted next:  {self.next_update['predicted_next_update']}")
179            print(f"   Based on:        {self.next_update['history_points']} observations")
180        else:
181            print("   Insufficient history — run monitors a few more times first")
182
183        if self.volume_projections:
184            print("\n📈 Volume forecast:")
185            print(f"   {'Period':<14} {'Date':<14} {'Projected rows':>14}   Range")
186            print(f"   {'-'*14} {'-'*14} {'-'*14}   {'-'*24}")
187            for p in self.volume_projections:
188                rng = f"{p['lower_bound']:,}{p['upper_bound']:,}"
189                print(f"   {p['period_label']:<14} {p['predicted_date']:<14} {p['predicted_row_count']:>14,}   {rng}")
190        else:
191            print("\n📈 Volume forecast: insufficient history")
def run_monitors( config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport:
 56def run_monitors(config: MonitorConfig, history_table: str = None, spark=None) -> MonitorReport:
 57    """Run every monitor configured on config.table and return a MonitorReport."""
 58    if spark is None:
 59        from pyspark.sql import SparkSession
 60        spark = SparkSession.getActiveSession()
 61    from pyspark.sql import functions as F
 62
 63    df = spark.table(config.table)
 64    run_ts = datetime.now().isoformat(timespec="seconds")
 65    results = []
 66
 67    if config.freshness_column and config.max_staleness_minutes is not None:
 68        latest = df.agg(F.max(config.freshness_column)).collect()[0][0]
 69        ok, msg = check_freshness(latest, config.max_staleness_minutes)
 70        results.append(MonitorResult(
 71            config.table, "freshness", "data_freshness",
 72            "PASS" if ok else "FAIL", msg, run_ts,
 73            details=json.dumps({"latest_timestamp": latest.isoformat() if latest else None}),
 74        ))
 75
 76    if config.min_rows is not None or config.max_rows is not None or config.volume_tolerance_pct is not None:
 77        row_count = df.count()
 78        baseline_avg = None
 79        if config.volume_tolerance_pct is not None and history_table:
 80            baseline_avg = _read_volume_baseline(spark, history_table, config.table)
 81        ok, msg = check_volume(row_count, config.min_rows, config.max_rows, baseline_avg, config.volume_tolerance_pct)
 82        results.append(MonitorResult(
 83            config.table, "volume", "row_count",
 84            "PASS" if ok else "FAIL", msg, run_ts,
 85            details=json.dumps({"row_count": row_count}),
 86        ))
 87
 88    if config.track_schema:
 89        new_schema = {f.name: str(f.dataType) for f in df.schema.fields}
 90        old_schema = _read_last_schema(spark, history_table, config.table) if history_table else None
 91        ok, msg, diff = check_schema(old_schema, new_schema)
 92        results.append(MonitorResult(
 93            config.table, "schema", "schema_change",
 94            "PASS" if ok else "FAIL", msg, run_ts,
 95            details=json.dumps({"schema": new_schema, "diff": diff}),
 96        ))
 97
 98    if history_table and results:
 99        _save_results(spark, history_table, results)
100
101    return MonitorReport(results)

Run every monitor configured on config.table and return a MonitorReport.

def run_forecast( table: str, history_table: str, n_periods: int = 4, period: str = 'weeks', spark=None) -> ForecastReport:
194def run_forecast(
195    table: str,
196    history_table: str,
197    n_periods: int = 4,
198    period: str = "weeks",
199    spark=None,
200) -> ForecastReport:
201    """
202    Read historical monitor results and produce next-update and volume forecasts.
203
204    Requires that run_monitors() has been called at least a few times with the
205    same history_table so there is enough data to fit a trend.
206    """
207    if spark is None:
208        from pyspark.sql import SparkSession
209        spark = SparkSession.getActiveSession()
210    from pyspark.sql import functions as F
211    from datetime import datetime
212
213    update_timestamps = []
214    try:
215        rows = (
216            spark.table(history_table)
217                 .filter((F.col("table_name") == table) & (F.col("monitor_type") == "freshness"))
218                 .orderBy("run_timestamp")
219                 .select("details")
220                 .collect()
221        )
222        for r in rows:
223            try:
224                ts_str = json.loads(r["details"]).get("latest_timestamp")
225                if ts_str:
226                    update_timestamps.append(datetime.fromisoformat(ts_str))
227            except Exception:
228                continue
229    except Exception:
230        pass
231
232    volume_history = []
233    try:
234        rows = (
235            spark.table(history_table)
236                 .filter((F.col("table_name") == table) & (F.col("monitor_type") == "volume"))
237                 .orderBy("run_timestamp")
238                 .select("run_timestamp", "details")
239                 .collect()
240        )
241        for r in rows:
242            try:
243                count = json.loads(r["details"]).get("row_count")
244                if count is not None:
245                    volume_history.append((r["run_timestamp"], count))
246            except Exception:
247                continue
248    except Exception:
249        pass
250
251    return ForecastReport(
252        table=table,
253        next_update=predict_next_update(update_timestamps),
254        volume_projections=predict_volume(volume_history, n_periods=n_periods, period=period),
255    )

Read historical monitor results and produce next-update and volume forecasts.

Requires that run_monitors() has been called at least a few times with the same history_table so there is enough data to fit a trend.

def check_freshness( latest_timestamp, max_staleness_minutes: float, now: datetime.datetime = None) -> tuple[bool, str]:
27def check_freshness(latest_timestamp, max_staleness_minutes: float, now: datetime = None) -> tuple[bool, str]:
28    """Most recent value in the freshness column must be within max_staleness_minutes of now."""
29    if latest_timestamp is None:
30        return False, "No timestamp values found — table may be empty"
31    now = now or datetime.now(timezone.utc).replace(tzinfo=None)
32    if getattr(latest_timestamp, "tzinfo", None) is not None:
33        latest_timestamp = latest_timestamp.replace(tzinfo=None)
34    staleness_minutes = (now - latest_timestamp).total_seconds() / 60
35    ok = staleness_minutes <= max_staleness_minutes
36    msg = f"Data is {staleness_minutes:.1f} min old (threshold {max_staleness_minutes} min)"
37    return ok, msg

Most recent value in the freshness column must be within max_staleness_minutes of now.

def check_volume( row_count: int, min_rows: int = None, max_rows: int = None, baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]:
40def check_volume(row_count: int, min_rows: int = None, max_rows: int = None,
41                  baseline_avg: float = None, tolerance_pct: float = None) -> tuple[bool, str]:
42    """Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline."""
43    if min_rows is not None and row_count < min_rows:
44        return False, f"Row count {row_count} below min_rows {min_rows}"
45    if max_rows is not None and row_count > max_rows:
46        return False, f"Row count {row_count} above max_rows {max_rows}"
47    if baseline_avg is not None and tolerance_pct is not None and baseline_avg > 0:
48        deviation_pct = abs(row_count - baseline_avg) / baseline_avg * 100
49        if deviation_pct > tolerance_pct:
50            return False, (
51                f"Row count {row_count} deviates {deviation_pct:.1f}% from baseline "
52                f"{baseline_avg:.0f} (tolerance {tolerance_pct}%)"
53            )
54    return True, f"Row count {row_count} within expected bounds"

Row count must satisfy absolute bounds and/or stay within tolerance_pct of a historical baseline.

def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]:
174def check_schema(old_schema: dict, new_schema: dict) -> tuple[bool, str, dict]:
175    """old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure."""
176    if old_schema is None:
177        return True, "Initial schema snapshot recorded", {"added": [], "removed": [], "type_changed": []}
178
179    diff = diff_schema(old_schema, new_schema)
180    changed = diff["added"] or diff["removed"] or diff["type_changed"]
181    if not changed:
182        return True, "No schema changes detected", diff
183
184    parts = []
185    if diff["added"]:
186        parts.append(f"added: {', '.join(diff['added'])}")
187    if diff["removed"]:
188        parts.append(f"removed: {', '.join(diff['removed'])}")
189    if diff["type_changed"]:
190        parts.append(f"type changed: {', '.join(diff['type_changed'])}")
191    return False, "Schema changed — " + "; ".join(parts), diff

old_schema=None means no prior snapshot exists yet — treated as a baseline, not a failure.

def diff_schema(old_schema: dict, new_schema: dict) -> dict:
57def diff_schema(old_schema: dict, new_schema: dict) -> dict:
58    """Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O."""
59    old_cols, new_cols = set(old_schema), set(new_schema)
60    added = sorted(new_cols - old_cols)
61    removed = sorted(old_cols - new_cols)
62    type_changed = sorted(c for c in (old_cols & new_cols) if old_schema[c] != new_schema[c])
63    return {"added": added, "removed": removed, "type_changed": type_changed}

Compare two {column_name: dtype_string} schemas. Pure set/dict logic, no I/O.

def predict_next_update(update_timestamps: list) -> dict:
 66def predict_next_update(update_timestamps: list) -> dict:
 67    """
 68    Predict when a table will next be updated based on historical update timestamps.
 69
 70    update_timestamps: list of datetime objects (when the freshness column last changed).
 71    Returns dict: predicted_next_update (ISO str), avg_interval_hours, pattern, history_points.
 72    """
 73    from datetime import timedelta
 74
 75    if len(update_timestamps) < 2:
 76        return {
 77            "predicted_next_update": None,
 78            "avg_interval_hours": None,
 79            "pattern": "insufficient_data",
 80            "last_known_update": update_timestamps[0].isoformat(timespec="seconds") if update_timestamps else None,
 81            "history_points": len(update_timestamps),
 82        }
 83
 84    sorted_ts = sorted(update_timestamps)
 85    intervals_h = [
 86        (sorted_ts[i + 1] - sorted_ts[i]).total_seconds() / 3600
 87        for i in range(len(sorted_ts) - 1)
 88    ]
 89    avg_h = sum(intervals_h) / len(intervals_h)
 90
 91    pattern = "irregular"
 92    for threshold_h, label in [
 93        (1.4,  "hourly"),
 94        (5.0,  "every few hours"),
 95        (18.0, "twice daily"),
 96        (30.0, "daily"),
 97        (55.0, "every few days"),
 98        (200.0, "weekly"),
 99        (400.0, "bi-weekly"),
100        (800.0, "monthly"),
101    ]:
102        if avg_h <= threshold_h:
103            pattern = label
104            break
105
106    predicted_next = sorted_ts[-1] + timedelta(hours=avg_h)
107    return {
108        "predicted_next_update": predicted_next.isoformat(timespec="seconds"),
109        "avg_interval_hours": round(avg_h, 2),
110        "pattern": pattern,
111        "last_known_update": sorted_ts[-1].isoformat(timespec="seconds"),
112        "history_points": len(sorted_ts),
113    }

Predict when a table will next be updated based on historical update timestamps.

update_timestamps: list of datetime objects (when the freshness column last changed). Returns dict: predicted_next_update (ISO str), avg_interval_hours, pattern, history_points.

def predict_volume(history: list, n_periods: int = 4, period: str = 'weeks') -> list[dict]:
116def predict_volume(history: list, n_periods: int = 4, period: str = "weeks") -> list[dict]:
117    """
118    Forecast future row counts using a linear trend fitted to historical observations.
119
120    history: list of (datetime_or_isostr, row_count) pairs.
121    period: 'days' | 'weeks' | 'months'
122    Returns list of dicts: period_label, predicted_date, predicted_row_count,
123    lower_bound, upper_bound (95% prediction interval).
124    """
125    from datetime import datetime, timedelta
126
127    PERIOD_DAYS = {"days": 1, "weeks": 7, "months": 30}
128    if period not in PERIOD_DAYS:
129        raise ValueError(f"period must be one of {list(PERIOD_DAYS)}, got {period!r}")
130    if len(history) < 2:
131        return []
132
133    parsed = []
134    for ts, count in history:
135        if isinstance(ts, str):
136            ts = datetime.fromisoformat(ts)
137        parsed.append((ts, int(count)))
138    parsed.sort(key=lambda x: x[0])
139
140    t0 = parsed[0][0]
141    xs = [(p[0] - t0).total_seconds() / 86400 for p in parsed]
142    ys = [p[1] for p in parsed]
143    n = len(xs)
144    mean_x = sum(xs) / n
145    mean_y = sum(ys) / n
146    denom = sum((x - mean_x) ** 2 for x in xs)
147    slope = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n)) / denom if denom else 0
148    intercept = mean_y - slope * mean_x
149
150    residuals = [ys[i] - (intercept + slope * xs[i]) for i in range(n)]
151    std_err = (sum(r ** 2 for r in residuals) / max(n - 2, 1)) ** 0.5
152
153    step_days = PERIOD_DAYS[period]
154    last_x = xs[-1]
155    last_ts = parsed[-1][0]
156
157    projections = []
158    for i in range(1, n_periods + 1):
159        future_x = last_x + i * step_days
160        future_ts = last_ts + timedelta(days=i * step_days)
161        predicted = max(0, int(intercept + slope * future_x))
162        ci = int(1.96 * std_err)
163        projections.append({
164            "period": i,
165            "period_label": f"+{i} {period}",
166            "predicted_date": future_ts.strftime("%Y-%m-%d"),
167            "predicted_row_count": predicted,
168            "lower_bound": max(0, predicted - ci),
169            "upper_bound": predicted + ci,
170        })
171    return projections

Forecast future row counts using a linear trend fitted to historical observations.

history: list of (datetime_or_isostr, row_count) pairs. period: 'days' | 'weeks' | 'months' Returns list of dicts: period_label, predicted_date, predicted_row_count, lower_bound, upper_bound (95% prediction interval).

def launch():
  6def launch():
  7    try:
  8        import ipywidgets as w
  9        from IPython.display import display
 10    except ImportError:
 11        raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets")
 12
 13    import dashui
 14
 15    monitors: list[dict] = []
 16
 17    # ── Add monitor ───────────────────────────────────────────────────────
 18    m_table = w.Text(description="UC Table:", placeholder="catalog.schema.table")
 19
 20    m_freshness_col = w.Text(description="Freshness col:", placeholder="updated_at (optional)")
 21    m_max_staleness = w.IntText(description="Max staleness (min):", value=1440)
 22
 23    m_min_rows = w.Text(description="Min rows:", placeholder="optional")
 24    m_max_rows = w.Text(description="Max rows:", placeholder="optional")
 25    m_tolerance = w.Text(description="Baseline tolerance %:", placeholder="optional, e.g. 20")
 26
 27    m_track_schema = w.Checkbox(value=True, description="Track schema changes")
 28
 29    add_btn = dashui.action_button("Add Monitor", style="info", emoji="+")
 30    monitors_output, render_monitors = dashui.running_list(
 31        lambda i, m: (
 32            f"  {i}. {m['table']} — "
 33            f"freshness:{'on' if m['freshness_column'] else 'off'}, "
 34            f"volume:{'on' if (m['min_rows'] or m['max_rows'] or m['volume_tolerance_pct']) else 'off'}, "
 35            f"schema:{'on' if m['track_schema'] else 'off'}"
 36        )
 37    )
 38
 39    def _parse_int(text):
 40        text = text.strip()
 41        return int(text) if text else None
 42
 43    def _parse_float(text):
 44        text = text.strip()
 45        return float(text) if text else None
 46
 47    def on_add(b):
 48        table = m_table.value.strip()
 49        if not table:
 50            return
 51        monitors.append({
 52            "table": table,
 53            "freshness_column": m_freshness_col.value.strip() or None,
 54            "max_staleness_minutes": m_max_staleness.value,
 55            "min_rows": _parse_int(m_min_rows.value),
 56            "max_rows": _parse_int(m_max_rows.value),
 57            "volume_tolerance_pct": _parse_float(m_tolerance.value),
 58            "track_schema": m_track_schema.value,
 59        })
 60        render_monitors(monitors)
 61        m_table.value = m_freshness_col.value = m_min_rows.value = m_max_rows.value = m_tolerance.value = ""
 62
 63    add_btn.on_click(on_add)
 64
 65    # ── Run ──────────────────────────────────────────────────────────────
 66    history_table = w.Text(description="History table:", placeholder="catalog.schema.observe_history (optional)")
 67    run_btn = dashui.action_button("Run All Monitors", style="success", emoji="▶")
 68    output = dashui.output_panel()
 69
 70    def on_run(b):
 71        with output:
 72            output.clear_output()
 73            if not monitors:
 74                print("⚠️  No monitors configured — add at least one above")
 75                return
 76            try:
 77                from dashobserve.runner import MonitorConfig, run_monitors
 78                hist = history_table.value.strip() or None
 79                for m in monitors:
 80                    cfg = MonitorConfig(
 81                        table=m["table"],
 82                        freshness_column=m["freshness_column"],
 83                        max_staleness_minutes=m["max_staleness_minutes"] if m["freshness_column"] else None,
 84                        min_rows=m["min_rows"],
 85                        max_rows=m["max_rows"],
 86                        volume_tolerance_pct=m["volume_tolerance_pct"],
 87                        track_schema=m["track_schema"],
 88                    )
 89                    report = run_monitors(cfg, history_table=hist)
 90                    report.display()
 91                    s = report.summary()
 92                    print(f"   → {s['passed']}/{s['total_monitors']} passed\n")
 93            except Exception as e:
 94                print(f"❌ {e}")
 95
 96    run_btn.on_click(on_run)
 97
 98    # ── Forecast ─────────────────────────────────────────────────────────
 99    f_table = w.Text(description="UC Table:", placeholder="catalog.schema.table")
100    f_history = w.Text(description="History table:", placeholder="catalog.schema.observe_history")
101    f_n_periods = w.IntText(description="Periods ahead:", value=4, min=1, max=52)
102    f_period = w.ToggleButtons(options=["days", "weeks", "months"], description="Period:")
103    forecast_btn = dashui.action_button("Run Forecast", style="info", emoji="📈")
104    forecast_output = dashui.output_panel()
105
106    def on_forecast(b):
107        with forecast_output:
108            forecast_output.clear_output()
109            table = f_table.value.strip()
110            hist = f_history.value.strip()
111            if not table or not hist:
112                print("⚠️  Specify both a UC table and the history table")
113                return
114            try:
115                from dashobserve.runner import run_forecast
116                report = run_forecast(
117                    table=table,
118                    history_table=hist,
119                    n_periods=f_n_periods.value,
120                    period=f_period.value,
121                )
122                report.display()
123            except Exception as e:
124                print(f"❌ {e}")
125
126    forecast_btn.on_click(on_forecast)
127
128    ui = dashui.card([
129        dashui.header("DashObserve — Data Observability", library="dashobserve", emoji="👁️"),
130        dashui.section("Step 1: Configure a monitor"),
131        m_table,
132        w.HBox([m_freshness_col, m_max_staleness]),
133        w.HBox([m_min_rows, m_max_rows, m_tolerance]),
134        m_track_schema,
135        add_btn, monitors_output,
136        dashui.section("Step 2: Run monitors"),
137        history_table,
138        run_btn,
139        output,
140        dashui.section("Step 3: Forecast"),
141        dashui.html(
142            "<div style='font-size:12px;color:#666;margin-bottom:4px'>"
143            "Predicts next table update and projects volume trend. "
144            "Requires history built from prior monitor runs.</div>"
145        ),
146        w.HBox([f_table, f_history]),
147        w.HBox([f_n_periods, f_period]),
148        forecast_btn,
149        forecast_output,
150    ])
151    display(ui)