dashdq

DashDQ — Data Quality for Databricks.

Workflow::

# Cell 1: open wizard, configure checks, click Save Config
config = dashdq.configure()

# Cell 2: run checks and get a DQReport
report = dashdq.run_checks(config)

# Or all-in-one:
dashdq.launch()
 1"""
 2DashDQ — Data Quality for Databricks.
 3
 4Workflow::
 5
 6    # Cell 1: open wizard, configure checks, click Save Config
 7    config = dashdq.configure()
 8
 9    # Cell 2: run checks and get a DQReport
10    report = dashdq.run_checks(config)
11
12    # Or all-in-one:
13    dashdq.launch()
14"""
15from dashdq.suite import run_checks, table_quality_ok, DQReport
16from dashdq.checks import CHECKS_REGISTRY, CheckResult
17
18__version__ = "0.1.15"
19__author__ = "Darshan Shah"
20__email__ = "darshan.innovation@gmail.com"
21__license__ = "Apache-2.0"
22__url__ = "https://github.com/dash-libs/dash-dq"
23
24__all__ = [
25    "env_setup", "configure", "run_checks", "table_quality_ok", "launch",
26    "DQReport", "CheckResult", "CHECKS_REGISTRY",
27]
28
29
30def env_setup() -> None:
31    """Open the DashDQ environment setup panel (configure paths and defaults)."""
32    from dashdq.ui import env_setup as _env_setup
33    return _env_setup()
34
35
36def configure(spark=None) -> dict:
37    """Open the DashDQ wizard. Returns a config dict filled on Save Configuration."""
38    from dashdq.ui import configure as _configure
39    return _configure(spark=spark)
40
41
42def launch(spark=None) -> None:
43    """All-in-one: open wizard + Run Checks button."""
44    from dashdq.ui import launch as _launch
45    return _launch(spark=spark)
def env_setup() -> None:
31def env_setup() -> None:
32    """Open the DashDQ environment setup panel (configure paths and defaults)."""
33    from dashdq.ui import env_setup as _env_setup
34    return _env_setup()

Open the DashDQ environment setup panel (configure paths and defaults).

def configure(spark=None) -> dict:
37def configure(spark=None) -> dict:
38    """Open the DashDQ wizard. Returns a config dict filled on Save Configuration."""
39    from dashdq.ui import configure as _configure
40    return _configure(spark=spark)

Open the DashDQ wizard. Returns a config dict filled on Save Configuration.

def run_checks(config, spark=None) -> DQReport:
221def run_checks(config, spark=None) -> DQReport:
222    """
223    Execute all checks defined in config and return a DQReport.
224
225    ``config`` can be:
226    - a **dict** returned by ``dashdq.configure()``
227    - a **file path** (str) to a JSON config saved by the wizard
228
229    config shape::
230
231        {
232            "source":   {"table": "catalog.schema.table"},
233            "metadata": {"data_owner": "", "data_steward": "", ...},  # optional
234            "checks": [
235                {"check_name": "expect_column_values_to_not_be_null",
236                 "column": "customer_id",
237                 "threshold_pct": 100.0,
238                 "params": {}},
239                ...
240            ],
241            "output":   {"types": ["delta"], "delta_table": "..."}    # optional
242        }
243    """
244    import os
245    if isinstance(config, (str, os.PathLike)):
246        path = str(config)
247        if not os.path.exists(path):
248            raise FileNotFoundError(f"DashDQ config file not found: {path}")
249        with open(path) as f:
250            config = json.load(f)
251
252    if not config:
253        raise ValueError("Config is empty — run dashdq.configure() first and click 'Save Config'.")
254
255    if spark is None:
256        from pyspark.sql import SparkSession
257        spark = SparkSession.getActiveSession()
258
259    table = config["source"]["table"]
260    df = spark.table(table)
261    total_cols = len(df.columns)
262    metadata = config.get("metadata", {})
263    run_ts = datetime.now().isoformat(timespec="seconds")
264    tags_str = ",".join(metadata.get("tags", []))
265
266    checked_cols: set[str] = set()
267    results: list[CheckResult] = []
268
269    for chk in config.get("checks", []):
270        name = chk["check_name"]
271        col = chk.get("column", "_TABLE_LEVEL_")
272        threshold = float(chk.get("threshold_pct", 100.0))
273        params = chk.get("params", {})
274
275        entry = CHECKS_REGISTRY.get(name)
276        if not entry:
277            continue
278
279        try:
280            if entry.get("cross_table"):
281                total, passed, failed = entry["fn"](df, col, params, spark)
282            else:
283                total, passed, failed = entry["fn"](df, col, params)
284            passed_pct = round(passed / total * 100, 2) if total > 0 else 0.0
285            status = "PASS" if passed_pct >= threshold else "FAIL"
286            if not entry.get("table_level") and not entry.get("compound"):
287                checked_cols.add(col)
288        except Exception as exc:
289            total = passed = failed = 0
290            passed_pct = 0.0
291            status = f"ERROR: {exc}"
292
293        results.append(CheckResult(
294            table_name=table,
295            column_name=col,
296            check_name=name,
297            dq_dimension=entry["dimension"],
298            total_rows=total,
299            passed_rows=passed,
300            failed_rows=failed,
301            passed_pct=passed_pct,
302            threshold_pct=threshold,
303            status=status,
304            check_params=json.dumps(params),
305            run_timestamp=run_ts,
306            data_owner=metadata.get("data_owner", ""),
307            data_steward=metadata.get("data_steward", ""),
308            business_domain=metadata.get("business_domain", ""),
309            table_description=metadata.get("description", ""),
310            tags=tags_str,
311            columns_checked=0,       # back-filled below
312            total_columns=total_cols,
313            column_coverage_pct=0.0,
314        ))
315
316    # Back-fill coverage (same value for all rows — table-level metric)
317    n_covered = len(checked_cols)
318    coverage = round(n_covered / total_cols * 100, 2) if total_cols else 0.0
319    for r in results:
320        r.columns_checked = n_covered
321        r.column_coverage_pct = coverage
322
323    report = DQReport(results, config)
324
325    # Auto-save check-level output if configured
326    output_cfg = config.get("output", {})
327    types = output_cfg.get("types") or ([output_cfg.get("type", "dataframe")])
328    if output_cfg and types != ["dataframe"]:
329        report.save(output_cfg, spark)
330
331    # Auto-save table-level summary output if configured (new separate key)
332    tbl_output_cfg = config.get("table_output", {})
333    tbl_types = tbl_output_cfg.get("types") or ([tbl_output_cfg.get("type", "dataframe")])
334    if tbl_output_cfg and tbl_types != ["dataframe"]:
335        report.save_table_summary(tbl_output_cfg, spark)
336
337    return report

