dashgov

DashGov — Data lineage and governance for Databricks.

 1"""DashGov — Data lineage and governance for Databricks."""
 2from dashgov.lineage import LineageGraph, build_lineage_graph, fetch_uc_lineage
 3from dashgov.parser import parse_table_lineage, parse_column_lineage, parse_notebook_lineage
 4from dashgov.classifier import classify_table, classify_all
 5from dashgov.ui import launch
 6
 7__version__ = "0.1.4"
 8__all__ = [
 9    "LineageGraph",
10    "build_lineage_graph",
11    "fetch_uc_lineage",
12    "parse_table_lineage",
13    "parse_column_lineage",
14    "parse_notebook_lineage",
15    "classify_table",
16    "classify_all",
17    "launch",
18]
class LineageGraph:
 39class LineageGraph:
 40    """Directed acyclic graph of table and column lineage."""
 41
 42    def __init__(
 43        self,
 44        tables: dict[str, TableNode],
 45        table_edges: list[TableEdge],
 46        column_edges: list[ColumnEdge],
 47    ):
 48        self.tables = tables
 49        self.table_edges = table_edges
 50        self.column_edges = column_edges
 51
 52        # adjacency: source → {targets}
 53        self._downstream: dict[str, set[str]] = {}
 54        self._upstream: dict[str, set[str]] = {}
 55        for e in table_edges:
 56            self._downstream.setdefault(e.source, set()).add(e.target)
 57            self._upstream.setdefault(e.target, set()).add(e.source)
 58
 59    # ── Table-level traversal ────────────────────────────────────────────────
 60
 61    def upstream_tables(self, table: str, depth: int = 1) -> list[str]:
 62        """All tables that feed into *table*, up to *depth* hops."""
 63        return self._bfs(table, self._upstream, depth)
 64
 65    def downstream_tables(self, table: str, depth: int = 1) -> list[str]:
 66        """All tables that consume from *table*, up to *depth* hops."""
 67        return self._bfs(table, self._downstream, depth)
 68
 69    def root_sources(self, table: str) -> list[str]:
 70        """Tables with no upstream that eventually feed into *table*."""
 71        visited, result = set(), []
 72        stack = [table]
 73        while stack:
 74            t = stack.pop()
 75            if t in visited:
 76                continue
 77            visited.add(t)
 78            ups = list(self._upstream.get(t, []))
 79            if not ups and t != table:
 80                result.append(t)
 81            stack.extend(ups)
 82        return sorted(result)
 83
 84    def impact_analysis(self, table: str) -> dict:
 85        """What breaks if *table* changes — full downstream tree."""
 86        direct = sorted(self._downstream.get(table, []))
 87        all_downstream = self._bfs(table, self._downstream, depth=999)
 88        col_targets = {}
 89        for ce in self.column_edges:
 90            if ce.source_table == table:
 91                col_targets.setdefault(ce.source_column, []).append(
 92                    f"{ce.target_table}.{ce.target_column}"
 93                )
 94        return {
 95            "table": table,
 96            "direct_dependents": direct,
 97            "all_downstream": all_downstream,
 98            "affected_column_paths": col_targets,
 99            "total_affected_tables": len(all_downstream),
100        }
101
102    # ── Column-level traversal ───────────────────────────────────────────────
103
104    def column_sources(self, table: str, column: str) -> list[ColumnEdge]:
105        """Edges that feed into *table.column*."""
106        return [
107            e for e in self.column_edges
108            if e.target_table == table and e.target_column == column
109        ]
110
111    def column_targets(self, table: str, column: str) -> list[ColumnEdge]:
112        """Edges that *table.column* feeds into."""
113        return [
114            e for e in self.column_edges
115            if e.source_table == table and e.source_column == column
116        ]
117
118    def column_lineage_chain(self, table: str, column: str) -> dict:
119        """Full upstream chain for a single column."""
120        visited, upstream = set(), []
121        stack = [(table, column)]
122        while stack:
123            t, c = stack.pop()
124            key = f"{t}.{c}"
125            if key in visited:
126                continue
127            visited.add(key)
128            for src in self.column_sources(t, c):
129                upstream.append({"table": src.source_table, "column": src.source_column})
130                stack.append((src.source_table, src.source_column))
131        return {
132            "table": table,
133            "column": column,
134            "upstream_columns": upstream,
135        }
136
137    # ── Export ───────────────────────────────────────────────────────────────
138
139    def to_dict(self) -> dict:
140        return {
141            "tables": {
142                k: {
143                    "full_name": v.full_name,
144                    "catalog": v.catalog,
145                    "schema_name": v.schema_name,
146                    "table": v.table,
147                    "columns": v.columns,
148                    "role": v.role,
149                }
150                for k, v in self.tables.items()
151            },
152            "table_edges": [
153                {"source": e.source, "target": e.target} for e in self.table_edges
154            ],
155            "column_edges": [
156                {
157                    "source_table": e.source_table,
158                    "source_column": e.source_column,
159                    "target_table": e.target_table,
160                    "target_column": e.target_column,
161                    "transformation": e.transformation,
162                }
163                for e in self.column_edges
164            ],
165        }
166
167    def summary(self) -> dict:
168        return {
169            "total_tables": len(self.tables),
170            "total_table_edges": len(self.table_edges),
171            "total_column_edges": len(self.column_edges),
172            "root_sources": [t for t in self.tables if t not in self._upstream],
173            "leaf_sinks": [t for t in self.tables if t not in self._downstream],
174        }
175
176    # ── Internal ─────────────────────────────────────────────────────────────
177
178    def _bfs(self, start: str, adj: dict, depth: int) -> list[str]:
179        visited, result = {start}, []
180        queue = deque([(start, 0)])
181        while queue:
182            node, d = queue.popleft()
183            if d >= depth:
184                continue
185            for neighbour in adj.get(node, []):
186                if neighbour not in visited:
187                    visited.add(neighbour)
188                    result.append(neighbour)
189                    queue.append((neighbour, d + 1))
190        return result

