Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
public AlterConfigsRequest build(short version) {
return new AlterConfigsRequest(data, version);
}

@Override
public String toString() {
return maskData(data);
}
}

private final AlterConfigsRequestData data;
Expand Down Expand Up @@ -135,4 +140,20 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
return new AlterConfigsRequest(new AlterConfigsRequestData(new ByteBufferAccessor(buffer), version), version);
}

// It is not safe to print all config values
private static String maskData(AlterConfigsRequestData data) {
AlterConfigsRequestData tempData = data.duplicate();
tempData.resources().forEach(resource -> {
resource.configs().forEach(config -> {
config.setValue("REDACTED");
});
});
return tempData.toString();
}

@Override
public String toString() {
return maskData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
Expand All @@ -48,7 +44,7 @@ public AlterUserScramCredentialsRequest build(short version) {

@Override
public String toString() {
return data.toString();
return maskData(data);
}
}

Expand Down Expand Up @@ -87,15 +83,18 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
}

private static String maskData(AlterUserScramCredentialsRequestData data) {
AlterUserScramCredentialsRequestData tempData = data.duplicate();
tempData.upsertions().forEach(upsertion -> {
upsertion.setSalt(new byte[0]);
upsertion.setSaltedPassword(new byte[0]);
});
return tempData.toString();
}

