flowchart TD
    A["execute(plan)"] --> B["render_plan / fetch main scan"]
    B --> C["fetch_column_slots(plan)"]
    C --> D{plan has ColumnFetchSlot?}
    D -- no --> E["empty dicts"]
    D -- yes --> F["render_column_fetch(plan, slot, caps)"]
    F --> G{caps.column_fetch shape}
    G -- NATIVE_ITER --> H["stream_column_slot: SQLAlchemy cursor iter, yield rows"]
    G -- SERVER_SIDE_CURSOR --> I["stream_column_slot: stream_results=True yield_per=N, yield chunks"]
    G -- RESULT_SCAN_ARROW --> J["stream_column_slot: Snowflake fetch_arrow_batches, yield Arrow rows"]
    H --> K["drain Iterator into DataFrame via arrow_to_dataframe or pd.DataFrame"]
    I --> K
    J --> K
    K --> L["DataframeHandle(opaque=df)"]
    L --> M["DatasetMeasurement.column_fetches[slot.id]"]
    M --> N["build_measurement -> return DatasetMeasurement"]
    F --> OE["SQLAlchemyError / Exception"]
    OE --> P["SlotError(slot_id, reason)"]
    P --> N
sequenceDiagram
    participant Exec as BatchedDatasetExecutor
    participant Renderer as render_column_fetch
    participant StreamFn as stream_column_slot
    participant Conn as SQLAlchemy Connection
    participant Handle as DataframeHandle

    Exec->>Renderer: render_column_fetch(plan, slot, caps)
    Renderer-->>Exec: sql string

    Exec->>StreamFn: stream_column_slot(engine, sql, caps)
    StreamFn->>Conn: engine.connect() with shape-specific execution_options
    loop yield chunks
        StreamFn->>Conn: iterate cursor rows
        Conn-->>StreamFn: row tuple or Arrow batch
        StreamFn-->>Exec: RecordBatch chunk
    end
    StreamFn->>Conn: conn.close() in finally

    Exec->>Exec: reassemble chunks into DataFrame
    Exec->>Handle: DataframeHandle(opaque=df)
    Handle-->>Exec: stored in column_fetches[slot.id]
classDiagram
    class BatchedDatasetExecutor {
        +engine : Engine
        +caps : DialectCapabilities
        +column_executor_factory : ColumnExecutorFactory~None
        +execute(plan : DatasetPlan) DatasetMeasurement
        +fetch_pks(plan : DatasetPlan, slot : PkSlot) Iterator~PrimaryKey~
        +fetch_columns(plan : DatasetPlan) Iterator~RecordBatch~
        +fetch_column_slots(plan : DatasetPlan) tuple~dict, dict~
    }

    class DialectCapabilities {
        +dialect : DataSourceType
        +max_sql_len : int~None
        +pk_cap : int~None
        +column_fetch : ColumnFetchShape
    }

    class ColumnFetchShape {
        <<StrEnum>>
        NATIVE_ITER
        SERVER_SIDE_CURSOR
        RESULT_SCAN_ARROW
    }

    class DataframeHandle {
        +opaque : object
    }

    class SlotError {
        +slot_id : SlotId
        +reason : str
        +exception : Exception~None
    }

    class DatasetExecutor {
        <<Protocol>>
        +execute(plan : DatasetPlan) DatasetMeasurement
        +fetch_pks(plan : DatasetPlan, slot : PkSlot) Iterator~PrimaryKey~
        +fetch_columns(plan : DatasetPlan) Iterator~RecordBatch~
    }

    BatchedDatasetExecutor ..|> DatasetExecutor
    BatchedDatasetExecutor --> DialectCapabilities
    DialectCapabilities --> ColumnFetchShape
    BatchedDatasetExecutor ..> DataframeHandle
    BatchedDatasetExecutor ..> SlotError