dashingest

DashIngest — ADF-style data ingestion for Databricks: pick a source kind (Volume, ADLS, S3, DBFS, Database, REST API), fill a few plain fields, run.

 1"""DashIngest — ADF-style data ingestion for Databricks: pick a source kind
 2(Volume, ADLS, S3, DBFS, Database, REST API), fill a few plain fields, run."""
 3from dashingest.connectors import (
 4    ADLSSource,
 5    DatabaseSource,
 6    DBFSSource,
 7    IngestTarget,
 8    RestApiSource,
 9    S3Source,
10    VolumeSource,
11    build_jdbc_url,
12    infer_format_from_path,
13    resolve_path,
14)
15from dashingest.ingestor import ConnectionTestResult, IngestResult, preview, run_ingestion, test_connection
16from dashingest.readers import (
17    AvroReaderOptions,
18    CsvReaderOptions,
19    ExcelReaderOptions,
20    JsonReaderOptions,
21    OrcReaderOptions,
22    ParquetReaderOptions,
23    TextReaderOptions,
24)
25from dashingest.ui import launch
26
27__version__ = "0.1.1"
28__all__ = [
29    "ADLSSource",
30    "AvroReaderOptions",
31    "ConnectionTestResult",
32    "CsvReaderOptions",
33    "DBFSSource",
34    "DatabaseSource",
35    "ExcelReaderOptions",
36    "IngestResult",
37    "IngestTarget",
38    "JsonReaderOptions",
39    "OrcReaderOptions",
40    "ParquetReaderOptions",
41    "RestApiSource",
42    "S3Source",
43    "TextReaderOptions",
44    "VolumeSource",
45    "build_jdbc_url",
46    "infer_format_from_path",
47    "launch",
48    "preview",
49    "resolve_path",
50    "run_ingestion",
51    "test_connection",
52]
@dataclass
class ADLSSource:
35@dataclass
36class ADLSSource:
37    """Azure Data Lake Storage Gen2 — abfss://<container>@<account>.dfs.core.windows.net/<path>."""
38    storage_account: str
39    container: str
40    path: str = ""
41    file_format: str | None = None
42    reader_options: Any = None
43    options: dict = field(default_factory=dict)

Azure Data Lake Storage Gen2 — abfss://@.dfs.core.windows.net/.

ADLSSource( storage_account: str, container: str, path: str = '', file_format: str | None = None, reader_options: Any = None, options: dict = <factory>)
storage_account: str
container: str
path: str = ''
file_format: str | None = None
reader_options: Any = None
options: dict
@dataclass
class AvroReaderOptions:
82@dataclass
83class AvroReaderOptions:
84    pass
@dataclass
class ConnectionTestResult:
32@dataclass
33class ConnectionTestResult:
34    ok: bool
35    message: str
36
37    def display(self) -> None:
38        print(f"{'OK' if self.ok else 'Error'}{self.message}")
ConnectionTestResult(ok: bool, message: str)
ok: bool
message: str
def display(self) -> None:
37    def display(self) -> None:
38        print(f"{'OK' if self.ok else 'Error'}{self.message}")
@dataclass
class CsvReaderOptions:
28@dataclass
29class CsvReaderOptions:
30    header: bool = True
31    infer_schema: bool = True
32    delimiter: str = ","
33    quote_char: str = '"'
34    escape_char: str = "\\"
35    encoding: str = "UTF-8"
36    null_value: str = ""
37    date_format: str = ""
38    timestamp_format: str = ""
39    multiline: bool = False
40    comment_char: str = ""
41    # PERMISSIVE keeps malformed rows (nulled out) instead of dropping/failing.
42    parse_mode: str = "PERMISSIVE"  # PERMISSIVE | DROPMALFORMED | FAILFAST
CsvReaderOptions( header: bool = True, infer_schema: bool = True, delimiter: str = ',', quote_char: str = '"', escape_char: str = '\\', encoding: str = 'UTF-8', null_value: str = '', date_format: str = '', timestamp_format: str = '', multiline: bool = False, comment_char: str = '', parse_mode: str = 'PERMISSIVE')
header: bool = True
infer_schema: bool = True
delimiter: str = ','
quote_char: str = '"'
escape_char: str = '\\'
encoding: str = 'UTF-8'
null_value: str = ''
date_format: str = ''
timestamp_format: str = ''
multiline: bool = False
comment_char: str = ''
parse_mode: str = 'PERMISSIVE'
@dataclass
class DBFSSource:
56@dataclass
57class DBFSSource:
58    """Legacy DBFS mount or path — dbfs:/<path>."""
59    path: str
60    file_format: str | None = None
61    reader_options: Any = None
62    options: dict = field(default_factory=dict)