// Do not print salt or saltedPassword
@Override
public String toString() {
JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();

for (JsonNode upsertion : json.get("upsertions")) {
((ObjectNode) upsertion).put("salt", "");
((ObjectNode) upsertion).put("saltedPassword", "");
}
return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
return maskData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,13 @@ public boolean hasError() {
public boolean shouldClientThrottle(short version) {
return version >= 1;
}

// Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content
@Override
public String toString() {
CreateDelegationTokenResponseData tempData = data.duplicate();
tempData.setTokenId("REDACTED");
tempData.setHmac(new byte[0]);
return tempData.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,15 @@ public boolean hasError() {
public boolean shouldClientThrottle(short version) {
return version >= 1;
}

// Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content
@Override
public String toString() {
DescribeDelegationTokenResponseData tempData = data.duplicate();
tempData.tokens().forEach(token -> {
token.setTokenId("REDACTED");
token.setHmac(new byte[0]);
});
return tempData.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,19 @@ public ExpireDelegationTokenRequest build(short version) {

@Override
public String toString() {
return data.toString();
return maskData(data);
}
}

private static String maskData(ExpireDelegationTokenRequestData data) {
ExpireDelegationTokenRequestData tempData = data.duplicate();
tempData.setHmac(new byte[0]);
return tempData.toString();
}

// Do not print Hmac, overwrite a temp copy of the data with empty content
@Override
public String toString() {
return maskData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -77,7 +73,7 @@ public IncrementalAlterConfigsRequest build(short version) {

@Override
public String toString() {
return data.toString();
return maskData(data);
}
}

Expand Down Expand Up @@ -113,14 +109,18 @@ public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwab
}

// It is not safe to print all config values
private static String maskData(IncrementalAlterConfigsRequestData data) {
IncrementalAlterConfigsRequestData tempData = data.duplicate();
tempData.resources().forEach(resource -> {
resource.configs().forEach(config -> {
config.setValue("REDACTED");
});
});
return tempData.toString();
}

@Override
public String toString() {
JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy();
for (JsonNode resource : json.get("resources")) {
for (JsonNode config : resource.get("configs")) {
((ObjectNode) config).put("value", "REDACTED");
}
}
return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString();
return maskData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,19 @@ public RenewDelegationTokenRequest build(short version) {

@Override
public String toString() {
return data.toString();
return maskData(data);
}
}

private static String maskData(RenewDelegationTokenRequestData data) {
RenewDelegationTokenRequestData tempData = data.duplicate();
tempData.setHmac(new byte[0]);
return tempData.toString();
}

// Do not print Hmac, overwrite a temp copy of the data with empty content
@Override
public String toString() {
return maskData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,12 @@ public static SaslAuthenticateRequest parse(ByteBuffer buffer, short version) {
return new SaslAuthenticateRequest(new SaslAuthenticateRequestData(new ByteBufferAccessor(buffer), version),
version);
}

// Do not print authBytes, overwrite a temp copy of the data with empty bytes
@Override
public String toString() {
SaslAuthenticateRequestData tempData = data.duplicate();
tempData.setAuthBytes(new byte[0]);
return tempData.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,12 @@ public SaslAuthenticateResponseData data() {
public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) {
return new SaslAuthenticateResponse(new SaslAuthenticateResponseData(new ByteBufferAccessor(buffer), version));
}

// Do not print authBytes, overwrite a temp copy of the data with empty bytes
@Override
public String toString() {
SaslAuthenticateResponseData tempData = data.duplicate();
tempData.setAuthBytes(new byte[0]);
return tempData.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
import static org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -2989,15 +2988,27 @@ private DescribeConfigsResponse createDescribeConfigsResponse(short version) {
}

private AlterConfigsRequest createAlterConfigsRequest(short version) {
Map<ConfigResource, AlterConfigsRequest.Config> configs = new HashMap<>();
Map<ConfigResource, AlterConfigsRequest.Config> configs = new LinkedHashMap<>();
List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
new AlterConfigsRequest.ConfigEntry("config_name", "config_value"),
new AlterConfigsRequest.ConfigEntry("another_name", "another value")
);
configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
new AlterConfigsRequest.Config(emptyList()));
return new AlterConfigsRequest.Builder(configs, false).build(version);
AlterConfigsRequest alterConfigsRequest = new AlterConfigsRequest.Builder(configs, false).build(version);
assertEquals(
"AlterConfigsRequestData(resources=[" +
"AlterConfigsResource(resourceType=" + ConfigResource.Type.BROKER.id() + ", " +
"resourceName='0', " +
"configs=[AlterableConfig(name='config_name', value='REDACTED'), " +
"AlterableConfig(name='another_name', value='REDACTED')]), " +
"AlterConfigsResource(resourceType=" + ConfigResource.Type.TOPIC.id() + ", " +
"resourceName='topic', configs=[])], " +
"validateOnly=false)",
alterConfigsRequest.toString()
);
return alterConfigsRequest;
}

private AlterConfigsResponse createAlterConfigsResponse() {
Expand Down Expand Up @@ -3100,7 +3111,12 @@ private CreateDelegationTokenResponse createCreateTokenResponse() {
.setMaxTimestampMs(System.currentTimeMillis())
.setTokenId("token1")
.setHmac("test".getBytes());
return new CreateDelegationTokenResponse(data);
var response = new CreateDelegationTokenResponse(data);

String responseStr = response.toString();
assertTrue(responseStr.contains("tokenId='REDACTED'"));
assertTrue(responseStr.contains("hmac=[]"));
return response;
}

private RenewDelegationTokenRequest createRenewTokenRequest(short version) {
Expand Down Expand Up @@ -3156,7 +3172,14 @@ private DescribeDelegationTokenResponse createDescribeTokenResponse(short versio
tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));

return new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList);
var response = new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList);

String responseStr = response.toString();
String[] parts = responseStr.split(",");
// The 2 token info should both be redacted
assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("tokenId='REDACTED'")).count());
assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("hmac=[]")).count());
return response;
}

private ElectLeadersRequest createElectLeadersRequestNullPartitions() {
Expand Down Expand Up @@ -3773,4 +3796,26 @@ public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() {
parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
}

@Test
public void testSaslAuthenticateRequestResponseToStringMasksSensitiveData() {
byte[] sensitiveAuthBytes = "sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8);
SaslAuthenticateRequestData requestData = new SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes);
SaslAuthenticateRequest request = new SaslAuthenticateRequest(requestData, (short) 2);

String requestString = request.toString();

// Verify that the authBytes field is present but empty in the output
assertTrue(requestString.contains("authBytes=[]"),
"authBytes field should be empty in toString() output");

SaslAuthenticateResponseData responseData = new SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes);
SaslAuthenticateResponse response = new SaslAuthenticateResponse(responseData);

String responseString = response.toString();

// Verify that the authBytes field is present but empty in the output
assertTrue(responseString.contains("authBytes=[]"),
"authBytes field should be empty in toString() output");
}
}
19 changes: 15 additions & 4 deletions core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.mockito.Mockito.mock
import java.io.IOException
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
Expand All @@ -65,13 +66,23 @@ class RequestChannelTest {

val sensitiveValue = "secret"
def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = {
val alterConfigs = request(new AlterConfigsRequest.Builder(
Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build())
val alterConfigs = new AlterConfigsRequest.Builder(
util.Map.of(resource, new Config(entries.asJavaCollection)), true).build()

val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val alterConfigsString = alterConfigs.toString
entries.foreach { entry =>
if (!alterConfigsString.contains(entry.name())) {
fail("Config names should be in the request string")
}
if (entry.value() != null && alterConfigsString.contains(entry.value())) {
fail("Config values should not be in the request string")
}
}
val alterConfigsReq = request(alterConfigs)
val loggableAlterConfigs = alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigsReq.header, alterConfigsReq.requestLog.toJava, alterConfigsReq.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}

Expand Down
Loading