Skip to content

Commit e1a88d1

Browse files
feat: add grpc channel registry (#13)
1 parent b52c268 commit e1a88d1

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

grpc-client-utils/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222

2323
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
2424
testImplementation("org.mockito:mockito-core:3.4.4")
25+
testRuntimeOnly("io.grpc:grpc-netty:1.36.0")
2526
}
2627

2728
tasks.test {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.hypertrace.core.grpcutils.client;
2+
3+
import io.grpc.ManagedChannel;
4+
import io.grpc.ManagedChannelBuilder;
5+
import java.util.Map;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class GrpcChannelRegistry {
11+
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
12+
private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
13+
private volatile boolean isShutdown = false;
14+
15+
public ManagedChannel forAddress(String host, int port) {
16+
assert !this.isShutdown;
17+
String channelId = this.getChannelId(host, port);
18+
return this.channelMap.computeIfAbsent(channelId, unused -> this.buildNewChannel(host, port));
19+
}
20+
21+
private ManagedChannel buildNewChannel(String host, int port) {
22+
LOG.info("Creating new channel for {}:{}", host, port);
23+
return ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
24+
}
25+
26+
private String getChannelId(String host, int port) {
27+
return host + ":" + port;
28+
}
29+
30+
public void shutdown() {
31+
channelMap.values().forEach(ManagedChannel::shutdown);
32+
this.isShutdown = true;
33+
}
34+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.hypertrace.core.grpcutils.client;
2+
3+
import static org.junit.jupiter.api.Assertions.assertFalse;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
import static org.junit.jupiter.api.Assertions.assertNotSame;
6+
import static org.junit.jupiter.api.Assertions.assertSame;
7+
import static org.junit.jupiter.api.Assertions.assertThrows;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import io.grpc.Channel;
11+
import io.grpc.ManagedChannel;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
15+
class GrpcChannelRegistryTest {
16+
17+
GrpcChannelRegistry channelRegistry;
18+
19+
@BeforeEach
20+
void beforeEach() {
21+
this.channelRegistry = new GrpcChannelRegistry();
22+
}
23+
24+
@Test
25+
void createsNewChannelsAsRequested() {
26+
assertNotNull(this.channelRegistry.forAddress("foo", 1000));
27+
}
28+
29+
@Test
30+
void reusesChannelsForDuplicateRequests() {
31+
Channel firstChannel = this.channelRegistry.forAddress("foo", 1000);
32+
assertSame(firstChannel, this.channelRegistry.forAddress("foo", 1000));
33+
assertNotSame(firstChannel, this.channelRegistry.forAddress("foo", 1001));
34+
assertNotSame(firstChannel, this.channelRegistry.forAddress("bar", 1000));
35+
}
36+
37+
@Test
38+
void shutdownAllChannelsOnShutdown() {
39+
ManagedChannel firstChannel = this.channelRegistry.forAddress("foo", 1000);
40+
ManagedChannel secondChannel = this.channelRegistry.forAddress("foo", 1002);
41+
assertFalse(firstChannel.isShutdown());
42+
assertFalse(secondChannel.isShutdown());
43+
this.channelRegistry.shutdown();
44+
assertTrue(firstChannel.isShutdown());
45+
assertTrue(secondChannel.isShutdown());
46+
}
47+
48+
@Test
49+
void throwsIfNewChannelRequestedAfterShutdown() {
50+
this.channelRegistry.shutdown();
51+
assertThrows(AssertionError.class, () -> this.channelRegistry.forAddress("foo", 1000));
52+
}
53+
}

0 commit comments

Comments
 (0)