diff --git a/.gitignore b/.gitignore index ed0a062a..5e98eea5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,9 +4,11 @@ __pycache__ _build .coverage* .env +.gradle .DS_Store .meltano .pytest_cache +build coverage.xml mlruns/ archive/ diff --git a/framework/flink/tour/README.md b/framework/flink/tour/README.md new file mode 100644 index 00000000..03dbd1e8 --- /dev/null +++ b/framework/flink/tour/README.md @@ -0,0 +1,11 @@ +# A tour of Apache Flink and CrateDB + +This folder includes concise executable examples demonstrating +how to use Apache Flink with CrateDB, with both Java and Python. + +Usually, you will submit your Apache Flink job to a cluster for +execution The examples in this tour are self-contained, and will +use the [Flink MiniCluster] for execution. + + +[Flink MiniCluster]: https://speakerdeck.com/rmetzger/tiny-flink-minimizing-the-memory-footprint-of-apache-flink diff --git a/framework/flink/tour/java/README.md b/framework/flink/tour/java/README.md new file mode 100644 index 00000000..9ae86bbb --- /dev/null +++ b/framework/flink/tour/java/README.md @@ -0,0 +1,22 @@ +# Apache Flink and CrateDB with Java + +Basic examples demonstrating how to read and write from/to +CrateDB when using Apache Flink. + +The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC] +driver. CrateDB JDBC is needed for catalog operations, which are +required when reading from CrateDB using Flink. + +```sql +uvx crash -c 'CREATE TABLE person (name STRING, age INT);' +``` +```shell +gradle run write +``` +```shell +gradle run read +``` + + +[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html +[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html diff --git a/framework/flink/tour/java/build.gradle b/framework/flink/tour/java/build.gradle new file mode 100644 index 00000000..495deb7e --- /dev/null +++ b/framework/flink/tour/java/build.gradle @@ -0,0 +1,16 @@ +// https://medium.com/@santoshkotagiri/setting-up-flink-projects-with-gradle-a-step-by-step-guide-4d1ef85f017 +apply plugin : 'application' +repositories.mavenCentral() +dependencies { + implementation 'org.apache.flink:flink-clients:1.20.3' + implementation 'org.apache.flink:flink-connector-jdbc:3.2.0-1.19' + implementation 'org.apache.flink:flink-table-planner_2.12:1.20.3' + implementation 'org.apache.flink:flink-table-runtime:1.20.3' + implementation 'org.slf4j:slf4j-simple:1.7.36' + runtimeOnly 'io.crate:crate-jdbc:2.7.0' + runtimeOnly 'org.postgresql:postgresql:42.7.8' +} +java.toolchain.languageVersion = JavaLanguageVersion.of(11) +sourceSets.main.java.srcDirs = ['.'] +tasks.register('read') { application.mainClass = 'read' } +tasks.register('write') { application.mainClass = 'write' } diff --git a/framework/flink/tour/java/read.java b/framework/flink/tour/java/read.java new file mode 100644 index 00000000..883b941f --- /dev/null +++ b/framework/flink/tour/java/read.java @@ -0,0 +1,51 @@ +// Read from CrateDB using Apache Flink. +// Invoke: gradle run read +// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/read.java + +// https://tacnode.io/docs/guides/ecosystem/bigdata/flink#catalog-registration +// https://github.com/crate/cratedb-flink-jobs/blob/main/src/main/java/io/crate/flink/demo/SimpleTableApiJob.java +// https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql +import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.table.api.*; + +import static org.apache.flink.table.api.Expressions.$; + +public class read { + + public static String CATALOG_NAME = "example_catalog"; + + public static void main(String[] args) throws Exception { + + // Create Flink Table API environment. + EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); + TableEnvironment env = TableEnvironment.create(settings); + + // Define catalog. + // CrateDB only knows a single database called `crate`, + // but you can separate concerns using schemata. The + // default schema is `doc`. + JdbcCatalog catalog = new JdbcCatalog( + CATALOG_NAME, + "crate", + "crate", + "crate", + "jdbc:crate://localhost:5432" + ); + + // Register catalog and set as default. + env.registerCatalog(CATALOG_NAME, catalog); + env.useCatalog(CATALOG_NAME); + + // Invoke query using plain SQL. + // FIXME: Currently does not work with `sys.summits`. + // SqlValidatorException: Object 'sys.summits' not found + env.executeSql("SELECT * FROM `doc.person` LIMIT 3").print(); + + // Invoke query using DSL. + env.from("`doc.person`") + .select($("name"), $("age")) + .execute() + .print(); + } + +} diff --git a/framework/flink/tour/java/write.java b/framework/flink/tour/java/write.java new file mode 100644 index 00000000..22e79a29 --- /dev/null +++ b/framework/flink/tour/java/write.java @@ -0,0 +1,57 @@ +// Write to CrateDB using Apache Flink. +// Invoke: uvx crash -c 'CREATE TABLE person (name STRING, age INT);' +// Invoke: gradle run write +// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/write.java + +// https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#full-example + +import org.apache.flink.connector.jdbc.*; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class write { + + public static void main(String[] args) throws Exception { + var env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Define source data. + env.fromElements( + new Person("Fred", 35), + new Person("Wilma", 35), + new Person("Pebbles", 2) + + // Define CrateDB as data sink. + ).addSink( + JdbcSink.sink( + "INSERT INTO person (name, age) VALUES (?, ?)", + (statement, person) -> { + statement.setString(1, person.name); + statement.setInt(2, person.age); + }, + JdbcExecutionOptions.builder() + .withBatchSize(1000) + .withBatchIntervalMs(200) + .withMaxRetries(5) + .build(), + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl("jdbc:postgresql://localhost:5432/doc?sslmode=disable") + .withDriverName("org.postgresql.Driver") + .withUsername("crate") + .withPassword("crate") + .build() + )); + + // Execute pipeline. + env.execute(); + } + + public static class Person { + public String name; + public Integer age; + public Person() {} + public Person(String name, Integer age) { + this.name = name; + this.age = age; + } + } + +} diff --git a/framework/flink/tour/python/README.md b/framework/flink/tour/python/README.md new file mode 100644 index 00000000..f4645250 --- /dev/null +++ b/framework/flink/tour/python/README.md @@ -0,0 +1,28 @@ +# Apache Flink and CrateDB with Python + +Basic examples demonstrating how to read and write from/to +CrateDB when using Apache Flink (PyFlink). + +The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC] +driver. CrateDB JDBC is needed for catalog operations, which are +required when reading from CrateDB using Flink. + +```sql +uvx crash -c 'CREATE TABLE person (name STRING, age INT);' +``` +Flink >= 1.19 has problems with JDBC and PyFlink, +but previous versions need a Python of the same age. +```shell +uv venv --python 3.10 --seed .venv310 +uv pip install -r requirements.txt +``` +```shell +python write.py +``` +```shell +python ready.py +``` + + +[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html +[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html diff --git a/framework/flink/tour/python/read.py b/framework/flink/tour/python/read.py new file mode 100644 index 00000000..491f8ff6 --- /dev/null +++ b/framework/flink/tour/python/read.py @@ -0,0 +1,6 @@ +def main(): + print("Hello World") + + +if __name__ == "__main__": + main() diff --git a/framework/flink/tour/python/requirements.txt b/framework/flink/tour/python/requirements.txt new file mode 100644 index 00000000..98537ca8 --- /dev/null +++ b/framework/flink/tour/python/requirements.txt @@ -0,0 +1,2 @@ +apache-flink==1.18.1 +avro<1.13 diff --git a/framework/flink/tour/python/write.py b/framework/flink/tour/python/write.py new file mode 100644 index 00000000..a84eedd1 --- /dev/null +++ b/framework/flink/tour/python/write.py @@ -0,0 +1,80 @@ +import dataclasses +import logging + +from pathlib import Path + +from pyflink.common import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions + + +logger = logging.getLogger(__name__) + +JARS_PATH = Path(__file__).parent / 'jars' + + +@dataclasses.dataclass +class Person: + name: str + age: int + + +def main(): + + env = StreamExecutionEnvironment.get_execution_environment() + jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar')))) + env.add_jars(*jars) + + # Define source data. + ds = env.from_collection([ + Person("Fred", 35), + Person("Wilma", 35), + Person("Pebbles", 2), + ]) + + # Define CrateDB as data sink. + row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()]) + ds.add_sink( + JdbcSink.sink( + "INSERT INTO person (name, age) VALUES (?, ?)", + row_type_info, + + # FIXME (Flink >= 1.19): java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder + + # This is due to a bug in the python Flink library. In `flink-connector-jdbc` v3.1, + # the `JdbcOutputFormat` was renamed to `RowJdbcOutputFormat`. This change has up till + # now not been implemented in the python Flink library. + # https://stackoverflow.com/questions/78960829/java-lang-nosuchmethodexception-in-python-flink-jdbc + + # As you see, java `JdbcSink` connector class has different shape from Python `JdbcSink` connector. + # In Java code, `jdbcSink` object is generated from `JdbcSinkBuilder` class, but in Python it is not. + # I think these errors are due to API version mismatch. Any idea to solve these errors? + # https://stackoverflow.com/questions/79604252/issue-with-pyflink-api-code-for-inserting-data-into-sql + + JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .with_url('jdbc:postgresql://localhost:5432/crate') + .with_driver_name('org.postgresql.Driver') + .with_user_name("crate") + .with_password("crate") + .build(), + JdbcExecutionOptions.builder() + .with_batch_interval_ms(1000) + .with_batch_size(200) + .with_max_retries(5) + .build() + ) + ) + + # Execute pipeline. + env.execute() + + +if __name__ == '__main__': + logging.basicConfig( + format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s', + level=logging.DEBUG + ) + + logger.info("Start") + main() + logger.info("Ready")