-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17415: Avoid overflow of expired timestamp #17026
Conversation
@frankvicky could you please fix conflicts? |
60f9567
to
d3a91df
Compare
@@ -178,6 +178,12 @@ class DelegationTokenManagerZk(config: KafkaConfig, | |||
|
|||
val issueTimeStamp = time.milliseconds | |||
val maxLifeTime = if (maxLifeTimeMs <= 0) tokenMaxLifetime else Math.min(maxLifeTimeMs, tokenMaxLifetime) | |||
|
|||
if (checkTimestampOverflow(issueTimeStamp, maxLifeTime)) { | |||
responseCallback(CreateTokenResult(owner, tokenRequester, -1, -1, -1, "", Array[Byte](), Errors.INVALID_TIMESTAMP)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we set upper bound Long.MAX_VALUE
instead of throwing exception? That makes Long.MAX_VALUE
valid in request. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, this change will make api easier to use.
@@ -292,6 +298,8 @@ class DelegationTokenManagerZk(config: KafkaConfig, | |||
expireResponseCallback(Errors.NONE, now) | |||
} else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { | |||
expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) | |||
} else if (now > Long.MaxValue - expireLifeTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should align the behavior of handling the overflow timestamp, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I missed it.
Thanks for pointing out 😸
@frankvicky please rebase code to trigger CI again |
val maxLifeTimeStamp = issueTimeStamp + maxLifeTime | ||
val expiryTimeStamp = Math.min(maxLifeTimeStamp, issueTimeStamp + defaultTokenRenewTime) | ||
|
||
val isOverflowed = checkTimestampOverflow(issueTimeStamp, maxLifeTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too complicated. Could you please add sum
method?
private static long sum(long now, long duration) {
return Long.MAX_VALUE - now <= duration ? Long.MAX_VALUE : now + duration;
}
long maxTimestamp = sum(now, maxLifeTime);
long expiryTimestamp = Math.min(maxTimestamp, sum(now, tokenDefaultRenewLifetimeMs));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for your patch
@@ -294,7 +299,8 @@ class DelegationTokenManagerZk(config: KafkaConfig, | |||
expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) | |||
} else { | |||
//set expiry time stamp | |||
val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, now + expireLifeTimeMs) | |||
val expiryTimeStamp = if (now > Long.MaxValue - expireLifeTimeMs) Long.MaxValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, sum(now, expireLifeTimeMs))
long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), | ||
now + requestData.expiryTimePeriodMs()); | ||
} else { | ||
long expiryTimestamp = now > Long.MAX_VALUE - requestData.expiryTimePeriodMs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), sum(now, requestData.expiryTimePeriodMs()));
@@ -192,6 +193,10 @@ class DelegationTokenManagerZk(config: KafkaConfig, | |||
} | |||
} | |||
|
|||
private def sum(now: Long, duration: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the zk is dropping and we don't add tests for zk, so could you please revert those changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for this patch. one small question remains.
@@ -633,7 +633,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | |||
@ValueSource(strings = Array("kraft")) | |||
def testExpireDelegationToken(quorum: String): Unit = { | |||
client = createAdminClient | |||
val createDelegationTokenOptions = new CreateDelegationTokenOptions() | |||
val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(5000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was made because the previous value of maxlifeTimeMs
was 5000
milliseconds. Later, I modified the config to Long.MaxValue
to test overflow scenarios. To avoid breaking the original test logic, I changed the CreateDelegationTokenOptions
that used the default value to explicitly set .maxlifeTimeMs(5000)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I have updated the description due to removing zk. |
Both ZK and KRaft modes do not handle overflow, so setting a large max lifetime results in a negative expired timestamp and negative max timestamp, which is unexpected behavior. In this PR, we are only fixing the KRaft code since ZK will be removed soon. Reviewers: Chia-Ping Tsai <[email protected]>
JIRA: KAFKA-17415
Committer Checklist (excluded from commit message)