Execute all checks defined in config and return a DQReport.

config can be:

  • a dict returned by dashdq.configure()
  • a file path (str) to a JSON config saved by the wizard

config shape::

{
    "source":   {"table": "catalog.schema.table"},
    "metadata": {"data_owner": "", "data_steward": "", ...},  # optional
    "checks": [
        {"check_name": "expect_column_values_to_not_be_null",
         "column": "customer_id",
         "threshold_pct": 100.0,
         "params": {}},
        ...
    ],
    "output":   {"types": ["delta"], "delta_table": "..."}    # optional
}
def table_quality_ok(config, spark=None) -> bool:
340def table_quality_ok(config, spark=None) -> bool:
341    """Run all configured checks and return True if every check passes.
342
343    Useful as a gate before consuming a table::
344
345        if dashdq.table_quality_ok(config, spark=spark):
346            df = spark.table(config["source"]["table"])
347            # safe to use
348        else:
349            raise RuntimeError("Table failed quality checks — aborting pipeline.")
350    """
351    report = run_checks(config, spark=spark)
352    return report.table_summary().get("overall_status") == "PASS"

Run all configured checks and return True if every check passes.

Useful as a gate before consuming a table::

if dashdq.table_quality_ok(config, spark=spark):
    df = spark.table(config["source"]["table"])
    # safe to use
else:
    raise RuntimeError("Table failed quality checks — aborting pipeline.")
def launch(spark=None) -> None:
43def launch(spark=None) -> None:
44    """All-in-one: open wizard + Run Checks button."""
45    from dashdq.ui import launch as _launch
46    return _launch(spark=spark)

All-in-one: open wizard + Run Checks button.

class DQReport:
  9class DQReport:
 10    def __init__(self, results: list[CheckResult], config: dict):
 11        self.results = results
 12        self.config = config
 13
 14    # ── Row-level outputs (one row per check × column) ────────────────────────
 15
 16    def to_dict(self) -> list[dict]:
 17        return [r.to_dict() for r in self.results]
 18
 19    def to_spark_df(self, spark=None):
 20        if spark is None:
 21            from pyspark.sql import SparkSession
 22            spark = SparkSession.getActiveSession()
 23        return spark.createDataFrame([r.to_dict() for r in self.results])
 24
 25    def to_pandas(self):
 26        import pandas as pd
 27        return pd.DataFrame(self.to_dict())
 28
 29    def display(self):
 30        try:
 31            from IPython.display import display as ipy_display
 32            ipy_display(self.to_pandas())
 33        except Exception:
 34            for r in self.results:
 35                print(r)
 36
 37    def summary(self) -> dict:
 38        total = len(self.results)
 39        passed = sum(1 for r in self.results if r.status == "PASS")
 40        return {
 41            "total_checks": total,
 42            "passed": passed,
 43            "failed": total - passed,
 44            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
 45        }
 46
 47    # ── Table-level summary (one row per table run) ───────────────────────────
 48
 49    def table_summary(self) -> dict:
 50        """Single-row summary at table level.
 51
 52        clean_records = rows that passed every column check applied to them.
 53        overall_status = PASS only if all checks passed.
 54        """
 55        if not self.results:
 56            return {}
 57
 58        r0 = self.results[0]
 59        metadata = self.config.get("metadata", {})
 60        total_rows = r0.total_rows
 61        total_checks = len(self.results)
 62        passed_checks = sum(1 for r in self.results if r.status == "PASS")
 63        failed_checks = total_checks - passed_checks
 64        overall_status = "PASS" if failed_checks == 0 else "FAIL"
 65
 66        # Clean records: rows not flagged as failed by ANY check.
 67        # Each check reports failed_rows independently; we sum them as a
 68        # conservative lower-bound on dirty rows (exact intersection needs a join).
 69        total_failed_rows = sum(r.failed_rows for r in self.results)
 70        # Cap at total_rows to avoid negative clean counts when checks overlap
 71        dirty_rows = min(total_failed_rows, total_rows) if total_rows else 0
 72        clean_rows = max(0, total_rows - dirty_rows)
 73        clean_pct = round(clean_rows / total_rows * 100, 2) if total_rows else 0.0
 74
 75        return {
 76            "table_name":          r0.table_name,
 77            "overall_status":      overall_status,
 78            "total_rows":          total_rows,
 79            "clean_rows":          clean_rows,
 80            "dirty_rows":          dirty_rows,
 81            "clean_pct":           clean_pct,
 82            "total_checks":        total_checks,
 83            "passed_checks":       passed_checks,
 84            "failed_checks":       failed_checks,
 85            "columns_checked":     r0.columns_checked,
 86            "total_columns":       r0.total_columns,
 87            "column_coverage_pct": r0.column_coverage_pct,
 88            "run_timestamp":       r0.run_timestamp,
 89            "data_owner":          metadata.get("data_owner", ""),
 90            "data_steward":        metadata.get("data_steward", ""),
 91            "business_domain":     metadata.get("business_domain", ""),
 92            "description":         metadata.get("description", ""),
 93            "tags":                ",".join(metadata.get("tags", [])),
 94        }
 95
 96    def to_table_summary_df(self, spark=None):
 97        """Spark DataFrame with one row summarising this table run."""
 98        if spark is None:
 99            from pyspark.sql import SparkSession
