Skip to content

Commit 0cfa7e6

Browse files
YifeiZhuangkannanjgithub
authored andcommitted
Otel server context interceptor (grpc#11500)
Add opentelemetry tracing API, guarded by environmental variable(disabled by default). Use server interceptor to explicitly propagate span to the application thread.
1 parent 27c5300 commit 0cfa7e6

File tree

6 files changed

+357
-8
lines changed

6 files changed

+357
-8
lines changed

opentelemetry/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ dependencies {
1414
libraries.opentelemetry.api,
1515
libraries.auto.value.annotations
1616

17-
testImplementation testFixtures(project(':grpc-core')),
18-
project(':grpc-testing'),
17+
testImplementation project(':grpc-testing'),
18+
project(':grpc-inprocess'),
19+
testFixtures(project(':grpc-core')),
20+
testFixtures(project(':grpc-api')),
1921
libraries.opentelemetry.sdk.testing,
2022
libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj
2123

opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@
3333
import io.grpc.ManagedChannelBuilder;
3434
import io.grpc.MetricSink;
3535
import io.grpc.ServerBuilder;
36+
import io.grpc.internal.GrpcUtil;
3637
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
3738
import io.opentelemetry.api.OpenTelemetry;
3839
import io.opentelemetry.api.metrics.Meter;
3940
import io.opentelemetry.api.metrics.MeterProvider;
41+
import io.opentelemetry.api.trace.Tracer;
4042
import java.util.ArrayList;
4143
import java.util.Collection;
4244
import java.util.Collections;
@@ -61,13 +63,18 @@ public Stopwatch get() {
6163
}
6264
};
6365

66+
@VisibleForTesting
67+
static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING",
68+
false);
69+
6470
private final OpenTelemetry openTelemetrySdk;
6571
private final MeterProvider meterProvider;
6672
private final Meter meter;
6773
private final Map<String, Boolean> enableMetrics;
6874
private final boolean disableDefault;
6975
private final OpenTelemetryMetricsResource resource;
7076
private final OpenTelemetryMetricsModule openTelemetryMetricsModule;
77+
private final OpenTelemetryTracingModule openTelemetryTracingModule;
7178
private final List<String> optionalLabels;
7279
private final MetricSink sink;
7380

@@ -88,6 +95,7 @@ private GrpcOpenTelemetry(Builder builder) {
8895
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
8996
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
9097
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins);
98+
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
9199
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
92100
}
93101

@@ -125,6 +133,11 @@ MetricSink getSink() {
125133
return sink;
126134
}
127135

136+
@VisibleForTesting
137+
Tracer getTracer() {
138+
return this.openTelemetryTracingModule.getTracer();
139+
}
140+
128141
/**
129142
* Registers GrpcOpenTelemetry globally, applying its configuration to all subsequently created
130143
* gRPC channels and servers.
@@ -152,6 +165,9 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
152165
InternalManagedChannelBuilder.addMetricSink(builder, sink);
153166
InternalManagedChannelBuilder.interceptWithTarget(
154167
builder, openTelemetryMetricsModule::getClientInterceptor);
168+
if (ENABLE_OTEL_TRACING) {
169+
builder.intercept(openTelemetryTracingModule.getClientInterceptor());
170+
}
155171
}
156172

157173
/**
@@ -161,6 +177,11 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
161177
*/
162178
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
163179
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
180+
if (ENABLE_OTEL_TRACING) {
181+
serverBuilder.addStreamTracerFactory(
182+
openTelemetryTracingModule.getServerTracerFactory());
183+
serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor());
184+
}
164185
}
165186

166187
@VisibleForTesting
@@ -342,6 +363,11 @@ public Builder disableAllMetrics() {
342363
return this;
343364
}
344365

