Skip to content

Commit

Permalink
MINOR: cleanup KStream JavaDocs (5/N) - stream-globalTable-inner-join
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 31, 2025
1 parent 281a3c6 commit 4e504ef
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 185 deletions.
189 changes: 81 additions & 108 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2616,134 +2616,107 @@ <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final Joined<K, V, VT> joined);

/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi-join.
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* {@code keyValueMapper.map(streamRecord) == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time)
* internal {@link GlobalKTable} state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
*
* <p>For each {@code KStream} record that finds a joining record in the {@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* The key of the result record is the same as the stream record's key.
* If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* If a {@code KStream} input record's value is {@code null} or if the provided {@link KeyValueMapper keySelector}
* returns {@code null}, the input record will be dropped, and no join computation is triggered.
* If a {@link GlobalKTable} input record's key is {@code null} the input record will be dropped, and the table
* state won't be updated.
* {@link GlobalKTable} input records with {@code null} values are considered deletes (so-called tombstone) for
* the table.
*
* <p>Example, using the first value attribute as join key:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>GlobalKTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:(GK1,A)&gt;</td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;GK1:b&gt;</td>
* <td>&lt;GK1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:(GK1,C)&gt;</td>
* <td></td>
* <td>&lt;GK1:b&gt;</td>
* <td>&lt;K1:ValueJoiner((GK1,C),b)&gt;</td>
* </tr>
* </table>
*
* In contrast to {@link #join(KTable, ValueJoiner)}, there is no co-partitioning requirement between this
* {@code KStream} and the {@link GlobalKTable}.
* Also note, that the {@link GlobalKTable} is updated "asynchronously", and thus this operation is inherently
* non-deterministic.
*
* @param globalTable
* the {@link GlobalKTable} to be joined with this stream
* @param keySelector
* a {@link KeyValueMapper} that computes the join key for stream input records
* @param joiner
* a {@link ValueJoiner} that computes the join result for a pair of matching records
*
* @param <GlobalKey> the key type of the global table
* @param <GlobalValue> the value type of the global table
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding
* key and a value computed by the given {@link ValueJoiner}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner);

/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner);
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner);

/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
final Named named);
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named);

/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner,
final Named named);
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named);

/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
Expand Down
Loading

0 comments on commit 4e504ef

Please sign in to comment.