100            spark = SparkSession.getActiveSession()
101        return spark.createDataFrame([self.table_summary()])
102
103    def table_summary_pandas(self):
104        import pandas as pd
105        return pd.DataFrame([self.table_summary()])
106
107    def save_table_summary(self, output_cfg: dict, spark=None):
108        """Persist the table-level summary row to configured destinations."""
109        import os
110
111        types = output_cfg.get("types") or (
112            [output_cfg["type"]] if "type" in output_cfg else ["dataframe"]
113        )
114        summary = self.table_summary()
115        if not summary:
116            return
117
118        for otype in types:
119            if otype == "delta":
120                if spark is None:
121                    from pyspark.sql import SparkSession
122                    spark = SparkSession.getActiveSession()
123                table = output_cfg.get("delta_table", "")
124                if not table:
125                    print("⚠️  delta_table not set — skipping Delta summary output")
126                    continue
127                (self.to_table_summary_df(spark)
128                    .write.format("delta")
129                    .mode("append")
130                    .option("mergeSchema", "true")
131                    .saveAsTable(table))
132                print(f"✅ Saved table summary to Delta: {table}")
133
134            elif otype in ("volume_json", "volume_csv"):
135                vol_path = output_cfg.get("volume_path", "").rstrip("/")
136                table_name = self.config.get("source", {}).get("table", "")
137                tbl = table_name.split(".")[-1] if table_name else "table"
138                filename = (
139                    output_cfg.get("filename")
140                    or f"dq_{tbl}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
141                )
142                ext = "json" if otype == "volume_json" else "csv"
143                os.makedirs(vol_path, exist_ok=True)
144                full = f"{vol_path}/{filename}.{ext}"
145                import pandas as pd
146                spdf = pd.DataFrame([summary])
147                if ext == "json":
148                    spdf.to_json(full, orient="records", indent=2)
149                else:
150                    spdf.to_csv(full, index=False)
151                print(f"✅ Saved table summary to: {full}")
152
153    def save(self, output_cfg: dict, spark=None):
154        """Persist results to one or more destinations defined in output_cfg."""
155        import os
156
157        # Support both old single-type ("type") and new multi-type ("types") format
158        types = output_cfg.get("types") or ([output_cfg["type"]] if "type" in output_cfg else ["dataframe"])
159
160        results = {}
161        sdf = None
162
163        for otype in types:
164            if otype == "dataframe":
165                if sdf is None:
166                    sdf = self.to_spark_df(spark)
167                results["dataframe"] = sdf
168
169            elif otype == "delta":
170                if sdf is None:
171                    sdf = self.to_spark_df(spark)
172                table = output_cfg.get("delta_table", "")
173                if not table:
174                    print("⚠️  delta_table not set — skipping Delta output")
175                    continue
176                (sdf.write.format("delta")
177                    .mode("append")
178                    .option("mergeSchema", "true")
179                    .saveAsTable(table))
180                print(f"✅ Saved to Delta table: {table}")
181                # Also write table-level summary to <table>_summary if configured
182                summary_table = output_cfg.get("summary_delta_table", "")
183                if summary_table:
184                    (self.to_table_summary_df(spark)
185                        .write.format("delta")
186                        .mode("append")
187                        .option("mergeSchema", "true")
188                        .saveAsTable(summary_table))
189                    print(f"✅ Saved table summary to: {summary_table}")
190                results["delta"] = sdf
191
192            elif otype in ("volume_json", "volume_csv"):
193                # vol_path already contains catalog/schema from the wizard
194                vol_path = output_cfg.get("volume_path", "").rstrip("/")
195                table_name = self.config.get("source", {}).get("table", "")
196                tbl = table_name.split(".")[-1] if table_name else "table"
197                filename = (output_cfg.get("filename")
198                            or f"dq_{tbl}_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
199                ext = "json" if otype == "volume_json" else "csv"
200                os.makedirs(vol_path, exist_ok=True)
201                full = f"{vol_path}/{filename}.{ext}"
202                pdf = self.to_pandas()
203                if ext == "json":
204                    pdf.to_json(full, orient="records", indent=2)
205                else:
206                    pdf.to_csv(full, index=False)
207                print(f"✅ Saved to: {full}")
208                # Summary file alongside: same dir, _summary suffix
209                summary_file = f"{vol_path}/{filename}_summary.{ext}"
210                spdf = self.table_summary_pandas()
211                if ext == "json":
212                    spdf.to_json(summary_file, orient="records", indent=2)
213                else:
214                    spdf.to_csv(summary_file, index=False)
215                print(f"✅ Saved table summary to: {summary_file}")
216                results[otype] = full
217
218        return results.get("dataframe") or (sdf if sdf is not None else None)
DQReport(results: list[CheckResult], config: dict)
10    def __init__(self, results: list[CheckResult], config: dict):
11        self.results = results
12        self.config = config
results
config
def to_dict(self) -> list[dict]:
16    def to_dict(self) -> list[dict]:
17        return [r.to_dict() for r in self.results]
def to_spark_df(self, spark=None):
19    def to_spark_df(self, spark=None):
20        if spark is None:
21            from pyspark.sql import SparkSession
22            spark = SparkSession.getActiveSession()
23        return spark.createDataFrame([r.to_dict() for r in self.results])
def to_pandas(self):
25    def to_pandas(self):
26        import pandas as pd
27        return pd.DataFrame(self.to_dict())
def display(self):
29    def display(self):
30        try:
31            from IPython.display import display as ipy_display
32            ipy_display(self.to_pandas())
33        except Exception:
34            for r in self.results:
35                print(r)
def summary(self) -> dict:
37    def summary(self) -> dict:
38        total = len(self.results)
39        passed = sum(1 for r in self.results if r.status == "PASS")
40        return {
41            "total_checks": total,
42            "passed": passed,
43            "failed": total - passed,
44            "pass_rate_pct": round(passed / total * 100, 1) if total else 0,
45        }
def table_summary(self) -> dict:
49    def table_summary(self) -> dict:
50        """Single-row summary at table level.
51
52        clean_records = rows that passed every column check applied to them.
53        overall_status = PASS only if all checks passed.
54        """
55        if not self.results:
56            return {}
57
58        r0 = self.results[0]
59        metadata = self.config.get("metadata", {})
60        total_rows = r0.total_rows
61        total_checks = len(self.results)
62        passed_checks = sum(1 for r in self.results if r.status == "PASS")
63        failed_checks = total_checks - passed_checks
64        overall_status = "PASS" if failed_checks == 0 else "FAIL"
65
66        # Clean records: rows not flagged as failed by ANY check.
67        # Each check reports failed_rows independently; we sum them as a
68        # conservative lower-bound on dirty rows (exact intersection needs a join).
69        total_failed_rows = sum(r.failed_rows for r in self.results)
70        # Cap at total_rows to avoid negative clean counts when checks overlap
71        dirty_rows = min(total_failed_rows, total_rows) if total_rows else 0
72        clean_rows = max(0, total_rows - dirty_rows)
73        clean_pct = round(clean_rows / total_rows * 100, 2) if total_rows else 0.0
74
75        return {
76            "table_name":          r0.table_name,
77            "overall_status":      overall_status,
78            "total_rows":          total_rows,
79            "clean_rows":          clean_rows,
80            "dirty_rows":          dirty_rows,
81            "clean_pct":           clean_pct,
82            "total_checks":        total_checks,
83            "passed_checks":       passed_checks,
84            "failed_checks":       failed_checks,
85            "columns_checked":     r0.columns_checked,
86            "total_columns":       r0.total_columns,
87            "column_coverage_pct": r0.column_coverage_pct,
88            "run_timestamp":       r0.run_timestamp,
89            "data_owner":          metadata.get("data_owner", ""),
90            "data_steward":        metadata.get("data_steward", ""),
91            "business_domain":     metadata.get("business_domain", ""),
92            "description":         metadata.get("description", ""),
93            "tags":                ",".join(metadata.get("tags", [])),
94        }

Single-row summary at table level.

clean_records = rows that passed every column check applied to them. overall_status = PASS only if all checks passed.

def to_table_summary_df(self, spark=None):
 96    def to_table_summary_df(self, spark=None):
 97        """Spark DataFrame with one row summarising this table run."""
 98        if spark is None:
 99            from pyspark.sql import SparkSession
100            spark = SparkSession.getActiveSession()
101        return spark.createDataFrame([self.table_summary()])

Spark DataFrame with one row summarising this table run.

def table_summary_pandas(self):
103    def table_summary_pandas(self):
104        import pandas as pd
105        return pd.DataFrame([self.table_summary()])
def save_table_summary(self, output_cfg: dict, spark=None):
107    def save_table_summary(self, output_cfg: dict, spark=None):
108        """Persist the table-level summary row to configured destinations."""
109        import os
110
111        types = output_cfg.get("types") or (
112            [output_cfg["type"]] if "type" in output_cfg else ["dataframe"]
113        )
114        summary = self.table_summary()
115        if not summary:
116            return
117
118        for otype in types:
119            if otype == "delta":
120                if spark is None:
121                    from pyspark.sql import SparkSession
122                    spark = SparkSession.getActiveSession()
123                table = output_cfg.get("delta_table", "")
124                if not table:
125                    print("⚠️  delta_table not set — skipping Delta summary output")
126                    continue
127                (self.to_table_summary_df(spark)
128                    .write.format("delta")
129                    .mode("append")
130                    .option("mergeSchema", "true")
131                    .saveAsTable(table))
132                print(f"✅ Saved table summary to Delta: {table}")
133
134            elif otype in ("volume_json", "volume_csv"):
135                vol_path = output_cfg.get("volume_path", "").rstrip("/")
136                table_name = self.config.get("source", {}).get("table", "")
137                tbl = table_name.split(".")[-1] if table_name else "table"
138                filename = (
139                    output_cfg.get("filename")
140                    or f"dq_{tbl}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
141                )
142                ext = "json" if otype == "volume_json" else "csv"
143                os.makedirs(vol_path, exist_ok=True)
144                full = f"{vol_path}/{filename}.{ext}"
145                import pandas as pd
146                spdf = pd.DataFrame([summary])
147                if ext == "json":
148                    spdf.to_json(full, orient="records", indent=2)
149                else:
150                    spdf.to_csv(full, index=False)
151                print(f"✅ Saved table summary to: {full}")

Persist the table-level summary row to configured destinations.

def save(self, output_cfg: dict, spark=None):
153    def save(self, output_cfg: dict, spark=None):
154        """Persist results to one or more destinations defined in output_cfg."""
155        import os
156
157        # Support both old single-type ("type") and new multi-type ("types") format
158        types = output_cfg.get("types") or ([output_cfg["type"]] if "type" in output_cfg else ["dataframe"])
159
160        results = {}
161        sdf = None
162
163        for otype in types:
164            if otype == "dataframe":
165                if sdf is None:
166                    sdf = self.to_spark_df(spark)
167                results["dataframe"] = sdf
168
169            elif otype == "delta":
170                if sdf is None:
171                    sdf = self.to_spark_df(spark)
172                table = output_cfg.get("delta_table", "")
173                if not table:
174                    print("⚠️  delta_table not set — skipping Delta output")
175                    continue
176                (sdf.write.format("delta")
177                    .mode("append")
178                    .option("mergeSchema", "true")
179                    .saveAsTable(table))
180                print(f"✅ Saved to Delta table: {table}")
181                # Also write table-level summary to <table>_summary if configured
182                summary_table = output_cfg.get("summary_delta_table", "")
183                if summary_table:
184                    (self.to_table_summary_df(spark)
185                        .write.format("delta")
186                        .mode("append")
187                        .option("mergeSchema", "true")
188                        .saveAsTable(summary_table))
189                    print(f"✅ Saved table summary to: {summary_table}")
190                results["delta"] = sdf
191
192            elif otype in ("volume_json", "volume_csv"):
193                # vol_path already contains catalog/schema from the wizard
194                vol_path = output_cfg.get("volume_path", "").rstrip("/")
195                table_name = self.config.get("source", {}).get("table", "")
196                tbl = table_name.split(".")[-1] if table_name else "table"
197                filename = (output_cfg.get("filename")
198                            or f"dq_{tbl}_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
199                ext = "json" if otype == "volume_json" else "csv"
200                os.makedirs(vol_path, exist_ok=True)
201                full = f"{vol_path}/{filename}.{ext}"
202                pdf = self.to_pandas()
203                if ext == "json":
204                    pdf.to_json(full, orient="records", indent=2)
205                else:
206                    pdf.to_csv(full, index=False)
207                print(f"✅ Saved to: {full}")
208                # Summary file alongside: same dir, _summary suffix
209                summary_file = f"{vol_path}/{filename}_summary.{ext}"
210                spdf = self.table_summary_pandas()
211                if ext == "json":
212                    spdf.to_json(summary_file, orient="records", indent=2)
213                else:
214                    spdf.to_csv(summary_file, index=False)
215                print(f"✅ Saved table summary to: {summary_file}")
216                results[otype] = full
217
218        return results.get("dataframe") or (sdf if sdf is not None else None)

Persist results to one or more destinations defined in output_cfg.

@dataclass
class CheckResult:
24@dataclass
25class CheckResult:
26    table_name: str
27    column_name: str
28    check_name: str
29    dq_dimension: str
30    total_rows: int
31    passed_rows: int
32    failed_rows: int
33    passed_pct: float
34    threshold_pct: float
35    status: str                    # PASS | FAIL | ERROR
36    check_params: str = "{}"
37    run_timestamp: str = ""
38    data_owner: str = ""
39    data_steward: str = ""
40    business_domain: str = ""
41    table_description: str = ""
42    columns_checked: int = 0
43    total_columns: int = 0
44    column_coverage_pct: float = 0.0
45    tags: str = ""
46
47    def to_dict(self) -> dict:
48        return {k: v for k, v in self.__dict__.items()}
CheckResult( table_name: str, column_name: str, check_name: str, dq_dimension: str, total_rows: int, passed_rows: int, failed_rows: int, passed_pct: float, threshold_pct: float, status: str, check_params: str = '{}', run_timestamp: str = '', data_owner: str = '', data_steward: str = '', business_domain: str = '', table_description: str = '', columns_checked: int = 0, total_columns: int = 0, column_coverage_pct: float = 0.0, tags: str = '')
table_name: str
column_name: str
check_name: str
dq_dimension: str
total_rows: int
passed_rows: int
failed_rows: int
passed_pct: float
threshold_pct: float
status: str
check_params: str = '{}'
run_timestamp: str = ''
data_owner: str = ''
data_steward: str = ''
business_domain: str = ''
table_description: str = ''
columns_checked: int = 0
total_columns: int = 0
column_coverage_pct: float = 0.0
tags: str = ''
def to_dict(self) -> dict:
47    def to_dict(self) -> dict:
48        return {k: v for k, v in self.__dict__.items()}
CHECKS_REGISTRY = {'expect_column_values_to_not_be_null': {'dimension': 'Completeness', 'description': 'Values must not be null', 'params': [], 'fn': <function _not_null>}, 'expect_column_values_to_be_null': {'dimension': 'Completeness', 'description': 'Values must be null', 'params': [], 'fn': <function _is_null>}, 'expect_column_values_to_not_be_null_or_empty': {'dimension': 'Completeness', 'description': 'Values must not be null and must not be an empty/whitespace string', 'params': [], 'fn': <function _not_null_or_empty>}, 'expect_column_null_count_to_be_between': {'dimension': 'Completeness', 'description': 'Number of null values must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _null_count_between>, 'aggregate': True}, 'expect_column_null_proportion_to_be_between': {'dimension': 'Completeness', 'description': 'Proportion of null values (0–1) must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _null_proportion_between>, 'aggregate': True}, 'expect_column_values_to_be_between': {'dimension': 'Accuracy', 'description': 'Values must fall within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _between>}, 'expect_column_values_to_not_be_between': {'dimension': 'Accuracy', 'description': 'Values must fall outside [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _not_between>}, 'expect_column_values_to_be_in_set': {'dimension': 'Accuracy', 'description': 'Values must belong to the allowed set', 'params': ['value_set'], 'fn': <function _in_set>}, 'expect_column_values_to_not_be_in_set': {'dimension': 'Accuracy', 'description': 'Values must not belong to the disallowed set', 'params': ['value_set'], 'fn': <function _not_in_set>}, 'expect_column_values_to_equal': {'dimension': 'Accuracy', 'description': 'All values must equal the specified constant', 'params': ['value'], 'fn': <function _equal_to>}, 'expect_column_values_to_not_equal': {'dimension': 'Accuracy', 'description': 'No values may equal the specified constant', 'params': ['value'], 'fn': <function _not_equal_to>}, 'expect_column_values_to_be_not_less_than': {'dimension': 'Accuracy', 'description': 'Values must be >= min_value', 'params': ['min_value'], 'fn': <function _not_less_than>}, 'expect_column_values_to_be_not_greater_than': {'dimension': 'Accuracy', 'description': 'Values must be <= max_value', 'params': ['max_value'], 'fn': <function _not_greater_than>}, 'expect_column_values_to_be_positive': {'dimension': 'Accuracy', 'description': 'Values must be strictly positive (> 0)', 'params': [], 'fn': <function _positive>}, 'expect_column_values_to_be_negative': {'dimension': 'Accuracy', 'description': 'Values must be strictly negative (< 0)', 'params': [], 'fn': <function _negative>}, 'expect_column_values_to_be_non_negative': {'dimension': 'Accuracy', 'description': 'Values must be zero or positive (>= 0)', 'params': [], 'fn': <function _non_negative>}, 'expect_column_values_to_be_increasing': {'dimension': 'Accuracy', 'description': 'Values must be non-decreasing in row order', 'params': [], 'fn': <function _increasing>}, 'expect_column_values_to_be_decreasing': {'dimension': 'Accuracy', 'description': 'Values must be non-increasing in row order', 'params': [], 'fn': <function _decreasing>}, 'expect_column_values_to_not_be_empty_string': {'dimension': 'Accuracy', 'description': 'Non-null values must not be empty or whitespace-only', 'params': [], 'fn': <function _not_empty_string>}, 'expect_column_values_to_match_regex': {'dimension': 'Accuracy', 'description': 'Values must match the regular expression', 'params': ['regex'], 'fn': <function _regex>}, 'expect_column_values_to_not_match_regex': {'dimension': 'Accuracy', 'description': 'Values must not match the regular expression', 'params': ['regex'], 'fn': <function _not_regex>}, 'expect_column_values_to_match_regex_list': {'dimension': 'Accuracy', 'description': 'Values must match at least one regex in the list', 'params': ['regex_list'], 'fn': <function _regex_list>}, 'expect_column_values_to_match_like_pattern': {'dimension': 'Accuracy', 'description': 'Values must match the SQL LIKE pattern (% and _ wildcards)', 'params': ['like_pattern'], 'fn': <function _like_pattern>}, 'expect_column_values_to_not_match_like_pattern': {'dimension': 'Accuracy', 'description': 'Values must not match the SQL LIKE pattern', 'params': ['like_pattern'], 'fn': <function _not_like_pattern>}, 'expect_column_value_lengths_to_be_between': {'dimension': 'Accuracy', 'description': 'String length must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _length_between>}, 'expect_column_value_lengths_to_equal': {'dimension': 'Accuracy', 'description': 'String length must equal the specified value', 'params': ['value'], 'fn': <function _length_equal>}, 'expect_column_values_to_be_of_type': {'dimension': 'Accuracy', 'description': "Column dtype must contain the specified type string (e.g. 'int', 'string')", 'params': ['type_'], 'fn': <function _of_type>}, 'expect_column_values_to_be_in_type_list': {'dimension': 'Accuracy', 'description': 'Column dtype must match one of the types in type_list', 'params': ['type_list'], 'fn': <function _in_type_list>}, 'expect_column_values_to_be_valid_email': {'dimension': 'Accuracy', 'description': 'Values must be valid email addresses', 'params': [], 'fn': <function _valid_email>}, 'expect_column_values_to_be_valid_url': {'dimension': 'Accuracy', 'description': 'Values must be valid HTTP/HTTPS URLs', 'params': [], 'fn': <function _valid_url>}, 'expect_column_values_to_be_valid_ipv4': {'dimension': 'Accuracy', 'description': 'Values must be valid IPv4 addresses (e.g. 192.168.1.1)', 'params': [], 'fn': <function _valid_ipv4>}, 'expect_column_values_to_be_valid_uuid': {'dimension': 'Accuracy', 'description': 'Values must be valid UUIDs (8-4-4-4-12 hex format)', 'params': [], 'fn': <function _valid_uuid>}, 'expect_column_values_to_be_json_parseable': {'dimension': 'Accuracy', 'description': 'Values must be valid JSON strings', 'params': [], 'fn': <function _json_parseable>}, 'expect_column_values_to_match_strftime_format': {'dimension': 'Accuracy', 'description': 'Values must match the strftime date format (e.g. %Y-%m-%d)', 'params': ['strftime_format'], 'fn': <function _strftime>}, 'expect_column_values_to_be_dateutil_parseable': {'dimension': 'Accuracy', 'description': 'Values must be parseable as a date in any common format', 'params': [], 'fn': <function _date_parseable>}, 'expect_column_values_to_not_be_in_future': {'dimension': 'Accuracy', 'description': 'Date/timestamp values must not be in the future', 'params': [], 'fn': <function _not_in_future>}, 'expect_column_values_to_be_not_older_than_n_days': {'dimension': 'Accuracy', 'description': 'Date values must be within the last n_days', 'params': ['n_days'], 'fn': <function _not_older_than_n_days>}, 'expect_column_values_to_not_be_in_near_future': {'dimension': 'Accuracy', 'description': 'Values must not fall within the next n_days', 'params': ['n_days'], 'fn': <function _not_near_future>}, 'expect_column_data_to_be_fresh': {'dimension': 'Accuracy', 'description': 'Most recent value in column must be within n_minutes of now', 'params': ['n_minutes'], 'fn': <function _data_fresh>, 'aggregate': True}, 'expect_column_values_to_pass_custom_sql_filter': {'dimension': 'Accuracy', 'description': 'Rows matching sql_filter are FAILED rows (write a WHERE clause for bad data)', 'params': ['sql_filter'], 'fn': <function _custom_sql_filter>}, 'expect_column_mean_to_be_between': {'dimension': 'Accuracy', 'description': 'Column mean must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _mean_between>, 'aggregate': True}, 'expect_column_median_to_be_between': {'dimension': 'Accuracy', 'description': 'Column median must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _median_between>, 'aggregate': True}, 'expect_column_stdev_to_be_between': {'dimension': 'Accuracy', 'description': 'Standard deviation must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _stdev_between>, 'aggregate': True}, 'expect_column_max_to_be_between': {'dimension': 'Accuracy', 'description': 'Column maximum must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _max_between>, 'aggregate': True}, 'expect_column_min_to_be_between': {'dimension': 'Accuracy', 'description': 'Column minimum must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _min_between>, 'aggregate': True}, 'expect_column_sum_to_be_between': {'dimension': 'Accuracy', 'description': 'Column sum must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _sum_between>, 'aggregate': True}, 'expect_column_most_common_value_to_be_in_set': {'dimension': 'Accuracy', 'description': 'Most frequent value must be in the allowed set', 'params': ['value_set'], 'fn': <function _most_common_in_set>, 'aggregate': True}, 'expect_column_quantile_value_to_be_between': {'dimension': 'Accuracy', 'description': 'Column quantile (0–1) must be within [min_value, max_value]', 'params': ['quantile', 'min_value', 'max_value'], 'fn': <function _quantile_between>, 'aggregate': True}, 'expect_column_distinct_values_to_be_in_set': {'dimension': 'Accuracy', 'description': 'All distinct values must be in value_set (no unlisted values allowed)', 'params': ['value_set'], 'fn': <function _distinct_in_set>, 'aggregate': True}, 'expect_column_distinct_values_to_contain_set': {'dimension': 'Accuracy', 'description': 'Distinct values must include all items in value_set', 'params': ['value_set'], 'fn': <function _distinct_contains_set>, 'aggregate': True}, 'expect_column_distinct_values_to_equal_set': {'dimension': 'Accuracy', 'description': 'Distinct values must exactly match value_set (no extras, no missing)', 'params': ['value_set'], 'fn': <function _distinct_equal_set>, 'aggregate': True}, 'expect_column_values_to_be_unique': {'dimension': 'Integrity', 'description': 'All values must be unique — no duplicates', 'params': [], 'fn': <function _unique>}, 'expect_column_unique_value_count_to_be_between': {'dimension': 'Integrity', 'description': 'Distinct value count must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _unique_count_between>, 'aggregate': True}, 'expect_column_proportion_of_unique_values_to_be_between': {'dimension': 'Integrity', 'description': 'Proportion of unique values (0–1) must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _unique_proportion_between>, 'aggregate': True}, 'expect_column_pair_values_to_be_equal': {'dimension': 'Integrity', 'description': 'Values in this column must equal values in column_b (row-wise)', 'params': ['column_b'], 'fn': <function _pair_equal>}, 'expect_column_pair_values_a_to_be_greater_than_b': {'dimension': 'Integrity', 'description': 'Values in this column must be greater than values in column_b', 'params': ['column_b'], 'fn': <function _pair_greater>}, 'expect_column_pair_values_to_be_in_set': {'dimension': 'Integrity', 'description': 'Row-wise (colA, colB) value pairs must be in valid_pairs', 'params': ['column_b', 'valid_pairs'], 'fn': <function _pair_in_set>}, 'expect_compound_columns_to_be_unique': {'dimension': 'Integrity', 'description': 'Combination of columns must be unique across all rows', 'params': ['columns'], 'fn': <function _compound_unique>, 'compound': True}, 'expect_primary_key_to_be_valid': {'dimension': 'Integrity', 'description': 'PK columns must all be non-null AND the combination must be unique', 'params': ['columns'], 'fn': <function _primary_key_valid>, 'compound': True}, 'expect_column_values_to_exist_in_reference_table': {'dimension': 'Integrity', 'description': 'Values must exist in reference_table.reference_column (foreign key check)', 'params': ['reference_table', 'reference_column'], 'fn': <function _foreign_key>, 'cross_table': True}, 'expect_referential_integrity': {'dimension': 'Integrity', 'description': 'Full referential integrity check between tables; optionally checks orphans too', 'params': ['reference_table', 'reference_column', 'check_orphans'], 'fn': <function _referential_integrity>, 'cross_table': True}, 'expect_table_row_count_to_be_between': {'dimension': 'Consistency', 'description': 'Table row count must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _row_count_between>, 'table_level': True, 'aggregate': True}, 'expect_table_row_count_to_equal': {'dimension': 'Consistency', 'description': 'Table row count must equal value exactly', 'params': ['value'], 'fn': <function _row_count_equal>, 'table_level': True, 'aggregate': True}, 'expect_table_column_count_to_be_between': {'dimension': 'Consistency', 'description': 'Number of columns must be within [min_value, max_value]', 'params': ['min_value', 'max_value'], 'fn': <function _col_count_between>, 'table_level': True, 'aggregate': True}, 'expect_table_column_count_to_equal': {'dimension': 'Consistency', 'description': 'Number of columns must equal value exactly', 'params': ['value'], 'fn': <function _col_count_equal>, 'table_level': True, 'aggregate': True}, 'expect_column_to_exist': {'dimension': 'Consistency', 'description': 'The specified column must exist in the table', 'params': [], 'fn': <function _col_to_exist>, 'table_level': True, 'aggregate': True}, 'expect_table_columns_to_match_set': {'dimension': 'Consistency', 'description': 'Table column names must exactly match column_set (order-independent)', 'params': ['column_set'], 'fn': <function _columns_match_set>, 'table_level': True, 'aggregate': True}, 'expect_table_columns_to_match_ordered_list': {'dimension': 'Consistency', 'description': 'Table columns must match column_list in exact order', 'params': ['column_list'], 'fn': <function _columns_match_ordered_list>, 'table_level': True, 'aggregate': True}, 'expect_multicolumn_sum_to_equal': {'dimension': 'Consistency', 'description': 'Sum of all values across columns must equal sum_value', 'params': ['columns', 'sum_value'], 'fn': <function _multicolumn_sum_equal>, 'compound': True, 'aggregate': True}, 'expect_table_row_count_to_equal_other_table': {'dimension': 'Consistency', 'description': "This table's row count must equal reference_table's row count", 'params': ['reference_table'], 'fn': <function _row_count_equal_other_table>, 'table_level': True, 'cross_table': True, 'aggregate': True}, 'expect_table_schema_to_match': {'dimension': 'Consistency', 'description': 'Table schema must match expected_schema dict {col_name: dtype_string}', 'params': ['expected_schema'], 'fn': <function _schema_valid>, 'table_level': True, 'aggregate': True}}