dashingest
DashIngest — ADF-style data ingestion for Databricks: pick a source kind (Volume, ADLS, S3, DBFS, Database, REST API), fill a few plain fields, run.
1"""DashIngest — ADF-style data ingestion for Databricks: pick a source kind 2(Volume, ADLS, S3, DBFS, Database, REST API), fill a few plain fields, run.""" 3from dashingest.connectors import ( 4 ADLSSource, 5 DatabaseSource, 6 DBFSSource, 7 IngestTarget, 8 RestApiSource, 9 S3Source, 10 VolumeSource, 11 build_jdbc_url, 12 infer_format_from_path, 13 resolve_path, 14) 15from dashingest.ingestor import ConnectionTestResult, IngestResult, preview, run_ingestion, test_connection 16from dashingest.readers import ( 17 AvroReaderOptions, 18 CsvReaderOptions, 19 ExcelReaderOptions, 20 JsonReaderOptions, 21 OrcReaderOptions, 22 ParquetReaderOptions, 23 TextReaderOptions, 24) 25from dashingest.ui import launch 26 27__version__ = "0.1.1" 28__all__ = [ 29 "ADLSSource", 30 "AvroReaderOptions", 31 "ConnectionTestResult", 32 "CsvReaderOptions", 33 "DBFSSource", 34 "DatabaseSource", 35 "ExcelReaderOptions", 36 "IngestResult", 37 "IngestTarget", 38 "JsonReaderOptions", 39 "OrcReaderOptions", 40 "ParquetReaderOptions", 41 "RestApiSource", 42 "S3Source", 43 "TextReaderOptions", 44 "VolumeSource", 45 "build_jdbc_url", 46 "infer_format_from_path", 47 "launch", 48 "preview", 49 "resolve_path", 50 "run_ingestion", 51 "test_connection", 52]
35@dataclass 36class ADLSSource: 37 """Azure Data Lake Storage Gen2 — abfss://<container>@<account>.dfs.core.windows.net/<path>.""" 38 storage_account: str 39 container: str 40 path: str = "" 41 file_format: str | None = None 42 reader_options: Any = None 43 options: dict = field(default_factory=dict)
Azure Data Lake Storage Gen2 — abfss://
28@dataclass 29class CsvReaderOptions: 30 header: bool = True 31 infer_schema: bool = True 32 delimiter: str = "," 33 quote_char: str = '"' 34 escape_char: str = "\\" 35 encoding: str = "UTF-8" 36 null_value: str = "" 37 date_format: str = "" 38 timestamp_format: str = "" 39 multiline: bool = False 40 comment_char: str = "" 41 # PERMISSIVE keeps malformed rows (nulled out) instead of dropping/failing. 42 parse_mode: str = "PERMISSIVE" # PERMISSIVE | DROPMALFORMED | FAILFAST
56@dataclass 57class DBFSSource: 58 """Legacy DBFS mount or path — dbfs:/<path>.""" 59 path: str 60 file_format: str | None = None 61 reader_options: Any = None 62 options: dict = field(default_factory=dict)
Legacy DBFS mount or path — dbfs:/
74@dataclass 75class DatabaseSource: 76 """A relational database table or query. 77 78 Set `engine` + `host` + `database` for a known engine (builds the JDBC 79 URL/driver for you), or set `url`/`driver` directly for anything else. 80 Set exactly one of `table` or `query`. 81 82 Advanced (all optional): `fetch_size` batches network round-trips; 83 `num_partitions` + `partition_column` + `lower_bound` + `upper_bound` 84 split a large table into parallel reads (all four must be set together); 85 `ssl` sets the common per-engine SSL flag; `connection_properties` is a 86 raw escape hatch for anything else the JDBC driver accepts. 87 """ 88 host: str = "" 89 database: str = "" 90 engine: str = "postgresql" # postgresql | mysql | sqlserver | oracle | snowflake 91 port: int | None = None 92 table: str = "" 93 query: str = "" 94 user: str = "" 95 password: str = "" 96 url: str = "" # overrides the engine preset if set 97 driver: str = "" # overrides the engine preset if set 98 fetch_size: int | None = None 99 num_partitions: int | None = None 100 partition_column: str = "" 101 lower_bound: int | None = None 102 upper_bound: int | None = None 103 ssl: bool = False 104 connection_properties: dict = field(default_factory=dict) 105 106 @property 107 def is_partitioned(self) -> bool: 108 return bool(self.num_partitions and self.partition_column and self.lower_bound is not None and self.upper_bound is not None)
A relational database table or query.
Set engine + host + database for a known engine (builds the JDBC
URL/driver for you), or set url/driver directly for anything else.
Set exactly one of table or query.
Advanced (all optional): fetch_size batches network round-trips;
num_partitions + partition_column + lower_bound + upper_bound
split a large table into parallel reads (all four must be set together);
ssl sets the common per-engine SSL flag; connection_properties is a
raw escape hatch for anything else the JDBC driver accepts.
57@dataclass 58class ExcelReaderOptions: 59 """Comprehensive spark-excel option coverage — the format that actually 60 needs it, since a raw file path alone doesn't tell Spark which sheet, 61 where the header row is, or whether the workbook is password-protected.""" 62 sheet_name: str = "0" # sheet name, or 0-based index as a string 63 header: bool = True 64 header_row: int = 0 # 0-indexed row the header lives on (title rows above it are skipped) 65 data_address: str = "" # explicit cell range, e.g. "'Sheet1'!B2:F100" — overrides sheet_name/header_row 66 infer_schema: bool = True 67 treat_empty_as_null: bool = True 68 date_format: str = "" 69 timestamp_format: str = "" 70 max_rows_in_memory: int | None = None # set for large .xlsx files to stream instead of loading fully 71 workbook_password: str = "" 72 # Read and union multiple named sheets with matching schemas into one 73 # DataFrame, instead of a single sheet_name/data_address. 74 sheet_names: list[str] = field(default_factory=list)
Comprehensive spark-excel option coverage — the format that actually needs it, since a raw file path alone doesn't tell Spark which sheet, where the header row is, or whether the workbook is password-protected.
142@dataclass 143class IngestTarget: 144 table: str 145 write_mode: str = "append" # append | overwrite | merge 146 schema_evolution: bool = True 147 merge_keys: list[str] = field(default_factory=list)
45@dataclass 46class JsonReaderOptions: 47 # False = one JSON object per line (JSON Lines); True = pretty-printed / 48 # multi-line records, which Spark can't split on newlines. 49 multiline: bool = False 50 infer_schema: bool = True 51 date_format: str = "" 52 timestamp_format: str = "" 53 parse_mode: str = "PERMISSIVE" 54 primitives_as_string: bool = False
111@dataclass 112class RestApiSource: 113 """A JSON REST API. `json_path` is a dot-path to the records array/dict 114 if the payload wraps it (e.g. "data.items"); leave empty for a bare array. 115 116 Advanced (all optional): `auth_type` in none|bearer|api_key|basic sets 117 how credentials are attached. `pagination` in none|page_param|cursor 118 follows multiple pages automatically, up to `max_pages`. 119 """ 120 url: str 121 headers: dict = field(default_factory=dict) 122 params: dict = field(default_factory=dict) 123 json_path: str = "" 124 timeout_seconds: int = 30 125 126 auth_type: str = "none" # none | bearer | api_key | basic 127 bearer_token: str = "" 128 api_key_header: str = "X-API-Key" 129 api_key: str = "" 130 basic_user: str = "" 131 basic_password: str = "" 132 133 pagination: str = "none" # none | page_param | cursor 134 page_param: str = "page" 135 page_size_param: str = "" 136 page_size: int = 0 137 max_pages: int = 20 138 cursor_param: str = "cursor" 139 cursor_json_path: str = "" # dot-path in the response to the next page's cursor value
A JSON REST API. json_path is a dot-path to the records array/dict
if the payload wraps it (e.g. "data.items"); leave empty for a bare array.
Advanced (all optional): auth_type in none|bearer|api_key|basic sets
how credentials are attached. pagination in none|page_param|cursor
follows multiple pages automatically, up to max_pages.
46@dataclass 47class S3Source: 48 """Amazon S3 — s3://<bucket>/<path>.""" 49 bucket: str 50 path: str = "" 51 file_format: str | None = None 52 reader_options: Any = None 53 options: dict = field(default_factory=dict)
Amazon S3 — s3://
23@dataclass 24class VolumeSource: 25 """A Unity Catalog Volume — /Volumes/<catalog>/<schema>/<volume>/<path>.""" 26 catalog: str 27 schema_name: str 28 volume: str 29 path: str = "" 30 file_format: str | None = None # inferred from path extension if omitted 31 reader_options: Any = None # a CsvReaderOptions/ExcelReaderOptions/... — defaults used if unset 32 options: dict = field(default_factory=dict) # raw Spark options, applied on top of reader_options
A Unity Catalog Volume — /Volumes/
190def build_jdbc_url(source: DatabaseSource) -> str: 191 if source.url: 192 return source.url 193 if source.engine not in JDBC_PRESETS: 194 raise ValueError(f"Unknown engine {source.engine!r}; set url/driver directly for custom engines.") 195 preset = JDBC_PRESETS[source.engine] 196 port = source.port or preset["default_port"] 197 return preset["url"].format(host=source.host, port=port, database=source.database)
7def launch(): 8 try: 9 import ipywidgets as w 10 from IPython.display import display 11 except ImportError: 12 raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets") from None 13 14 import dashui 15 16 kind_toggle = w.ToggleButtons( 17 options=["Databricks Volume", "ADLS Gen2", "Amazon S3", "DBFS", "Database", "REST API"], 18 description="Source:", 19 ) 20 21 # ── Databricks Volume ─────────────────────────────────────────────────── 22 vol_catalog = w.Text(description="Catalog:") 23 vol_schema = w.Text(description="Schema:") 24 vol_volume = w.Text(description="Volume:") 25 vol_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)") 26 vol_box = w.VBox([w.HBox([vol_catalog, vol_schema, vol_volume]), vol_path]) 27 28 # ── ADLS Gen2 ──────────────────────────────────────────────────────────── 29 adls_account = w.Text(description="Storage account:") 30 adls_container = w.Text(description="Container:") 31 adls_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)") 32 adls_box = w.VBox([w.HBox([adls_account, adls_container]), adls_path]) 33 34 # ── Amazon S3 ──────────────────────────────────────────────────────────── 35 s3_bucket = w.Text(description="Bucket:") 36 s3_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)") 37 s3_box = w.VBox([s3_bucket, s3_path]) 38 39 # ── DBFS ───────────────────────────────────────────────────────────────── 40 dbfs_path = w.Text(description="Path:", placeholder="folder/file.csv") 41 dbfs_box = w.VBox([dbfs_path]) 42 43 # ── Database ───────────────────────────────────────────────────────────── 44 db_engine = w.Dropdown(options=["postgresql", "mysql", "sqlserver", "oracle", "snowflake"], description="Engine:") 45 db_host = w.Text(description="Host:") 46 db_database = w.Text(description="Database:") 47 db_port = w.Text(description="Port:", placeholder="default for engine") 48 db_table = w.Text(description="Table:", placeholder="schema.table") 49 db_query = w.Text(description="or Query:", placeholder="SELECT ... (instead of table)") 50 db_user = w.Text(description="User:") 51 db_password = w.Password(description="Password:") 52 53 db_ssl = w.Checkbox(value=False, description="Use SSL") 54 db_fetch_size = w.IntText(description="Fetch size:", value=0, layout=w.Layout(width="180px")) 55 db_num_partitions = w.IntText(description="Partitions:", value=0, layout=w.Layout(width="180px")) 56 db_partition_col = w.Text(description="Partition col:", placeholder="id") 57 db_lower_bound = w.IntText(description="Lower bound:", layout=w.Layout(width="180px")) 58 db_upper_bound = w.IntText(description="Upper bound:", layout=w.Layout(width="180px")) 59 db_props_table = dashui.editable_table(["Property", "Value"], placeholders={"Property": "sslmode", "Value": "require"}) 60 db_advanced = w.Accordion(children=[w.VBox([ 61 db_ssl, 62 w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Parallel read (set all four to split a large table across partitions)</div>"), 63 w.HBox([db_num_partitions, db_partition_col]), 64 w.HBox([db_lower_bound, db_upper_bound]), 65 db_fetch_size, 66 w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Extra JDBC connection properties</div>"), 67 db_props_table.widget, 68 ])]) 69 db_advanced.set_title(0, "Advanced") 70 db_advanced.selected_index = None 71 72 db_box = w.VBox([ 73 w.HBox([db_engine, db_host, db_port]), 74 db_database, db_table, db_query, 75 w.HBox([db_user, db_password]), 76 db_advanced, 77 ]) 78 79 # ── REST API ───────────────────────────────────────────────────────────── 80 api_url = w.Text(description="URL:", placeholder="https://api.example.com/records") 81 api_json_path = w.Text(description="JSON path:", placeholder="data.items (optional)") 82 83 api_auth_type = w.Dropdown(options=["none", "bearer", "api_key", "basic"], description="Auth:") 84 api_bearer_token = w.Password(description="Token:", disabled=True) 85 api_key_header = w.Text(description="Header name:", value="X-API-Key", disabled=True) 86 api_key_value = w.Password(description="Key:", disabled=True) 87 api_basic_user = w.Text(description="User:", disabled=True) 88 api_basic_password = w.Password(description="Password:", disabled=True) 89 api_auth_box = w.VBox([api_bearer_token]) 90 91 def on_auth_type_change(change): 92 api_auth_box.children = { 93 "bearer": [api_bearer_token], 94 "api_key": [api_key_header, api_key_value], 95 "basic": [api_basic_user, api_basic_password], 96 }.get(change["new"], []) 97 98 api_auth_type.observe(on_auth_type_change, names="value") 99 100 api_pagination = w.Dropdown(options=["none", "page_param", "cursor"], description="Pagination:") 101 api_page_param = w.Text(description="Page param:", value="page", disabled=True) 102 api_max_pages = w.IntText(description="Max pages:", value=20, disabled=True) 103 api_cursor_param = w.Text(description="Cursor param:", value="cursor", disabled=True) 104 api_cursor_json_path = w.Text(description="Cursor JSON path:", placeholder="meta.next_cursor", disabled=True) 105 api_pagination_box = w.VBox([]) 106 107 def on_pagination_change(change): 108 for field in (api_page_param, api_max_pages, api_cursor_param, api_cursor_json_path): 109 field.disabled = True 110 if change["new"] == "page_param": 111 api_page_param.disabled = api_max_pages.disabled = False 112 api_pagination_box.children = [api_page_param, api_max_pages] 113 elif change["new"] == "cursor": 114 api_cursor_param.disabled = api_cursor_json_path.disabled = api_max_pages.disabled = False 115 api_pagination_box.children = [api_cursor_param, api_cursor_json_path, api_max_pages] 116 else: 117 api_pagination_box.children = [] 118 119 api_pagination.observe(on_pagination_change, names="value") 120 121 api_headers_table = dashui.editable_table(["Header", "Value"], placeholders={"Header": "Accept", "Value": "application/json"}) 122 api_params_table = dashui.editable_table(["Param", "Value"]) 123 124 api_advanced = w.Accordion(children=[w.VBox([ 125 api_auth_type, api_auth_box, 126 api_pagination, api_pagination_box, 127 w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Headers</div>"), 128 api_headers_table.widget, 129 w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Query params</div>"), 130 api_params_table.widget, 131 ])]) 132 api_advanced.set_title(0, "Advanced") 133 api_advanced.selected_index = None 134 135 api_box = w.VBox([api_url, api_json_path, api_advanced]) 136 137 # ── File format (path-based sources only) ─────────────────────────────── 138 file_format = w.Dropdown( 139 options=["(infer from path)", "csv", "json", "parquet", "excel", "avro", "orc", "text"], 140 description="Format:", 141 ) 142 143 csv_delimiter = w.Text(description="Delimiter:", value=",") 144 csv_header = w.Checkbox(value=True, description="Has header row") 145 csv_null_value = w.Text(description="Null marker:", placeholder="e.g. NA (optional)") 146 csv_box = w.VBox([w.HBox([csv_delimiter, csv_header]), csv_null_value]) 147 148 # Excel options — the format that actually needs this much configuration 149 xl_sheet = w.Text(description="Sheet:", value="0", placeholder="name or 0-based index") 150 xl_header_row = w.IntText(description="Header row:", value=0, min=0) 151 xl_header = w.Checkbox(value=True, description="Has header row") 152 xl_password = w.Password(description="Password:", placeholder="if protected (optional)") 153 xl_sheets_to_union = w.Text(description="Union sheets:", placeholder="Jan, Feb, Mar (optional — reads+stacks multiple sheets)") 154 excel_box = w.VBox([ 155 w.HBox([xl_sheet, xl_header_row, xl_header]), 156 xl_password, xl_sheets_to_union, 157 ]) 158 159 format_options_panel = w.VBox([]) 160 161 def on_format_change(change): 162 format_options_panel.children = {"csv": [csv_box], "excel": [excel_box]}.get(change["new"], []) 163 164 file_format.observe(on_format_change, names="value") 165 166 source_panel = w.VBox([vol_box]) 167 format_row = w.VBox([file_format, format_options_panel]) 168 169 def on_kind_change(change): 170 kind = change["new"] 171 source_panel.children = { 172 "Databricks Volume": [vol_box], 173 "ADLS Gen2": [adls_box], 174 "Amazon S3": [s3_box], 175 "DBFS": [dbfs_box], 176 "Database": [db_box], 177 "REST API": [api_box], 178 }[kind] 179 format_row.children = [] if kind in ("Database", "REST API") else [file_format, format_options_panel] 180 181 kind_toggle.observe(on_kind_change, names="value") 182 on_kind_change({"new": kind_toggle.value}) 183 184 # ── Target ─────────────────────────────────────────────────────────────── 185 target_table = w.Text(description="Target table:", placeholder="catalog.schema.table") 186 write_mode = w.ToggleButtons(options=["append", "overwrite", "merge"], description="Write mode:") 187 merge_keys = w.Text(description="Merge keys:", placeholder="id, updated_at (merge mode only)", disabled=True) 188 schema_evo = w.Checkbox(value=True, description="Allow schema evolution") 189 write_mode.observe(lambda c: setattr(merge_keys, "disabled", c["new"] != "merge"), names="value") 190 191 test_btn = dashui.action_button("Test Connection", style="info") 192 preview_btn = dashui.action_button("Preview", style="info") 193 run_btn = dashui.action_button("Run Ingestion", style="success") 194 output = dashui.output_panel() 195 196 def _build_reader_options(fmt): 197 from dashingest.readers import CsvReaderOptions, ExcelReaderOptions 198 199 if fmt == "csv": 200 return CsvReaderOptions( 201 delimiter=csv_delimiter.value or ",", 202 header=csv_header.value, 203 null_value=csv_null_value.value.strip(), 204 ) 205 if fmt == "excel": 206 sheets = [s.strip() for s in xl_sheets_to_union.value.split(",") if s.strip()] 207 return ExcelReaderOptions( 208 sheet_name=xl_sheet.value.strip() or "0", 209 header_row=xl_header_row.value, 210 header=xl_header.value, 211 workbook_password=xl_password.value, 212 sheet_names=sheets, 213 ) 214 return None 215 216 def _build_source(): 217 from dashingest.connectors import ADLSSource, DatabaseSource, DBFSSource, RestApiSource, S3Source, VolumeSource 218 219 kind = kind_toggle.value 220 fmt = None if file_format.value == "(infer from path)" else file_format.value 221 reader_opts = _build_reader_options(fmt) 222 223 if kind == "Databricks Volume": 224 return VolumeSource(vol_catalog.value.strip(), vol_schema.value.strip(), vol_volume.value.strip(), 225 vol_path.value.strip(), fmt, reader_opts) 226 if kind == "ADLS Gen2": 227 return ADLSSource(adls_account.value.strip(), adls_container.value.strip(), adls_path.value.strip(), 228 fmt, reader_opts) 229 if kind == "Amazon S3": 230 return S3Source(s3_bucket.value.strip(), s3_path.value.strip(), fmt, reader_opts) 231 if kind == "DBFS": 232 return DBFSSource(dbfs_path.value.strip(), fmt, reader_opts) 233 if kind == "Database": 234 port = int(db_port.value.strip()) if db_port.value.strip() else None 235 props = {r["Property"]: r["Value"] for r in db_props_table.values()} 236 return DatabaseSource( 237 host=db_host.value.strip(), database=db_database.value.strip(), engine=db_engine.value, 238 port=port, table=db_table.value.strip(), query=db_query.value.strip(), 239 user=db_user.value.strip(), password=db_password.value, 240 ssl=db_ssl.value, 241 fetch_size=db_fetch_size.value or None, 242 num_partitions=db_num_partitions.value or None, 243 partition_column=db_partition_col.value.strip(), 244 lower_bound=db_lower_bound.value if db_num_partitions.value else None, 245 upper_bound=db_upper_bound.value if db_num_partitions.value else None, 246 connection_properties=props, 247 ) 248 249 headers = {r["Header"]: r["Value"] for r in api_headers_table.values()} 250 params = {r["Param"]: r["Value"] for r in api_params_table.values()} 251 return RestApiSource( 252 api_url.value.strip(), headers=headers, params=params, json_path=api_json_path.value.strip(), 253 auth_type=api_auth_type.value, 254 bearer_token=api_bearer_token.value, 255 api_key_header=api_key_header.value.strip() or "X-API-Key", 256 api_key=api_key_value.value, 257 basic_user=api_basic_user.value.strip(), 258 basic_password=api_basic_password.value, 259 pagination=api_pagination.value, 260 page_param=api_page_param.value.strip() or "page", 261 max_pages=api_max_pages.value or 20, 262 cursor_param=api_cursor_param.value.strip() or "cursor", 263 cursor_json_path=api_cursor_json_path.value.strip(), 264 ) 265 266 def on_test(b): 267 with output: 268 output.clear_output() 269 try: 270 from dashingest.ingestor import test_connection 271 test_connection(_build_source()).display() 272 except Exception as e: 273 print(f"Error: {e}") 274 275 def on_preview(b): 276 with output: 277 output.clear_output() 278 try: 279 from dashingest.ingestor import preview 280 print(preview(_build_source(), limit=10)) 281 except Exception as e: 282 print(f"Error: {e}") 283 284 def on_run(b): 285 with output: 286 output.clear_output() 287 try: 288 from dashingest.connectors import IngestTarget 289 from dashingest.ingestor import run_ingestion 290 291 target = IngestTarget( 292 table=target_table.value.strip(), 293 write_mode=write_mode.value, 294 schema_evolution=schema_evo.value, 295 merge_keys=[k.strip() for k in merge_keys.value.split(",") if k.strip()], 296 ) 297 result = run_ingestion(_build_source(), target) 298 result.display() 299 except Exception as e: 300 print(f"Error: {e}") 301 302 test_btn.on_click(on_test) 303 preview_btn.on_click(on_preview) 304 run_btn.on_click(on_run) 305 306 ui = dashui.card([ 307 dashui.header("DashIngest — Data Ingestion", library="dashingest"), 308 dashui.section("Step 1: Source"), 309 kind_toggle, source_panel, format_row, 310 w.HBox([test_btn, preview_btn]), 311 dashui.section("Step 2: Target"), 312 target_table, write_mode, merge_keys, schema_evo, 313 dashui.section("Step 3: Run"), 314 run_btn, output, 315 ]) 316 display(ui)
51def preview(source, limit: int = 10): 52 """Load up to `limit` rows without writing anywhere — returns a pandas 53 DataFrame for display in a notebook cell or the UI's output panel.""" 54 from pyspark.sql import SparkSession 55 56 spark = SparkSession.getActiveSession() 57 return _load(source, spark).limit(limit).toPandas()
Load up to limit rows without writing anywhere — returns a pandas
DataFrame for display in a notebook cell or the UI's output panel.
158def resolve_path(source: PathSource) -> str: 159 if isinstance(source, VolumeSource): 160 base = f"/Volumes/{source.catalog}/{source.schema_name}/{source.volume}" 161 elif isinstance(source, ADLSSource): 162 base = f"abfss://{source.container}@{source.storage_account}.dfs.core.windows.net" 163 elif isinstance(source, S3Source): 164 base = f"s3://{source.bucket}" 165 elif isinstance(source, DBFSSource): 166 base = "dbfs:" 167 else: 168 raise TypeError(f"Not a path-based source: {type(source).__name__}") 169 170 path = source.path.lstrip("/") 171 return f"{base}/{path}" if path else base
41def run_ingestion(source, target: IngestTarget) -> IngestResult: 42 from pyspark.sql import SparkSession 43 44 spark = SparkSession.getActiveSession() 45 df = _load(source, spark) 46 _write(df, target, spark) 47 count = spark.table(target.table).count() 48 return IngestResult(target.table, count, target.write_mode)
60def test_connection(source) -> ConnectionTestResult: 61 """Check reachability/credentials without loading real data.""" 62 from pyspark.sql import SparkSession 63 64 spark = SparkSession.getActiveSession() 65 if isinstance(source, DatabaseSource): 66 return _test_database_connection(source, spark) 67 if isinstance(source, RestApiSource): 68 return _test_rest_api_connection(source) 69 return _test_path_connection(source, spark)
Check reachability/credentials without loading real data.