From 6c69cebef7bc9cb74416b9d2f2713ddf4b43c8b4 Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Tue, 22 Oct 2024 11:03:28 +0200 Subject: [PATCH 1/4] feat: init dap-main --- CHANGELOG.md | 5 - dbt/adapters/clickhouse/__version__.py | 2 +- dbt/adapters/clickhouse/connections.py | 2 + dbt/adapters/clickhouse/impl.py | 5 +- .../incremental/distributed_incremental.sql | 14 ++- .../incremental/incremental.sql | 56 ++++++++---- .../test_distributed_incremental.py | 91 +++++++++++++++---- tests/integration/conftest.py | 10 +- 8 files changed, 136 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe3d4536..1d9e77ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,3 @@ -### Release [1.8.4], 2024-09-17 -### Improvement -* The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to -[Mitchell Bregman](https://github.com/mitchbregs) for the contribution! - ### Release [1.8.3], 2024-09-01 ### Bug Fixes * An [issue](https://github.com/ClickHouse/dbt-clickhouse/issues/348) was detected when using multiple projections. We solved it and added a test to cover that use case. ([#349](https://github.com/ClickHouse/dbt-clickhouse/pull/349)) diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index be6c9703..cf085ece 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.8.4' +version = '1.8.3' diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index db8d9a13..51d1e8dd 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -96,6 +96,8 @@ def execute( query_result = client.command(sql) status = self.get_status(client) logger.debug(f'SQL status: {status} in {(time.time() - pre):.2f} seconds') + if hasattr(query_result, 'summary'): + logger.info(f'ClickHouse query summary: {query_result.summary}') if fetch: table = self.get_table_from_response( query_result.result_set, query_result.column_names diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index c40a01f6..2f655465 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -470,8 +470,11 @@ def get_model_settings(self, model): """ @available - def get_model_query_settings(self, model): + def get_model_query_settings(self, model, additional_settings: dict = None): settings = model['config'].get('query_settings', {}) + if additional_settings: + settings = {**settings, **additional_settings} + res = [] for key in settings: res.append(f' {key}={settings[key]}') diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index ef31a76c..8dfdd5c6 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -15,7 +15,7 @@ {% do exceptions.raise_compiler_error('To use distributed materializations cluster setting in dbt profile must be set') %} {% endif %} - {% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + {% set existing_relation_local = load_cached_relation(this.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema})) %} {% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if target_relation is not none else none %} {%- set unique_key = config.get('unique_key') -%} @@ -39,7 +39,7 @@ {%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - {%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp') -%} + {%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp_' + invocation_id.replace('-', '_')) -%} {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} @@ -55,7 +55,7 @@ {{ create_view_as(view_relation, sql) }} {% endcall %} - {% if existing_relation is none %} + {% if existing_relation_local is none %} -- No existing table, simply create a new one {{ create_distributed_local_table(target_relation, target_relation_local, view_relation, sql) }} @@ -74,6 +74,12 @@ {% endcall %} {% else %} + {% if existing_relation is none %} + {{ drop_relation_if_exists(existing_relation) }} + {% do run_query(create_distributed_table(target_relation, target_relation_local)) %} + {% set existing_relation = target_relation %} + {% endif %} + {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} @@ -124,6 +130,8 @@ {% do to_drop.append(distributed_backup_relation) %} {% endif %} + {{ drop_relation_if_exists(view_relation) }} + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do apply_grants(target_relation_local, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index c4dcb1b8..5c4392cf 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -203,7 +203,8 @@ {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} {{ drop_relation_if_exists(new_data_relation) }} - {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} + {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + + '__dbt_distributed_new_data_' + invocation_id.replace('-', '_')}) -%} {%- set inserting_relation = new_data_relation -%} @@ -220,23 +221,44 @@ {% endcall %} {% endif %} - {% call statement('delete_existing_data') %} - {% if is_distributed %} - {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) - {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) + {% set delete_filter %} + select distinct {{ unique_key }} from {{ inserting_relation }} + {% endset %} + {% set data_to_delete_count_query %} + select count(*) from {{ existing_relation }} where ({{ unique_key }}) global in ({{ delete_filter }}) + {% endset %} + {% set data_to_delete_count = run_query(data_to_delete_count_query).rows[0].values()[0] %} + {% if data_to_delete_count > 0 %} + {{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }} + {% set unique_key_query %} + -- https://github.com/ClickHouse/ClickHouse/issues/69559 + select count(distinct {{ unique_key }}) from {{ inserting_relation }} + {% endset %} + {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} + {% if unique_key_count == 1 %} + {% set query %} + select toString(tuple({{ unique_key }})) from {{ inserting_relation }} + {% endset %} + {% set delete_filter = run_query(query).rows[0].values()[0] %} + {{ log('Delete filter: ' ~ delete_filter) }} {% endif %} - {%- if incremental_predicates %} - {% for predicate in incremental_predicates %} - and {{ predicate }} - {% endfor %} - {%- endif -%} - {{ adapter.get_model_query_settings(model) }} - {% endcall %} - + {% call statement('delete_existing_data') %} + {% if is_distributed %} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% else %} + delete from {{ existing_relation }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% endif %} + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%} + {{ adapter.get_model_query_settings(model, {'allow_nondeterministic_mutations': 1}) }} + {% endcall %} + {% else %} + {{ log("No data to be deleted, skip lightweight delete.", info=True) }} + {% endif %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index f132933d..588a1023 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -8,7 +8,7 @@ seeds_base_csv, ) from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, run_dbt_and_capture from tests.integration.adapter.incremental.test_base_incremental import uniq_schema @@ -56,15 +56,6 @@ def test_simple_incremental(self, project): run_dbt(["run", "--select", "unique_source_one"]) run_dbt(["run", "--select", "unique_incremental_one"]) - -lw_delete_schema = """ -version: 2 - -models: - - name: "lw_delete_inc" - description: "Incremental table" -""" - lw_delete_inc = """ {{ config( materialized='distributed_incremental', @@ -81,23 +72,89 @@ def test_simple_incremental(self, project): {% endif %} """ +lw_delete_no_op = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +{% if is_incremental() %} + SELECT toUInt64(number) as key FROM numbers(50, 10) +{% else %} + SELECT toUInt64(number) as key FROM numbers(10) +{% endif %} +""" + +LW_DELETE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key +""" + +LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key=['key', 'date'], + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key, toDate('2024-10-21') as date +""" + +@pytest.mark.skipif(os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster') class TestLWDeleteDistributedIncremental: @pytest.fixture(scope="class") def models(self): - return {"lw_delete_inc.sql": lw_delete_inc} + return { + "lw_delete_inc.sql": lw_delete_inc, + 'lw_delete_no_op.sql': lw_delete_no_op, + 'lw_delete_unique_key_compilation.sql': LW_DELETE_UNIQUE_KEY_COMPILATION, + 'lw_delete_composite_unique_key_compilation.sql': LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION, + } - @pytest.mark.skipif( - os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' - ) - def test_lw_delete(self, project): - run_dbt() + @pytest.mark.parametrize("model", ["lw_delete_inc"]) + def test_lw_delete(self, project, model): + run_dbt(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") assert result[0] == 100 - run_dbt() + _, log = run_dbt_and_capture(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") + assert '20 rows to be deleted.' in log assert result[0] == 180 + @pytest.mark.parametrize("model", ["lw_delete_no_op"]) + def test_lw_delete_no_op(self, project, model): + run_dbt(["run", "--select", model]) + _, log = run_dbt_and_capture(["run", "--select", model]) + # assert that no delete query is issued against table lw_delete_no_op + assert 'rows to be deleted.' not in log + assert 'No data to be deleted, skip lightweight delete.' in log + + @pytest.mark.parametrize( + "model,delete_filter_log", + [ + ("lw_delete_unique_key_compilation", "Delete filter: (1)"), + ("lw_delete_composite_unique_key_compilation", "Delete filter: (1,'2024-10-21')"), + ], + ) + def test_lw_delete_unique_key(self, project, model, delete_filter_log): + """Assure that the delete_filter in `DELETE FROM WHERE IN ()` is templated + by a string of unique key value(s) when there is only one value (combination) for the unique key(s).""" + run_dbt(["run", "--select", model]) + _, log = run_dbt_and_capture(["run", "--select", model, "--log-level", "debug"]) + result = project.run_sql(f"select count(*) as num_rows from {model}", fetch="one") + assert delete_filter_log in log + assert result[0] == 1 + compound_key_schema = """ version: 2 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 50b1af6a..5fdfc914 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,16 +55,16 @@ def test_config(ch_test_users, ch_test_version): client_port = client_port or 10723 test_port = 10900 if test_driver == 'native' else client_port try: - run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) sys.stderr.write('Starting docker compose') os.environ['PROJECT_ROOT'] = '.' - up_result = run_cmd(['docker-compose', '-f', compose_file, 'up', '-d']) + up_result = run_cmd(['docker', 'compose', '-f', compose_file, 'up', '-d']) if up_result[0]: raise Exception(f'Failed to start docker: {up_result[2]}') url = f"http://{test_host}:{client_port}" wait_until_responsive(timeout=30.0, pause=0.5, check=lambda: is_responsive(url)) except Exception as e: - raise Exception('Failed to run docker-compose: {}', str(e)) + raise Exception('Failed to run docker compose: {}', str(e)) elif not client_port: if test_driver == 'native': client_port = 8443 if test_port == 9440 else 8123 @@ -102,9 +102,9 @@ def test_config(ch_test_users, ch_test_version): if docker: try: - run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) except Exception as e: - raise Exception('Failed to run docker-compose while cleaning up: {}', str(e)) + raise Exception('Failed to run docker compose while cleaning up: {}', str(e)) else: for test_user in ch_test_users: test_client.command('DROP USER %s', (test_user,)) From 06a02c7a085ba0d985afc47335042ed991d716de Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Tue, 29 Oct 2024 09:11:08 +0100 Subject: [PATCH 2/4] feat: sync --- dbt/include/clickhouse/macros/adapters.sql | 25 +++++++++++++++---- .../materializations/distributed_table.sql | 2 +- .../incremental/distributed_incremental.sql | 18 ++++++------- .../incremental/incremental.sql | 13 ++++------ .../macros/materializations/table.sql | 2 +- .../test_distributed_incremental.py | 6 ++++- 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index 6ae897d7..2c2c4a0f 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -79,12 +79,27 @@ {%- endcall %} {% endmacro %} -{% macro clickhouse__make_temp_relation(base_relation, suffix) %} - {% set tmp_identifier = base_relation.identifier ~ suffix %} - {% set tmp_relation = base_relation.incorporate( +{% macro clickhouse__make_intermediate_relation(base_relation, suffix='__dbt_tmp') %} + {%- set intermediate_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set intermediate_relation = base_relation.incorporate(path={"identifier": intermediate_identifier}) -%} + {{ return(intermediate_relation) }} +{%- endmacro %} + +{% macro clickhouse__make_backup_relation(base_relation, backup_relation_type, suffix='__dbt_backup') %} + {%- set backup_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set backup_relation = base_relation.incorporate( + path={"identifier": backup_identifier}, + type=backup_relation_type + ) -%} + {{ return(backup_relation) }} +{%- endmacro %} + +{% macro clickhouse__make_temp_relation(base_relation, suffix='__dbt_tmp') %} + {%- set tmp_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set tmp_relation = base_relation.incorporate( path={"identifier": tmp_identifier, "schema": None}) -%} - {% do return(tmp_relation) %} -{% endmacro %} + {{ return(tmp_relation) }} +{%- endmacro %} {% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%} diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index e84e8396..a9ee17fb 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -31,7 +31,7 @@ {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} {% endif %} {% endif %} - {% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %} + {%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%} -- drop the temp relations if they exist already in the database {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index 8dfdd5c6..7948cc44 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -34,17 +34,12 @@ {{ create_schema(target_relation_local) }} {%- set intermediate_relation = make_intermediate_relation(target_relation_local)-%} {%- set distributed_intermediate_relation = make_intermediate_relation(target_relation)-%} - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%} - {%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - {%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp_' + invocation_id.replace('-', '_')) -%} - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ drop_relation_if_exists(view_relation) }} {{ drop_relation_if_exists(distributed_intermediate_relation) }} + + {%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%} + {{ drop_relation_if_exists(view_relation) }} {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} @@ -105,6 +100,11 @@ {% endif %} {% if need_swap %} + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%} + {%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + {{ drop_relation_if_exists(preexisting_backup_relation) }} {% if False %} {% do adapter.rename_relation(intermediate_relation, backup_relation) %} {% do exchange_tables_atomic(backup_relation, target_relation_local) %} @@ -114,7 +114,7 @@ {% endif %} -- Structure could have changed, need to update distributed table from replaced local table - {% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %} + {%- set target_relation_new = make_intermediate_relation(target_relation) -%} {{ drop_relation_if_exists(target_relation_new) }} {% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 5c4392cf..3427b669 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -200,11 +200,9 @@ {% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier - + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} + {%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%} {{ drop_relation_if_exists(new_data_relation) }} - {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier - + '__dbt_distributed_new_data_' + invocation_id.replace('-', '_')}) -%} + {%- set distributed_new_data_relation = make_intermediate_relation(existing_relation, '__dbt_distributed_new_data') -%} {%- set inserting_relation = new_data_relation -%} @@ -231,13 +229,13 @@ {% if data_to_delete_count > 0 %} {{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }} {% set unique_key_query %} - -- https://github.com/ClickHouse/ClickHouse/issues/69559 + {# https://github.com/ClickHouse/ClickHouse/issues/69559 #} select count(distinct {{ unique_key }}) from {{ inserting_relation }} {% endset %} {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} {% if unique_key_count == 1 %} {% set query %} - select toString(tuple({{ unique_key }})) from {{ inserting_relation }} + select any(toString(tuple({{ unique_key }}))) from {{ inserting_relation }} {% endset %} {% set delete_filter = run_query(query).rows[0].values()[0] %} {{ log('Delete filter: ' ~ delete_filter) }} @@ -269,8 +267,7 @@ {% endmacro %} {% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] - + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} + {%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%} {{ drop_relation_if_exists(new_data_relation) }} {% call statement('create_new_data_temp') -%} {{ get_create_table_as_sql(False, new_data_relation, sql) }} diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index 1ef0e8b5..7aad977c 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -208,7 +208,7 @@ {%- endmacro %} {% macro clickhouse__insert_into(target_relation, sql, has_contract) %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_columns = adapter.get_column_schema_from_query(sql) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} insert into {{ target_relation }} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index 588a1023..2a4e73f2 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -96,6 +96,8 @@ def test_simple_incremental(self, project): ) }} SELECT 1 as key +UNION ALL +SELECT 1 as key """ LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ @@ -107,6 +109,8 @@ def test_simple_incremental(self, project): ) }} SELECT 1 as key, toDate('2024-10-21') as date +UNION ALL +SELECT 1 as key, toDate('2024-10-21') as date """ @@ -153,7 +157,7 @@ def test_lw_delete_unique_key(self, project, model, delete_filter_log): _, log = run_dbt_and_capture(["run", "--select", model, "--log-level", "debug"]) result = project.run_sql(f"select count(*) as num_rows from {model}", fetch="one") assert delete_filter_log in log - assert result[0] == 1 + assert result[0] == 2 compound_key_schema = """ From f1fa5a515347e06b3189fd097926e2d3d8bee0ea Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Tue, 29 Oct 2024 12:13:25 +0100 Subject: [PATCH 3/4] feat: add uuid to temp table names --- CHANGELOG.md | 5 + dbt/adapters/clickhouse/__version__.py | 2 +- dbt/adapters/clickhouse/connections.py | 2 - dbt/adapters/clickhouse/impl.py | 5 +- .../incremental/distributed_incremental.sql | 4 +- .../incremental/incremental.sql | 33 ++----- .../test_distributed_incremental.py | 95 ++++--------------- tests/integration/conftest.py | 10 +- 8 files changed, 37 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d9e77ef..fe3d4536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +### Release [1.8.4], 2024-09-17 +### Improvement +* The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to +[Mitchell Bregman](https://github.com/mitchbregs) for the contribution! + ### Release [1.8.3], 2024-09-01 ### Bug Fixes * An [issue](https://github.com/ClickHouse/dbt-clickhouse/issues/348) was detected when using multiple projections. We solved it and added a test to cover that use case. ([#349](https://github.com/ClickHouse/dbt-clickhouse/pull/349)) diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index cf085ece..be6c9703 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.8.3' +version = '1.8.4' diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index 51d1e8dd..db8d9a13 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -96,8 +96,6 @@ def execute( query_result = client.command(sql) status = self.get_status(client) logger.debug(f'SQL status: {status} in {(time.time() - pre):.2f} seconds') - if hasattr(query_result, 'summary'): - logger.info(f'ClickHouse query summary: {query_result.summary}') if fetch: table = self.get_table_from_response( query_result.result_set, query_result.column_names diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 2f655465..c40a01f6 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -470,11 +470,8 @@ def get_model_settings(self, model): """ @available - def get_model_query_settings(self, model, additional_settings: dict = None): + def get_model_query_settings(self, model): settings = model['config'].get('query_settings', {}) - if additional_settings: - settings = {**settings, **additional_settings} - res = [] for key in settings: res.append(f' {key}={settings[key]}') diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index 7948cc44..fb973e68 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -15,7 +15,7 @@ {% do exceptions.raise_compiler_error('To use distributed materializations cluster setting in dbt profile must be set') %} {% endif %} - {% set existing_relation_local = load_cached_relation(this.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema})) %} + {% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} {% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if target_relation is not none else none %} {%- set unique_key = config.get('unique_key') -%} @@ -50,7 +50,7 @@ {{ create_view_as(view_relation, sql) }} {% endcall %} - {% if existing_relation_local is none %} + {% if existing_relation is none %} -- No existing table, simply create a new one {{ create_distributed_local_table(target_relation, target_relation_local, view_relation, sql) }} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 3427b669..b5d96366 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -219,44 +219,23 @@ {% endcall %} {% endif %} - {% set delete_filter %} - select distinct {{ unique_key }} from {{ inserting_relation }} - {% endset %} - {% set data_to_delete_count_query %} - select count(*) from {{ existing_relation }} where ({{ unique_key }}) global in ({{ delete_filter }}) - {% endset %} - {% set data_to_delete_count = run_query(data_to_delete_count_query).rows[0].values()[0] %} - {% if data_to_delete_count > 0 %} - {{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }} - {% set unique_key_query %} - {# https://github.com/ClickHouse/ClickHouse/issues/69559 #} - select count(distinct {{ unique_key }}) from {{ inserting_relation }} - {% endset %} - {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} - {% if unique_key_count == 1 %} - {% set query %} - select any(toString(tuple({{ unique_key }}))) from {{ inserting_relation }} - {% endset %} - {% set delete_filter = run_query(query).rows[0].values()[0] %} - {{ log('Delete filter: ' ~ delete_filter) }} - {% endif %} {% call statement('delete_existing_data') %} {% if is_distributed %} {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in ({{ delete_filter }}) + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in ({{ delete_filter }}) + delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) {% endif %} {%- if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {%- endif -%} - {{ adapter.get_model_query_settings(model, {'allow_nondeterministic_mutations': 1}) }} + {{ adapter.get_model_query_settings(model) }} {% endcall %} - {% else %} - {{ log("No data to be deleted, skip lightweight delete.", info=True) }} - {% endif %} + {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index 2a4e73f2..f132933d 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -8,7 +8,7 @@ seeds_base_csv, ) from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange -from dbt.tests.util import run_dbt, run_dbt_and_capture +from dbt.tests.util import run_dbt from tests.integration.adapter.incremental.test_base_incremental import uniq_schema @@ -56,6 +56,15 @@ def test_simple_incremental(self, project): run_dbt(["run", "--select", "unique_source_one"]) run_dbt(["run", "--select", "unique_incremental_one"]) + +lw_delete_schema = """ +version: 2 + +models: + - name: "lw_delete_inc" + description: "Incremental table" +""" + lw_delete_inc = """ {{ config( materialized='distributed_incremental', @@ -72,93 +81,23 @@ def test_simple_incremental(self, project): {% endif %} """ -lw_delete_no_op = """ -{{ config( - materialized='distributed_incremental', - order_by=['key'], - unique_key='key', - incremental_strategy='delete+insert' - ) -}} -{% if is_incremental() %} - SELECT toUInt64(number) as key FROM numbers(50, 10) -{% else %} - SELECT toUInt64(number) as key FROM numbers(10) -{% endif %} -""" - -LW_DELETE_UNIQUE_KEY_COMPILATION = """ -{{ config( - materialized='distributed_incremental', - order_by=['key'], - unique_key='key', - incremental_strategy='delete+insert' - ) -}} -SELECT 1 as key -UNION ALL -SELECT 1 as key -""" - -LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ -{{ config( - materialized='distributed_incremental', - order_by=['key'], - unique_key=['key', 'date'], - incremental_strategy='delete+insert' - ) -}} -SELECT 1 as key, toDate('2024-10-21') as date -UNION ALL -SELECT 1 as key, toDate('2024-10-21') as date -""" - -@pytest.mark.skipif(os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster') class TestLWDeleteDistributedIncremental: @pytest.fixture(scope="class") def models(self): - return { - "lw_delete_inc.sql": lw_delete_inc, - 'lw_delete_no_op.sql': lw_delete_no_op, - 'lw_delete_unique_key_compilation.sql': LW_DELETE_UNIQUE_KEY_COMPILATION, - 'lw_delete_composite_unique_key_compilation.sql': LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION, - } + return {"lw_delete_inc.sql": lw_delete_inc} - @pytest.mark.parametrize("model", ["lw_delete_inc"]) - def test_lw_delete(self, project, model): - run_dbt(["run", "--select", model]) + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_lw_delete(self, project): + run_dbt() result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") assert result[0] == 100 - _, log = run_dbt_and_capture(["run", "--select", model]) + run_dbt() result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") - assert '20 rows to be deleted.' in log assert result[0] == 180 - @pytest.mark.parametrize("model", ["lw_delete_no_op"]) - def test_lw_delete_no_op(self, project, model): - run_dbt(["run", "--select", model]) - _, log = run_dbt_and_capture(["run", "--select", model]) - # assert that no delete query is issued against table lw_delete_no_op - assert 'rows to be deleted.' not in log - assert 'No data to be deleted, skip lightweight delete.' in log - - @pytest.mark.parametrize( - "model,delete_filter_log", - [ - ("lw_delete_unique_key_compilation", "Delete filter: (1)"), - ("lw_delete_composite_unique_key_compilation", "Delete filter: (1,'2024-10-21')"), - ], - ) - def test_lw_delete_unique_key(self, project, model, delete_filter_log): - """Assure that the delete_filter in `DELETE FROM
WHERE IN ()` is templated - by a string of unique key value(s) when there is only one value (combination) for the unique key(s).""" - run_dbt(["run", "--select", model]) - _, log = run_dbt_and_capture(["run", "--select", model, "--log-level", "debug"]) - result = project.run_sql(f"select count(*) as num_rows from {model}", fetch="one") - assert delete_filter_log in log - assert result[0] == 2 - compound_key_schema = """ version: 2 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 5fdfc914..50b1af6a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,16 +55,16 @@ def test_config(ch_test_users, ch_test_version): client_port = client_port or 10723 test_port = 10900 if test_driver == 'native' else client_port try: - run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) sys.stderr.write('Starting docker compose') os.environ['PROJECT_ROOT'] = '.' - up_result = run_cmd(['docker', 'compose', '-f', compose_file, 'up', '-d']) + up_result = run_cmd(['docker-compose', '-f', compose_file, 'up', '-d']) if up_result[0]: raise Exception(f'Failed to start docker: {up_result[2]}') url = f"http://{test_host}:{client_port}" wait_until_responsive(timeout=30.0, pause=0.5, check=lambda: is_responsive(url)) except Exception as e: - raise Exception('Failed to run docker compose: {}', str(e)) + raise Exception('Failed to run docker-compose: {}', str(e)) elif not client_port: if test_driver == 'native': client_port = 8443 if test_port == 9440 else 8123 @@ -102,9 +102,9 @@ def test_config(ch_test_users, ch_test_version): if docker: try: - run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) except Exception as e: - raise Exception('Failed to run docker compose while cleaning up: {}', str(e)) + raise Exception('Failed to run docker-compose while cleaning up: {}', str(e)) else: for test_user in ch_test_users: test_client.command('DROP USER %s', (test_user,)) From 39d5e3483973862eff9f1a75132d3bf1a2092be2 Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Tue, 29 Oct 2024 12:15:47 +0100 Subject: [PATCH 4/4] fix: indentation --- .../incremental/incremental.sql | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index b5d96366..80530e4e 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -219,22 +219,22 @@ {% endcall %} {% endif %} - {% call statement('delete_existing_data') %} - {% if is_distributed %} - {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) - {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) - {% endif %} - {%- if incremental_predicates %} - {% for predicate in incremental_predicates %} - and {{ predicate }} - {% endfor %} - {%- endif -%} - {{ adapter.get_model_query_settings(model) }} - {% endcall %} + {% call statement('delete_existing_data') %} + {% if is_distributed %} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) + {% else %} + delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) + {% endif %} + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%} + {{ adapter.get_model_query_settings(model) }} + {% endcall %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}