dashgov
DashGov — Data lineage and governance for Databricks.
1"""DashGov — Data lineage and governance for Databricks.""" 2from dashgov.lineage import LineageGraph, build_lineage_graph, fetch_uc_lineage 3from dashgov.parser import parse_table_lineage, parse_column_lineage, parse_notebook_lineage 4from dashgov.classifier import classify_table, classify_all 5from dashgov.ui import launch 6 7__version__ = "0.1.4" 8__all__ = [ 9 "LineageGraph", 10 "build_lineage_graph", 11 "fetch_uc_lineage", 12 "parse_table_lineage", 13 "parse_column_lineage", 14 "parse_notebook_lineage", 15 "classify_table", 16 "classify_all", 17 "launch", 18]
39class LineageGraph: 40 """Directed acyclic graph of table and column lineage.""" 41 42 def __init__( 43 self, 44 tables: dict[str, TableNode], 45 table_edges: list[TableEdge], 46 column_edges: list[ColumnEdge], 47 ): 48 self.tables = tables 49 self.table_edges = table_edges 50 self.column_edges = column_edges 51 52 # adjacency: source → {targets} 53 self._downstream: dict[str, set[str]] = {} 54 self._upstream: dict[str, set[str]] = {} 55 for e in table_edges: 56 self._downstream.setdefault(e.source, set()).add(e.target) 57 self._upstream.setdefault(e.target, set()).add(e.source) 58 59 # ── Table-level traversal ──────────────────────────────────────────────── 60 61 def upstream_tables(self, table: str, depth: int = 1) -> list[str]: 62 """All tables that feed into *table*, up to *depth* hops.""" 63 return self._bfs(table, self._upstream, depth) 64 65 def downstream_tables(self, table: str, depth: int = 1) -> list[str]: 66 """All tables that consume from *table*, up to *depth* hops.""" 67 return self._bfs(table, self._downstream, depth) 68 69 def root_sources(self, table: str) -> list[str]: 70 """Tables with no upstream that eventually feed into *table*.""" 71 visited, result = set(), [] 72 stack = [table] 73 while stack: 74 t = stack.pop() 75 if t in visited: 76 continue 77 visited.add(t) 78 ups = list(self._upstream.get(t, [])) 79 if not ups and t != table: 80 result.append(t) 81 stack.extend(ups) 82 return sorted(result) 83 84 def impact_analysis(self, table: str) -> dict: 85 """What breaks if *table* changes — full downstream tree.""" 86 direct = sorted(self._downstream.get(table, [])) 87 all_downstream = self._bfs(table, self._downstream, depth=999) 88 col_targets = {} 89 for ce in self.column_edges: 90 if ce.source_table == table: 91 col_targets.setdefault(ce.source_column, []).append( 92 f"{ce.target_table}.{ce.target_column}" 93 ) 94 return { 95 "table": table, 96 "direct_dependents": direct, 97 "all_downstream": all_downstream, 98 "affected_column_paths": col_targets, 99 "total_affected_tables": len(all_downstream), 100 } 101 102 # ── Column-level traversal ─────────────────────────────────────────────── 103 104 def column_sources(self, table: str, column: str) -> list[ColumnEdge]: 105 """Edges that feed into *table.column*.""" 106 return [ 107 e for e in self.column_edges 108 if e.target_table == table and e.target_column == column 109 ] 110 111 def column_targets(self, table: str, column: str) -> list[ColumnEdge]: 112 """Edges that *table.column* feeds into.""" 113 return [ 114 e for e in self.column_edges 115 if e.source_table == table and e.source_column == column 116 ] 117 118 def column_lineage_chain(self, table: str, column: str) -> dict: 119 """Full upstream chain for a single column.""" 120 visited, upstream = set(), [] 121 stack = [(table, column)] 122 while stack: 123 t, c = stack.pop() 124 key = f"{t}.{c}" 125 if key in visited: 126 continue 127 visited.add(key) 128 for src in self.column_sources(t, c): 129 upstream.append({"table": src.source_table, "column": src.source_column}) 130 stack.append((src.source_table, src.source_column)) 131 return { 132 "table": table, 133 "column": column, 134 "upstream_columns": upstream, 135 } 136 137 # ── Export ─────────────────────────────────────────────────────────────── 138 139 def to_dict(self) -> dict: 140 return { 141 "tables": { 142 k: { 143 "full_name": v.full_name, 144 "catalog": v.catalog, 145 "schema_name": v.schema_name, 146 "table": v.table, 147 "columns": v.columns, 148 "role": v.role, 149 } 150 for k, v in self.tables.items() 151 }, 152 "table_edges": [ 153 {"source": e.source, "target": e.target} for e in self.table_edges 154 ], 155 "column_edges": [ 156 { 157 "source_table": e.source_table, 158 "source_column": e.source_column, 159 "target_table": e.target_table, 160 "target_column": e.target_column, 161 "transformation": e.transformation, 162 } 163 for e in self.column_edges 164 ], 165 } 166 167 def summary(self) -> dict: 168 return { 169 "total_tables": len(self.tables), 170 "total_table_edges": len(self.table_edges), 171 "total_column_edges": len(self.column_edges), 172 "root_sources": [t for t in self.tables if t not in self._upstream], 173 "leaf_sinks": [t for t in self.tables if t not in self._downstream], 174 } 175 176 # ── Internal ───────────────────────────────────────────────────────────── 177 178 def _bfs(self, start: str, adj: dict, depth: int) -> list[str]: 179 visited, result = {start}, [] 180 queue = deque([(start, 0)]) 181 while queue: 182 node, d = queue.popleft() 183 if d >= depth: 184 continue 185 for neighbour in adj.get(node, []): 186 if neighbour not in visited: 187 visited.add(neighbour) 188 result.append(neighbour) 189 queue.append((neighbour, d + 1)) 190 return result
Directed acyclic graph of table and column lineage.
42 def __init__( 43 self, 44 tables: dict[str, TableNode], 45 table_edges: list[TableEdge], 46 column_edges: list[ColumnEdge], 47 ): 48 self.tables = tables 49 self.table_edges = table_edges 50 self.column_edges = column_edges 51 52 # adjacency: source → {targets} 53 self._downstream: dict[str, set[str]] = {} 54 self._upstream: dict[str, set[str]] = {} 55 for e in table_edges: 56 self._downstream.setdefault(e.source, set()).add(e.target) 57 self._upstream.setdefault(e.target, set()).add(e.source)
61 def upstream_tables(self, table: str, depth: int = 1) -> list[str]: 62 """All tables that feed into *table*, up to *depth* hops.""" 63 return self._bfs(table, self._upstream, depth)
All tables that feed into table, up to depth hops.
65 def downstream_tables(self, table: str, depth: int = 1) -> list[str]: 66 """All tables that consume from *table*, up to *depth* hops.""" 67 return self._bfs(table, self._downstream, depth)
All tables that consume from table, up to depth hops.
69 def root_sources(self, table: str) -> list[str]: 70 """Tables with no upstream that eventually feed into *table*.""" 71 visited, result = set(), [] 72 stack = [table] 73 while stack: 74 t = stack.pop() 75 if t in visited: 76 continue 77 visited.add(t) 78 ups = list(self._upstream.get(t, [])) 79 if not ups and t != table: 80 result.append(t) 81 stack.extend(ups) 82 return sorted(result)
Tables with no upstream that eventually feed into table.
84 def impact_analysis(self, table: str) -> dict: 85 """What breaks if *table* changes — full downstream tree.""" 86 direct = sorted(self._downstream.get(table, [])) 87 all_downstream = self._bfs(table, self._downstream, depth=999) 88 col_targets = {} 89 for ce in self.column_edges: 90 if ce.source_table == table: 91 col_targets.setdefault(ce.source_column, []).append( 92 f"{ce.target_table}.{ce.target_column}" 93 ) 94 return { 95 "table": table, 96 "direct_dependents": direct, 97 "all_downstream": all_downstream, 98 "affected_column_paths": col_targets, 99 "total_affected_tables": len(all_downstream), 100 }
What breaks if table changes — full downstream tree.
104 def column_sources(self, table: str, column: str) -> list[ColumnEdge]: 105 """Edges that feed into *table.column*.""" 106 return [ 107 e for e in self.column_edges 108 if e.target_table == table and e.target_column == column 109 ]
Edges that feed into table.column.
111 def column_targets(self, table: str, column: str) -> list[ColumnEdge]: 112 """Edges that *table.column* feeds into.""" 113 return [ 114 e for e in self.column_edges 115 if e.source_table == table and e.source_column == column 116 ]
Edges that table.column feeds into.
118 def column_lineage_chain(self, table: str, column: str) -> dict: 119 """Full upstream chain for a single column.""" 120 visited, upstream = set(), [] 121 stack = [(table, column)] 122 while stack: 123 t, c = stack.pop() 124 key = f"{t}.{c}" 125 if key in visited: 126 continue 127 visited.add(key) 128 for src in self.column_sources(t, c): 129 upstream.append({"table": src.source_table, "column": src.source_column}) 130 stack.append((src.source_table, src.source_column)) 131 return { 132 "table": table, 133 "column": column, 134 "upstream_columns": upstream, 135 }
Full upstream chain for a single column.
139 def to_dict(self) -> dict: 140 return { 141 "tables": { 142 k: { 143 "full_name": v.full_name, 144 "catalog": v.catalog, 145 "schema_name": v.schema_name, 146 "table": v.table, 147 "columns": v.columns, 148 "role": v.role, 149 } 150 for k, v in self.tables.items() 151 }, 152 "table_edges": [ 153 {"source": e.source, "target": e.target} for e in self.table_edges 154 ], 155 "column_edges": [ 156 { 157 "source_table": e.source_table, 158 "source_column": e.source_column, 159 "target_table": e.target_table, 160 "target_column": e.target_column, 161 "transformation": e.transformation, 162 } 163 for e in self.column_edges 164 ], 165 }
167 def summary(self) -> dict: 168 return { 169 "total_tables": len(self.tables), 170 "total_table_edges": len(self.table_edges), 171 "total_column_edges": len(self.column_edges), 172 "root_sources": [t for t in self.tables if t not in self._upstream], 173 "leaf_sinks": [t for t in self.tables if t not in self._downstream], 174 }
195def build_lineage_graph( 196 tables: list[dict], 197 table_edges: list[dict], 198 column_edges: list[dict], 199) -> LineageGraph: 200 """ 201 Build a LineageGraph from plain dicts. 202 203 tables — [{"full_name": str, "columns": [{name, type, nullable}], ...}] 204 table_edges — [{"source": str, "target": str}] 205 column_edges — [{"source_table", "source_column", "target_table", "target_column"}] 206 """ 207 nodes: dict[str, TableNode] = {} 208 for t in tables: 209 full = t["full_name"] 210 parts = full.split(".") 211 cat = parts[0] if len(parts) >= 3 else "" 212 sch = parts[1] if len(parts) >= 3 else (parts[0] if len(parts) == 2 else "") 213 tbl = parts[-1] 214 nodes[full] = TableNode( 215 full_name=full, 216 catalog=cat, 217 schema_name=sch, 218 table=tbl, 219 columns=t.get("columns", []), 220 role=t.get("role", "unknown"), 221 ) 222 223 t_edges = [TableEdge(e["source"], e["target"]) for e in table_edges] 224 c_edges = [ 225 ColumnEdge( 226 source_table=e["source_table"], 227 source_column=e["source_column"], 228 target_table=e["target_table"], 229 target_column=e["target_column"], 230 transformation=e.get("transformation"), 231 ) 232 for e in column_edges 233 ] 234 return LineageGraph(nodes, t_edges, c_edges)
Build a LineageGraph from plain dicts.
tables — [{"full_name": str, "columns": [{name, type, nullable}], ...}] table_edges — [{"source": str, "target": str}] column_edges — [{"source_table", "source_column", "target_table", "target_column"}]
237def fetch_uc_lineage( 238 table: str, 239 workspace_url: str, 240 token: str, 241 depth: int = 2, 242) -> dict: 243 """ 244 Fetch table-level and column-level lineage from Unity Catalog REST API. 245 246 Returns a dict compatible with build_lineage_graph(). 247 Requires workspace_url (https://...) and a Databricks PAT. 248 """ 249 try: 250 import requests 251 except ImportError: 252 raise RuntimeError("requests is required: pip install requests") 253 254 headers = {"Authorization": f"Bearer {token}"} 255 base = workspace_url.rstrip("/") 256 257 visited_tables: set[str] = set() 258 table_edges: list[dict] = [] 259 column_edges: list[dict] = [] 260 queue = deque([table]) 261 visited_tables.add(table) 262 263 for _ in range(depth): 264 next_queue: deque = deque() 265 while queue: 266 t = queue.popleft() 267 resp = requests.get( 268 f"{base}/api/2.0/lineage-tracking/table-lineages", 269 headers=headers, 270 params={"table_name": t}, 271 timeout=15, 272 ) 273 if resp.status_code != 200: 274 continue 275 data = resp.json() 276 for up in data.get("upstream_tables", []): 277 src = up.get("name", "") 278 if src and src not in visited_tables: 279 visited_tables.add(src) 280 table_edges.append({"source": src, "target": t}) 281 next_queue.append(src) 282 for down in data.get("downstream_tables", []): 283 tgt = down.get("name", "") 284 if tgt and tgt not in visited_tables: 285 visited_tables.add(tgt) 286 table_edges.append({"source": t, "target": tgt}) 287 next_queue.append(tgt) 288 queue = next_queue 289 290 # Column lineage for the root table 291 col_resp = requests.get( 292 f"{base}/api/2.0/lineage-tracking/column-lineages", 293 headers=headers, 294 params={"table_name": table}, 295 timeout=15, 296 ) 297 if col_resp.status_code == 200: 298 for col_data in col_resp.json().get("column_lineage", []): 299 tgt_col = col_data.get("name", "") 300 for up in col_data.get("upstream_columns", []): 301 column_edges.append({ 302 "source_table": up.get("table_name", ""), 303 "source_column": up.get("name", ""), 304 "target_table": table, 305 "target_column": tgt_col, 306 }) 307 308 tables_list = [{"full_name": t, "columns": []} for t in visited_tables] 309 return { 310 "tables": tables_list, 311 "table_edges": table_edges, 312 "column_edges": column_edges, 313 }
Fetch table-level and column-level lineage from Unity Catalog REST API.
Returns a dict compatible with build_lineage_graph(). Requires workspace_url (https://...) and a Databricks PAT.
22def parse_table_lineage(sql: str, dialect: str = "spark") -> dict: 23 """ 24 Extract table-level lineage from a SQL statement. 25 26 Returns: 27 { 28 "target": str | None, # the table being written to 29 "sources": [str, ...], # tables being read from 30 "type": "ctas"|"insert"|"select"|"unknown" 31 } 32 """ 33 sg = _sqlglot() 34 exp = sg.exp 35 36 try: 37 stmt = sg.parse_one(sql, dialect=dialect) 38 except Exception: 39 return {"target": None, "sources": [], "type": "unknown"} 40 41 def _full(tbl) -> str: 42 parts = [p for p in (tbl.catalog, tbl.db, tbl.name) if p] 43 return ".".join(parts) if parts else (tbl.name or "") 44 45 def _table_names(node) -> list[str]: 46 return [_full(t) for t in node.find_all(exp.Table) if t.name] 47 48 if isinstance(stmt, exp.Create): 49 tbl = stmt.find(exp.Table) 50 target_full = _full(tbl) if tbl else None 51 target_short = tbl.name if tbl else None 52 all_names = _table_names(stmt) 53 sources = [n for n in all_names if n != target_full] 54 return {"target": target_short, "sources": list(dict.fromkeys(sources)), "type": "ctas"} 55 56 if isinstance(stmt, exp.Insert): 57 tbl = stmt.find(exp.Table) 58 target_full = _full(tbl) if tbl else None 59 target_short = tbl.name if tbl else None 60 inner = stmt.find(sg.exp.Select) 61 if inner: 62 sources = [n for n in _table_names(inner) if n != target_full] 63 else: 64 sources = [] 65 return {"target": target_short, "sources": list(dict.fromkeys(sources)), "type": "insert"} 66 67 if isinstance(stmt, (exp.Select, exp.Subquery)): 68 sources = list(dict.fromkeys(_table_names(stmt))) 69 return {"target": None, "sources": sources, "type": "select"} 70 71 return {"target": None, "sources": [], "type": "unknown"}
Extract table-level lineage from a SQL statement.
Returns:
{ "target": str | None, # the table being written to "sources": [str, ...], # tables being read from "type": "ctas"|"insert"|"select"|"unknown" }
74def parse_column_lineage( 75 sql: str, 76 target_table: str, 77 dialect: str = "spark", 78) -> list[dict]: 79 """ 80 Extract column-level lineage from a SQL statement. 81 82 Returns list of: 83 { 84 "target_column": str, 85 "source_table": str | None, 86 "source_column": str | None, 87 "expression": str | None, # for computed columns 88 } 89 90 Only handles direct column references. Complex expressions 91 (aggregations, UDFs) are returned with expression set to the SQL text. 92 """ 93 sg = _sqlglot() 94 exp = sg.exp 95 96 try: 97 stmt = sg.parse_one(sql, dialect=dialect) 98 except Exception: 99 return [] 100 101 # Unwrap CREATE TABLE AS SELECT / INSERT INTO SELECT 102 select = stmt.find(exp.Select) 103 if select is None: 104 if isinstance(stmt, exp.Select): 105 select = stmt 106 else: 107 return [] 108 109 # Build alias map: alias → real table name 110 alias_map: dict[str, str] = {} 111 for from_expr in select.find_all(exp.From): 112 tbl = from_expr.find(exp.Table) 113 if tbl: 114 alias_map[tbl.alias or tbl.name] = tbl.name 115 116 for join in select.find_all(exp.Join): 117 tbl = join.find(exp.Table) 118 if tbl: 119 alias_map[tbl.alias or tbl.name] = tbl.name 120 121 result = [] 122 for sel in select.selects: 123 alias = sel.alias or (sel.name if hasattr(sel, "name") else None) 124 target_col = alias or str(sel) 125 126 if isinstance(sel, (exp.Column, exp.Alias)): 127 col_node = sel.find(exp.Column) if isinstance(sel, exp.Alias) else sel 128 if col_node: 129 tbl_alias = ( 130 col_node.table if hasattr(col_node, "table") else None 131 ) 132 src_tbl = alias_map.get(tbl_alias, tbl_alias) if tbl_alias else None 133 src_col = col_node.name if hasattr(col_node, "name") else None 134 result.append({ 135 "target_table": target_table, 136 "target_column": target_col, 137 "source_table": src_tbl, 138 "source_column": src_col, 139 "expression": None, 140 }) 141 else: 142 result.append({ 143 "target_table": target_table, 144 "target_column": target_col, 145 "source_table": None, 146 "source_column": None, 147 "expression": str(sel), 148 }) 149 else: 150 result.append({ 151 "target_table": target_table, 152 "target_column": target_col, 153 "source_table": None, 154 "source_column": None, 155 "expression": str(sel), 156 }) 157 158 return result
Extract column-level lineage from a SQL statement.
Returns list of:
{ "target_column": str, "source_table": str | None, "source_column": str | None, "expression": str | None, # for computed columns }
Only handles direct column references. Complex expressions (aggregations, UDFs) are returned with expression set to the SQL text.
161def parse_notebook_lineage(sql_cells: list[str], dialect: str = "spark") -> dict: 162 """ 163 Parse multiple SQL cells from a notebook and build combined lineage. 164 165 Returns: 166 { 167 "table_edges": [{"source": str, "target": str}, ...], 168 "column_edges": [{...}, ...], 169 "statements": int, 170 "parsed": int, 171 } 172 """ 173 table_edges: list[dict] = [] 174 column_edges: list[dict] = [] 175 parsed = 0 176 177 for cell in sql_cells: 178 cell = cell.strip() 179 if not cell: 180 continue 181 tl = parse_table_lineage(cell, dialect=dialect) 182 if tl["target"] and tl["sources"]: 183 parsed += 1 184 for src in tl["sources"]: 185 table_edges.append({"source": src, "target": tl["target"]}) 186 cl = parse_column_lineage(cell, tl["target"], dialect=dialect) 187 for ce in cl: 188 if ce["source_table"] and ce["source_column"]: 189 column_edges.append({ 190 "source_table": ce["source_table"], 191 "source_column": ce["source_column"], 192 "target_table": ce["target_table"], 193 "target_column": ce["target_column"], 194 "transformation": ce.get("expression"), 195 }) 196 197 return { 198 "table_edges": table_edges, 199 "column_edges": column_edges, 200 "statements": len(sql_cells), 201 "parsed": parsed, 202 }
Parse multiple SQL cells from a notebook and build combined lineage.
Returns:
{ "table_edges": [{"source": str, "target": str}, ...], "column_edges": [{...}, ...], "statements": int, "parsed": int, }
64def classify_table( 65 full_name: str, 66 columns: list[dict], 67 n_upstream: int = 0, 68 n_downstream: int = 0, 69) -> tuple[str, float]: 70 """ 71 Classify a table's role. 72 73 Returns (role: str, confidence: float). 74 75 confidence is in [0.0, 1.0]: 76 >= 0.85 → strong signal (name prefix, junction shape) 77 0.60–0.84 → moderate signal (position in lineage + shape) 78 < 0.60 → weak / unknown 79 """ 80 name = _name_lower(full_name) 81 n_cols = len(columns) 82 n_fk = count_fk_columns(columns) 83 has_pk = has_primary_key(columns) 84 85 # ── Staging ── 86 if _starts_with_any(name, _STAGING_PREFIXES): 87 return "staging", 0.90 88 89 # ── Aggregation ── 90 if _ends_with_any(name, _AGG_SUFFIXES): 91 return "aggregation", 0.90 92 if _starts_with_any(name, _FACT_PREFIXES) and n_upstream > 0: 93 return "aggregation", 0.75 94 95 # ── Dimension / Entity ── 96 if _starts_with_any(name, _DIMENSION_PREFIXES): 97 return "entity", 0.90 98 99 # ── Junction ── 100 if _ends_with_any(name, _JUNCTION_SUFFIXES): 101 return "junction", 0.88 102 if n_cols >= 2 and n_fk >= 2 and n_fk / max(n_cols, 1) >= 0.6: 103 # Mostly FK columns → junction/bridge table 104 return "junction", 0.80 105 106 # ── Entity ── 107 # Root source with a PK and meaningful columns 108 if n_upstream == 0 and has_pk and n_cols >= 3: 109 return "entity", 0.82 110 if n_upstream == 0 and n_cols >= 5: 111 return "entity", 0.65 112 113 # ── Fact ── 114 # Has upstream (transformed from somewhere) + FK columns 115 if n_upstream >= 1 and n_fk >= 1 and n_downstream >= 1: 116 return "fact", 0.70 117 if n_upstream >= 1 and n_fk >= 2: 118 return "fact", 0.65 119 120 # ── Aggregation by position ── 121 if n_upstream >= 2 and n_downstream == 0: 122 return "aggregation", 0.60 123 124 return "unknown", 0.40
Classify a table's role.
Returns (role: str, confidence: float).
confidence is in [0.0, 1.0]:
= 0.85 → strong signal (name prefix, junction shape) 0.60–0.84 → moderate signal (position in lineage + shape) < 0.60 → weak / unknown
127def classify_all( 128 tables: dict, # {full_name: {"columns": [...], "role": ...}} 129 upstream_counts: dict[str, int], 130 downstream_counts: dict[str, int], 131) -> dict[str, tuple[str, float]]: 132 """ 133 Classify every table in the graph. 134 135 Returns {full_name: (role, confidence)}. 136 """ 137 return { 138 name: classify_table( 139 name, 140 info.get("columns", []), 141 upstream_counts.get(name, 0), 142 downstream_counts.get(name, 0), 143 ) 144 for name, info in tables.items() 145 }
Classify every table in the graph.
Returns {full_name: (role, confidence)}.
54def launch(): 55 try: 56 import ipywidgets as w 57 from IPython.display import display 58 except ImportError: 59 raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets") 60 61 import dashui 62 63 # ── SQL parser ──────────────────────────────────────────────────────────── 64 sql_input = w.Textarea( 65 description="SQL:", 66 placeholder="Paste CREATE TABLE AS SELECT or INSERT INTO SELECT ...", 67 layout=w.Layout(width="100%", height="120px"), 68 ) 69 dialect_toggle = w.ToggleButtons( 70 options=["spark", "snowflake", "bigquery", "trino"], 71 description="Dialect:", 72 value="spark", 73 ) 74 parse_btn = dashui.action_button("Parse Lineage from SQL", style="info") 75 parse_output = dashui.output_panel() 76 77 def on_parse(b): 78 with parse_output: 79 parse_output.clear_output() 80 sql = sql_input.value.strip() 81 if not sql: 82 print("Warning: paste a SQL statement above") 83 return 84 try: 85 from dashgov.parser import parse_table_lineage, parse_column_lineage 86 tl = parse_table_lineage(sql, dialect=dialect_toggle.value) 87 print(f"Type : {tl['type']}") 88 print(f"Target : {tl['target'] or '—'}") 89 print(f"Sources : {', '.join(tl['sources']) or '—'}") 90 if tl["target"]: 91 cl = parse_column_lineage(sql, tl["target"], dialect=dialect_toggle.value) 92 if cl: 93 print("\nColumn lineage:") 94 for c in cl: 95 src = ( 96 f"{c['source_table']}.{c['source_column']}" 97 if c["source_table"] else c.get("expression", "?") 98 ) 99 print(f" {src:40s} → {c['target_column']}") 100 except Exception as e: 101 print(f"Error: {e}") 102 103 parse_btn.on_click(on_parse) 104 105 # ── UC live lineage ─────────────────────────────────────────────────────── 106 uc_workspace = w.Text( 107 description="Workspace URL:", 108 placeholder="https://adb-xxx.azuredatabricks.net", 109 ) 110 uc_token = w.Password(description="Token:", placeholder="dapixxxxxxxx") 111 uc_table = w.Text(description="Table:", placeholder="catalog.schema.table") 112 uc_depth = w.IntSlider(description="Depth:", value=2, min=1, max=5) 113 uc_btn = dashui.action_button("Fetch UC Lineage", style="success") 114 uc_output = dashui.output_panel() 115 lineage_viz = w.HTML(value="") 116 117 def on_uc_fetch(b): 118 with uc_output: 119 uc_output.clear_output() 120 url = uc_workspace.value.strip() 121 tok = uc_token.value.strip() 122 tbl = uc_table.value.strip() 123 if not (url and tok and tbl): 124 print("Warning: fill in workspace URL, token, and table name") 125 return 126 try: 127 from dashgov.lineage import fetch_uc_lineage, build_lineage_graph 128 raw = fetch_uc_lineage(tbl, url, tok, depth=uc_depth.value) 129 graph = build_lineage_graph( 130 raw["tables"], raw["table_edges"], raw["column_edges"] 131 ) 132 s = graph.summary() 133 print(f"Tables : {s['total_tables']}") 134 print(f"Edges : {s['total_table_edges']} table, {s['total_column_edges']} column") 135 print(f"Roots : {', '.join(s['root_sources']) or '—'}") 136 print(f"Sinks : {', '.join(s['leaf_sinks']) or '—'}") 137 lineage_viz.value = _lineage_html(graph.to_dict(), focus_table=tbl) 138 imp = graph.impact_analysis(tbl) 139 if imp["all_downstream"]: 140 print(f"\nImpact if {tbl} changes:") 141 for t in imp["all_downstream"]: 142 print(f" ↓ {t}") 143 except Exception as e: 144 print(f"Error: {e}") 145 146 uc_btn.on_click(on_uc_fetch) 147 148 ui = dashui.card([ 149 dashui.header("DashGov — Data Lineage & Governance", library="dashgov"), 150 151 dashui.section("Step 1: Parse lineage from SQL"), 152 dashui.html( 153 "<div style='font-size:12px;color:#666;margin-bottom:4px'>" 154 "Paste a CREATE TABLE AS SELECT or INSERT INTO SELECT to extract " 155 "table and column lineage without a UC connection.</div>" 156 ), 157 sql_input, dialect_toggle, parse_btn, parse_output, 158 159 dashui.section("Step 2: Fetch live lineage from Unity Catalog"), 160 dashui.html( 161 "<div style='font-size:12px;color:#666;margin-bottom:4px'>" 162 "Requires a Databricks workspace URL and personal access token.</div>" 163 ), 164 w.HBox([uc_workspace, uc_token]), 165 w.HBox([uc_table, uc_depth]), 166 uc_btn, uc_output, lineage_viz, 167 ]) 168 display(ui)