Skip to content

Commit 87bbe8e

Browse files
authored
Fix cloud tests and problems with snapshots not using a consistent now() (#509)
* skip replicated related tests when working on ClickHouse cloud * increase system.query_log waiting time when working with cloud * disable enable_parallel_replicas for cloud tests * wait for dictionary reload on cloud tests * override snapshot_staging_table due to timestamp issues * adjust refreshable mv test with cloud because of RunningOnAnotherReplica status * Fix UNKNOWN_USER error as the user was concurrently removed by other worker: now new users have random names. Fix Black lint issues. * TestBasicRefreshableMV.test_create was flaky: with only 2 minutes between runs that means it could get into "running" state, breaking the test.
1 parent 1fb1041 commit 87bbe8e

File tree

7 files changed

+325
-22
lines changed

7 files changed

+325
-22
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
### Release [1.9.3], 2025-09-08
22

33
#### Bugs
4-
* Fix `query_settings` not being correctly read when values are strings ([#240](https://github.com/ClickHouse/dbt-clickhouse/pull/497)).
4+
* Fix `query_settings` not being correctly read when values are strings ([#497](https://github.com/ClickHouse/dbt-clickhouse/pull/497)).
55
* Ensure that the default `replicated_deduplication_window` is only applied for `*MergeTree` engines ([#504](https://github.com/ClickHouse/dbt-clickhouse/pull/504)).
66
* Avoid full model recalculation if database is `shared` ([#498](https://github.com/ClickHouse/dbt-clickhouse/pull/498)).
7+
* Override snapshot macro when working with timestamp strategy to allways get a consistent now() value ([#509](https://github.com/ClickHouse/dbt-clickhouse/pull/509)).
78
* Use importlib instead of pkg_resources as it's now deprecated ([#471](https://github.com/ClickHouse/dbt-clickhouse/pull/471)).
89
* Several fixes made to improve test execution. Most relevant ones:
910
* Restore testing against different CH versions - all versions are now LTS ones, skip 25.8 until pending issue is fixed, fix tests on older versions ([c86a0889](https://github.com/ClickHouse/dbt-clickhouse/commit/c86a0889ad323ce0b02c7409275360e6f2202723)).

dbt/include/clickhouse/macros/materializations/snapshot.sql

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,246 @@
8686

8787
{% do return ('select 1') %}
8888
{% endmacro %}
89+
90+
91+
{% macro clickhouse__snapshot_staging_table(strategy, source_sql, target_relation) -%}
92+
{# Detect strategy type and delegate to specific macro #}
93+
{% if strategy.updated_at == 'now()' or 'now()' in strategy.updated_at %}
94+
{{ clickhouse__snapshot_staging_table_check_strategy(strategy, source_sql, target_relation) }}
95+
{% else %}
96+
{{ clickhouse__snapshot_staging_table_timestamp_strategy(strategy, source_sql, target_relation) }}
97+
{% endif %}
98+
{%- endmacro %}
99+
100+
{% macro clickhouse__snapshot_staging_table_check_strategy(strategy, source_sql, target_relation) -%}
101+
102+
with snapshot_time as (
103+
select {{ strategy.updated_at }} as ts -- Single timestamp
104+
),
105+
snapshot_query as (
106+
107+
{{ source_sql }}
108+
109+
),
110+
111+
snapshotted_data as (
112+
113+
select *,
114+
{{ strategy.unique_key }} as dbt_unique_key
115+
116+
from {{ target_relation }}
117+
where dbt_valid_to is null
118+
119+
),
120+
121+
insertions_source_data as (
122+
123+
select
124+
*,
125+
{{ strategy.unique_key }} as dbt_unique_key,
126+
snapshot_time.ts as dbt_updated_at,
127+
snapshot_time.ts as dbt_valid_from,
128+
nullif(snapshot_time.ts, snapshot_time.ts) as dbt_valid_to,
129+
{{ strategy.scd_id }} as dbt_scd_id
130+
131+
from snapshot_query, snapshot_time
132+
),
133+
134+
updates_source_data as (
135+
136+
select
137+
*,
138+
{{ strategy.unique_key }} as dbt_unique_key,
139+
snapshot_time.ts as dbt_updated_at,
140+
snapshot_time.ts as dbt_valid_from,
141+
snapshot_time.ts as dbt_valid_to
142+
143+
from snapshot_query, snapshot_time
144+
),
145+
146+
{%- if strategy.invalidate_hard_deletes %}
147+
148+
deletes_source_data as (
149+
150+
select
151+
*,
152+
{{ strategy.unique_key }} as dbt_unique_key
153+
from snapshot_query
154+
),
155+
{% endif %}
156+
157+
insertions as (
158+
159+
select
160+
'insert' as dbt_change_type,
161+
source_data.*
162+
163+
from insertions_source_data as source_data
164+
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
165+
where snapshotted_data.dbt_unique_key is null
166+
or (
167+
snapshotted_data.dbt_unique_key is not null
168+
and (
169+
{{ strategy.row_changed }}
170+
)
171+
)
172+
173+
),
174+
175+
updates as (
176+
177+
select
178+
'update' as dbt_change_type,
179+
source_data.*,
180+
snapshotted_data.dbt_scd_id
181+
182+
from updates_source_data as source_data
183+
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
184+
where (
185+
{{ strategy.row_changed }}
186+
)
187+
)
188+
189+
{%- if strategy.invalidate_hard_deletes -%}
190+
,
191+
192+
deletes as (
193+
194+
select
195+
'delete' as dbt_change_type,
196+
source_data.*,
197+
{{ snapshot_get_time() }} as dbt_valid_from,
198+
{{ snapshot_get_time() }} as dbt_updated_at,
199+
{{ snapshot_get_time() }} as dbt_valid_to,
200+
snapshotted_data.dbt_scd_id
201+
202+
from snapshotted_data
203+
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
204+
where source_data.dbt_unique_key is null
205+
)
206+
{%- endif %}
207+
208+
select * EXCEPT (ts) from insertions
209+
union all
210+
select * EXCEPT (ts) from updates
211+
{%- if strategy.invalidate_hard_deletes %}
212+
union all
213+
select * EXCEPT (ts) from deletes
214+
{%- endif %}
215+
216+
{%- endmacro %}
217+
218+
{% macro clickhouse__snapshot_staging_table_timestamp_strategy(strategy, source_sql, target_relation) -%}
219+
220+
with snapshot_query as (
221+
222+
{{ source_sql }}
223+
224+
),
225+
226+
snapshotted_data as (
227+
228+
select *,
229+
{{ strategy.unique_key }} as dbt_unique_key
230+
231+
from {{ target_relation }}
232+
where dbt_valid_to is null
233+
234+
),
235+
236+
insertions_source_data as (
237+
238+
select
239+
*,
240+
{{ strategy.unique_key }} as dbt_unique_key,
241+
{{ strategy.updated_at }} as dbt_updated_at,
242+
{{ strategy.updated_at }} as dbt_valid_from,
243+
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
244+
{{ strategy.scd_id }} as dbt_scd_id
245+
246+
from snapshot_query
247+
),
248+
249+
updates_source_data as (
250+
251+
select
252+
*,
253+
{{ strategy.unique_key }} as dbt_unique_key,
254+
{{ strategy.updated_at }} as dbt_updated_at,
255+
{{ strategy.updated_at }} as dbt_valid_from,
256+
{{ strategy.updated_at }} as dbt_valid_to
257+
258+
from snapshot_query
259+
),
260+
261+
{%- if strategy.invalidate_hard_deletes %}
262+
263+
deletes_source_data as (
264+
265+
select
266+
*,
267+
{{ strategy.unique_key }} as dbt_unique_key
268+
from snapshot_query
269+
),
270+
{% endif %}
271+
272+
insertions as (
273+
274+
select
275+
'insert' as dbt_change_type,
276+
source_data.*
277+
278+
from insertions_source_data as source_data
279+
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
280+
where snapshotted_data.dbt_unique_key is null
281+
or (
282+
snapshotted_data.dbt_unique_key is not null
283+
and (
284+
{{ strategy.row_changed }}
285+
)
286+
)
287+
288+
),
289+
290+
updates as (
291+
292+
select
293+
'update' as dbt_change_type,
294+
source_data.*,
295+
snapshotted_data.dbt_scd_id
296+
297+
from updates_source_data as source_data
298+
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
299+
where (
300+
{{ strategy.row_changed }}
301+
)
302+
)
303+
304+
{%- if strategy.invalidate_hard_deletes -%}
305+
,
306+
307+
deletes as (
308+
309+
select
310+
'delete' as dbt_change_type,
311+
source_data.*,
312+
{{ snapshot_get_time() }} as dbt_valid_from,
313+
{{ snapshot_get_time() }} as dbt_updated_at,
314+
{{ snapshot_get_time() }} as dbt_valid_to,
315+
snapshotted_data.dbt_scd_id
316+
317+
from snapshotted_data
318+
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
319+
where source_data.dbt_unique_key is null
320+
)
321+
{%- endif %}
322+
323+
select * from insertions
324+
union all
325+
select * from updates
326+
{%- if strategy.invalidate_hard_deletes %}
327+
union all
328+
select * from deletes
329+
{%- endif %}
330+
331+
{%- endmacro %}

tests/integration/adapter/dictionary/test_dictionary.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
import os
7+
import time
78

89
import pytest
910
from dbt.tests.util import run_dbt
@@ -175,6 +176,9 @@ def test_create_and_update(self, project):
175176
)
176177
# force the dictionary to be rebuilt to include the new records in `people`
177178
project.run_sql("system reload dictionary hackers")
179+
180+
if os.environ.get('DBT_CH_TEST_CLOUD', '').lower() in ('1', 'true', 'yes'):
181+
time.sleep(30)
178182
result = project.run_sql("select count(distinct id) from hackers", fetch="all")
179183
assert result[0][0] == 5
180184

tests/integration/adapter/materialized_view/test_refreshable_materialized_view.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55

66
import json
7+
import os
78

89
import pytest
910
from dbt.tests.util import check_relation_types, run_dbt
@@ -107,11 +108,28 @@ def test_create(self, project):
107108
},
108109
)
109110

110-
result = project.run_sql(
111-
f"select database, view, status from system.view_refreshes where database= '{project.test_schema}' and view='hackers_mv'",
112-
fetch="all",
113-
)
114-
assert result[0][2] == 'Scheduled'
111+
if os.environ.get('DBT_CH_TEST_CLOUD', '').lower() in ('1', 'true', 'yes'):
112+
result = project.run_sql(
113+
f"""
114+
SELECT
115+
hostName() as replica,
116+
status,
117+
last_refresh_time
118+
FROM clusterAllReplicas('default', 'system', 'view_refreshes')
119+
WHERE database = '{project.test_schema}'
120+
AND view = 'hackers_mv'
121+
""",
122+
fetch="all",
123+
)
124+
statuses = [row[1] for row in result]
125+
assert 'Scheduled' in statuses or 'Running' in statuses
126+
else:
127+
result = project.run_sql(
128+
f"select database, view, status from system.view_refreshes where database= '{project.test_schema}' and view='hackers_mv'",
129+
fetch="all",
130+
)
131+
mv_status = result[0][2]
132+
assert mv_status in ('Scheduled', 'Running')
115133

116134
def test_validate_dependency(self, project):
117135
"""

tests/integration/adapter/projections/test_projections.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
- name: people
6767
"""
6868

69+
SLEEP_TIME = 30 if os.environ.get('DBT_CH_TEST_CLOUD', '').lower() in ('1', 'true', 'yes') else 10
70+
6971

7072
class TestProjections:
7173
@pytest.fixture(scope="class")
@@ -100,14 +102,14 @@ def test_create_and_verify_projection(self, project):
100102
assert len(result) == 3 # We expect 3 departments in the result
101103
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
102104

103-
# waiting for system.log table to be created
104-
time.sleep(10)
105+
# waiting for system.log table to be created/populated
106+
time.sleep(SLEEP_TIME)
105107

106108
# check that the latest query used the projection
107109
result = project.run_sql(
108110
f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
109111
f"WHERE query like '%{unique_query_identifier}%' "
110-
f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
112+
f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
111113
fetch="all",
112114
)
113115
assert len(result) > 0
@@ -132,14 +134,14 @@ def test_create_and_verify_multiple_projections(self, project):
132134
assert len(result) == 3 # We expect 3 departments in the result
133135
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
134136

135-
# waiting for system.log table to be created
136-
time.sleep(10)
137+
# waiting for system.log table to be created/populated
138+
time.sleep(SLEEP_TIME)
137139

138140
# check that the latest query used the projection
139141
result = project.run_sql(
140142
f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
141143
f"WHERE query like '%{unique_query_identifier}%' "
142-
f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
144+
f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
143145
fetch="all",
144146
)
145147
assert len(result) > 0
@@ -158,14 +160,14 @@ def test_create_and_verify_multiple_projections(self, project):
158160
assert len(result) == 3 # We expect 3 departments in the result
159161
assert result == [('engineering', 131), ('malware', 40), ('sales', 25)]
160162

161-
# waiting for system.log table to be created
162-
time.sleep(10)
163+
# waiting for system.log table to be created/populated
164+
time.sleep(SLEEP_TIME)
163165

164166
# check that the latest query used the projection
165167
result = project.run_sql(
166168
f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
167169
f"WHERE query like '%{unique_query_identifier}%' "
168-
f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
170+
f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
169171
fetch="all",
170172
)
171173
assert len(result) > 0
@@ -191,8 +193,8 @@ def test_create_and_verify_distributed_projection(self, project):
191193
assert len(result) == 3 # We expect 3 departments in the result
192194
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
193195

194-
# waiting for system.log table to be created
195-
time.sleep(10)
196+
# waiting for system.log table to be created/populated
197+
time.sleep(SLEEP_TIME)
196198

197199
# check that the latest query used the projection
198200
result = project.run_sql(

0 commit comments

Comments
 (0)