Directed acyclic graph of table and column lineage.

LineageGraph( tables: dict[str, dashgov.lineage.TableNode], table_edges: list[dashgov.lineage.TableEdge], column_edges: list[dashgov.lineage.ColumnEdge])
42    def __init__(
43        self,
44        tables: dict[str, TableNode],
45        table_edges: list[TableEdge],
46        column_edges: list[ColumnEdge],
47    ):
48        self.tables = tables
49        self.table_edges = table_edges
50        self.column_edges = column_edges
51
52        # adjacency: source → {targets}
53        self._downstream: dict[str, set[str]] = {}
54        self._upstream: dict[str, set[str]] = {}
55        for e in table_edges:
56            self._downstream.setdefault(e.source, set()).add(e.target)
57            self._upstream.setdefault(e.target, set()).add(e.source)
tables
table_edges
column_edges
def upstream_tables(self, table: str, depth: int = 1) -> list[str]:
61    def upstream_tables(self, table: str, depth: int = 1) -> list[str]:
62        """All tables that feed into *table*, up to *depth* hops."""
63        return self._bfs(table, self._upstream, depth)

All tables that feed into table, up to depth hops.

def downstream_tables(self, table: str, depth: int = 1) -> list[str]:
65    def downstream_tables(self, table: str, depth: int = 1) -> list[str]:
66        """All tables that consume from *table*, up to *depth* hops."""
67        return self._bfs(table, self._downstream, depth)

All tables that consume from table, up to depth hops.

def root_sources(self, table: str) -> list[str]:
69    def root_sources(self, table: str) -> list[str]:
70        """Tables with no upstream that eventually feed into *table*."""
71        visited, result = set(), []
72        stack = [table]
73        while stack:
74            t = stack.pop()
75            if t in visited:
76                continue
77            visited.add(t)
78            ups = list(self._upstream.get(t, []))
79            if not ups and t != table:
80                result.append(t)
81            stack.extend(ups)
82        return sorted(result)

