flowchart TD
A["execute(plan)"] --> B["render_plan / fetch main scan"]
B --> C["fetch_column_slots(plan)"]
C --> D{plan has ColumnFetchSlot?}
D -- no --> E["empty dicts"]
D -- yes --> F["render_column_fetch(plan, slot, caps)"]
F --> G{caps.column_fetch shape}
G -- NATIVE_ITER --> H["stream_column_slot: SQLAlchemy cursor iter, yield rows"]
G -- SERVER_SIDE_CURSOR --> I["stream_column_slot: stream_results=True yield_per=N, yield chunks"]
G -- RESULT_SCAN_ARROW --> J["stream_column_slot: Snowflake fetch_arrow_batches, yield Arrow rows"]
H --> K["drain Iterator into DataFrame via arrow_to_dataframe or pd.DataFrame"]
I --> K
J --> K
K --> L["DataframeHandle(opaque=df)"]
L --> M["DatasetMeasurement.column_fetches[slot.id]"]
M --> N["build_measurement -> return DatasetMeasurement"]
F --> OE["SQLAlchemyError / Exception"]
OE --> P["SlotError(slot_id, reason)"]
P --> NsequenceDiagram
participant Exec as BatchedDatasetExecutor
participant Renderer as render_column_fetch
participant StreamFn as stream_column_slot
participant Conn as SQLAlchemy Connection
participant Handle as DataframeHandle
Exec->>Renderer: render_column_fetch(plan, slot, caps)
Renderer-->>Exec: sql string
Exec->>StreamFn: stream_column_slot(engine, sql, caps)
StreamFn->>Conn: engine.connect() with shape-specific execution_options
loop yield chunks
StreamFn->>Conn: iterate cursor rows
Conn-->>StreamFn: row tuple or Arrow batch
StreamFn-->>Exec: RecordBatch chunk
end
StreamFn->>Conn: conn.close() in finally
Exec->>Exec: reassemble chunks into DataFrame
Exec->>Handle: DataframeHandle(opaque=df)
Handle-->>Exec: stored in column_fetches[slot.id]classDiagram
class BatchedDatasetExecutor {
+engine : Engine
+caps : DialectCapabilities
+column_executor_factory : ColumnExecutorFactory~None
+execute(plan : DatasetPlan) DatasetMeasurement
+fetch_pks(plan : DatasetPlan, slot : PkSlot) Iterator~PrimaryKey~
+fetch_columns(plan : DatasetPlan) Iterator~RecordBatch~
+fetch_column_slots(plan : DatasetPlan) tuple~dict, dict~
}
class DialectCapabilities {
+dialect : DataSourceType
+max_sql_len : int~None
+pk_cap : int~None
+column_fetch : ColumnFetchShape
}
class ColumnFetchShape {
<<StrEnum>>
NATIVE_ITER
SERVER_SIDE_CURSOR
RESULT_SCAN_ARROW
}
class DataframeHandle {
+opaque : object
}
class SlotError {
+slot_id : SlotId
+reason : str
+exception : Exception~None
}
class DatasetExecutor {
<<Protocol>>
+execute(plan : DatasetPlan) DatasetMeasurement
+fetch_pks(plan : DatasetPlan, slot : PkSlot) Iterator~PrimaryKey~
+fetch_columns(plan : DatasetPlan) Iterator~RecordBatch~
}
BatchedDatasetExecutor ..|> DatasetExecutor
BatchedDatasetExecutor --> DialectCapabilities
DialectCapabilities --> ColumnFetchShape
BatchedDatasetExecutor ..> DataframeHandle
BatchedDatasetExecutor ..> SlotError