@@ -128,51 +128,109 @@ public void process(final Record<KLeft, Change<VLeft>> record) {
128128        }
129129
130130        private  void  leftJoinInstructions (final  Record <KLeft , Change <VLeft >> record ) {
131-             if  (record .value ().oldValue  != null ) {
132-                 final  KRight  oldForeignKey  = foreignKeyExtractor .extract (record .key (), record .value ().oldValue );
133-                 final  KRight  newForeignKey  = record .value ().newValue  == null  ? null  : foreignKeyExtractor .extract (record .key (), record .value ().newValue );
134-                 if  (oldForeignKey  != null  && !Arrays .equals (serialize (newForeignKey ), serialize (oldForeignKey ))) {
131+             final  VLeft  oldValue  = record .value ().oldValue ;
132+             final  VLeft  newValue  = record .value ().newValue ;
133+ 
134+             if  (oldValue  == null  && newValue  == null ) {
135+                 // no output for idempotent left hand side deletes 
136+                 return ;
137+             }
138+ 
139+             final  KRight  oldForeignKey  = oldValue  == null  ? null  : foreignKeyExtractor .extract (record .key (), oldValue );
140+             final  KRight  newForeignKey  = newValue  == null  ? null  : foreignKeyExtractor .extract (record .key (), newValue );
141+ 
142+             final  boolean  maybeUnsubscribe  = oldForeignKey  != null ;
143+             if  (maybeUnsubscribe ) {
144+                 // delete old subscription only if FK changed 
145+                 // 
146+                 // if FK did change, we need to explicitly delete the old subscription, 
147+                 // because the new subscription goes to a different partition 
148+                 if  (foreignKeyChanged (newForeignKey , oldForeignKey )) {
149+                     // this may lead to unnecessary tombstones if the old FK did not join; 
150+                     // however, we cannot avoid it as we have no means to know if the old FK joined or not 
135151                    forward (record , oldForeignKey , DELETE_KEY_NO_PROPAGATE );
136152                }
137-                 forward (record , newForeignKey , PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE );
138-             } else  if  (record .value ().newValue  != null ) {
139-                 final  KRight  newForeignKey  = foreignKeyExtractor .extract (record .key (), record .value ().newValue );
140-                 forward (record , newForeignKey , PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE );
141153            }
154+ 
155+             // for all cases (insert, update, and delete), we send a new subscription; 
156+             // we need to get a response back for all cases to always produce a left-join result 
157+             // 
158+             // note: for delete, `newForeignKey` is null, what is a "hack" 
159+             // no actual subscription will be added for null-FK on the right hand side, but we still get the response back we need 
160+             // 
161+             // this may lead to unnecessary tombstones if the old FK did not join; 
162+             // however, we cannot avoid it as we have no means to know if the old FK joined or not 
163+             forward (record , newForeignKey , PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE );
142164        }
143165
144166        private  void  defaultJoinInstructions (final  Record <KLeft , Change <VLeft >> record ) {
145-             if  (record .value ().oldValue  != null ) {
146-                 final  KRight  oldForeignKey  = foreignKeyExtractor .extract (record .key (), record .value ().oldValue );
147-                 final  KRight  newForeignKey  = record .value ().newValue  == null  ? null  : foreignKeyExtractor .extract (record .key (), record .value ().newValue );
167+             final  VLeft  oldValue  = record .value ().oldValue ;
168+             final  VLeft  newValue  = record .value ().newValue ;
169+ 
170+             final  KRight  oldForeignKey  = oldValue  == null  ? null  : foreignKeyExtractor .extract (record .key (), oldValue );
171+             final  boolean  needToUnsubscribe  = oldForeignKey  != null ;
172+ 
173+             // if left row is inserted or updated, subscribe to new FK (if new FK is valid) 
174+             if  (newValue  != null ) {
175+                 final  KRight  newForeignKey  = foreignKeyExtractor .extract (record .key (), newValue );
148176
149-                 if  (oldForeignKey  ==  null  &&  newForeignKey  == null ) {
177+                 if  (newForeignKey  == null ) {  // invalid FK 
150178                    logSkippedRecordDueToNullForeignKey ();
151-                 } else  if  (oldForeignKey  == null ) {
152-                     forward (record , newForeignKey , PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE );
153-                 } else  if  (newForeignKey  == null ) {
154-                     forward (record , oldForeignKey , DELETE_KEY_AND_PROPAGATE );
155-                 } else  if  (!Arrays .equals (serialize (newForeignKey ), serialize (oldForeignKey ))) {
156-                     //Different Foreign Key - delete the old key value and propagate the new one. 
157-                     //Delete it from the oldKey's state store 
158-                     forward (record , oldForeignKey , DELETE_KEY_NO_PROPAGATE );
159-                     //Add to the newKey's state store. Additionally, propagate null if no FK is found there, 
160-                     //since we must "unset" any output set by the previous FK-join. This is true for both INNER 
161-                     //and LEFT join. 
162-                     forward (record , newForeignKey , PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE );
163-                 } else  { // unchanged FK 
164-                     forward (record , newForeignKey , PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE );
179+                     if  (needToUnsubscribe ) {
180+                         // delete old subscription 
181+                         // 
182+                         // this may lead to unnecessary tombstones if the old FK did not join; 
183+                         // however, we cannot avoid it as we have no means to know if the old FK joined or not 
184+                         forward (record , oldForeignKey , DELETE_KEY_AND_PROPAGATE );
185+                     }
186+                 } else  { // valid FK 
187+                     // regular insert/update 
188+ 
189+                     if  (needToUnsubscribe ) {
190+                         // update case 
191+ 
192+                         if  (foreignKeyChanged (newForeignKey , oldForeignKey )) {
193+                             // if FK did change, we need to explicitly delete the old subscription, 
194+                             // because the new subscription goes to a different partition 
195+                             // 
196+                             // we don't need any response, as we only want a response from the new subscription 
197+                             forward (record , oldForeignKey , DELETE_KEY_NO_PROPAGATE );
198+ 
199+                             // subscribe for new FK (note, could be on a different task/node than the old FK) 
200+                             // additionally, propagate null if no FK is found so we can delete the previous result (if any) 
201+                             // 
202+                             // this may lead to unnecessary tombstones if the old FK did not join and the new FK key does not join either; 
203+                             // however, we cannot avoid it as we have no means to know if the old FK joined or not 
204+                             forward (record , newForeignKey , PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE );
205+                         } else  {
206+                             // if FK did not change, we only need a response from the new FK subscription, if there is a join 
207+                             // if there is no join, we know that the old row did not join either (as it used the same FK) 
208+                             // and thus we don't need to propagate an idempotent null result 
209+                             forward (record , newForeignKey , PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE );
210+                         }
211+                     } else  {
212+                         // insert case 
213+ 
214+                         // subscribe to new key 
215+                         // don't propagate null if no FK is found: 
216+                         // for inserts, we know that there is no need to delete any previous result 
217+                         forward (record , newForeignKey , PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE );
218+                     }
165219                }
166-             } else  if  ( record . value (). newValue  !=  null )  {
167-                 final   KRight   newForeignKey  =  foreignKeyExtractor . extract ( record . key (),  record . value (). newValue ); 
168-                 if  (newForeignKey  ==  null ) {
169-                     logSkippedRecordDueToNullForeignKey () ;
170-                 }  else  { 
171-                     forward (record , newForeignKey ,  PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE );
220+             } else  {
221+                 // left row is deleted 
222+                 if  (needToUnsubscribe ) {
223+                     // this may lead to unnecessary tombstones if the old FK did not join ;
224+                      // however, we cannot avoid it as we have no means to know if the old FK joined or not 
225+                     forward (record , oldForeignKey ,  DELETE_KEY_AND_PROPAGATE );
172226                }
173227            }
174228        }
175229
230+         private  boolean  foreignKeyChanged (final  KRight  newForeignKey , final  KRight  oldForeignKey ) {
231+             return  !Arrays .equals (serialize (newForeignKey ), serialize (oldForeignKey ));
232+         }
233+ 
176234        private  byte [] serialize (final  KRight  key ) {
177235            return  foreignKeySerializer .serialize (foreignKeySerdeTopic , key );
178236        }
0 commit comments