Legacy DBFS mount or path — dbfs:/.

DBFSSource( path: str, file_format: str | None = None, reader_options: Any = None, options: dict = <factory>)
path: str
file_format: str | None = None
reader_options: Any = None
options: dict
@dataclass
class DatabaseSource:
 74@dataclass
 75class DatabaseSource:
 76    """A relational database table or query.
 77
 78    Set `engine` + `host` + `database` for a known engine (builds the JDBC
 79    URL/driver for you), or set `url`/`driver` directly for anything else.
 80    Set exactly one of `table` or `query`.
 81
 82    Advanced (all optional): `fetch_size` batches network round-trips;
 83    `num_partitions` + `partition_column` + `lower_bound` + `upper_bound`
 84    split a large table into parallel reads (all four must be set together);
 85    `ssl` sets the common per-engine SSL flag; `connection_properties` is a
 86    raw escape hatch for anything else the JDBC driver accepts.
 87    """
 88    host: str = ""
 89    database: str = ""
 90    engine: str = "postgresql"  # postgresql | mysql | sqlserver | oracle | snowflake
 91    port: int | None = None
 92    table: str = ""
 93    query: str = ""
 94    user: str = ""
 95    password: str = ""
 96    url: str = ""       # overrides the engine preset if set
 97    driver: str = ""    # overrides the engine preset if set
 98    fetch_size: int | None = None
 99    num_partitions: int | None = None
100    partition_column: str = ""
101    lower_bound: int | None = None
102    upper_bound: int | None = None
103    ssl: bool = False
104    connection_properties: dict = field(default_factory=dict)
105
106    @property
107    def is_partitioned(self) -> bool:
108        return bool(self.num_partitions and self.partition_column and self.lower_bound is not None and self.upper_bound is not None)

A relational database table or query.

Set engine + host + database for a known engine (builds the JDBC URL/driver for you), or set url/driver directly for anything else. Set exactly one of table or query.

Advanced (all optional): fetch_size batches network round-trips; num_partitions + partition_column + lower_bound + upper_bound split a large table into parallel reads (all four must be set together); ssl sets the common per-engine SSL flag; connection_properties is a raw escape hatch for anything else the JDBC driver accepts.

DatabaseSource( host: str = '', database: str = '', engine: str = 'postgresql', port: int | None = None, table: str = '', query: str = '', user: str = '', password: str = '', url: str = '', driver: str = '', fetch_size: int | None = None, num_partitions: int | None = None, partition_column: str = '', lower_bound: int | None = None, upper_bound: int | None = None, ssl: bool = False, connection_properties: dict = <factory>)
host: str = ''
database: str = ''
engine: str = 'postgresql'
port: int | None = None
table: str = ''
query: str = ''
user: str = ''
password: str = ''
url: str = ''
driver: str = ''
fetch_size: int | None = None
num_partitions: int | None = None
partition_column: str = ''
lower_bound: int | None = None
upper_bound: int | None = None
ssl: bool = False
connection_properties: dict
is_partitioned: bool
106    @property
107    def is_partitioned(self) -> bool:
108        return bool(self.num_partitions and self.partition_column and self.lower_bound is not None and self.upper_bound is not None)
@dataclass
class ExcelReaderOptions:
57@dataclass
58class ExcelReaderOptions:
59    """Comprehensive spark-excel option coverage — the format that actually
60    needs it, since a raw file path alone doesn't tell Spark which sheet,
61    where the header row is, or whether the workbook is password-protected."""
62    sheet_name: str = "0"          # sheet name, or 0-based index as a string
63    header: bool = True
64    header_row: int = 0            # 0-indexed row the header lives on (title rows above it are skipped)
65    data_address: str = ""         # explicit cell range, e.g. "'Sheet1'!B2:F100" — overrides sheet_name/header_row
66    infer_schema: bool = True
67    treat_empty_as_null: bool = True
68    date_format: str = ""
69    timestamp_format: str = ""
70    max_rows_in_memory: int | None = None  # set for large .xlsx files to stream instead of loading fully
71    workbook_password: str = ""
72    # Read and union multiple named sheets with matching schemas into one
73    # DataFrame, instead of a single sheet_name/data_address.
74    sheet_names: list[str] = field(default_factory=list)

