@@ -365,31 +365,26 @@ impl IncrementalJoin {
365
365
366
366
for update in updates {
367
367
if update. table_id == self . lhs . table_id {
368
- lhs_ops. extend ( update. ops . iter ( ) . cloned ( ) ) ;
368
+ lhs_ops. extend ( update. ops . iter ( ) ) ;
369
369
} else if update. table_id == self . rhs . table_id {
370
- rhs_ops. extend ( update. ops . iter ( ) . cloned ( ) ) ;
370
+ rhs_ops. extend ( update. ops . iter ( ) ) ;
371
371
}
372
372
}
373
373
374
374
if lhs_ops. is_empty ( ) && rhs_ops. is_empty ( ) {
375
375
return None ;
376
376
}
377
377
378
- let lhs = JoinSide {
379
- table_id : self . lhs . table_id ,
380
- table_name : self . lhs . head . table_name . clone ( ) ,
381
- inserts : lhs_ops. iter ( ) . filter ( |op| op. op_type == 1 ) . cloned ( ) . collect ( ) ,
382
- deletes : lhs_ops. iter ( ) . filter ( |op| op. op_type == 0 ) . cloned ( ) . collect ( ) ,
383
- } ;
384
-
385
- let rhs = JoinSide {
386
- table_id : self . rhs . table_id ,
387
- table_name : self . rhs . head . table_name . clone ( ) ,
388
- inserts : rhs_ops. iter ( ) . filter ( |op| op. op_type == 1 ) . cloned ( ) . collect ( ) ,
389
- deletes : rhs_ops. iter ( ) . filter ( |op| op. op_type == 0 ) . cloned ( ) . collect ( ) ,
378
+ let join_side = |table : & DbTable , ops : Vec < & TableOp > | {
379
+ let ( deletes, inserts) = ops. into_iter ( ) . cloned ( ) . partition ( |op| op. op_type == 0 ) ;
380
+ JoinSide {
381
+ table_id : table. table_id ,
382
+ table_name : table. head . table_name . clone ( ) ,
383
+ deletes,
384
+ inserts,
385
+ }
390
386
} ;
391
-
392
- Some ( ( lhs, rhs) )
387
+ Some ( ( join_side ( & self . lhs , lhs_ops) , join_side ( & self . rhs , rhs_ops) ) )
393
388
}
394
389
395
390
/// Evaluate join plan for lhs updates.
0 commit comments