Metadata-Version: 2.4
Name: confluent-pyflink
Version: 1.0.dev1
Summary: Confluent Apache Flink Table API Python
Author-email: Confluent <dev@confluent.io>
License-Expression: Apache-2.0
Project-URL: Homepage, https://confluent.io
Project-URL: Examples, https://github.com/confluentinc/flink-table-api-python-examples
Classifier: Development Status :: 4 - Beta
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: apache-flink~=2.1.0
Requires-Dist: pydantic-settings>=2.10.1
Requires-Dist: pyyaml
Requires-Dist: confluent-flink-jars==2.1.dev1

# Confluent Apache Flink Table API Python

This package contains the client library for running Apache Flink's Table API on Confluent Cloud.

The [Table API](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) enables a programmatic
way of developing, testing, and submitting Flink pipelines for processing data streams.
Streams can be finite or infinite, with insert-only or changelog data. The latter allows for dealing with *Change Data
Capture* (CDC) events.

Within the API, you conceptually work with tables that change over time - inspired by relational databases. Write
a *Table Program* as a declarative and structured graph of data transformations. Table API is inspired by SQL and complements
it with additional tools for juggling real-time data. You can mix and match Flink SQL with Table API at any time as they
go hand in hand.

## Table API on Confluent Cloud

Table API on Confluent Cloud is a client-side library that delegates Flink API calls to Confluent’s public
REST API. It submits [Statements](https://docs.confluent.io/cloud/current/api.html#tag/Statements-(sqlv1)) and retrieves
[StatementResults](https://docs.confluent.io/cloud/current/api.html#tag/Statement-Results-(sqlv1)).

Table programs are implemented against [Flink's open source Table API for Python](https://github.com/apache/flink/tree/master/flink-python/pyflink/table).
This package repackages Flink's Python API and bundles the Confluent-specific components for powering the `TableEnvironment` without the need
for a local Flink cluster. While using those packages, Flink internal components such as
`CatalogStore`, `Catalog`, `Planner`, `Executor`, and configuration are managed by the plugin and fully integrate with
Confluent Cloud. Including access to Apache Kafka®, Schema Registry, and Flink Compute Pools.

Note: The Table API plugin is in Open Preview stage.

### Motivating Example

The following code shows how a Table API program is structured. Subsequent sections will go into more details how you
can use the examples of this repository to play around with Flink on Confluent Cloud.

```python
from confluent_pyflink.table.utils import ConfluentSettings, ConfluentTools
from confluent_pyflink.table import TableEnvironment, Row
from confluent_pyflink.table.expressions import col, row


def run():
    # Setup connection properties to Confluent Cloud
    settings = ConfluentSettings.from_global_variables()
    env = TableEnvironment.create(settings)

    # Run your first Flink statement in Table API
    env.from_elements([row("Hello world!")]).execute().print()

    # Or use SQL
    env.sql_query("SELECT 'Hello world!'").execute().print()

    # Structure your code with Table objects - the main ingredient of Table API.
    table = (
        env.from_path("examples.marketplace.clicks")
        .filter(col("user_agent").like("Mozilla%"))
        .select(col("click_id"), col("user_id"))
    )

    table.print_schema()
    print(table.explain())

    # Use the provided tools to test on a subset of the streaming data
    expected = ConfluentTools.collect_materialized_limit(table, 50)
    actual = [Row(42, 500)]
    if expected != actual:
        print("Results don't match!")


if __name__ == "__main__":
    run()
```

### Further Examples

For further examples, please see Confluent's
[Apache Flink® Table API on Confluent Cloud Examples](https://github.com/confluentinc/flink-table-api-python-examples)
repository.
