dashdq
DashDQ — Data Quality for Databricks.
Workflow::
# Cell 1: open wizard, configure checks, click Save Config
config = dashdq.configure()
# Cell 2: run checks and get a DQReport
report = dashdq.run_checks(config)
# Or all-in-one:
dashdq.launch()
1""" 2DashDQ — Data Quality for Databricks. 3 4Workflow:: 5 6 # Cell 1: open wizard, configure checks, click Save Config 7 config = dashdq.configure() 8 9 # Cell 2: run checks and get a DQReport 10 report = dashdq.run_checks(config) 11 12 # Or all-in-one: 13 dashdq.launch() 14""" 15from dashdq.suite import run_checks, table_quality_ok, DQReport 16from dashdq.checks import CHECKS_REGISTRY, CheckResult 17 18__version__ = "0.1.15" 19__author__ = "Darshan Shah" 20__email__ = "darshan.innovation@gmail.com" 21__license__ = "Apache-2.0" 22__url__ = "https://github.com/dash-libs/dash-dq" 23 24__all__ = [ 25 "env_setup", "configure", "run_checks", "table_quality_ok", "launch", 26 "DQReport", "CheckResult", "CHECKS_REGISTRY", 27] 28 29 30def env_setup() -> None: 31 """Open the DashDQ environment setup panel (configure paths and defaults).""" 32 from dashdq.ui import env_setup as _env_setup 33 return _env_setup() 34 35 36def configure(spark=None) -> dict: 37 """Open the DashDQ wizard. Returns a config dict filled on Save Configuration.""" 38 from dashdq.ui import configure as _configure 39 return _configure(spark=spark) 40 41 42def launch(spark=None) -> None: 43 """All-in-one: open wizard + Run Checks button.""" 44 from dashdq.ui import launch as _launch 45 return _launch(spark=spark)
31def env_setup() -> None: 32 """Open the DashDQ environment setup panel (configure paths and defaults).""" 33 from dashdq.ui import env_setup as _env_setup 34 return _env_setup()
Open the DashDQ environment setup panel (configure paths and defaults).
37def configure(spark=None) -> dict: 38 """Open the DashDQ wizard. Returns a config dict filled on Save Configuration.""" 39 from dashdq.ui import configure as _configure 40 return _configure(spark=spark)
Open the DashDQ wizard. Returns a config dict filled on Save Configuration.
221def run_checks(config, spark=None) -> DQReport: 222 """ 223 Execute all checks defined in config and return a DQReport. 224 225 ``config`` can be: 226 - a **dict** returned by ``dashdq.configure()`` 227 - a **file path** (str) to a JSON config saved by the wizard 228 229 config shape:: 230 231 { 232 "source": {"table": "catalog.schema.table"}, 233 "metadata": {"data_owner": "", "data_steward": "", ...}, # optional 234 "checks": [ 235 {"check_name": "expect_column_values_to_not_be_null", 236 "column": "customer_id", 237 "threshold_pct": 100.0, 238 "params": {}}, 239 ... 240 ], 241 "output": {"types": ["delta"], "delta_table": "..."} # optional 242 } 243 """ 244 import os 245 if isinstance(config, (str, os.PathLike)): 246 path = str(config) 247 if not os.path.exists(path): 248 raise FileNotFoundError(f"DashDQ config file not found: {path}") 249 with open(path) as f: 250 config = json.load(f) 251 252 if not config: 253 raise ValueError("Config is empty — run dashdq.configure() first and click 'Save Config'.") 254 255 if spark is None: 256 from pyspark.sql import SparkSession 257 spark = SparkSession.getActiveSession() 258 259 table = config["source"]["table"] 260 df = spark.table(table) 261 total_cols = len(df.columns) 262 metadata = config.get("metadata", {}) 263 run_ts = datetime.now().isoformat(timespec="seconds") 264 tags_str = ",".join(metadata.get("tags", [])) 265 266 checked_cols: set[str] = set() 267 results: list[CheckResult] = [] 268 269 for chk in config.get("checks", []): 270 name = chk["check_name"] 271 col = chk.get("column", "_TABLE_LEVEL_") 272 threshold = float(chk.get("threshold_pct", 100.0)) 273 params = chk.get("params", {}) 274 275 entry = CHECKS_REGISTRY.get(name) 276 if not entry: 277 continue 278 279 try: 280 if entry.get("cross_table"): 281 total, passed, failed = entry["fn"](df, col, params, spark) 282 else: 283 total, passed, failed = entry["fn"](df, col, params) 284 passed_pct = round(passed / total * 100, 2) if total > 0 else 0.0 285 status = "PASS" if passed_pct >= threshold else "FAIL" 286 if not entry.get("table_level") and not entry.get("compound"): 287 checked_cols.add(col) 288 except Exception as exc: 289 total = passed = failed = 0 290 passed_pct = 0.0 291 status = f"ERROR: {exc}" 292 293 results.append(CheckResult( 294 table_name=table, 295 column_name=col, 296 check_name=name, 297 dq_dimension=entry["dimension"], 298 total_rows=total, 299 passed_rows=passed, 300 failed_rows=failed, 301 passed_pct=passed_pct, 302 threshold_pct=threshold, 303 status=status, 304 check_params=json.dumps(params), 305 run_timestamp=run_ts, 306 data_owner=metadata.get("data_owner", ""), 307 data_steward=metadata.get("data_steward", ""), 308 business_domain=metadata.get("business_domain", ""), 309 table_description=metadata.get("description", ""), 310 tags=tags_str, 311 columns_checked=0, # back-filled below 312 total_columns=total_cols, 313 column_coverage_pct=0.0, 314 )) 315 316 # Back-fill coverage (same value for all rows — table-level metric) 317 n_covered = len(checked_cols) 318 coverage = round(n_covered / total_cols * 100, 2) if total_cols else 0.0 319 for r in results: 320 r.columns_checked = n_covered 321 r.column_coverage_pct = coverage 322 323 report = DQReport(results, config) 324 325 # Auto-save check-level output if configured 326 output_cfg = config.get("output", {}) 327 types = output_cfg.get("types") or ([output_cfg.get("type", "dataframe")]) 328 if output_cfg and types != ["dataframe"]: 329 report.save(output_cfg, spark) 330 331 # Auto-save table-level summary output if configured (new separate key) 332 tbl_output_cfg = config.get("table_output", {}) 333 tbl_types = tbl_output_cfg.get("types") or ([tbl_output_cfg.get("type", "dataframe")]) 334 if tbl_output_cfg and tbl_types != ["dataframe"]: 335 report.save_table_summary(tbl_output_cfg, spark) 336 337 return report
Execute all checks defined in config and return a DQReport.
config can be:
- a dict returned by
dashdq.configure() - a file path (str) to a JSON config saved by the wizard
config shape::
{
"source": {"table": "catalog.schema.table"},
"metadata": {"data_owner": "", "data_steward": "", ...}, # optional
"checks": [
{"check_name": "expect_column_values_to_not_be_null",
"column": "customer_id",
"threshold_pct": 100.0,
"params": {}},
...
],
"output": {"types": ["delta"], "delta_table": "..."} # optional
}
340def table_quality_ok(config, spark=None) -> bool: 341 """Run all configured checks and return True if every check passes. 342 343 Useful as a gate before consuming a table:: 344 345 if dashdq.table_quality_ok(config, spark=spark): 346 df = spark.table(config["source"]["table"]) 347 # safe to use 348 else: 349 raise RuntimeError("Table failed quality checks — aborting pipeline.") 350 """ 351 report = run_checks(config, spark=spark) 352 return report.table_summary().get("overall_status") == "PASS"
Run all configured checks and return True if every check passes.
Useful as a gate before consuming a table::
if dashdq.table_quality_ok(config, spark=spark):
df = spark.table(config["source"]["table"])
# safe to use
else:
raise RuntimeError("Table failed quality checks — aborting pipeline.")
43def launch(spark=None) -> None: 44 """All-in-one: open wizard + Run Checks button.""" 45 from dashdq.ui import launch as _launch 46 return _launch(spark=spark)
All-in-one: open wizard + Run Checks button.
9class DQReport: 10 def __init__(self, results: list[CheckResult], config: dict): 11 self.results = results 12 self.config = config 13 14 # ── Row-level outputs (one row per check × column) ──────────────────────── 15 16 def to_dict(self) -> list[dict]: 17 return [r.to_dict() for r in self.results] 18 19 def to_spark_df(self, spark=None): 20 if spark is None: 21 from pyspark.sql import SparkSession 22 spark = SparkSession.getActiveSession() 23 return spark.createDataFrame([r.to_dict() for r in self.results]) 24 25 def to_pandas(self): 26 import pandas as pd 27 return pd.DataFrame(self.to_dict()) 28 29 def display(self): 30 try: 31 from IPython.display import display as ipy_display 32 ipy_display(self.to_pandas()) 33 except Exception: 34 for r in self.results: 35 print(r) 36 37 def summary(self) -> dict: 38 total = len(self.results) 39 passed = sum(1 for r in self.results if r.status == "PASS") 40 return { 41 "total_checks": total, 42 "passed": passed, 43 "failed": total - passed, 44 "pass_rate_pct": round(passed / total * 100, 1) if total else 0, 45 } 46 47 # ── Table-level summary (one row per table run) ─────────────────────────── 48 49 def table_summary(self) -> dict: 50 """Single-row summary at table level. 51 52 clean_records = rows that passed every column check applied to them. 53 overall_status = PASS only if all checks passed. 54 """ 55 if not self.results: 56 return {} 57 58 r0 = self.results[0] 59 metadata = self.config.get("metadata", {}) 60 total_rows = r0.total_rows 61 total_checks = len(self.results) 62 passed_checks = sum(1 for r in self.results if r.status == "PASS") 63 failed_checks = total_checks - passed_checks 64 overall_status = "PASS" if failed_checks == 0 else "FAIL" 65 66 # Clean records: rows not flagged as failed by ANY check. 67 # Each check reports failed_rows independently; we sum them as a 68 # conservative lower-bound on dirty rows (exact intersection needs a join). 69 total_failed_rows = sum(r.failed_rows for r in self.results) 70 # Cap at total_rows to avoid negative clean counts when checks overlap 71 dirty_rows = min(total_failed_rows, total_rows) if total_rows else 0 72 clean_rows = max(0, total_rows - dirty_rows) 73 clean_pct = round(clean_rows / total_rows * 100, 2) if total_rows else 0.0 74 75 return { 76 "table_name": r0.table_name, 77 "overall_status": overall_status, 78 "total_rows": total_rows, 79 "clean_rows": clean_rows, 80 "dirty_rows": dirty_rows, 81 "clean_pct": clean_pct, 82 "total_checks": total_checks, 83 "passed_checks": passed_checks, 84 "failed_checks": failed_checks, 85 "columns_checked": r0.columns_checked, 86 "total_columns": r0.total_columns, 87 "column_coverage_pct": r0.column_coverage_pct, 88 "run_timestamp": r0.run_timestamp, 89 "data_owner": metadata.get("data_owner", ""), 90 "data_steward": metadata.get("data_steward", ""), 91 "business_domain": metadata.get("business_domain", ""), 92 "description": metadata.get("description", ""), 93 "tags": ",".join(metadata.get("tags", [])), 94 } 95 96 def to_table_summary_df(self, spark=None): 97 """Spark DataFrame with one row summarising this table run.""" 98 if spark is None: 99 from pyspark.sql import SparkSession 100 spark = SparkSession.getActiveSession() 101 return spark.createDataFrame([self.table_summary()]) 102 103 def table_summary_pandas(self): 104 import pandas as pd 105 return pd.DataFrame([self.table_summary()]) 106 107 def save_table_summary(self, output_cfg: dict, spark=None): 108 """Persist the table-level summary row to configured destinations.""" 109 import os 110 111 types = output_cfg.get("types") or ( 112 [output_cfg["type"]] if "type" in output_cfg else ["dataframe"] 113 ) 114 summary = self.table_summary() 115 if not summary: 116 return 117 118 for otype in types: 119 if otype == "delta": 120 if spark is None: 121 from pyspark.sql import SparkSession 122 spark = SparkSession.getActiveSession() 123 table = output_cfg.get("delta_table", "") 124 if not table: 125 print("⚠️ delta_table not set — skipping Delta summary output") 126 continue 127 (self.to_table_summary_df(spark) 128 .write.format("delta") 129 .mode("append") 130 .option("mergeSchema", "true") 131 .saveAsTable(table)) 132 print(f"✅ Saved table summary to Delta: {table}") 133 134 elif otype in ("volume_json", "volume_csv"): 135 vol_path = output_cfg.get("volume_path", "").rstrip("/") 136 table_name = self.config.get("source", {}).get("table", "") 137 tbl = table_name.split(".")[-1] if table_name else "table" 138 filename = ( 139 output_cfg.get("filename") 140 or f"dq_{tbl}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 141 ) 142 ext = "json" if otype == "volume_json" else "csv" 143 os.makedirs(vol_path, exist_ok=True) 144 full = f"{vol_path}/{filename}.{ext}" 145 import pandas as pd 146 spdf = pd.DataFrame([summary]) 147 if ext == "json": 148 spdf.to_json(full, orient="records", indent=2) 149 else: 150 spdf.to_csv(full, index=False) 151 print(f"✅ Saved table summary to: {full}") 152 153 def save(self, output_cfg: dict, spark=None): 154 """Persist results to one or more destinations defined in output_cfg.""" 155 import os 156 157 # Support both old single-type ("type") and new multi-type ("types") format 158 types = output_cfg.get("types") or ([output_cfg["type"]] if "type" in output_cfg else ["dataframe"]) 159 160 results = {} 161 sdf = None 162 163 for otype in types: 164 if otype == "dataframe": 165 if sdf is None: 166 sdf = self.to_spark_df(spark) 167 results["dataframe"] = sdf 168 169 elif otype == "delta": 170 if sdf is None: 171 sdf = self.to_spark_df(spark) 172 table = output_cfg.get("delta_table", "") 173 if not table: 174 print("⚠️ delta_table not set — skipping Delta output") 175 continue 176 (sdf.write.format("delta") 177 .mode("append") 178 .option("mergeSchema", "true") 179 .saveAsTable(table)) 180 print(f"✅ Saved to Delta table: {table}") 181 # Also write table-level summary to <table>_summary if configured 182 summary_table = output_cfg.get("summary_delta_table", "") 183 if summary_table: 184 (self.to_table_summary_df(spark) 185 .write.format("delta") 186 .mode("append") 187 .option("mergeSchema", "true") 188 .saveAsTable(summary_table)) 189 print(f"✅ Saved table summary to: {summary_table}") 190 results["delta"] = sdf 191 192 elif otype in ("volume_json", "volume_csv"): 193 # vol_path already contains catalog/schema from the wizard 194 vol_path = output_cfg.get("volume_path", "").rstrip("/") 195 table_name = self.config.get("source", {}).get("table", "") 196 tbl = table_name.split(".")[-1] if table_name else "table" 197 filename = (output_cfg.get("filename") 198 or f"dq_{tbl}_{datetime.now().strftime('%Y%m%d_%H%M%S')}") 199 ext = "json" if otype == "volume_json" else "csv" 200 os.makedirs(vol_path, exist_ok=True) 201 full = f"{vol_path}/{filename}.{ext}" 202 pdf = self.to_pandas() 203 if ext == "json": 204 pdf.to_json(full, orient="records", indent=2) 205 else: 206 pdf.to_csv(full, index=False) 207 print(f"✅ Saved to: {full}") 208 # Summary file alongside: same dir, _summary suffix 209 summary_file = f"{vol_path}/{filename}_summary.{ext}" 210 spdf = self.table_summary_pandas() 211 if ext == "json": 212 spdf.to_json(summary_file, orient="records", indent=2) 213 else: 214 spdf.to_csv(summary_file, index=False) 215 print(f"✅ Saved table summary to: {summary_file}") 216 results[otype] = full 217 218 return results.get("dataframe") or (sdf if sdf is not None else None)
49 def table_summary(self) -> dict: 50 """Single-row summary at table level. 51 52 clean_records = rows that passed every column check applied to them. 53 overall_status = PASS only if all checks passed. 54 """ 55 if not self.results: 56 return {} 57 58 r0 = self.results[0] 59 metadata = self.config.get("metadata", {}) 60 total_rows = r0.total_rows 61 total_checks = len(self.results) 62 passed_checks = sum(1 for r in self.results if r.status == "PASS") 63 failed_checks = total_checks - passed_checks 64 overall_status = "PASS" if failed_checks == 0 else "FAIL" 65 66 # Clean records: rows not flagged as failed by ANY check. 67 # Each check reports failed_rows independently; we sum them as a 68 # conservative lower-bound on dirty rows (exact intersection needs a join). 69 total_failed_rows = sum(r.failed_rows for r in self.results) 70 # Cap at total_rows to avoid negative clean counts when checks overlap 71 dirty_rows = min(total_failed_rows, total_rows) if total_rows else 0 72 clean_rows = max(0, total_rows - dirty_rows) 73 clean_pct = round(clean_rows / total_rows * 100, 2) if total_rows else 0.0 74 75 return { 76 "table_name": r0.table_name, 77 "overall_status": overall_status, 78 "total_rows": total_rows, 79 "clean_rows": clean_rows, 80 "dirty_rows": dirty_rows, 81 "clean_pct": clean_pct, 82 "total_checks": total_checks, 83 "passed_checks": passed_checks, 84 "failed_checks": failed_checks, 85 "columns_checked": r0.columns_checked, 86 "total_columns": r0.total_columns, 87 "column_coverage_pct": r0.column_coverage_pct, 88 "run_timestamp": r0.run_timestamp, 89 "data_owner": metadata.get("data_owner", ""), 90 "data_steward": metadata.get("data_steward", ""), 91 "business_domain": metadata.get("business_domain", ""), 92 "description": metadata.get("description", ""), 93 "tags": ",".join(metadata.get("tags", [])), 94 }
Single-row summary at table level.
clean_records = rows that passed every column check applied to them. overall_status = PASS only if all checks passed.
96 def to_table_summary_df(self, spark=None): 97 """Spark DataFrame with one row summarising this table run.""" 98 if spark is None: 99 from pyspark.sql import SparkSession 100 spark = SparkSession.getActiveSession() 101 return spark.createDataFrame([self.table_summary()])
Spark DataFrame with one row summarising this table run.
107 def save_table_summary(self, output_cfg: dict, spark=None): 108 """Persist the table-level summary row to configured destinations.""" 109 import os 110 111 types = output_cfg.get("types") or ( 112 [output_cfg["type"]] if "type" in output_cfg else ["dataframe"] 113 ) 114 summary = self.table_summary() 115 if not summary: 116 return 117 118 for otype in types: 119 if otype == "delta": 120 if spark is None: 121 from pyspark.sql import SparkSession 122 spark = SparkSession.getActiveSession() 123 table = output_cfg.get("delta_table", "") 124 if not table: 125 print("⚠️ delta_table not set — skipping Delta summary output") 126 continue 127 (self.to_table_summary_df(spark) 128 .write.format("delta") 129 .mode("append") 130 .option("mergeSchema", "true") 131 .saveAsTable(table)) 132 print(f"✅ Saved table summary to Delta: {table}") 133 134 elif otype in ("volume_json", "volume_csv"): 135 vol_path = output_cfg.get("volume_path", "").rstrip("/") 136 table_name = self.config.get("source", {}).get("table", "") 137 tbl = table_name.split(".")[-1] if table_name else "table" 138 filename = ( 139 output_cfg.get("filename") 140 or f"dq_{tbl}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 141 ) 142 ext = "json" if otype == "volume_json" else "csv" 143 os.makedirs(vol_path, exist_ok=True) 144 full = f"{vol_path}/{filename}.{ext}" 145 import pandas as pd 146 spdf = pd.DataFrame([summary]) 147 if ext == "json": 148 spdf.to_json(full, orient="records", indent=2) 149 else: 150 spdf.to_csv(full, index=False) 151 print(f"✅ Saved table summary to: {full}")
Persist the table-level summary row to configured destinations.
153 def save(self, output_cfg: dict, spark=None): 154 """Persist results to one or more destinations defined in output_cfg.""" 155 import os 156 157 # Support both old single-type ("type") and new multi-type ("types") format 158 types = output_cfg.get("types") or ([output_cfg["type"]] if "type" in output_cfg else ["dataframe"]) 159 160 results = {} 161 sdf = None 162 163 for otype in types: 164 if otype == "dataframe": 165 if sdf is None: 166 sdf = self.to_spark_df(spark) 167 results["dataframe"] = sdf 168 169 elif otype == "delta": 170 if sdf is None: 171 sdf = self.to_spark_df(spark) 172 table = output_cfg.get("delta_table", "") 173 if not table: 174 print("⚠️ delta_table not set — skipping Delta output") 175 continue 176 (sdf.write.format("delta") 177 .mode("append") 178 .option("mergeSchema", "true") 179 .saveAsTable(table)) 180 print(f"✅ Saved to Delta table: {table}") 181 # Also write table-level summary to <table>_summary if configured 182 summary_table = output_cfg.get("summary_delta_table", "") 183 if summary_table: 184 (self.to_table_summary_df(spark) 185 .write.format("delta") 186 .mode("append") 187 .option("mergeSchema", "true") 188 .saveAsTable(summary_table)) 189 print(f"✅ Saved table summary to: {summary_table}") 190 results["delta"] = sdf 191 192 elif otype in ("volume_json", "volume_csv"): 193 # vol_path already contains catalog/schema from the wizard 194 vol_path = output_cfg.get("volume_path", "").rstrip("/") 195 table_name = self.config.get("source", {}).get("table", "") 196 tbl = table_name.split(".")[-1] if table_name else "table" 197 filename = (output_cfg.get("filename") 198 or f"dq_{tbl}_{datetime.now().strftime('%Y%m%d_%H%M%S')}") 199 ext = "json" if otype == "volume_json" else "csv" 200 os.makedirs(vol_path, exist_ok=True) 201 full = f"{vol_path}/{filename}.{ext}" 202 pdf = self.to_pandas() 203 if ext == "json": 204 pdf.to_json(full, orient="records", indent=2) 205 else: 206 pdf.to_csv(full, index=False) 207 print(f"✅ Saved to: {full}") 208 # Summary file alongside: same dir, _summary suffix 209 summary_file = f"{vol_path}/{filename}_summary.{ext}" 210 spdf = self.table_summary_pandas() 211 if ext == "json": 212 spdf.to_json(summary_file, orient="records", indent=2) 213 else: 214 spdf.to_csv(summary_file, index=False) 215 print(f"✅ Saved table summary to: {summary_file}") 216 results[otype] = full 217 218 return results.get("dataframe") or (sdf if sdf is not None else None)
Persist results to one or more destinations defined in output_cfg.
24@dataclass 25class CheckResult: 26 table_name: str 27 column_name: str 28 check_name: str 29 dq_dimension: str 30 total_rows: int 31 passed_rows: int 32 failed_rows: int 33 passed_pct: float 34 threshold_pct: float 35 status: str # PASS | FAIL | ERROR 36 check_params: str = "{}" 37 run_timestamp: str = "" 38 data_owner: str = "" 39 data_steward: str = "" 40 business_domain: str = "" 41 table_description: str = "" 42 columns_checked: int = 0 43 total_columns: int = 0 44 column_coverage_pct: float = 0.0 45 tags: str = "" 46 47 def to_dict(self) -> dict: 48 return {k: v for k, v in self.__dict__.items()}