366+
Builder enableTracing(boolean enable) {
367+
ENABLE_OTEL_TRACING = enable;
368+
return this;
369+
}
370+
345371
/**
346372
* Returns a new {@link GrpcOpenTelemetry} built with the configuration of this {@link
347373
* Builder}.

opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ public static void builderPlugin(
2929
GrpcOpenTelemetry.Builder builder, InternalOpenTelemetryPlugin plugin) {
3030
builder.plugin(plugin);
3131
}
32+
33+
public static void enableTracing(GrpcOpenTelemetry.Builder builder, boolean enable) {
34+
builder.enableTracing(enable);
35+
}
3236
}

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
21+
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
2122

2223
import com.google.common.annotations.VisibleForTesting;
2324
import io.grpc.Attributes;
@@ -28,15 +29,21 @@
2829
import io.grpc.ClientStreamTracer;
2930
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
3031
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
32+
import io.grpc.ForwardingServerCallListener;
3133
import io.grpc.Metadata;
3234
import io.grpc.MethodDescriptor;
35+
import io.grpc.ServerCall;
36+
import io.grpc.ServerCallHandler;
37+
import io.grpc.ServerInterceptor;
3338
import io.grpc.ServerStreamTracer;
39+
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
3440
import io.opentelemetry.api.OpenTelemetry;
3541
import io.opentelemetry.api.common.AttributesBuilder;
3642
import io.opentelemetry.api.trace.Span;
3743
import io.opentelemetry.api.trace.StatusCode;
3844
import io.opentelemetry.api.trace.Tracer;
3945
import io.opentelemetry.context.Context;
46+
import io.opentelemetry.context.Scope;
4047
import io.opentelemetry.context.propagation.ContextPropagators;
4148
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4249
import java.util.logging.Level;
@@ -50,7 +57,7 @@ final class OpenTelemetryTracingModule {
5057
private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
5158

5259
@VisibleForTesting
53-
static final String OTEL_TRACING_SCOPE_NAME = "grpc-java";
60+
final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
5461
@Nullable
5562
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
5663
@Nullable
@@ -83,13 +90,23 @@ final class OpenTelemetryTracingModule {
8390
private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
8491
private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
8592
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
93+
private final ServerInterceptor serverSpanPropagationInterceptor =
94+
new TracingServerSpanPropagationInterceptor();
8695
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
8796

8897
OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
89-
this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer");
98+
this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
99+
.tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
100+
.setInstrumentationVersion(IMPLEMENTATION_VERSION)
101+
.build();
90102
this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
91103
}
92104

105+
@VisibleForTesting
106+
Tracer getTracer() {
107+
return otelTracer;
108+
}
109+
93110
/**
94111
* Creates a {@link CallAttemptsTracerFactory} for a new call.
95112
*/
@@ -112,6 +129,10 @@ ClientInterceptor getClientInterceptor() {
112129
return clientInterceptor;
113130
}
114131

132+
ServerInterceptor getServerSpanPropagationInterceptor() {
133+
return serverSpanPropagationInterceptor;
134+
}
135+
115136
@VisibleForTesting
116137
final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
117138
volatile int callEnded;
@@ -252,6 +273,11 @@ public void streamClosed(io.grpc.Status status) {
252273
endSpanWithStatus(span, status);
253274
}
254275

276+
@Override
277+
public io.grpc.Context filterContext(io.grpc.Context context) {
278+
return context.withValue(otelSpan, span);
279+
}
280+
255281
@Override
256282
public void outboundMessageSent(
257283
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
@@ -293,6 +319,69 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
293319
}
294320
}
295321

