@@ -24,6 +24,7 @@ use log::info;
2424use tokio:: fs:: File ;
2525use tokio:: io:: BufReader ;
2626use tokio_stream:: Stream ;
27+ use tokio_stream:: StreamExt ;
2728
2829use databend_client:: APIClient ;
2930use databend_client:: Pages ;
@@ -64,11 +65,8 @@ impl IConnection for RestAPIConnection {
6465
6566 async fn exec ( & self , sql : & str ) -> Result < i64 > {
6667 info ! ( "exec: {}" , sql) ;
67- let page = self . client . query_all ( sql) . await ?;
68-
69- let affected_rows = page. affected_rows ( ) . map_err ( Error :: InvalidResponse ) ?;
70-
71- Ok ( affected_rows)
68+ // Use the new affected_rows method that internally uses query_iter
69+ self . calculate_affected_rows_from_iter ( sql) . await
7270 }
7371
7472 async fn kill_query ( & self , query_id : & str ) -> Result < ( ) > {
@@ -200,6 +198,60 @@ impl<'o> RestAPIConnection {
200198 fn default_copy_options ( ) -> BTreeMap < & ' o str , & ' o str > {
201199 vec ! [ ( "purge" , "true" ) ] . into_iter ( ) . collect ( )
202200 }
201+
202+ fn parse_row_count_string ( value_str : & str ) -> Result < i64 , String > {
203+ let trimmed = value_str. trim ( ) ;
204+
205+ if trimmed. is_empty ( ) {
206+ return Ok ( 0 ) ;
207+ }
208+
209+ if let Ok ( count) = trimmed. parse :: < i64 > ( ) {
210+ return Ok ( count) ;
211+ }
212+
213+ if let Ok ( count) = serde_json:: from_str :: < i64 > ( trimmed) {
214+ return Ok ( count) ;
215+ }
216+
217+ let unquoted = trimmed. trim_matches ( '"' ) ;
218+ if let Ok ( count) = unquoted. parse :: < i64 > ( ) {
219+ return Ok ( count) ;
220+ }
221+
222+ Err ( format ! (
223+ "failed to parse affected rows from: '{}'" ,
224+ value_str
225+ ) )
226+ }
227+
228+ async fn calculate_affected_rows_from_iter ( & self , sql : & str ) -> Result < i64 > {
229+ let mut rows = IConnection :: query_iter ( self , sql) . await ?;
230+ let mut count = 0i64 ;
231+
232+ // Get the first row to check if it has affected rows info
233+ if let Some ( first_row) = rows. next ( ) . await {
234+ let row = first_row?;
235+ let schema = row. schema ( ) ;
236+
237+ // Check if this is an affected rows response
238+ if !schema. fields ( ) . is_empty ( ) && schema. fields ( ) [ 0 ] . name . contains ( "number of rows" ) {
239+ let values = row. values ( ) ;
240+ if !values. is_empty ( ) {
241+ let value = & values[ 0 ] ;
242+ let s: String = value. clone ( ) . try_into ( ) . map_err ( |e| {
243+ Error :: InvalidResponse ( format ! ( "Failed to convert value to string: {}" , e) )
244+ } ) ?;
245+ count = Self :: parse_row_count_string ( & s) . map_err ( Error :: InvalidResponse ) ?;
246+ }
247+ } else {
248+ // If it's not affected rows info, count normally
249+ count = -1 ;
250+ }
251+ }
252+
253+ Ok ( count)
254+ }
203255}
204256
205257pub struct RestAPIRows < T > {
0 commit comments