From 15073bc568bd7392e5e22c45bc3d9703ca07c87d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 24 Jan 2025 22:59:32 -0800 Subject: [PATCH] MINOR: cleanup KStream JavaDocs (8/N) - stream-stream-inner-join --- .../apache/kafka/streams/kstream/KStream.java | 354 ++++-------------- .../kstream/internals/KStreamImpl.java | 59 +-- 2 files changed, 116 insertions(+), 297 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 8d6c1e33ab764..e6fbc80be7a3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -1005,23 +1005,25 @@ KGroupedStream groupBy(final KeyValueMapper grouped); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default - * serializers and deserializers. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Join records of this (left) stream with another (right) {@code KStream}'s records using a windowed inner equi-join. + * The join is computed on using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. - *

- * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute - * a value (with arbitrary type) for the result record. + * + *

For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to + * compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no - * output record will be added to the resulting {@code KStream}. - *

- * Example (assuming all input records belong to the correct windows): + * If you need read access to the join key, use {@link #join(KStream, ValueJoinerWithKey, JoinWindows)}. + * If an input record's key or value is {@code null} the input record will be dropped, and no join computation + * is triggered. + * Similarly, so-call late records, i.e., records with a timestamp belonging to an already closed window (based + * on stream-time progress, window size, and grace period), will be dropped. + * + *

Example (assuming all input records belong to the correct windows): * * - * - * + * + * * * * @@ -1040,283 +1042,91 @@ KGroupedStream groupBy(final KeyValueMapper * *
thisotherleftrightresult
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - *

- * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} + * parameter to align the partition count for both inputs to each other. + * Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure + * that the same partitioner is used for both inputs for the join. + * + *

If a key changing operator was used before this operation on either input stream + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of the + * corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the join key. + * + *

The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic(s) is determined based on both upstream topics partition + * numbers, and Kafka Streams will automatically align the number of partitions if required for co-partitioning. + * Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

Both of the joining {@code KStream}s will be materialized in local state stores. * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified - * in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an - * internally generated name, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * The changelog topic will be named "${applicationId}-<storename>-changelog", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, to customize the names of the repartition and changelog topic, or to + * customize the use state store, use {@link #join(KStream, ValueJoiner, JoinWindows, StreamJoined)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} on eiter input before {@code join()}. + * + * @param rightStream + * the {@code KStream} to be joined with this stream + * @param joiner + * a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows + * the specification of the {@link JoinWindows} + * + * @param the value type of the right stream + * @param the value type of the result stream + * + * @return A {@code KStream} that contains join-records, one for each matched record-pair, with the corresponding + * key and a value computed by the given {@link ValueJoiner}. * - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param the value type of the other stream - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals * @see #leftJoin(KStream, ValueJoiner, JoinWindows) * @see #outerJoin(KStream, ValueJoiner, JoinWindows) */ - KStream join(final KStream otherStream, - final ValueJoiner joiner, - final JoinWindows windows); + KStream join(final KStream rightStream, + final ValueJoiner joiner, + final JoinWindows windows); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default - * serializers and deserializers. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. - * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given - * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. - *

- * For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute - * a value (with arbitrary type) for the result record. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * The key of the result record is the same as for both joining input records. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no - * output record will be added to the resulting {@code KStream}. - *

- * Example (assuming all input records belong to the correct windows): - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
thisotherresult
<K1:A>
<K2:B><K2:b><K2:ValueJoinerWithKey(K1,B,b)>
<K3:c>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - *

- * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. - * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified - * in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an - * internally generated name, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * See {@link #join(KStream, ValueJoiner, JoinWindows)}. * - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param the value type of the other stream - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one for each matched record-pair with the same key and within the joining window intervals - * @see #leftJoin(KStream, ValueJoinerWithKey, JoinWindows) - * @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream join(final KStream otherStream, - final ValueJoinerWithKey joiner, - final JoinWindows windows); + KStream join(final KStream rightStream, + final ValueJoinerWithKey joiner, + final JoinWindows windows); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the - * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value - * serde}, {@link Serde the other stream's value serde}, and used state stores. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. - * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given - * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. - *

- * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute - * a value (with arbitrary type) for the result record. - * The key of the result record is the same as for both joining input records. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no - * output record will be added to the resulting {@code KStream}. - *

- * Example (assuming all input records belong to the correct windows): - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
thisotherresult
<K1:A>
<K2:B><K2:b><K2:ValueJoiner(B,b)>
<K3:c>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - *

- * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names, - * unless a name is provided via a {@code Materialized} instance. - * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified - * in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an - * internally generated name, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * - * @param the value type of the other stream - * @param the value type of the result stream - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param streamJoined a {@link StreamJoined} used to configure join stores - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals - * @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) - * @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) + * See {@link #join(KStream, ValueJoiner, JoinWindows)}. */ - KStream join(final KStream otherStream, - final ValueJoiner joiner, - final JoinWindows windows, - final StreamJoined streamJoined); + KStream join(final KStream rightStream, + final ValueJoiner joiner, + final JoinWindows windows, + final StreamJoined streamJoined); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the - * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value - * serde}, {@link Serde the other stream's value serde}, and used state stores. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. - * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given - * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. - *

- * For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute - * a value (with arbitrary type) for the result record. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * The key of the result record is the same as for both joining input records. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no - * output record will be added to the resulting {@code KStream}. - *

- * Example (assuming all input records belong to the correct windows): - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
thisotherresult
<K1:A>
<K2:B><K2:b><K2:ValueJoinerWithKey(K1,B,b)>
<K3:c>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - *

- * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names, - * unless a name is provided via a {@code Materialized} instance. - * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified - * in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an - * internally generated name, and "-changelog" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * See {@link #join(KStream, ValueJoiner, JoinWindows)}. * - * @param the value type of the other stream - * @param the value type of the result stream - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param streamJoined a {@link StreamJoined} used to configure join stores - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one for each matched record-pair with the same key and within the joining window intervals - * @see #leftJoin(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined) - * @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream join(final KStream otherStream, - final ValueJoinerWithKey joiner, - final JoinWindows windows, - final StreamJoined streamJoined); + KStream join(final KStream rightStream, + final ValueJoinerWithKey joiner, + final JoinWindows windows, + final StreamJoined streamJoined); + /** * Join records of this stream with another {@code KStream}'s records using windowed left equi join with default * serializers and deserializers. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 35ef36bb81613..4fcf23211b3d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -702,7 +702,7 @@ public KGroupedStream groupByKey(final Grouped grouped) { } @Override - public KGroupedStream groupBy(final KeyValueMapper keySelector) { + public KGroupedStream groupBy(final KeyValueMapper keySelector) { return groupBy(keySelector, Grouped.with(null, valueSerde)); } @@ -727,34 +727,33 @@ public KGroupedStream groupBy(final KeyValueMapper KStream join(final KStream otherStream, - final ValueJoiner joiner, - final JoinWindows windows) { + public KStream join(final KStream otherStream, + final ValueJoiner joiner, + final JoinWindows windows) { return join(otherStream, toValueJoinerWithKey(joiner), windows); } @Override - public KStream join(final KStream otherStream, - final ValueJoinerWithKey joiner, - final JoinWindows windows) { + public KStream join(final KStream otherStream, + final ValueJoinerWithKey joiner, + final JoinWindows windows) { return join(otherStream, joiner, windows, StreamJoined.with(null, null, null)); } @Override - public KStream join(final KStream otherStream, - final ValueJoiner joiner, - final JoinWindows windows, - final StreamJoined streamJoined) { + public KStream join(final KStream otherStream, + final ValueJoiner joiner, + final JoinWindows windows, + final StreamJoined streamJoined) { return join(otherStream, toValueJoinerWithKey(joiner), windows, streamJoined); } @Override - public KStream join(final KStream otherStream, - final ValueJoinerWithKey joiner, - final JoinWindows windows, - final StreamJoined streamJoined) { + public KStream join(final KStream otherStream, + final ValueJoinerWithKey joiner, + final JoinWindows windows, + final StreamJoined streamJoined) { return doJoin( otherStream, @@ -836,31 +835,41 @@ public KStream outerJoin(final KStream otherStream, return doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(builder, true, true)); } - private KStream doJoin(final KStream otherStream, - final ValueJoinerWithKey joiner, - final JoinWindows windows, - final StreamJoined streamJoined, - final KStreamImplJoin join) { + private KStream doJoin( + final KStream otherStream, + final ValueJoinerWithKey joiner, + final JoinWindows windows, + final StreamJoined streamJoined, + final KStreamImplJoin join) + { Objects.requireNonNull(otherStream, "otherStream can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(windows, "windows can't be null"); Objects.requireNonNull(streamJoined, "streamJoined can't be null"); KStreamImpl joinThis = this; - KStreamImpl joinOther = (KStreamImpl) otherStream; + KStreamImpl joinOther = (KStreamImpl) otherStream; - final StreamJoinedInternal streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder); + final StreamJoinedInternal streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder); final NamedInternal name = new NamedInternal(streamJoinedInternal.name()); if (joinThis.repartitionRequired) { final String joinThisName = joinThis.name; final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName); - joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde()); + joinThis = joinThis.repartitionForJoin( + leftJoinRepartitionTopicName, + streamJoinedInternal.keySerde(), + streamJoinedInternal.valueSerde() + ); } if (joinOther.repartitionRequired) { final String joinOtherName = joinOther.name; final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName); - joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde()); + joinOther = joinOther.repartitionForJoin( + rightJoinRepartitionTopicName, + streamJoinedInternal.keySerde(), + streamJoinedInternal.otherValueSerde() + ); } joinThis.ensureCopartitionWith(Collections.singleton(joinOther));