322+
@VisibleForTesting
323+
final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
324+
@Override
325+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
326+
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
327+
Span span = otelSpan.get(io.grpc.Context.current());
328+
if (span == null) {
329+
logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
330+
+ "tracing must be set.");
331+
return next.startCall(call, headers);
332+
}
333+
Context serverCallContext = Context.current().with(span);
334+
try (Scope scope = serverCallContext.makeCurrent()) {
335+
return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
336+
}
337+
}
338+
}
339+
340+
private static class ContextServerCallListener<ReqT> extends
341+
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
342+
private final Context context;
343+
344+
protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
345+
super(delegate);
346+
this.context = checkNotNull(context, "context");
347+
}
348+
349+
@Override
350+
public void onMessage(ReqT message) {
351+
try (Scope scope = context.makeCurrent()) {
352+
delegate().onMessage(message);
353+
}
354+
}
355+
356+
@Override
357+
public void onHalfClose() {
358+
try (Scope scope = context.makeCurrent()) {
359+
delegate().onHalfClose();
360+
}
361+
}
362+
363+
@Override
364+
public void onCancel() {
365+
try (Scope scope = context.makeCurrent()) {
366+
delegate().onCancel();
367+
}
368+
}
369+
370+
@Override
371+
public void onComplete() {
372+
try (Scope scope = context.makeCurrent()) {
373+
delegate().onComplete();
374+
}
375+
}
376+
377+
@Override
378+
public void onReady() {
379+
try (Scope scope = context.makeCurrent()) {
380+
delegate().onReady();
381+
}
382+
}
383+
}
384+
296385
@VisibleForTesting
297386
final class TracingClientInterceptor implements ClientInterceptor {
298387

opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,26 @@
1717
package io.grpc.opentelemetry;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2025

2126
import com.google.common.collect.ImmutableList;
27+
import io.grpc.ClientInterceptor;
28+
import io.grpc.ManagedChannelBuilder;
2229
import io.grpc.MetricSink;
30+
import io.grpc.ServerBuilder;
2331
import io.grpc.internal.GrpcUtil;
2432
import io.opentelemetry.api.OpenTelemetry;
2533
import io.opentelemetry.sdk.OpenTelemetrySdk;
2634
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
2735
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
36+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
2837
import java.util.Arrays;
38+
import org.junit.After;
39+
import org.junit.Before;
2940
import org.junit.Test;
3041
import org.junit.runner.RunWith;
3142
import org.junit.runners.JUnit4;
@@ -35,7 +46,19 @@ public class GrpcOpenTelemetryTest {
3546
private final InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
3647
private final SdkMeterProvider meterProvider =
3748
SdkMeterProvider.builder().registerMetricReader(inMemoryMetricReader).build();
49+
private final SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
3850
private final OpenTelemetry noopOpenTelemetry = OpenTelemetry.noop();
51+
private boolean originalEnableOtelTracing;
52+
53+
@Before
54+
public void setup() {
55+
originalEnableOtelTracing = GrpcOpenTelemetry.ENABLE_OTEL_TRACING;
56+
}
57+
58+
@After
59+
public void tearDown() {
60+
GrpcOpenTelemetry.ENABLE_OTEL_TRACING = originalEnableOtelTracing;
61+
}
3962

4063
@Test
4164
public void build() {
@@ -56,6 +79,31 @@ public void build() {
5679
assertThat(openTelemetryModule.getOptionalLabels()).isEqualTo(ImmutableList.of("version"));
5780
}
5881

82+
@Test
83+
public void buildTracer() {
84+
OpenTelemetrySdk sdk =
85+
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
86+
87+
GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder()
88+
.enableTracing(true)
89+
.sdk(sdk).build();
90+
91+
assertThat(grpcOpenTelemetry.getOpenTelemetryInstance()).isSameInstanceAs(sdk);
92+
assertThat(grpcOpenTelemetry.getTracer()).isSameInstanceAs(
93+
tracerProvider.tracerBuilder("grpc-java")
94+
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
95+
.build());
96+
ServerBuilder<?> mockServerBuiler = mock(ServerBuilder.class);
97+
grpcOpenTelemetry.configureServerBuilder(mockServerBuiler);
98+
verify(mockServerBuiler, times(2)).addStreamTracerFactory(any());
99+
verify(mockServerBuiler).intercept(any());
100+
verifyNoMoreInteractions(mockServerBuiler);
101+
102+
ManagedChannelBuilder<?> mockChannelBuilder = mock(ManagedChannelBuilder.class);
103+
grpcOpenTelemetry.configureChannelBuilder(mockChannelBuilder);
104+
verify(mockChannelBuilder).intercept(any(ClientInterceptor.class));
105+
}
106+
59107
@Test
60108
public void builderDefaults() {
61109
GrpcOpenTelemetry module = GrpcOpenTelemetry.newBuilder().build();
@@ -73,6 +121,13 @@ public void builderDefaults() {
73121
assertThat(module.getEnableMetrics()).isEmpty();
74122
assertThat(module.getOptionalLabels()).isEmpty();
75123
assertThat(module.getSink()).isInstanceOf(MetricSink.class);
124+
125+
assertThat(module.getTracer()).isSameInstanceAs(noopOpenTelemetry
126+
.getTracerProvider()
127+
.tracerBuilder("grpc-java")
128+
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
129+
.build()
130+
);
76131
}
77132

78133
@Test

0 commit comments

Comments
 (0)