Tables with no upstream that eventually feed into table.

def impact_analysis(self, table: str) -> dict:
 84    def impact_analysis(self, table: str) -> dict:
 85        """What breaks if *table* changes — full downstream tree."""
 86        direct = sorted(self._downstream.get(table, []))
 87        all_downstream = self._bfs(table, self._downstream, depth=999)
 88        col_targets = {}
 89        for ce in self.column_edges:
 90            if ce.source_table == table:
 91                col_targets.setdefault(ce.source_column, []).append(
 92                    f"{ce.target_table}.{ce.target_column}"
 93                )
 94        return {
 95            "table": table,
 96            "direct_dependents": direct,
 97            "all_downstream": all_downstream,
 98            "affected_column_paths": col_targets,
 99            "total_affected_tables": len(all_downstream),
100        }

What breaks if table changes — full downstream tree.

def column_sources(self, table: str, column: str) -> list[dashgov.lineage.ColumnEdge]:
104    def column_sources(self, table: str, column: str) -> list[ColumnEdge]:
105        """Edges that feed into *table.column*."""
106        return [
107            e for e in self.column_edges
108            if e.target_table == table and e.target_column == column
109        ]

Edges that feed into table.column.

def column_targets(self, table: str, column: str) -> list[dashgov.lineage.ColumnEdge]:
111    def column_targets(self, table: str, column: str) -> list[ColumnEdge]:
112        """Edges that *table.column* feeds into."""
113        return [
114            e for e in self.column_edges
115            if e.source_table == table and e.source_column == column
116        ]

Edges that table.column feeds into.

def column_lineage_chain(self, table: str, column: str) -> dict:
118    def column_lineage_chain(self, table: str, column: str) -> dict:
119        """Full upstream chain for a single column."""
120        visited, upstream = set(), []
121        stack = [(table, column)]
122        while stack:
123            t, c = stack.pop()
124            key = f"{t}.{c}"
125            if key in visited:
126                continue
127            visited.add(key)
128            for src in self.column_sources(t, c):
129                upstream.append({"table": src.source_table, "column": src.source_column})
130                stack.append((src.source_table, src.source_column))
131        return {
132            "table": table,
133            "column": column,
134            "upstream_columns": upstream,
135        }

Full upstream chain for a single column.

def to_dict(self) -> dict:
139    def to_dict(self) -> dict:
140        return {
141            "tables": {
142                k: {
143                    "full_name": v.full_name,
144                    "catalog": v.catalog,
145                    "schema_name": v.schema_name,
146                    "table": v.table,
147                    "columns": v.columns,
148                    "role": v.role,
149                }
150                for k, v in self.tables.items()
151            },
152            "table_edges": [
153                {"source": e.source, "target": e.target} for e in self.table_edges
154            ],
155            "column_edges": [
156                {
157                    "source_table": e.source_table,
158                    "source_column": e.source_column,
159                    "target_table": e.target_table,
160                    "target_column": e.target_column,
161                    "transformation": e.transformation,
162                }
163                for e in self.column_edges
164            ],
165        }
def summary(self) -> dict:
167    def summary(self) -> dict:
168        return {
169            "total_tables": len(self.tables),
170            "total_table_edges": len(self.table_edges),
171            "total_column_edges": len(self.column_edges),
172            "root_sources": [t for t in self.tables if t not in self._upstream],
173            "leaf_sinks": [t for t in self.tables if t not in self._downstream],
174        }
def build_lineage_graph( tables: list[dict], table_edges: list[dict], column_edges: list[dict]) -> LineageGraph:
195def build_lineage_graph(
196    tables: list[dict],
197    table_edges: list[dict],
198    column_edges: list[dict],
199) -> LineageGraph:
200    """
201    Build a LineageGraph from plain dicts.
202
203    tables      — [{"full_name": str, "columns": [{name, type, nullable}], ...}]
204    table_edges — [{"source": str, "target": str}]
205    column_edges — [{"source_table", "source_column", "target_table", "target_column"}]
206    """
207    nodes: dict[str, TableNode] = {}
208    for t in tables:
209        full = t["full_name"]
210        parts = full.split(".")
211        cat = parts[0] if len(parts) >= 3 else ""
212        sch = parts[1] if len(parts) >= 3 else (parts[0] if len(parts) == 2 else "")
213        tbl = parts[-1]
214        nodes[full] = TableNode(
215            full_name=full,
216            catalog=cat,
217            schema_name=sch,
218            table=tbl,
219            columns=t.get("columns", []),
220            role=t.get("role", "unknown"),
221        )
222
223    t_edges = [TableEdge(e["source"], e["target"]) for e in table_edges]
224    c_edges = [
225        ColumnEdge(
226            source_table=e["source_table"],
227            source_column=e["source_column"],
228            target_table=e["target_table"],
229            target_column=e["target_column"],
230            transformation=e.get("transformation"),
231        )
232        for e in column_edges
233    ]
234    return LineageGraph(nodes, t_edges, c_edges)