Comprehensive spark-excel option coverage — the format that actually needs it, since a raw file path alone doesn't tell Spark which sheet, where the header row is, or whether the workbook is password-protected.

ExcelReaderOptions( sheet_name: str = '0', header: bool = True, header_row: int = 0, data_address: str = '', infer_schema: bool = True, treat_empty_as_null: bool = True, date_format: str = '', timestamp_format: str = '', max_rows_in_memory: int | None = None, workbook_password: str = '', sheet_names: list[str] = <factory>)
sheet_name: str = '0'
header: bool = True
header_row: int = 0
data_address: str = ''
infer_schema: bool = True
treat_empty_as_null: bool = True
date_format: str = ''
timestamp_format: str = ''
max_rows_in_memory: int | None = None
workbook_password: str = ''
sheet_names: list[str]
@dataclass
class IngestResult:
22@dataclass
23class IngestResult:
24    table: str
25    row_count: int
26    write_mode: str
27
28    def display(self) -> None:
29        print(f"Ingested into {self.table} ({self.write_mode}): {self.row_count:,} total rows")
IngestResult(table: str, row_count: int, write_mode: str)
table: str
row_count: int
write_mode: str
def display(self) -> None:
28    def display(self) -> None:
29        print(f"Ingested into {self.table} ({self.write_mode}): {self.row_count:,} total rows")
@dataclass
class IngestTarget:
142@dataclass
143class IngestTarget:
144    table: str
145    write_mode: str = "append"  # append | overwrite | merge
146    schema_evolution: bool = True
147    merge_keys: list[str] = field(default_factory=list)
IngestTarget( table: str, write_mode: str = 'append', schema_evolution: bool = True, merge_keys: list[str] = <factory>)
table: str
write_mode: str = 'append'
schema_evolution: bool = True
merge_keys: list[str]
@dataclass
class JsonReaderOptions:
45@dataclass
46class JsonReaderOptions:
47    # False = one JSON object per line (JSON Lines); True = pretty-printed /
48    # multi-line records, which Spark can't split on newlines.
49    multiline: bool = False
50    infer_schema: bool = True
51    date_format: str = ""
52    timestamp_format: str = ""
53    parse_mode: str = "PERMISSIVE"
54    primitives_as_string: bool = False
JsonReaderOptions( multiline: bool = False, infer_schema: bool = True, date_format: str = '', timestamp_format: str = '', parse_mode: str = 'PERMISSIVE', primitives_as_string: bool = False)
multiline: bool = False
infer_schema: bool = True
date_format: str = ''
timestamp_format: str = ''
parse_mode: str = 'PERMISSIVE'
primitives_as_string: bool = False
@dataclass
class OrcReaderOptions:
87@dataclass
88class OrcReaderOptions:
89    merge_schema: bool = False
OrcReaderOptions(merge_schema: bool = False)
merge_schema: bool = False
@dataclass
class ParquetReaderOptions:
77@dataclass
78class ParquetReaderOptions:
79    merge_schema: bool = False
ParquetReaderOptions(merge_schema: bool = False)
merge_schema: bool = False
@dataclass
class RestApiSource:
111@dataclass
112class RestApiSource:
113    """A JSON REST API. `json_path` is a dot-path to the records array/dict
114    if the payload wraps it (e.g. "data.items"); leave empty for a bare array.
115
116    Advanced (all optional): `auth_type` in none|bearer|api_key|basic sets
117    how credentials are attached. `pagination` in none|page_param|cursor
118    follows multiple pages automatically, up to `max_pages`.
119    """
120    url: str
121    headers: dict = field(default_factory=dict)
122    params: dict = field(default_factory=dict)
123    json_path: str = ""
124    timeout_seconds: int = 30
125
126    auth_type: str = "none"  # none | bearer | api_key | basic
127    bearer_token: str = ""
128    api_key_header: str = "X-API-Key"
129    api_key: str = ""
130    basic_user: str = ""
131    basic_password: str = ""
132
133    pagination: str = "none"  # none | page_param | cursor
134    page_param: str = "page"
135    page_size_param: str = ""
136    page_size: int = 0
137    max_pages: int = 20
138    cursor_param: str = "cursor"
139    cursor_json_path: str = ""  # dot-path in the response to the next page's cursor value

