Skip to content

Commit

Permalink
Add unit tests for new gRPC APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Sep 13, 2024
1 parent ba04c17 commit 3458eb8
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ public static ByteString toByteString(ByteBuf body) {
// Fallback to nioBuffer() to handle the conversion efficiently
return ByteString.copyFrom(body.nioBuffer());
}

public static ByteString toByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.linkedin.venice.listener.StorageReadRequestHandler.VENICE_STORAGE_NODE_HARDWARE_IS_NOT_HEALTHY_MSG;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse;
import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse;
import com.linkedin.davinci.storage.DiskHealthCheckService;
Expand Down Expand Up @@ -45,8 +46,6 @@
import com.linkedin.venice.response.VeniceReadResponseStatus;
import com.linkedin.venice.utils.ObjectMapperFactory;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -293,16 +292,16 @@ public void handleAdminRequest(AdminRequest request, StreamObserver<AdminRespons
com.linkedin.davinci.listener.response.AdminResponse adminResponse = storageReadRequestHandler
.handleServerAdminRequest(com.linkedin.venice.listener.request.AdminRequest.parseAdminGrpcRequest(request));
if (!adminResponse.isError()) {
builder.setSchemaId(com.linkedin.davinci.listener.response.AdminResponse.getResponseSchemaIdHeader());
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode());
builder.setValue(GrpcUtils.toByteString(adminResponse.getResponseBody()));
builder.setContentType(HttpConstants.AVRO_BINARY);
builder.setContentLength(adminResponse.getResponseBody().readableBytes());
builder.setSchemaId(com.linkedin.davinci.listener.response.AdminResponse.getResponseSchemaIdHeader())
.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setValue(GrpcUtils.toByteString(adminResponse.getResponseBody()))
.setContentType(HttpConstants.AVRO_BINARY)
.setContentLength(adminResponse.getResponseBody().readableBytes());
} else {
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
String errorMessage = adminResponse.getMessage() != null ? adminResponse.getMessage() : "Unknown error";
builder.setErrorMessage(errorMessage);
builder.setContentType(HttpConstants.TEXT_PLAIN);
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode())
.setErrorMessage(errorMessage)
.setContentType(HttpConstants.TEXT_PLAIN);
}
} catch (Exception e) {
LOGGER.error("Error while processing admin request", e);
Expand All @@ -323,16 +322,16 @@ public void getMetadata(MetadataRequest request, StreamObserver<MetadataResponse
com.linkedin.davinci.listener.response.MetadataResponse metadataResponse =
storageReadRequestHandler.handleMetadataFetchRequest(metadataFetchRequest);
if (!metadataResponse.isError()) {
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode());
builder.setValue(GrpcUtils.toByteString(metadataResponse.getResponseBody()));
builder.setContentType(HttpConstants.AVRO_BINARY);
builder.setContentLength(metadataResponse.getResponseBody().readableBytes());
builder.setSchemaId(metadataResponse.getResponseSchemaIdHeader());
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setValue(GrpcUtils.toByteString(metadataResponse.getResponseBody()))
.setContentType(HttpConstants.AVRO_BINARY)
.setContentLength(metadataResponse.getResponseBody().readableBytes())
.setSchemaId(metadataResponse.getResponseSchemaIdHeader());
} else {
String errorMessage = metadataResponse.getMessage() != null ? metadataResponse.getMessage() : "Unknown error";
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
builder.setErrorMessage(errorMessage);
builder.setContentType(HttpConstants.TEXT_PLAIN);
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode())
.setErrorMessage(errorMessage)
.setContentType(HttpConstants.TEXT_PLAIN);
}
} catch (Exception e) {
LOGGER.error("Error while processing metadata request", e);
Expand All @@ -355,13 +354,12 @@ public void getCurrentVersionInfo(
ServerCurrentVersionResponse currentVersionResponse =
storageReadRequestHandler.handleCurrentVersionRequest(currentVersionRequest);
if (!currentVersionResponse.isError()) {
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode());
builder.setCurrentVersion(currentVersionResponse.getCurrentVersion());
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setCurrentVersion(currentVersionResponse.getCurrentVersion());
} else {
String errorMessage =
currentVersionResponse.getMessage() != null ? currentVersionResponse.getMessage() : "Unknown error";
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
builder.setErrorMessage(errorMessage);
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode())
.setErrorMessage(
currentVersionResponse.getMessage() != null ? currentVersionResponse.getMessage() : "Unknown error");
}
} catch (Exception e) {
LOGGER.error("Error while processing current version info request", e);
Expand All @@ -385,16 +383,13 @@ public void getIngestionContext(
TopicPartitionIngestionContextResponse response =
storageReadRequestHandler.handleTopicPartitionIngestionContextRequest(ingestionContextRequest);
if (!response.isError()) {
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode());
ByteBuf body = Unpooled.wrappedBuffer(OBJECT_MAPPER.writeValueAsBytes(response));
builder.setValue(GrpcUtils.toByteString(body));
builder.setContentType(HttpConstants.JSON);
builder.setContentLength(body.readableBytes());
ByteString body = GrpcUtils.toByteString(response.getTopicPartitionIngestionContext());
builder.setValue(body).setContentLength(body.size()).setStatusCode(VeniceReadResponseStatus.OK.getCode());
} else {
String errorMessage = response.getMessage() != null ? response.getMessage() : "Unknown error";
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
builder.setErrorMessage(errorMessage);
builder.setContentType(HttpConstants.TEXT_PLAIN);
builder.setStatusCode(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode())
.setErrorMessage(errorMessage)
.setContentType(HttpConstants.TEXT_PLAIN);
}
} catch (Exception e) {
LOGGER.error("Error while processing ingestion context request", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse;
import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse;
import com.linkedin.davinci.storage.DiskHealthCheckService;
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.listener.QuotaEnforcementHandler;
import com.linkedin.venice.listener.StorageReadRequestHandler;
import com.linkedin.venice.listener.response.BinaryResponse;
import com.linkedin.venice.meta.ServerAdminAction;
import com.linkedin.venice.protocols.AdminRequest;
import com.linkedin.venice.protocols.AdminResponse;
import com.linkedin.venice.protocols.CompressionDictionaryRequest;
import com.linkedin.venice.protocols.CompressionDictionaryResponse;
import com.linkedin.venice.protocols.CurrentVersionInfoRequest;
import com.linkedin.venice.protocols.CurrentVersionInfoResponse;
import com.linkedin.venice.protocols.HealthCheckRequest;
import com.linkedin.venice.protocols.HealthCheckResponse;
import com.linkedin.venice.protocols.IngestionContextRequest;
import com.linkedin.venice.protocols.IngestionContextResponse;
import com.linkedin.venice.protocols.MetadataRequest;
import com.linkedin.venice.protocols.MetadataResponse;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc.VeniceReadServiceBlockingStub;
import com.linkedin.venice.response.VeniceReadResponseStatus;
Expand All @@ -26,6 +37,7 @@
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -76,7 +88,7 @@ public void setUp() throws IOException {
}

@AfterMethod
public void cleanup() {
public void cleanUp() {
// Shut down the channel and server after each test
if (grpcChannel != null) {
grpcChannel.shutdownNow();
Expand All @@ -103,13 +115,10 @@ public void testIsServerHealthy() {

@Test
public void testGetCompressionDictionary() {
String storeName = "testStore";
int storeVersion = 1;

// Case 1: Non-empty buffer response means dictionary found
VeniceReadServiceBlockingStub blockingStub = VeniceReadServiceGrpc.newBlockingStub(grpcChannel);
CompressionDictionaryRequest request =
CompressionDictionaryRequest.newBuilder().setStoreName(storeName).setStoreVersion(storeVersion).build();
CompressionDictionaryRequest.newBuilder().setStoreName("testStore").setStoreVersion(1).build();
BinaryResponse binaryResponse = new BinaryResponse(ByteBuffer.wrap(new byte[] { 4, 5, 6 }));
when(storageReadRequestHandler.handleDictionaryFetchRequest(any())).thenReturn(binaryResponse);

Expand All @@ -134,6 +143,135 @@ public void testGetCompressionDictionary() {

@Test
public void testHandleAdminRequest() {
// Case 1: Admin request is handled successfully with a response
AdminRequest adminRequest = AdminRequest.newBuilder()
.setResourceName("testStore_v1")
.setServerAdminAction(ServerAdminAction.DUMP_SERVER_CONFIGS.name())
.build();
VeniceReadServiceBlockingStub blockingStub = VeniceReadServiceGrpc.newBlockingStub(grpcChannel);
com.linkedin.davinci.listener.response.AdminResponse adminResponse =
mock(com.linkedin.davinci.listener.response.AdminResponse.class);
when(adminResponse.isError()).thenReturn(false);
when(storageReadRequestHandler.handleServerAdminRequest(any())).thenReturn(adminResponse);
when(adminResponse.getResponseBody()).thenReturn(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 }));

AdminResponse grpcAdminResponse = blockingStub.handleAdminRequest(adminRequest);
assertEquals(grpcAdminResponse.getStatusCode(), VeniceReadResponseStatus.OK.getCode());
assertEquals(grpcAdminResponse.getValue().asReadOnlyByteBuffer(), ByteBuffer.wrap(new byte[] { 1, 2, 3 }));
assertEquals(grpcAdminResponse.getContentType(), HttpConstants.AVRO_BINARY);
assertEquals(grpcAdminResponse.getContentLength(), 3);

// Case 2: Admin request is handled successfully with error response
when(adminResponse.isError()).thenReturn(true);
when(adminResponse.getMessage()).thenReturn("Test error message");
when(adminResponse.getResponseBody()).thenReturn(Unpooled.EMPTY_BUFFER);
AdminResponse grpcAdminErrorResponse = blockingStub.handleAdminRequest(adminRequest);
assertEquals(grpcAdminErrorResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
assertEquals(grpcAdminErrorResponse.getValue().asReadOnlyByteBuffer(), ByteBuffer.wrap(new byte[0]));
assertEquals(grpcAdminErrorResponse.getContentType(), HttpConstants.TEXT_PLAIN);
assertEquals(grpcAdminErrorResponse.getContentLength(), 0);

// Case 3: Exception was thrown when handling the request hence return INTERNAL_SERVER_ERROR
when(storageReadRequestHandler.handleServerAdminRequest(any())).thenThrow(new VeniceException("Test exception"));
AdminResponse exceptionActualResponse = blockingStub.handleAdminRequest(adminRequest);
assertEquals(exceptionActualResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
}

@Test
public void testGetMetadata() {
// Case 1: Metadata request is handled successfully with a response
MetadataRequest request = MetadataRequest.newBuilder().setStoreName("testStore").build();
VeniceReadServiceBlockingStub blockingStub = VeniceReadServiceGrpc.newBlockingStub(grpcChannel);

com.linkedin.davinci.listener.response.MetadataResponse metadataResponseMock =
mock(com.linkedin.davinci.listener.response.MetadataResponse.class);
when(metadataResponseMock.isError()).thenReturn(false);
when(metadataResponseMock.getResponseBody()).thenReturn(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 }));
when(metadataResponseMock.getResponseSchemaIdHeader()).thenReturn(-11);
when(storageReadRequestHandler.handleMetadataFetchRequest(any())).thenReturn(metadataResponseMock);

MetadataResponse metadataResponse = blockingStub.getMetadata(request);
assertEquals(metadataResponse.getStatusCode(), VeniceReadResponseStatus.OK.getCode());
assertEquals(metadataResponse.getValue().asReadOnlyByteBuffer(), ByteBuffer.wrap(new byte[] { 1, 2, 3 }));
assertEquals(metadataResponse.getContentType(), HttpConstants.AVRO_BINARY);
assertEquals(metadataResponse.getContentLength(), 3);
assertEquals(metadataResponse.getSchemaId(), -11);

// Case 2: Metadata request is handled successfully with error response
when(metadataResponseMock.isError()).thenReturn(true);
when(metadataResponseMock.getMessage()).thenReturn("Test error message");
when(metadataResponseMock.getResponseBody()).thenReturn(Unpooled.EMPTY_BUFFER);
MetadataResponse grpcMetadataErrorResponse = blockingStub.getMetadata(request);
assertEquals(grpcMetadataErrorResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
assertEquals(grpcMetadataErrorResponse.getValue().asReadOnlyByteBuffer(), ByteBuffer.wrap(new byte[0]));
assertEquals(grpcMetadataErrorResponse.getContentType(), HttpConstants.TEXT_PLAIN);
assertEquals(grpcMetadataErrorResponse.getContentLength(), 0);

// Case 3: Exception was thrown when handling the request hence return INTERNAL_SERVER_ERROR
when(storageReadRequestHandler.handleMetadataFetchRequest(any())).thenThrow(new VeniceException("Test exception"));
MetadataResponse exceptionActualResponse = blockingStub.getMetadata(request);
assertEquals(exceptionActualResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
}

@Test
public void testGetCurrentVersionInfo() {
// Case 1: Current version info request is handled successfully with a response
CurrentVersionInfoRequest request = CurrentVersionInfoRequest.newBuilder().setStoreName("testStore").build();
VeniceReadServiceBlockingStub blockingStub = VeniceReadServiceGrpc.newBlockingStub(grpcChannel);
ServerCurrentVersionResponse serverCurrentVersionResponseMock = new ServerCurrentVersionResponse();
serverCurrentVersionResponseMock.setCurrentVersion(2);
serverCurrentVersionResponseMock.setError(false);
when(storageReadRequestHandler.handleCurrentVersionRequest(any())).thenReturn(serverCurrentVersionResponseMock);
CurrentVersionInfoResponse response = blockingStub.getCurrentVersionInfo(request);
assertEquals(response.getStatusCode(), VeniceReadResponseStatus.OK.getCode());
assertEquals(response.getCurrentVersion(), 2);

// Case 2: Current version info request is handled successfully with error response
serverCurrentVersionResponseMock.setError(true);
serverCurrentVersionResponseMock.setCurrentVersion(-1);
serverCurrentVersionResponseMock.setMessage("Test error message");
CurrentVersionInfoResponse errorResponse = blockingStub.getCurrentVersionInfo(request);
assertEquals(errorResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
assertEquals(errorResponse.getErrorMessage(), "Test error message");

// Case 3: Exception was thrown when handling the request hence return INTERNAL_SERVER_ERROR
when(storageReadRequestHandler.handleCurrentVersionRequest(any())).thenThrow(new VeniceException("Test exception"));
CurrentVersionInfoResponse exceptionActualResponse = blockingStub.getCurrentVersionInfo(request);
assertEquals(exceptionActualResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
}

@Test
public void testGetIngestionContext() {
// Case 1: Ingestion context request is handled successfully with a response
IngestionContextRequest request =
IngestionContextRequest.newBuilder().setTopicName("testStore_v1").setPartition(34).build();
VeniceReadServiceBlockingStub blockingStub = VeniceReadServiceGrpc.newBlockingStub(grpcChannel);
String jsonStr = "{\n" + "\"kafkaUrl\" : {\n" + " TP(topic: \"testStore_v1\", partition: 34) " + ": {\n"
+ " \"latestOffset\" : 0,\n" + " \"offsetLag\" : 1,\n" + " \"msgRate\" : 2.0,\n"
+ " \"byteRate\" : 4.0,\n" + " \"consumerIdx\" : 6,\n"
+ " \"elapsedTimeSinceLastPollInMs\" : 7\n" + " }\n" + " }\n" + "}";
byte[] ingestionCtxBytes = jsonStr.getBytes();
TopicPartitionIngestionContextResponse responseMock = new TopicPartitionIngestionContextResponse();
responseMock.setTopicPartitionIngestionContext(ingestionCtxBytes);
responseMock.setError(false);
when(storageReadRequestHandler.handleTopicPartitionIngestionContextRequest(any())).thenReturn(responseMock);
IngestionContextResponse response = blockingStub.getIngestionContext(request);
assertEquals(response.getStatusCode(), VeniceReadResponseStatus.OK.getCode());
assertEquals(response.getValue().asReadOnlyByteBuffer(), ByteBuffer.wrap(ingestionCtxBytes));
assertEquals(response.getContentLength(), ingestionCtxBytes.length);

// Case 2: Ingestion context request is handled successfully with error response
responseMock.setError(true);
responseMock.setMessage("Test error message");
responseMock.setTopicPartitionIngestionContext(null);
IngestionContextResponse errorResponse = blockingStub.getIngestionContext(request);
assertEquals(errorResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
assertEquals(errorResponse.getErrorMessage(), "Test error message");

// Case 3: Exception was thrown when handling the request hence return INTERNAL_SERVER_ERROR
when(storageReadRequestHandler.handleTopicPartitionIngestionContextRequest(any()))
.thenThrow(new VeniceException("Test exception"));
IngestionContextResponse exceptionActualResponse = blockingStub.getIngestionContext(request);
assertEquals(exceptionActualResponse.getStatusCode(), VeniceReadResponseStatus.INTERNAL_SERVER_ERROR.getCode());
}
}

0 comments on commit 3458eb8

Please sign in to comment.