From 3336e7c5324784d2e039c7f6590a2501df8c3136 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Tue, 6 May 2025 19:14:22 -0400 Subject: [PATCH 1/2] allow dynamic table selection based on record --- docs/connectors/sinks/postgresql-sink.md | 30 +++++++++++ quixstreams/sinks/community/postgresql.py | 65 ++++++++++++++--------- 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/docs/connectors/sinks/postgresql-sink.md b/docs/connectors/sinks/postgresql-sink.md index 306842833..9b9315fcd 100644 --- a/docs/connectors/sinks/postgresql-sink.md +++ b/docs/connectors/sinks/postgresql-sink.md @@ -82,3 +82,33 @@ PostgreSQLSink accepts the following configuration parameters: - `password`: The database user password. - `table_name`: The name of the PostgreSQL table where data will be written. - `schema_auto_update`: If True, the sink will automatically update the schema by adding new columns when new fields are detected. Default: True. + + +## Testing Locally + +Rather than connect to a hosted InfluxDB3 instance, you can alternatively test your +application using a local instance of Influxdb3 using Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name postgres \ + -e POSTGRES_PASSWORD=local \ + -e POSTGRES_USER=local \ + -e POSTGRES_DB=local \ + -p 5432:5432 \ + postgres + ``` + +2. Use the following settings for `PostgreSQLSink` to connect: + + ```python + PostgreSQLSink( + host="localhost", + port=5432, + user="local", + password="local", + dbname="local", + table_name="", + ) + ``` diff --git a/quixstreams/sinks/community/postgresql.py b/quixstreams/sinks/community/postgresql.py index 5267d684d..0bf2b156f 100644 --- a/quixstreams/sinks/community/postgresql.py +++ b/quixstreams/sinks/community/postgresql.py @@ -1,7 +1,7 @@ import logging from datetime import datetime from decimal import Decimal -from typing import Any, Mapping, Optional +from typing import Any, Callable, Mapping, Optional, Union try: import psycopg2 @@ -21,6 +21,7 @@ ClientConnectSuccessCallback, SinkBatch, ) +from quixstreams.sinks.base.item import SinkItem __all__ = ("PostgreSQLSink", "PostgreSQLSinkException") @@ -58,7 +59,7 @@ def __init__( dbname: str, user: str, password: str, - table_name: str, + table_name: Union[Callable[[SinkItem], str], str], schema_auto_update: bool = True, connection_timeout_seconds: int = 30, statement_timeout_seconds: int = 30, @@ -72,9 +73,10 @@ def __init__( :param host: PostgreSQL server address. :param port: PostgreSQL server port. :param dbname: PostgreSQL database name. - :param user: Database user name. + :param user: Database username. :param password: Database user password. - :param table_name: PostgreSQL table name. + :param table_name: PostgreSQL table name as either a string or a callable which + receives a SinkItem and returns a string. :param schema_auto_update: Automatically update the schema when new columns are detected. :param connection_timeout_seconds: Timeout for connection. :param statement_timeout_seconds: Timeout for DDL operations such as table @@ -91,9 +93,9 @@ def __init__( on_client_connect_success=on_client_connect_success, on_client_connect_failure=on_client_connect_failure, ) - - self.table_name = table_name - self.schema_auto_update = schema_auto_update + self._table_name = _table_name_setter(table_name) + self._tables = set() + self._schema_auto_update = schema_auto_update options = kwargs.pop("options", "") if "statement_timeout" not in options: options = f"{options} -c statement_timeout={statement_timeout_seconds}s" @@ -112,37 +114,38 @@ def __init__( def setup(self): self._client = psycopg2.connect(**self._client_settings) - # Initialize table if schema_auto_update is enabled - if self.schema_auto_update: - self._init_table() - def write(self, batch: SinkBatch): - rows = [] - cols_types = {} - + tables = {} for item in batch: + table = tables.setdefault( + self._table_name(item), {"rows": [], "cols_types": {}} + ) row = {} if item.key is not None: key_type = type(item.key) - cols_types.setdefault(_KEY_COLUMN_NAME, key_type) + table["cols_types"].setdefault(_KEY_COLUMN_NAME, key_type) row[_KEY_COLUMN_NAME] = item.key for key, value in item.value.items(): if value is not None: - cols_types.setdefault(key, type(value)) + table["cols_types"].setdefault(key, type(value)) row[key] = value row[_TIMESTAMP_COLUMN_NAME] = datetime.fromtimestamp(item.timestamp / 1000) - rows.append(row) + table["rows"].append(row) try: with self._client: - if self.schema_auto_update: - self._add_new_columns(cols_types) - self._insert_rows(rows) + for name, values in tables.items(): + if self._schema_auto_update: + self._init_table(name) + self._add_new_columns(name, values["cols_types"]) + self._insert_rows(name, values["rows"]) except psycopg2.Error as e: self._client.rollback() raise PostgreSQLSinkException(f"Failed to write batch: {str(e)}") from e + table_counts = {table: len(values["rows"]) for table, values in tables.items()} + logger.info(f"Successfully wrote records to tables; row counts: {table_counts}") def add( self, @@ -169,7 +172,9 @@ def add( offset=offset, ) - def _init_table(self): + def _init_table(self, table_name: str): + if table_name in self._tables: + return query = sql.SQL( """ CREATE TABLE IF NOT EXISTS {table} ( @@ -178,7 +183,7 @@ def _init_table(self): ) """ ).format( - table=sql.Identifier(self.table_name), + table=sql.Identifier(table_name), timestamp_col=sql.Identifier(_TIMESTAMP_COLUMN_NAME), key_col=sql.Identifier(_KEY_COLUMN_NAME), ) @@ -186,7 +191,7 @@ def _init_table(self): with self._client.cursor() as cursor: cursor.execute(query) - def _add_new_columns(self, columns: dict[str, type]) -> None: + def _add_new_columns(self, table_name: str, columns: dict[str, type]) -> None: for col_name, py_type in columns.items(): postgres_col_type = _POSTGRES_TYPES_MAP.get(py_type) if postgres_col_type is None: @@ -200,7 +205,7 @@ def _add_new_columns(self, columns: dict[str, type]) -> None: ADD COLUMN IF NOT EXISTS {column} {col_type} """ ).format( - table=sql.Identifier(self.table_name), + table=sql.Identifier(table_name), column=sql.Identifier(col_name), col_type=sql.SQL(postgres_col_type), ) @@ -208,7 +213,7 @@ def _add_new_columns(self, columns: dict[str, type]) -> None: with self._client.cursor() as cursor: cursor.execute(query) - def _insert_rows(self, rows: list[dict]) -> None: + def _insert_rows(self, table_name: str, rows: list[dict]) -> None: if not rows: return @@ -218,9 +223,17 @@ def _insert_rows(self, rows: list[dict]) -> None: values = [[row.get(col, None) for col in columns] for row in rows] query = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s").format( - table=sql.Identifier(self.table_name), + table=sql.Identifier(table_name), columns=sql.SQL(", ").join(map(sql.Identifier, columns)), ) with self._client.cursor() as cursor: execute_values(cursor, query, values) + + +def _table_name_setter( + table_name: Union[Callable[[SinkItem], str], str], +) -> Callable[[SinkItem], str]: + if isinstance(table_name, str): + return lambda sink_item: table_name + return table_name From 3183c5e27888d0bcbba4f247e9e284922c544f98 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Wed, 7 May 2025 14:06:15 -0400 Subject: [PATCH 2/2] add schema option --- docs/connectors/sinks/postgresql-sink.md | 12 +++++++- quixstreams/sinks/community/postgresql.py | 34 +++++++++++++++++++---- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/docs/connectors/sinks/postgresql-sink.md b/docs/connectors/sinks/postgresql-sink.md index 9b9315fcd..247b1571b 100644 --- a/docs/connectors/sinks/postgresql-sink.md +++ b/docs/connectors/sinks/postgresql-sink.md @@ -75,12 +75,22 @@ PostgreSQLSink provides at-least-once guarantees, meaning that the same records PostgreSQLSink accepts the following configuration parameters: +## Required + - `host`: The address of the PostgreSQL server. - `port`: The port of the PostgreSQL server. - `dbname`: The name of the PostgreSQL database. - `user`: The database user name. - `password`: The database user password. -- `table_name`: The name of the PostgreSQL table where data will be written. +- `table_name`: PostgreSQL table name as either a string or a callable which receives + a `SinkItem` (from quixstreams.sinks.base.item) and returns a string. + + +### Optional + +- `schema_name`: The schema name. Schemas are a way of organizing tables and + not related to the table data, referenced as `.`. + PostrgeSQL uses "public" by default under the hood. - `schema_auto_update`: If True, the sink will automatically update the schema by adding new columns when new fields are detected. Default: True. diff --git a/quixstreams/sinks/community/postgresql.py b/quixstreams/sinks/community/postgresql.py index 0bf2b156f..c4156e8b8 100644 --- a/quixstreams/sinks/community/postgresql.py +++ b/quixstreams/sinks/community/postgresql.py @@ -60,6 +60,7 @@ def __init__( user: str, password: str, table_name: Union[Callable[[SinkItem], str], str], + schema_name: str = "public", schema_auto_update: bool = True, connection_timeout_seconds: int = 30, statement_timeout_seconds: int = 30, @@ -77,6 +78,9 @@ def __init__( :param password: Database user password. :param table_name: PostgreSQL table name as either a string or a callable which receives a SinkItem and returns a string. + :param schema_name: The schema name. Schemas are a way of organizing tables and + not related to the table data, referenced as `.`. + PostrgeSQL uses "public" by default under the hood. :param schema_auto_update: Automatically update the schema when new columns are detected. :param connection_timeout_seconds: Timeout for connection. :param statement_timeout_seconds: Timeout for DDL operations such as table @@ -95,6 +99,7 @@ def __init__( ) self._table_name = _table_name_setter(table_name) self._tables = set() + self._schema_name = schema_name self._schema_auto_update = schema_auto_update options = kwargs.pop("options", "") if "statement_timeout" not in options: @@ -113,6 +118,7 @@ def __init__( def setup(self): self._client = psycopg2.connect(**self._client_settings) + self._create_schema() def write(self, batch: SinkBatch): tables = {} @@ -138,14 +144,22 @@ def write(self, batch: SinkBatch): with self._client: for name, values in tables.items(): if self._schema_auto_update: - self._init_table(name) + self._create_table(name) self._add_new_columns(name, values["cols_types"]) self._insert_rows(name, values["rows"]) except psycopg2.Error as e: self._client.rollback() raise PostgreSQLSinkException(f"Failed to write batch: {str(e)}") from e table_counts = {table: len(values["rows"]) for table, values in tables.items()} - logger.info(f"Successfully wrote records to tables; row counts: {table_counts}") + schema_log = ( + " " + if self._schema_name == "public" + else f" for schema '{self._schema_name}' " + ) + logger.info( + f"Successfully wrote records{schema_log}to tables; " + f"table row counts: {table_counts}" + ) def add( self, @@ -172,7 +186,15 @@ def add( offset=offset, ) - def _init_table(self, table_name: str): + def _create_schema(self): + query = sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format( + sql.Identifier(self._schema_name) + ) + + with self._client.cursor() as cursor: + cursor.execute(query) + + def _create_table(self, table_name: str): if table_name in self._tables: return query = sql.SQL( @@ -183,7 +205,7 @@ def _init_table(self, table_name: str): ) """ ).format( - table=sql.Identifier(table_name), + table=sql.Identifier(self._schema_name, table_name), timestamp_col=sql.Identifier(_TIMESTAMP_COLUMN_NAME), key_col=sql.Identifier(_KEY_COLUMN_NAME), ) @@ -205,7 +227,7 @@ def _add_new_columns(self, table_name: str, columns: dict[str, type]) -> None: ADD COLUMN IF NOT EXISTS {column} {col_type} """ ).format( - table=sql.Identifier(table_name), + table=sql.Identifier(self._schema_name, table_name), column=sql.Identifier(col_name), col_type=sql.SQL(postgres_col_type), ) @@ -223,7 +245,7 @@ def _insert_rows(self, table_name: str, rows: list[dict]) -> None: values = [[row.get(col, None) for col in columns] for row in rows] query = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s").format( - table=sql.Identifier(table_name), + table=sql.Identifier(self._schema_name, table_name), columns=sql.SQL(", ").join(map(sql.Identifier, columns)), )