11import os
2- import time
32import uuid
43
54import pytest
65from dbt .tests .util import relation_from_name , run_dbt
76
7+ from tests .integration .adapter .helpers import DEFAULT_RETRY_CONFIG , retry_until_assertion_passes
8+
89PEOPLE_SEED_CSV = """
910id,name,age,department
10111231,Dade,33,engineering
6667 - name: people
6768"""
6869
69- SLEEP_TIME = 30 if os .environ .get ('DBT_CH_TEST_CLOUD' , '' ).lower () in ('1' , 'true' , 'yes' ) else 10
70+ RETRY_CONFIG = (
71+ {'max_retries' : 30 , 'delay' : 1 }
72+ if os .environ .get ('DBT_CH_TEST_CLOUD' , '' ).lower () in ('1' , 'true' , 'yes' )
73+ else DEFAULT_RETRY_CONFIG
74+ )
7075
7176
7277class TestProjections :
@@ -87,6 +92,18 @@ def models(self):
8792 % "table" ,
8893 }
8994
95+ def _get_table_reference (self , table : str ) -> str :
96+ return (
97+ table
98+ if os .environ .get ('DBT_CH_TEST_CLUSTER' , '' ).strip () == ''
99+ else f"clusterAllReplicas({ os .environ .get ('DBT_CH_TEST_CLUSTER' )} , { table } )"
100+ )
101+
102+ def _flush_system_logs (self , project ) -> None :
103+ cluster = os .environ .get ('DBT_CH_TEST_CLUSTER' , '' ).strip ()
104+ cluster_clause = f'ON CLUSTER "{ cluster } "' if cluster else ''
105+ project .run_sql (f"SYSTEM FLUSH LOGS { cluster_clause } " , fetch = "all" )
106+
90107 def test_create_and_verify_projection (self , project ):
91108 run_dbt (["seed" ])
92109 run_dbt (["run" , "--select" , "people_with_projection" ])
@@ -102,20 +119,23 @@ def test_create_and_verify_projection(self, project):
102119 assert len (result ) == 3 # We expect 3 departments in the result
103120 assert result == [('engineering' , 43.666666666666664 ), ('malware' , 40.0 ), ('sales' , 25.0 )]
104121
105- # waiting for system.log table to be created/populated
106- time .sleep (SLEEP_TIME )
107-
108122 # check that the latest query used the projection
109- result = project .run_sql (
110- f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
111- f"WHERE query like '%{ unique_query_identifier } %' "
112- f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
113- fetch = "all" ,
123+ def check_that_the_latest_query_used_the_projection ():
124+ self ._flush_system_logs (project )
125+ result = project .run_sql (
126+ f"SELECT query, projections FROM { self ._get_table_reference ('system.query_log' )} "
127+ f"WHERE query like '%{ unique_query_identifier } %' "
128+ f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
129+ fetch = "all" ,
130+ )
131+ assert len (result ) > 0
132+ assert query in result [0 ][0 ]
133+
134+ assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_avg_age' ]
135+
136+ retry_until_assertion_passes (
137+ check_that_the_latest_query_used_the_projection , ** RETRY_CONFIG
114138 )
115- assert len (result ) > 0
116- assert query in result [0 ][0 ]
117-
118- assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_avg_age' ]
119139
120140 def test_create_and_verify_multiple_projections (self , project ):
121141 run_dbt (["seed" ])
@@ -134,20 +154,23 @@ def test_create_and_verify_multiple_projections(self, project):
134154 assert len (result ) == 3 # We expect 3 departments in the result
135155 assert result == [('engineering' , 43.666666666666664 ), ('malware' , 40.0 ), ('sales' , 25.0 )]
136156
137- # waiting for system.log table to be created/populated
138- time .sleep (SLEEP_TIME )
139-
140157 # check that the latest query used the projection
141- result = project .run_sql (
142- f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
143- f"WHERE query like '%{ unique_query_identifier } %' "
144- f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
145- fetch = "all" ,
158+ def check_that_the_latest_query_used_the_projection ():
159+ self ._flush_system_logs (project )
160+ result = project .run_sql (
161+ f"SELECT query, projections FROM { self ._get_table_reference ('system.query_log' )} "
162+ f"WHERE query like '%{ unique_query_identifier } %' "
163+ f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
164+ fetch = "all" ,
165+ )
166+ assert len (result ) > 0
167+ assert query in result [0 ][0 ]
168+
169+ assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_avg_age' ]
170+
171+ retry_until_assertion_passes (
172+ check_that_the_latest_query_used_the_projection , ** RETRY_CONFIG
146173 )
147- assert len (result ) > 0
148- assert query in result [0 ][0 ]
149-
150- assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_avg_age' ]
151174
152175 # test the second projection
153176 unique_query_identifier = str (uuid .uuid4 ())
@@ -160,20 +183,22 @@ def test_create_and_verify_multiple_projections(self, project):
160183 assert len (result ) == 3 # We expect 3 departments in the result
161184 assert result == [('engineering' , 131 ), ('malware' , 40 ), ('sales' , 25 )]
162185
163- # waiting for system.log table to be created/populated
164- time .sleep (SLEEP_TIME )
165-
166- # check that the latest query used the projection
167- result = project .run_sql (
168- f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
169- f"WHERE query like '%{ unique_query_identifier } %' "
170- f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
171- fetch = "all" ,
186+ def check_that_the_latest_query_used_the_projection ():
187+ self ._flush_system_logs (project )
188+ result = project .run_sql (
189+ f"SELECT query, projections FROM { self ._get_table_reference ('system.query_log' )} "
190+ f"WHERE query like '%{ unique_query_identifier } %' "
191+ f"and query not like '%clusterAllReplicas%' and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
192+ fetch = "all" ,
193+ )
194+ assert len (result ) > 0
195+ assert query in result [0 ][0 ]
196+
197+ assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_sum_age' ]
198+
199+ retry_until_assertion_passes (
200+ check_that_the_latest_query_used_the_projection , ** RETRY_CONFIG
172201 )
173- assert len (result ) > 0
174- assert query in result [0 ][0 ]
175-
176- assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } .projection_sum_age' ]
177202
178203 @pytest .mark .xfail
179204 @pytest .mark .skipif (
@@ -193,17 +218,21 @@ def test_create_and_verify_distributed_projection(self, project):
193218 assert len (result ) == 3 # We expect 3 departments in the result
194219 assert result == [('engineering' , 43.666666666666664 ), ('malware' , 40.0 ), ('sales' , 25.0 )]
195220
196- # waiting for system.log table to be created/populated
197- time .sleep (SLEEP_TIME )
198-
199- # check that the latest query used the projection
200- result = project .run_sql (
201- f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
202- f"WHERE query like '%{ unique_query_identifier } %' "
203- f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
204- fetch = "all" ,
221+ def check_that_the_latest_query_used_the_projection ():
222+ self ._flush_system_logs (project )
223+ result = project .run_sql (
224+ f"SELECT query, projections FROM { self ._get_table_reference ('system.query_log' )} "
225+ f"WHERE query like '%{ unique_query_identifier } %' "
226+ f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC" ,
227+ fetch = "all" ,
228+ )
229+ assert len (result ) > 0
230+ assert query in result [0 ][0 ]
231+
232+ assert result [0 ][1 ] == [
233+ f'{ project .test_schema } .{ relation .name } _local.projection_avg_age'
234+ ]
235+
236+ retry_until_assertion_passes (
237+ check_that_the_latest_query_used_the_projection , ** RETRY_CONFIG
205238 )
206- assert len (result ) > 0
207- assert query in result [0 ][0 ]
208-
209- assert result [0 ][1 ] == [f'{ project .test_schema } .{ relation .name } _local.projection_avg_age' ]
0 commit comments