A JSON REST API. json_path is a dot-path to the records array/dict if the payload wraps it (e.g. "data.items"); leave empty for a bare array.

Advanced (all optional): auth_type in none|bearer|api_key|basic sets how credentials are attached. pagination in none|page_param|cursor follows multiple pages automatically, up to max_pages.

RestApiSource( url: str, headers: dict = <factory>, params: dict = <factory>, json_path: str = '', timeout_seconds: int = 30, auth_type: str = 'none', bearer_token: str = '', api_key_header: str = 'X-API-Key', api_key: str = '', basic_user: str = '', basic_password: str = '', pagination: str = 'none', page_param: str = 'page', page_size_param: str = '', page_size: int = 0, max_pages: int = 20, cursor_param: str = 'cursor', cursor_json_path: str = '')
url: str
headers: dict
params: dict
json_path: str = ''
timeout_seconds: int = 30
auth_type: str = 'none'
bearer_token: str = ''
api_key_header: str = 'X-API-Key'
api_key: str = ''
basic_user: str = ''
basic_password: str = ''
pagination: str = 'none'
page_param: str = 'page'
page_size_param: str = ''
page_size: int = 0
max_pages: int = 20
cursor_param: str = 'cursor'
cursor_json_path: str = ''
@dataclass
class S3Source:
46@dataclass
47class S3Source:
48    """Amazon S3 — s3://<bucket>/<path>."""
49    bucket: str
50    path: str = ""
51    file_format: str | None = None
52    reader_options: Any = None
53    options: dict = field(default_factory=dict)

Amazon S3 — s3:///.

S3Source( bucket: str, path: str = '', file_format: str | None = None, reader_options: Any = None, options: dict = <factory>)
bucket: str
path: str = ''
file_format: str | None = None
reader_options: Any = None
options: dict
@dataclass
class TextReaderOptions:
92@dataclass
93class TextReaderOptions:
94    line_sep: str = ""
95    whole_text: bool = False
96    encoding: str = "UTF-8"
TextReaderOptions( line_sep: str = '', whole_text: bool = False, encoding: str = 'UTF-8')
line_sep: str = ''
whole_text: bool = False
encoding: str = 'UTF-8'
@dataclass
class VolumeSource:
23@dataclass
24class VolumeSource:
25    """A Unity Catalog Volume — /Volumes/<catalog>/<schema>/<volume>/<path>."""
26    catalog: str
27    schema_name: str
28    volume: str
29    path: str = ""
30    file_format: str | None = None  # inferred from path extension if omitted
31    reader_options: Any = None      # a CsvReaderOptions/ExcelReaderOptions/... — defaults used if unset
32    options: dict = field(default_factory=dict)  # raw Spark options, applied on top of reader_options

A Unity Catalog Volume — /Volumes////.

