Skip to content

PostgreSQLSink: allow dynamic table name selection based on record #867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion docs/connectors/sinks/postgresql-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,50 @@ PostgreSQLSink provides at-least-once guarantees, meaning that the same records

PostgreSQLSink accepts the following configuration parameters:

## Required

- `host`: The address of the PostgreSQL server.
- `port`: The port of the PostgreSQL server.
- `dbname`: The name of the PostgreSQL database.
- `user`: The database user name.
- `password`: The database user password.
- `table_name`: The name of the PostgreSQL table where data will be written.
- `table_name`: PostgreSQL table name as either a string or a callable which receives
a `SinkItem` (from quixstreams.sinks.base.item) and returns a string.


### Optional

- `schema_name`: The schema name. Schemas are a way of organizing tables and
not related to the table data, referenced as `<schema_name>.<table_name>`.
PostrgeSQL uses "public" by default under the hood.
- `schema_auto_update`: If True, the sink will automatically update the schema by adding new columns when new fields are detected. Default: True.


## Testing Locally

Rather than connect to a hosted InfluxDB3 instance, you can alternatively test your
application using a local instance of Influxdb3 using Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name postgres \
-e POSTGRES_PASSWORD=local \
-e POSTGRES_USER=local \
-e POSTGRES_DB=local \
-p 5432:5432 \
postgres
```

2. Use the following settings for `PostgreSQLSink` to connect:

```python
PostgreSQLSink(
host="localhost",
port=5432,
user="local",
password="local",
dbname="local",
table_name="<YOUR TABLE NAME>",
)
```
87 changes: 61 additions & 26 deletions quixstreams/sinks/community/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from decimal import Decimal
from typing import Any, Mapping, Optional
from typing import Any, Callable, Mapping, Optional, Union

try:
import psycopg2
Expand All @@ -21,6 +21,7 @@
ClientConnectSuccessCallback,
SinkBatch,
)
from quixstreams.sinks.base.item import SinkItem

__all__ = ("PostgreSQLSink", "PostgreSQLSinkException")

Expand Down Expand Up @@ -58,7 +59,8 @@ def __init__(
dbname: str,
user: str,
password: str,
table_name: str,
table_name: Union[Callable[[SinkItem], str], str],
schema_name: str = "public",
schema_auto_update: bool = True,
connection_timeout_seconds: int = 30,
statement_timeout_seconds: int = 30,
Expand All @@ -72,9 +74,13 @@ def __init__(
:param host: PostgreSQL server address.
:param port: PostgreSQL server port.
:param dbname: PostgreSQL database name.
:param user: Database user name.
:param user: Database username.
:param password: Database user password.
:param table_name: PostgreSQL table name.
:param table_name: PostgreSQL table name as either a string or a callable which
receives a SinkItem and returns a string.
:param schema_name: The schema name. Schemas are a way of organizing tables and
not related to the table data, referenced as `<schema_name>.<table_name>`.
PostrgeSQL uses "public" by default under the hood.
:param schema_auto_update: Automatically update the schema when new columns are detected.
:param connection_timeout_seconds: Timeout for connection.
:param statement_timeout_seconds: Timeout for DDL operations such as table
Expand All @@ -91,9 +97,10 @@ def __init__(
on_client_connect_success=on_client_connect_success,
on_client_connect_failure=on_client_connect_failure,
)

self.table_name = table_name
self.schema_auto_update = schema_auto_update
self._table_name = _table_name_setter(table_name)
self._tables = set()
self._schema_name = schema_name
self._schema_auto_update = schema_auto_update
options = kwargs.pop("options", "")
if "statement_timeout" not in options:
options = f"{options} -c statement_timeout={statement_timeout_seconds}s"
Expand All @@ -111,38 +118,48 @@ def __init__(

def setup(self):
self._client = psycopg2.connect(**self._client_settings)

# Initialize table if schema_auto_update is enabled
if self.schema_auto_update:
self._init_table()
self._create_schema()

def write(self, batch: SinkBatch):
rows = []
cols_types = {}

tables = {}
for item in batch:
table = tables.setdefault(
self._table_name(item), {"rows": [], "cols_types": {}}
)
row = {}
if item.key is not None:
key_type = type(item.key)
cols_types.setdefault(_KEY_COLUMN_NAME, key_type)
table["cols_types"].setdefault(_KEY_COLUMN_NAME, key_type)
row[_KEY_COLUMN_NAME] = item.key

for key, value in item.value.items():
if value is not None:
cols_types.setdefault(key, type(value))
table["cols_types"].setdefault(key, type(value))
row[key] = value

row[_TIMESTAMP_COLUMN_NAME] = datetime.fromtimestamp(item.timestamp / 1000)
rows.append(row)
table["rows"].append(row)

try:
with self._client:
if self.schema_auto_update:
self._add_new_columns(cols_types)
self._insert_rows(rows)
for name, values in tables.items():
if self._schema_auto_update:
self._create_table(name)
self._add_new_columns(name, values["cols_types"])
self._insert_rows(name, values["rows"])
except psycopg2.Error as e:
self._client.rollback()
raise PostgreSQLSinkException(f"Failed to write batch: {str(e)}") from e
table_counts = {table: len(values["rows"]) for table, values in tables.items()}
schema_log = (
" "
if self._schema_name == "public"
else f" for schema '{self._schema_name}' "
)
logger.info(
f"Successfully wrote records{schema_log}to tables; "
f"table row counts: {table_counts}"
)

def add(
self,
Expand All @@ -169,7 +186,17 @@ def add(
offset=offset,
)

def _init_table(self):
def _create_schema(self):
query = sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(
sql.Identifier(self._schema_name)
)

with self._client.cursor() as cursor:
cursor.execute(query)

def _create_table(self, table_name: str):
if table_name in self._tables:
return
query = sql.SQL(
"""
CREATE TABLE IF NOT EXISTS {table} (
Expand All @@ -178,15 +205,15 @@ def _init_table(self):
)
"""
).format(
table=sql.Identifier(self.table_name),
table=sql.Identifier(self._schema_name, table_name),
timestamp_col=sql.Identifier(_TIMESTAMP_COLUMN_NAME),
key_col=sql.Identifier(_KEY_COLUMN_NAME),
)

with self._client.cursor() as cursor:
cursor.execute(query)

def _add_new_columns(self, columns: dict[str, type]) -> None:
def _add_new_columns(self, table_name: str, columns: dict[str, type]) -> None:
for col_name, py_type in columns.items():
postgres_col_type = _POSTGRES_TYPES_MAP.get(py_type)
if postgres_col_type is None:
Expand All @@ -200,15 +227,15 @@ def _add_new_columns(self, columns: dict[str, type]) -> None:
ADD COLUMN IF NOT EXISTS {column} {col_type}
"""
).format(
table=sql.Identifier(self.table_name),
table=sql.Identifier(self._schema_name, table_name),
column=sql.Identifier(col_name),
col_type=sql.SQL(postgres_col_type),
)

with self._client.cursor() as cursor:
cursor.execute(query)

def _insert_rows(self, rows: list[dict]) -> None:
def _insert_rows(self, table_name: str, rows: list[dict]) -> None:
if not rows:
return

Expand All @@ -218,9 +245,17 @@ def _insert_rows(self, rows: list[dict]) -> None:
values = [[row.get(col, None) for col in columns] for row in rows]

query = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s").format(
table=sql.Identifier(self.table_name),
table=sql.Identifier(self._schema_name, table_name),
columns=sql.SQL(", ").join(map(sql.Identifier, columns)),
)

with self._client.cursor() as cursor:
execute_values(cursor, query, values)


def _table_name_setter(
table_name: Union[Callable[[SinkItem], str], str],
) -> Callable[[SinkItem], str]:
if isinstance(table_name, str):
return lambda sink_item: table_name
return table_name