Build a LineageGraph from plain dicts.

tables — [{"full_name": str, "columns": [{name, type, nullable}], ...}] table_edges — [{"source": str, "target": str}] column_edges — [{"source_table", "source_column", "target_table", "target_column"}]

def fetch_uc_lineage(table: str, workspace_url: str, token: str, depth: int = 2) -> dict:
237def fetch_uc_lineage(
238    table: str,
239    workspace_url: str,
240    token: str,
241    depth: int = 2,
242) -> dict:
243    """
244    Fetch table-level and column-level lineage from Unity Catalog REST API.
245
246    Returns a dict compatible with build_lineage_graph().
247    Requires workspace_url (https://...) and a Databricks PAT.
248    """
249    try:
250        import requests
251    except ImportError:
252        raise RuntimeError("requests is required: pip install requests")
253
254    headers = {"Authorization": f"Bearer {token}"}
255    base = workspace_url.rstrip("/")
256
257    visited_tables: set[str] = set()
258    table_edges: list[dict] = []
259    column_edges: list[dict] = []
260    queue = deque([table])
261    visited_tables.add(table)
262
263    for _ in range(depth):
264        next_queue: deque = deque()
265        while queue:
266            t = queue.popleft()
267            resp = requests.get(
268                f"{base}/api/2.0/lineage-tracking/table-lineages",
269                headers=headers,
270                params={"table_name": t},
271                timeout=15,
272            )
273            if resp.status_code != 200:
274                continue
275            data = resp.json()
276            for up in data.get("upstream_tables", []):
277                src = up.get("name", "")
278                if src and src not in visited_tables:
279                    visited_tables.add(src)
280                    table_edges.append({"source": src, "target": t})
281                    next_queue.append(src)
282            for down in data.get("downstream_tables", []):
283                tgt = down.get("name", "")
284                if tgt and tgt not in visited_tables:
285                    visited_tables.add(tgt)
286                    table_edges.append({"source": t, "target": tgt})
287                    next_queue.append(tgt)
288        queue = next_queue
289
290    # Column lineage for the root table
291    col_resp = requests.get(
292        f"{base}/api/2.0/lineage-tracking/column-lineages",
293        headers=headers,
294        params={"table_name": table},
295        timeout=15,
296    )
297    if col_resp.status_code == 200:
298        for col_data in col_resp.json().get("column_lineage", []):
299            tgt_col = col_data.get("name", "")
300            for up in col_data.get("upstream_columns", []):
301                column_edges.append({
302                    "source_table": up.get("table_name", ""),
303                    "source_column": up.get("name", ""),
304                    "target_table": table,
305                    "target_column": tgt_col,
306                })
307
308    tables_list = [{"full_name": t, "columns": []} for t in visited_tables]
309    return {
310        "tables": tables_list,
311        "table_edges": table_edges,
312        "column_edges": column_edges,
313    }