VolumeSource( catalog: str, schema_name: str, volume: str, path: str = '', file_format: str | None = None, reader_options: Any = None, options: dict = <factory>)
catalog: str
schema_name: str
volume: str
path: str = ''
file_format: str | None = None
reader_options: Any = None
options: dict
def build_jdbc_url(source: DatabaseSource) -> str:
190def build_jdbc_url(source: DatabaseSource) -> str:
191    if source.url:
192        return source.url
193    if source.engine not in JDBC_PRESETS:
194        raise ValueError(f"Unknown engine {source.engine!r}; set url/driver directly for custom engines.")
195    preset = JDBC_PRESETS[source.engine]
196    port = source.port or preset["default_port"]
197    return preset["url"].format(host=source.host, port=port, database=source.database)
def infer_format_from_path(path: str) -> str | None:
153def infer_format_from_path(path: str) -> str | None:
154    ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
155    return FILE_EXTENSIONS.get(ext)
def launch():
  7def launch():
  8    try:
  9        import ipywidgets as w
 10        from IPython.display import display
 11    except ImportError:
 12        raise RuntimeError("ipywidgets required. Run: %pip install ipywidgets") from None
 13
 14    import dashui
 15
 16    kind_toggle = w.ToggleButtons(
 17        options=["Databricks Volume", "ADLS Gen2", "Amazon S3", "DBFS", "Database", "REST API"],
 18        description="Source:",
 19    )
 20
 21    # ── Databricks Volume ───────────────────────────────────────────────────
 22    vol_catalog = w.Text(description="Catalog:")
 23    vol_schema = w.Text(description="Schema:")
 24    vol_volume = w.Text(description="Volume:")
 25    vol_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)")
 26    vol_box = w.VBox([w.HBox([vol_catalog, vol_schema, vol_volume]), vol_path])
 27
 28    # ── ADLS Gen2 ────────────────────────────────────────────────────────────
 29    adls_account = w.Text(description="Storage account:")
 30    adls_container = w.Text(description="Container:")
 31    adls_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)")
 32    adls_box = w.VBox([w.HBox([adls_account, adls_container]), adls_path])
 33
 34    # ── Amazon S3 ────────────────────────────────────────────────────────────
 35    s3_bucket = w.Text(description="Bucket:")
 36    s3_path = w.Text(description="Path:", placeholder="folder/file.csv (optional)")
 37    s3_box = w.VBox([s3_bucket, s3_path])
 38
 39    # ── DBFS ─────────────────────────────────────────────────────────────────
 40    dbfs_path = w.Text(description="Path:", placeholder="folder/file.csv")
 41    dbfs_box = w.VBox([dbfs_path])
 42
 43    # ── Database ─────────────────────────────────────────────────────────────
 44    db_engine = w.Dropdown(options=["postgresql", "mysql", "sqlserver", "oracle", "snowflake"], description="Engine:")
 45    db_host = w.Text(description="Host:")
 46    db_database = w.Text(description="Database:")
 47    db_port = w.Text(description="Port:", placeholder="default for engine")
 48    db_table = w.Text(description="Table:", placeholder="schema.table")
 49    db_query = w.Text(description="or Query:", placeholder="SELECT ... (instead of table)")
 50    db_user = w.Text(description="User:")
 51    db_password = w.Password(description="Password:")
 52
 53    db_ssl = w.Checkbox(value=False, description="Use SSL")
 54    db_fetch_size = w.IntText(description="Fetch size:", value=0, layout=w.Layout(width="180px"))
 55    db_num_partitions = w.IntText(description="Partitions:", value=0, layout=w.Layout(width="180px"))
 56    db_partition_col = w.Text(description="Partition col:", placeholder="id")
 57    db_lower_bound = w.IntText(description="Lower bound:", layout=w.Layout(width="180px"))
 58    db_upper_bound = w.IntText(description="Upper bound:", layout=w.Layout(width="180px"))
 59    db_props_table = dashui.editable_table(["Property", "Value"], placeholders={"Property": "sslmode", "Value": "require"})
 60    db_advanced = w.Accordion(children=[w.VBox([
 61        db_ssl,
 62        w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Parallel read (set all four to split a large table across partitions)</div>"),
 63        w.HBox([db_num_partitions, db_partition_col]),
 64        w.HBox([db_lower_bound, db_upper_bound]),
 65        db_fetch_size,
 66        w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Extra JDBC connection properties</div>"),
 67        db_props_table.widget,
 68    ])])
 69    db_advanced.set_title(0, "Advanced")
 70    db_advanced.selected_index = None
 71
 72    db_box = w.VBox([
 73        w.HBox([db_engine, db_host, db_port]),
 74        db_database, db_table, db_query,
 75        w.HBox([db_user, db_password]),
 76        db_advanced,
 77    ])
 78
 79    # ── REST API ─────────────────────────────────────────────────────────────
 80    api_url = w.Text(description="URL:", placeholder="https://api.example.com/records")
 81    api_json_path = w.Text(description="JSON path:", placeholder="data.items (optional)")
 82
 83    api_auth_type = w.Dropdown(options=["none", "bearer", "api_key", "basic"], description="Auth:")
 84    api_bearer_token = w.Password(description="Token:", disabled=True)
 85    api_key_header = w.Text(description="Header name:", value="X-API-Key", disabled=True)
 86    api_key_value = w.Password(description="Key:", disabled=True)
 87    api_basic_user = w.Text(description="User:", disabled=True)
 88    api_basic_password = w.Password(description="Password:", disabled=True)
 89    api_auth_box = w.VBox([api_bearer_token])
 90
 91    def on_auth_type_change(change):
 92        api_auth_box.children = {
 93            "bearer": [api_bearer_token],
 94            "api_key": [api_key_header, api_key_value],
 95            "basic": [api_basic_user, api_basic_password],
 96        }.get(change["new"], [])
 97
 98    api_auth_type.observe(on_auth_type_change, names="value")
 99
