From 4e504eff740e11cef0814433b160b668f47dc306 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 24 Jan 2025 22:59:32 -0800 Subject: [PATCH] MINOR: cleanup KStream JavaDocs (5/N) - stream-globalTable-inner-join --- .../apache/kafka/streams/kstream/KStream.java | 189 ++++++++---------- .../kstream/internals/KStreamImpl.java | 154 +++++++------- 2 files changed, 158 insertions(+), 185 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 8d6c1e33ab764..84cbcb79b7939 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -2616,134 +2616,107 @@ KStream leftJoin(final KTable table, final Joined joined); /** - * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join. + * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi-join. * The join is a primary key table lookup join with join attribute - * {@code keyValueMapper.map(stream.keyValue) == table.key}. + * {@code keyValueMapper.map(streamRecord) == tableRecord.key}. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current internal {@link GlobalKTable} - * state. + * This is done by performing a lookup for matching records in the current (i.e., processing time) + * internal {@link GlobalKTable} state. * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} * state and will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided + * + *

For each {@code KStream} record that finds a joining record in the {@link GlobalKTable} the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as the key of this {@code KStream}. - * If a {@code KStream} input value is {@code null} the record will not be included in the join operation - * and thus no output record will be added to the resulting {@code KStream}. + * The key of the result record is the same as the stream record's key. + * If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. + * If a {@code KStream} input record's value is {@code null} or if the provided {@link KeyValueMapper keySelector} + * returns {@code null}, the input record will be dropped, and no join computation is triggered. + * If a {@link GlobalKTable} input record's key is {@code null} the input record will be dropped, and the table + * state won't be updated. + * {@link GlobalKTable} input records with {@code null} values are considered deletes (so-called tombstone) for + * the table. + * + *

Example, using the first value attribute as join key: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
KStreamGlobalKTablestateresult
<K1:(GK1,A)>
<GK1:b><GK1:b>
<K1:(GK1,C)><GK1:b><K1:ValueJoiner((GK1,C),b)>
+ * + * In contrast to {@link #join(KTable, ValueJoiner)}, there is no co-partitioning requirement between this + * {@code KStream} and the {@link GlobalKTable}. + * Also note, that the {@link GlobalKTable} is updated "asynchronously", and thus this operation is inherently + * non-deterministic. + * + * @param globalTable + * the {@link GlobalKTable} to be joined with this stream + * @param keySelector + * a {@link KeyValueMapper} that computes the join key for stream input records + * @param joiner + * a {@link ValueJoiner} that computes the join result for a pair of matching records + * + * @param the key type of the global table + * @param the value type of the global table + * @param the value type of the result stream + * + * @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding + * key and a value computed by the given {@link ValueJoiner}. * - * @param globalTable the {@link GlobalKTable} to be joined with this stream - * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream - * to the key of the {@link GlobalKTable} - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param the key type of {@link GlobalKTable} - * @param the value type of the {@link GlobalKTable} - * @param the value type of the resulting {@code KStream} - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one output for each input {@code KStream} record * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) */ - KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoiner joiner); + KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoiner joiner); /** - * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join. - * The join is a primary key table lookup join with join attribute - * {@code keyValueMapper.map(stream.keyValue) == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current internal {@link GlobalKTable} - * state. - * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} - * state and will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as the key of this {@code KStream}. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If a {@code KStream} input value is {@code null} the record will not be included in the join operation - * and thus no output record will be added to the resulting {@code KStream}. + * See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * - * @param globalTable the {@link GlobalKTable} to be joined with this stream - * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream - * to the key of the {@link GlobalKTable} - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param the key type of {@link GlobalKTable} - * @param the value type of the {@link GlobalKTable} - * @param the value type of the resulting {@code KStream} - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one output for each input {@code KStream} record - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoinerWithKey joiner); + KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoinerWithKey joiner); /** - * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join. - * The join is a primary key table lookup join with join attribute - * {@code keyValueMapper.map(stream.keyValue) == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current internal {@link GlobalKTable} - * state. - * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} - * state and will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as the key of this {@code KStream}. - * If a {@code KStream} input value is {@code null} the record will not be included in the join operation - * and thus no output record will be added to the resulting {@code KStream}. + * See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * - * @param globalTable the {@link GlobalKTable} to be joined with this stream - * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream - * to the key of the {@link GlobalKTable} - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param named a {@link Named} config used to name the processor in the topology - * @param the key type of {@link GlobalKTable} - * @param the value type of the {@link GlobalKTable} - * @param the value type of the resulting {@code KStream} - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one output for each input {@code KStream} record - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoiner joiner, - final Named named); + KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoiner joiner, + final Named named); /** - * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join. - * The join is a primary key table lookup join with join attribute - * {@code keyValueMapper.map(stream.keyValue) == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current internal {@link GlobalKTable} - * state. - * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} - * state and will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as the key of this {@code KStream}. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If a {@code KStream} input value is {@code null} the record will not be included in the join operation - * and thus no output record will be added to the resulting {@code KStream}. + * See {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. * - * @param globalTable the {@link GlobalKTable} to be joined with this stream - * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream - * to the key of the {@link GlobalKTable} - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param named a {@link Named} config used to name the processor in the topology - * @param the key type of {@link GlobalKTable} - * @param the value type of the {@link GlobalKTable} - * @param the value type of the resulting {@code KStream} - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one output for each input {@code KStream} record - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoinerWithKey joiner, - final Named named); + KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoinerWithKey joiner, + final Named named); /** * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 35ef36bb81613..30255cfeee01a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1048,33 +1048,96 @@ public KStream leftJoin(final KTable table, } } + @SuppressWarnings({"unchecked", "resource"}) + private KStream doStreamTableJoin(final KTable table, + final ValueJoinerWithKey joiner, + final JoinedInternal joinedInternal, + final boolean leftJoin) { + Objects.requireNonNull(table, "table can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + + final Set allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream) table)); + + final NamedInternal renamed = new NamedInternal(joinedInternal.name()); + + final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME); + + Optional> bufferStoreBuilder = Optional.empty(); + + if (joinedInternal.gracePeriod() != null) { + if (!((KTableImpl) table).graphNode.isOutputVersioned().orElse(true)) { + throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); + } + final String bufferName = name + "-Buffer"; + bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>( + bufferName, + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, + joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde, + joinedInternal.gracePeriod(), + name) + ); + } + + final ProcessorSupplier processorSupplier = new KStreamKTableJoin<>( + ((KTableImpl) table).valueGetterSupplier(), + joiner, + leftJoin, + Optional.ofNullable(joinedInternal.gracePeriod()), + bufferStoreBuilder + ); + + final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); + final StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode<>( + name, + processorParameters, + ((KTableImpl) table).valueGetterSupplier().storeNames(), + this.name, + joinedInternal.gracePeriod() + ); + + builder.addGraphNode(graphNode, streamTableJoinNode); + if (leftJoin) { + streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN); + } + + // do not have serde for joined result + return new KStreamImpl<>( + name, + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, + null, + allSourceNodes, + false, + streamTableJoinNode, + builder); + } + @Override - public KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoiner joiner) { + public KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoiner joiner) { return join(globalTable, keySelector, toValueJoinerWithKey(joiner)); } @Override - public KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoinerWithKey joiner) { + public KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoinerWithKey joiner) { return globalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty()); } @Override - public KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoiner joiner, - final Named named) { + public KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoiner joiner, + final Named named) { return join(globalTable, keySelector, toValueJoinerWithKey(joiner), named); } @Override - public KStream join(final GlobalKTable globalTable, - final KeyValueMapper keySelector, - final ValueJoinerWithKey joiner, - final Named named) { + public KStream join(final GlobalKTable globalTable, + final KeyValueMapper keySelector, + final ValueJoinerWithKey joiner, + final Named named) { return globalTableJoin(globalTable, keySelector, joiner, false, named); } @@ -1148,69 +1211,6 @@ private KStream globalTableJoin( builder); } - @SuppressWarnings({"unchecked", "resource"}) - private KStream doStreamTableJoin(final KTable table, - final ValueJoinerWithKey joiner, - final JoinedInternal joinedInternal, - final boolean leftJoin) { - Objects.requireNonNull(table, "table can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - - final Set allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream) table)); - - final NamedInternal renamed = new NamedInternal(joinedInternal.name()); - - final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME); - - Optional> bufferStoreBuilder = Optional.empty(); - - if (joinedInternal.gracePeriod() != null) { - if (!((KTableImpl) table).graphNode.isOutputVersioned().orElse(true)) { - throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); - } - final String bufferName = name + "-Buffer"; - bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>( - bufferName, - joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, - joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde, - joinedInternal.gracePeriod(), - name) - ); - } - - final ProcessorSupplier processorSupplier = new KStreamKTableJoin<>( - ((KTableImpl) table).valueGetterSupplier(), - joiner, - leftJoin, - Optional.ofNullable(joinedInternal.gracePeriod()), - bufferStoreBuilder - ); - - final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); - final StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode<>( - name, - processorParameters, - ((KTableImpl) table).valueGetterSupplier().storeNames(), - this.name, - joinedInternal.gracePeriod() - ); - - builder.addGraphNode(graphNode, streamTableJoinNode); - if (leftJoin) { - streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN); - } - - // do not have serde for joined result - return new KStreamImpl<>( - name, - joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, - null, - allSourceNodes, - false, - streamTableJoinNode, - builder); - } - @Override public KStream process( final ProcessorSupplier processorSupplier,