-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API #17190
Conversation
cb98260
to
37e99d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Overall LGTM. I think we can remove more unused code though.
topology | ||
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) | ||
.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) | ||
.addProcessor("processor1", defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OldAPIStatefulProcessor
should be unused now and can be removed, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, remove these deprecated API.
private Topology createStatefulTopology(final String storeName) { | ||
return topology | ||
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) | ||
.addProcessor("processor", define(new OldAPIStatefulProcessor(storeName)), "source") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define()
should be unused now and can be removed, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()); | ||
return topology | ||
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) | ||
.addProcessor("processor", defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defineWithStoresOldAPI()
should be unused now and can be removed, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -312,79 +312,6 @@ public void testDrivingSimpleTopologyWithDroppingPartitioner() { | |||
assertTrue(outputTopic1.isEmpty()); | |||
} | |||
|
|||
@Test | |||
public void testDrivingStatefulTopology() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing this test? Seems we should keep it. It's not marked as @Deprecated
and does not say "old API" in it's name either, nor is their a comment that it would test old API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pegasas -- i don't see any reply on this comment (same for the other 3 below)
} | ||
|
||
@Test | ||
public void testDrivingConnectedStateStoreTopology() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pegasas -- i don't see any reply on this comment (2ed one)
@@ -1776,27 +1229,6 @@ public void process(final Record<String, String> record) { | |||
} | |||
} | |||
|
|||
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we should not remove this (it's not @Deprecated
) and comment says Need to be migrated
, but rather rewrite to for the new processor API? Or is it unused now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pegasas -- i don't see any reply on this comment (3re one)
return () -> processor; | ||
} | ||
|
||
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pegasas -- i don't see any reply on this comment (last one)
@pegasas -- Any updates on this PR? |
@pegasas We are slowly approaching AK 4.0 release deadline. Would be great to push this over the finish line soon. |
Sorry for the delay due to busy last month. |
@pegasas -- I don't see any replies to the latest comment, nor an update to the PR? |
yes, I've replied to previous comments. |
@pegasas -- I am still somewhat confused. Did you see my latest in-line tagging?
These are still open questions that I think we need to resolve before we can merge this PR. And AK 4.0 release deadline is approaching, so I would like to finish this :) |
Hi, @mjsax , as line318 shows,
which createStatefulTopology would be deprecated. It seems my comments are disappeared several times. it is weird. |
I believe the annotation and comment is wrong. While it does test the old API, there is no corresponding test for the new API (or did I miss it?), and thus, it should have be We should not remove these 4 tests, but rather rewrite all 4 to use the new PAPI, to avoid reducing test coverage. For the other test you remove in this PR, eg, Does this make sense? |
@pegasas -- did you see my last reply? |
8466f4c
to
9359d01
Compare
Correct and fixed. |
Yes, these changes got merged very recently. Support for older Java version gets dropped with 4.0.0 release, and thus JDK17 is now required to compile Good find about |
9359d01
to
d1840f9
Compare
Your PR update does not seem to be right/complete? I am wondering if we have some communication gap and going somewhat in cycles? -- Would you mind if we close this PR and I work on a PR on my side directly? It's two months since this PR was opened and it seems we do not really make any progress on it. |
sorry. |
Thanks for your understanding. And thanks for contributing! |
New PR for this work item: #18154 In case you are interested. Maybe my comments and change requests make more sense now? |
Thanks mjsax! |
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)