Fetch table-level and column-level lineage from Unity Catalog REST API.

Returns a dict compatible with build_lineage_graph(). Requires workspace_url (https://...) and a Databricks PAT.

def parse_table_lineage(sql: str, dialect: str = 'spark') -> dict:
22def parse_table_lineage(sql: str, dialect: str = "spark") -> dict:
23    """
24    Extract table-level lineage from a SQL statement.
25
26    Returns:
27        {
28          "target": str | None,       # the table being written to
29          "sources": [str, ...],      # tables being read from
30          "type": "ctas"|"insert"|"select"|"unknown"
31        }
32    """
33    sg = _sqlglot()
34    exp = sg.exp
35
36    try:
37        stmt = sg.parse_one(sql, dialect=dialect)
38    except Exception:
39        return {"target": None, "sources": [], "type": "unknown"}
40
41    def _full(tbl) -> str:
42        parts = [p for p in (tbl.catalog, tbl.db, tbl.name) if p]
43        return ".".join(parts) if parts else (tbl.name or "")
44
45    def _table_names(node) -> list[str]:
46        return [_full(t) for t in node.find_all(exp.Table) if t.name]
47
48    if isinstance(stmt, exp.Create):
49        tbl = stmt.find(exp.Table)
50        target_full = _full(tbl) if tbl else None
51        target_short = tbl.name if tbl else None
52        all_names = _table_names(stmt)
53        sources = [n for n in all_names if n != target_full]
54        return {"target": target_short, "sources": list(dict.fromkeys(sources)), "type": "ctas"}
55
56    if isinstance(stmt, exp.Insert):
57        tbl = stmt.find(exp.Table)
58        target_full = _full(tbl) if tbl else None
59        target_short = tbl.name if tbl else None
60        inner = stmt.find(sg.exp.Select)
61        if inner:
62            sources = [n for n in _table_names(inner) if n != target_full]
63        else:
64            sources = []
65        return {"target": target_short, "sources": list(dict.fromkeys(sources)), "type": "insert"}
66
67    if isinstance(stmt, (exp.Select, exp.Subquery)):
68        sources = list(dict.fromkeys(_table_names(stmt)))
69        return {"target": None, "sources": sources, "type": "select"}
70
71    return {"target": None, "sources": [], "type": "unknown"}

Extract table-level lineage from a SQL statement.

Returns:

{ "target": str | None, # the table being written to "sources": [str, ...], # tables being read from "type": "ctas"|"insert"|"select"|"unknown" }

def parse_column_lineage(sql: str, target_table: str, dialect: str = 'spark') -> list[dict]:
 74def parse_column_lineage(
 75    sql: str,
 76    target_table: str,
 77    dialect: str = "spark",
 78) -> list[dict]:
 79    """
 80    Extract column-level lineage from a SQL statement.
 81
 82    Returns list of:
 83        {
 84          "target_column": str,
 85          "source_table": str | None,
 86          "source_column": str | None,
 87          "expression": str | None,   # for computed columns
 88        }
 89
 90    Only handles direct column references. Complex expressions
 91    (aggregations, UDFs) are returned with expression set to the SQL text.
 92    """
 93    sg = _sqlglot()
 94    exp = sg.exp
 95
 96    try:
 97        stmt = sg.parse_one(sql, dialect=dialect)
 98    except Exception:
 99        return []
100
101    # Unwrap CREATE TABLE AS SELECT / INSERT INTO SELECT
102    select = stmt.find(exp.Select)
103    if select is None:
104        if isinstance(stmt, exp.Select):
105            select = stmt
106        else:
107            return []
108
109    # Build alias map: alias → real table name
110    alias_map: dict[str, str] = {}
111    for from_expr in select.find_all(exp.From):
112        tbl = from_expr.find(exp.Table)
113        if tbl:
114            alias_map[tbl.alias or tbl.name] = tbl.name
115
116    for join in select.find_all(exp.Join):
117        tbl = join.find(exp.Table)
118        if tbl:
119            alias_map[tbl.alias or tbl.name] = tbl.name
120
121    result = []
122    for sel in select.selects:
123        alias = sel.alias or (sel.name if hasattr(sel, "name") else None)
124        target_col = alias or str(sel)
125
126        if isinstance(sel, (exp.Column, exp.Alias)):
127            col_node = sel.find(exp.Column) if isinstance(sel, exp.Alias) else sel
128            if col_node:
129                tbl_alias = (
130                    col_node.table if hasattr(col_node, "table") else None
131                )
132                src_tbl = alias_map.get(tbl_alias, tbl_alias) if tbl_alias else None
133                src_col = col_node.name if hasattr(col_node, "name") else None
134                result.append({
135                    "target_table": target_table,
136                    "target_column": target_col,
137                    "source_table": src_tbl,
138                    "source_column": src_col,
139                    "expression": None,
140                })
141            else:
142                result.append({
143                    "target_table": target_table,
144                    "target_column": target_col,
145                    "source_table": None,
146                    "source_column": None,
147                    "expression": str(sel),
148                })
149        else:
150            result.append({
151                "target_table": target_table,
152                "target_column": target_col,
153                "source_table": None,
154                "source_column": None,
155                "expression": str(sel),
156            })
157
158    return result

Extract column-level lineage from a SQL statement.

Returns list of:

{ "target_column": str, "source_table": str | None, "source_column": str | None, "expression": str | None, # for computed columns }

Only handles direct column references. Complex expressions (aggregations, UDFs) are returned with expression set to the SQL text.

def parse_notebook_lineage(sql_cells: list[str], dialect: str = 'spark') -> dict:
161def parse_notebook_lineage(sql_cells: list[str], dialect: str = "spark") -> dict:
162    """
163    Parse multiple SQL cells from a notebook and build combined lineage.
164
165    Returns:
166        {
167          "table_edges": [{"source": str, "target": str}, ...],
168          "column_edges": [{...}, ...],
169          "statements": int,
170          "parsed": int,
171        }
172    """
173    table_edges: list[dict] = []
174    column_edges: list[dict] = []
175    parsed = 0
176
177    for cell in sql_cells:
178        cell = cell.strip()
179        if not cell:
180            continue
181        tl = parse_table_lineage(cell, dialect=dialect)
182        if tl["target"] and tl["sources"]:
183            parsed += 1
184            for src in tl["sources"]:
185                table_edges.append({"source": src, "target": tl["target"]})
186            cl = parse_column_lineage(cell, tl["target"], dialect=dialect)
187            for ce in cl:
188                if ce["source_table"] and ce["source_column"]:
189                    column_edges.append({
190                        "source_table": ce["source_table"],
191                        "source_column": ce["source_column"],
192                        "target_table": ce["target_table"],
193                        "target_column": ce["target_column"],
194                        "transformation": ce.get("expression"),
195                    })
196
197    return {
198        "table_edges": table_edges,
199        "column_edges": column_edges,
200        "statements": len(sql_cells),
201        "parsed": parsed,
202    }

Parse multiple SQL cells from a notebook and build combined lineage.

Returns:

{ "table_edges": [{"source": str, "target": str}, ...], "column_edges": [{...}, ...], "statements": int, "parsed": int, }

def classify_table( full_name: str, columns: list[dict], n_upstream: int = 0, n_downstream: int = 0) -> tuple[str, float]:
 64def classify_table(
 65    full_name: str,
 66    columns: list[dict],
 67    n_upstream: int = 0,
 68    n_downstream: int = 0,
 69) -> tuple[str, float]:
 70    """
 71    Classify a table's role.
 72
 73    Returns (role: str, confidence: float).
 74
 75    confidence is in [0.0, 1.0]:
 76      >= 0.85 → strong signal (name prefix, junction shape)
 77      0.60–0.84 → moderate signal (position in lineage + shape)
 78      < 0.60 → weak / unknown
 79    """
 80    name = _name_lower(full_name)
 81    n_cols = len(columns)
 82    n_fk = count_fk_columns(columns)
 83    has_pk = has_primary_key(columns)
 84
 85    # ── Staging ──
 86    if _starts_with_any(name, _STAGING_PREFIXES):
 87        return "staging", 0.90
 88
 89    # ── Aggregation ──
 90    if _ends_with_any(name, _AGG_SUFFIXES):
 91        return "aggregation", 0.90
 92    if _starts_with_any(name, _FACT_PREFIXES) and n_upstream > 0:
 93        return "aggregation", 0.75
 94
 95    # ── Dimension / Entity ──
 96    if _starts_with_any(name, _DIMENSION_PREFIXES):
 97        return "entity", 0.90
 98
 99    # ── Junction ──
100    if _ends_with_any(name, _JUNCTION_SUFFIXES):
101        return "junction", 0.88
102    if n_cols >= 2 and n_fk >= 2 and n_fk / max(n_cols, 1) >= 0.6:
103        # Mostly FK columns → junction/bridge table
104        return "junction", 0.80
105
106    # ── Entity ──
107    # Root source with a PK and meaningful columns
108    if n_upstream == 0 and has_pk and n_cols >= 3:
109        return "entity", 0.82
110    if n_upstream == 0 and n_cols >= 5:
111        return "entity", 0.65
112
113    # ── Fact ──
114    # Has upstream (transformed from somewhere) + FK columns
115    if n_upstream >= 1 and n_fk >= 1 and n_downstream >= 1:
116        return "fact", 0.70
117    if n_upstream >= 1 and n_fk >= 2:
118        return "fact", 0.65
119
120    # ── Aggregation by position ──
121    if n_upstream >= 2 and n_downstream == 0:
122        return "aggregation", 0.60
123
124    return "unknown", 0.40

Classify a table's role.

Returns (role: str, confidence: float).

confidence is in [0.0, 1.0]:

= 0.85 → strong signal (name prefix, junction shape) 0.60–0.84 → moderate signal (position in lineage + shape) < 0.60 → weak / unknown

def classify_all( tables: dict, upstream_counts: dict[str, int], downstream_counts: dict[str, int]) -> dict[str, tuple[str, float]]:
127def classify_all(
128    tables: dict,   # {full_name: {"columns": [...], "role": ...}}
129    upstream_counts: dict[str, int],
130    downstream_counts: dict[str, int],
131) -> dict[str, tuple[str, float]]:
132    """
133    Classify every table in the graph.
134
135    Returns {full_name: (role, confidence)}.
136    """
137    return {
138        name: classify_table(
139            name,
140            info.get("columns", []),
141            upstream_counts.get(name, 0),
142            downstream_counts.get(name, 0),
143        )
144        for name, info in tables.items()
145    }

Classify every table in the graph.

Returns {full_name: (role, confidence)}.

def launch():
 54def launch():
 55    try:
 56        import ipywidgets as w
 57        from IPython.display import display
 58    except ImportError:
 59        raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets")
 60
 61    import dashui
 62
 63    # ── SQL parser ────────────────────────────────────────────────────────────
 64    sql_input = w.Textarea(
 65        description="SQL:",
 66        placeholder="Paste CREATE TABLE AS SELECT or INSERT INTO SELECT ...",
 67        layout=w.Layout(width="100%", height="120px"),
 68    )
 69    dialect_toggle = w.ToggleButtons(
 70        options=["spark", "snowflake", "bigquery", "trino"],
 71        description="Dialect:",
 72        value="spark",
 73    )
 74    parse_btn = dashui.action_button("Parse Lineage from SQL", style="info")
 75    parse_output = dashui.output_panel()
 76
 77    def on_parse(b):
 78        with parse_output:
 79            parse_output.clear_output()
 80            sql = sql_input.value.strip()
 81            if not sql:
 82                print("Warning: paste a SQL statement above")
 83                return
 84            try:
 85                from dashgov.parser import parse_table_lineage, parse_column_lineage
 86                tl = parse_table_lineage(sql, dialect=dialect_toggle.value)
 87                print(f"Type    : {tl['type']}")
 88                print(f"Target  : {tl['target'] or '—'}")
 89                print(f"Sources : {', '.join(tl['sources']) or '—'}")
 90                if tl["target"]:
 91                    cl = parse_column_lineage(sql, tl["target"], dialect=dialect_toggle.value)
 92                    if cl:
 93                        print("\nColumn lineage:")
 94                        for c in cl:
 95                            src = (
 96                                f"{c['source_table']}.{c['source_column']}"
 97                                if c["source_table"] else c.get("expression", "?")
 98                            )
 99                            print(f"  {src:40s}{c['target_column']}")
100            except Exception as e:
101                print(f"Error: {e}")
102
103    parse_btn.on_click(on_parse)
104
105    # ── UC live lineage ───────────────────────────────────────────────────────
106    uc_workspace = w.Text(
107        description="Workspace URL:",
108        placeholder="https://adb-xxx.azuredatabricks.net",
109    )
110    uc_token = w.Password(description="Token:", placeholder="dapixxxxxxxx")
111    uc_table = w.Text(description="Table:", placeholder="catalog.schema.table")
112    uc_depth = w.IntSlider(description="Depth:", value=2, min=1, max=5)
113    uc_btn = dashui.action_button("Fetch UC Lineage", style="success")
114    uc_output = dashui.output_panel()
115    lineage_viz = w.HTML(value="")
116
117    def on_uc_fetch(b):
118        with uc_output:
119            uc_output.clear_output()
120            url = uc_workspace.value.strip()
121            tok = uc_token.value.strip()
122            tbl = uc_table.value.strip()
123            if not (url and tok and tbl):
124                print("Warning: fill in workspace URL, token, and table name")
125                return
126            try:
127                from dashgov.lineage import fetch_uc_lineage, build_lineage_graph
128                raw = fetch_uc_lineage(tbl, url, tok, depth=uc_depth.value)
129                graph = build_lineage_graph(
130                    raw["tables"], raw["table_edges"], raw["column_edges"]
131                )
132                s = graph.summary()
133                print(f"Tables : {s['total_tables']}")
134                print(f"Edges  : {s['total_table_edges']} table, {s['total_column_edges']} column")
135                print(f"Roots  : {', '.join(s['root_sources']) or '—'}")
136                print(f"Sinks  : {', '.join(s['leaf_sinks']) or '—'}")
137                lineage_viz.value = _lineage_html(graph.to_dict(), focus_table=tbl)
138                imp = graph.impact_analysis(tbl)
139                if imp["all_downstream"]:
140                    print(f"\nImpact if {tbl} changes:")
141                    for t in imp["all_downstream"]:
142                        print(f"  ↓ {t}")
143            except Exception as e:
144                print(f"Error: {e}")
145
146    uc_btn.on_click(on_uc_fetch)
147
148    ui = dashui.card([
149        dashui.header("DashGov — Data Lineage & Governance", library="dashgov"),
150
151        dashui.section("Step 1: Parse lineage from SQL"),
152        dashui.html(
153            "<div style='font-size:12px;color:#666;margin-bottom:4px'>"
154            "Paste a CREATE TABLE AS SELECT or INSERT INTO SELECT to extract "
155            "table and column lineage without a UC connection.</div>"
156        ),
157        sql_input, dialect_toggle, parse_btn, parse_output,
158
159        dashui.section("Step 2: Fetch live lineage from Unity Catalog"),
160        dashui.html(
161            "<div style='font-size:12px;color:#666;margin-bottom:4px'>"
162            "Requires a Databricks workspace URL and personal access token.</div>"
163        ),
164        w.HBox([uc_workspace, uc_token]),
165        w.HBox([uc_table, uc_depth]),
166        uc_btn, uc_output, lineage_viz,
167    ])
168    display(ui)