100    api_pagination = w.Dropdown(options=["none", "page_param", "cursor"], description="Pagination:")
101    api_page_param = w.Text(description="Page param:", value="page", disabled=True)
102    api_max_pages = w.IntText(description="Max pages:", value=20, disabled=True)
103    api_cursor_param = w.Text(description="Cursor param:", value="cursor", disabled=True)
104    api_cursor_json_path = w.Text(description="Cursor JSON path:", placeholder="meta.next_cursor", disabled=True)
105    api_pagination_box = w.VBox([])
106
107    def on_pagination_change(change):
108        for field in (api_page_param, api_max_pages, api_cursor_param, api_cursor_json_path):
109            field.disabled = True
110        if change["new"] == "page_param":
111            api_page_param.disabled = api_max_pages.disabled = False
112            api_pagination_box.children = [api_page_param, api_max_pages]
113        elif change["new"] == "cursor":
114            api_cursor_param.disabled = api_cursor_json_path.disabled = api_max_pages.disabled = False
115            api_pagination_box.children = [api_cursor_param, api_cursor_json_path, api_max_pages]
116        else:
117            api_pagination_box.children = []
118
119    api_pagination.observe(on_pagination_change, names="value")
120
121    api_headers_table = dashui.editable_table(["Header", "Value"], placeholders={"Header": "Accept", "Value": "application/json"})
122    api_params_table = dashui.editable_table(["Param", "Value"])
123
124    api_advanced = w.Accordion(children=[w.VBox([
125        api_auth_type, api_auth_box,
126        api_pagination, api_pagination_box,
127        w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Headers</div>"),
128        api_headers_table.widget,
129        w.HTML("<div style='font-size:12px;color:#5A6872;margin:6px 0 2px'>Query params</div>"),
130        api_params_table.widget,
131    ])])
132    api_advanced.set_title(0, "Advanced")
133    api_advanced.selected_index = None
134
135    api_box = w.VBox([api_url, api_json_path, api_advanced])
136
137    # ── File format (path-based sources only) ───────────────────────────────
138    file_format = w.Dropdown(
139        options=["(infer from path)", "csv", "json", "parquet", "excel", "avro", "orc", "text"],
140        description="Format:",
141    )
142
143    csv_delimiter = w.Text(description="Delimiter:", value=",")
144    csv_header = w.Checkbox(value=True, description="Has header row")
145    csv_null_value = w.Text(description="Null marker:", placeholder="e.g. NA (optional)")
146    csv_box = w.VBox([w.HBox([csv_delimiter, csv_header]), csv_null_value])
147
148    # Excel options — the format that actually needs this much configuration
149    xl_sheet = w.Text(description="Sheet:", value="0", placeholder="name or 0-based index")
150    xl_header_row = w.IntText(description="Header row:", value=0, min=0)
151    xl_header = w.Checkbox(value=True, description="Has header row")
152    xl_password = w.Password(description="Password:", placeholder="if protected (optional)")
153    xl_sheets_to_union = w.Text(description="Union sheets:", placeholder="Jan, Feb, Mar (optional — reads+stacks multiple sheets)")
154    excel_box = w.VBox([
155        w.HBox([xl_sheet, xl_header_row, xl_header]),
156        xl_password, xl_sheets_to_union,
157    ])
158
159    format_options_panel = w.VBox([])
160
161    def on_format_change(change):
162        format_options_panel.children = {"csv": [csv_box], "excel": [excel_box]}.get(change["new"], [])
163
164    file_format.observe(on_format_change, names="value")
165
166    source_panel = w.VBox([vol_box])
167    format_row = w.VBox([file_format, format_options_panel])
168
169    def on_kind_change(change):
170        kind = change["new"]
171        source_panel.children = {
172            "Databricks Volume": [vol_box],
173            "ADLS Gen2": [adls_box],
174            "Amazon S3": [s3_box],
175            "DBFS": [dbfs_box],
176            "Database": [db_box],
177            "REST API": [api_box],
178        }[kind]
179        format_row.children = [] if kind in ("Database", "REST API") else [file_format, format_options_panel]
180
181    kind_toggle.observe(on_kind_change, names="value")
182    on_kind_change({"new": kind_toggle.value})
183
184    # ── Target ───────────────────────────────────────────────────────────────
185    target_table = w.Text(description="Target table:", placeholder="catalog.schema.table")
186    write_mode = w.ToggleButtons(options=["append", "overwrite", "merge"], description="Write mode:")
187    merge_keys = w.Text(description="Merge keys:", placeholder="id, updated_at (merge mode only)", disabled=True)
188    schema_evo = w.Checkbox(value=True, description="Allow schema evolution")
189    write_mode.observe(lambda c: setattr(merge_keys, "disabled", c["new"] != "merge"), names="value")
190
191    test_btn = dashui.action_button("Test Connection", style="info")
192    preview_btn = dashui.action_button("Preview", style="info")
193    run_btn = dashui.action_button("Run Ingestion", style="success")
194    output = dashui.output_panel()
195
196    def _build_reader_options(fmt):
197        from dashingest.readers import CsvReaderOptions, ExcelReaderOptions
198
199        if fmt == "csv":
200            return CsvReaderOptions(
201                delimiter=csv_delimiter.value or ",",
202                header=csv_header.value,
203                null_value=csv_null_value.value.strip(),
204            )
205        if fmt == "excel":
206            sheets = [s.strip() for s in xl_sheets_to_union.value.split(",") if s.strip()]
207            return ExcelReaderOptions(
208                sheet_name=xl_sheet.value.strip() or "0",
209                header_row=xl_header_row.value,
210                header=xl_header.value,
211                workbook_password=xl_password.value,
212                sheet_names=sheets,
213            )
214        return None
215
216    def _build_source():
217        from dashingest.connectors import ADLSSource, DatabaseSource, DBFSSource, RestApiSource, S3Source, VolumeSource
218
219        kind = kind_toggle.value
220        fmt = None if file_format.value == "(infer from path)" else file_format.value
221        reader_opts = _build_reader_options(fmt)
222
223        if kind == "Databricks Volume":
224            return VolumeSource(vol_catalog.value.strip(), vol_schema.value.strip(), vol_volume.value.strip(),
225                                 vol_path.value.strip(), fmt, reader_opts)
226        if kind == "ADLS Gen2":
227            return ADLSSource(adls_account.value.strip(), adls_container.value.strip(), adls_path.value.strip(),
228                               fmt, reader_opts)
229        if kind == "Amazon S3":
230            return S3Source(s3_bucket.value.strip(), s3_path.value.strip(), fmt, reader_opts)
231        if kind == "DBFS":
232            return DBFSSource(dbfs_path.value.strip(), fmt, reader_opts)
233        if kind == "Database":
234            port = int(db_port.value.strip()) if db_port.value.strip() else None
235            props = {r["Property"]: r["Value"] for r in db_props_table.values()}
236            return DatabaseSource(
237                host=db_host.value.strip(), database=db_database.value.strip(), engine=db_engine.value,
238                port=port, table=db_table.value.strip(), query=db_query.value.strip(),
239                user=db_user.value.strip(), password=db_password.value,
240                ssl=db_ssl.value,
241                fetch_size=db_fetch_size.value or None,
242                num_partitions=db_num_partitions.value or None,
243                partition_column=db_partition_col.value.strip(),
244                lower_bound=db_lower_bound.value if db_num_partitions.value else None,
245                upper_bound=db_upper_bound.value if db_num_partitions.value else None,
246                connection_properties=props,
247            )
248
249        headers = {r["Header"]: r["Value"] for r in api_headers_table.values()}
250        params = {r["Param"]: r["Value"] for r in api_params_table.values()}
251        return RestApiSource(
252            api_url.value.strip(), headers=headers, params=params, json_path=api_json_path.value.strip(),
253            auth_type=api_auth_type.value,
254            bearer_token=api_bearer_token.value,
255            api_key_header=api_key_header.value.strip() or "X-API-Key",
256            api_key=api_key_value.value,
257            basic_user=api_basic_user.value.strip(),
258            basic_password=api_basic_password.value,
259            pagination=api_pagination.value,
260            page_param=api_page_param.value.strip() or "page",
261            max_pages=api_max_pages.value or 20,
262            cursor_param=api_cursor_param.value.strip() or "cursor",
263            cursor_json_path=api_cursor_json_path.value.strip(),
264        )
265
266    def on_test(b):
267        with output:
268            output.clear_output()
269            try:
270                from dashingest.ingestor import test_connection
271                test_connection(_build_source()).display()
272            except Exception as e:
273                print(f"Error: {e}")
274
275    def on_preview(b):
276        with output:
277            output.clear_output()
278            try:
279                from dashingest.ingestor import preview
280                print(preview(_build_source(), limit=10))
281            except Exception as e:
282                print(f"Error: {e}")
283
284    def on_run(b):
285        with output:
286            output.clear_output()
287            try:
288                from dashingest.connectors import IngestTarget
289                from dashingest.ingestor import run_ingestion
290
291                target = IngestTarget(
292                    table=target_table.value.strip(),
293                    write_mode=write_mode.value,
294                    schema_evolution=schema_evo.value,
295                    merge_keys=[k.strip() for k in merge_keys.value.split(",") if k.strip()],
296                )
297                result = run_ingestion(_build_source(), target)
298                result.display()
299            except Exception as e:
300                print(f"Error: {e}")
301
302    test_btn.on_click(on_test)
303    preview_btn.on_click(on_preview)
304    run_btn.on_click(on_run)
305
306    ui = dashui.card([
307        dashui.header("DashIngest — Data Ingestion", library="dashingest"),
308        dashui.section("Step 1: Source"),
309        kind_toggle, source_panel, format_row,
310        w.HBox([test_btn, preview_btn]),
311        dashui.section("Step 2: Target"),
312        target_table, write_mode, merge_keys, schema_evo,
313        dashui.section("Step 3: Run"),
314        run_btn, output,
315    ])
316    display(ui)
def preview(source, limit: int = 10):
51def preview(source, limit: int = 10):
52    """Load up to `limit` rows without writing anywhere — returns a pandas
53    DataFrame for display in a notebook cell or the UI's output panel."""
54    from pyspark.sql import SparkSession
55
56    spark = SparkSession.getActiveSession()
57    return _load(source, spark).limit(limit).toPandas()

