-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17525: Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime #17168
Conversation
All fail tests pass on my local. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TaiJuWu thanks for this quick fix.
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
Outdated
Show resolved
Hide resolved
@@ -131,9 +131,11 @@ private static void validateProperties(Properties properties) { | |||
} | |||
}); | |||
|
|||
ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there are configs having no default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712 , thanks for review.
The situation won't happen because Line 120
checks the properties which are defined at Line 102
and they have default value except PUSH_INTERVAL_MS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced instantiating ClientMetricsConfigs(properties)
in the validate method. The idea with validate is to create ClientMetricsConfigs
instance only after post validating the configs are correct.
I understand we want to thrown invalid request exception
when configs are not correct and as per the changes I am assuming that the unknown server occurs when the int config is specified as non-int, correct? If that's the only scenario then we can handle directly. Am I reading something wrong here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced instantiating
ClientMetricsConfigs(properties)
in the validate method. The idea with validate is to createClientMetricsConfigs
instance only after post validating the configs are correct.I understand we want to thrown
invalid request exception
when configs are not correct and as per the changes I am assuming that the unknown server occurs when the int config is specified as non-int, correct? If that's the only scenario then we can handle directly. Am I reading something wrong here?
Yes, you are right and I updated this part by utilize ConfigDef.parseType
.
PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu
I have a minor comment, PTAL
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
Outdated
Show resolved
Hide resolved
8f39965
to
1dd26e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR. Left some comments.
int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); | ||
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { | ||
Integer pushIntervalMs = (Integer) ConfigDef.parseType(PUSH_INTERVAL_MS, properties.getProperty(PUSH_INTERVAL_MS), Type.INT); | ||
if (pushIntervalMs == null || pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, what happens when we use kafka-client-metrics.sh
tool to just update pattern or metrics i.e. pushIntervalMs is not updated? Then isn't pushIntervalMs not null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @apoorvmittal10, thanks for the review and sorry for late response.
If kafka-client-metrics.sh doesn't have pushIntervalMs, it should not enter this block (check in 136).
I added a null check just to ensure that null values are handled properly and suppress compiler warning.
Ideally, the check should always pass.
@@ -133,8 +134,8 @@ private static void validateProperties(Properties properties) { | |||
|
|||
// Make sure that push interval is between 100ms and 1 hour. | |||
if (properties.containsKey(PUSH_INTERVAL_MS)) { | |||
int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); | |||
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { | |||
Integer pushIntervalMs = (Integer) ConfigDef.parseType(PUSH_INTERVAL_MS, properties.getProperty(PUSH_INTERVAL_MS), Type.INT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I get it right, the issue we are trying to solve is regarding the correct exception when config is not as INT or expected? If that's the case then either of the 3 approach I think would be better:
- Make this method non-static, instance method. And remove
properties
as input. Then instantiate ClientMetricsConfig where they are used and call validate. The downside of this is that validation also happens inControllerConfigurationValidator
where unnecessaryClientMetricsConfig
will be created. - Enclose a try/catch in validate method and return what received in exception but can return config exception where exception translates to UnknownServer Exception. This is not very clean but a good way to handle.
- Use
CONFIG.parse(properties)
in validate method to getMap<?,?> valueMaps = CONFIG.parse(properties);
and then iterate on map as like we iterate on properties. That shall solve your problem.
I would have chosen option 3. Also if you detail on the prolem satetment in PR description or jira then I can help better. But as far as I think, option 3 might solve your issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the updated jira, thank you. Let me know if option 3 works for you, ideally should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @apoorvmittal10 , sorry for late response.
I have updated this PR according your great suggestion.
Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing comments, some additional changes we can do.
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
Show resolved
Hide resolved
Map<String, String> propertiesMap = new HashMap<>(properties.size()); | ||
properties.forEach((key, value) -> propertiesMap.put((String) key, (String) value)); | ||
Map<String, Object> parsed = CONFIG.parse(propertiesMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to create a map or should just write below:
Map<String, String> propertiesMap = new HashMap<>(properties.size()); | |
properties.forEach((key, value) -> propertiesMap.put((String) key, (String) value)); | |
Map<String, Object> parsed = CONFIG.parse(propertiesMap); | |
Map<String, Object> parsed = CONFIG.parse(properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can also write:
Map<?, ?> parsed = CONFIG.parse(properties);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I hadn’t noticed this before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments, LGTM!. Minor nit test comments.
@@ -124,6 +128,20 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru | |||
} | |||
} | |||
|
|||
private static String[] toArray(List<String>... lists) { | |||
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a line break
private static String[] toArray(List<String>... lists) { | ||
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would be better to place the private
method after public
methods and before static inner class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will notice in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the PR and addressing the comments.
…tion when altering client-metrics config at runtime (apache#17168) Reviewers: Apoorv Mittal <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
As title
jira: https://issues.apache.org/jira/browse/KAFKA-17525
Committer Checklist (excluded from commit message)