Load up to limit rows without writing anywhere — returns a pandas DataFrame for display in a notebook cell or the UI's output panel.

def resolve_path( source: Union[VolumeSource, ADLSSource, S3Source, DBFSSource]) -> str:
158def resolve_path(source: PathSource) -> str:
159    if isinstance(source, VolumeSource):
160        base = f"/Volumes/{source.catalog}/{source.schema_name}/{source.volume}"
161    elif isinstance(source, ADLSSource):
162        base = f"abfss://{source.container}@{source.storage_account}.dfs.core.windows.net"
163    elif isinstance(source, S3Source):
164        base = f"s3://{source.bucket}"
165    elif isinstance(source, DBFSSource):
166        base = "dbfs:"
167    else:
168        raise TypeError(f"Not a path-based source: {type(source).__name__}")
169
170    path = source.path.lstrip("/")
171    return f"{base}/{path}" if path else base
def run_ingestion( source, target: IngestTarget) -> IngestResult:
41def run_ingestion(source, target: IngestTarget) -> IngestResult:
42    from pyspark.sql import SparkSession
43
44    spark = SparkSession.getActiveSession()
45    df = _load(source, spark)
46    _write(df, target, spark)
47    count = spark.table(target.table).count()
48    return IngestResult(target.table, count, target.write_mode)
def test_connection(source) -> ConnectionTestResult:
60def test_connection(source) -> ConnectionTestResult:
61    """Check reachability/credentials without loading real data."""
62    from pyspark.sql import SparkSession
63
64    spark = SparkSession.getActiveSession()
65    if isinstance(source, DatabaseSource):
66        return _test_database_connection(source, spark)
67    if isinstance(source, RestApiSource):
68        return _test_rest_api_connection(source)
69    return _test_path_connection(source, spark)

Check reachability/credentials without loading real data.