diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 2f366b7404c..42e1fdfebea 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -132,12 +132,15 @@ public static final class StreamInfo { private final CallOptions callOptions; private final int previousAttempts; private final boolean isTransparentRetry; + private final boolean isHedging; StreamInfo( - CallOptions callOptions, int previousAttempts, boolean isTransparentRetry) { + CallOptions callOptions, int previousAttempts, boolean isTransparentRetry, + boolean isHedging) { this.callOptions = checkNotNull(callOptions, "callOptions"); this.previousAttempts = previousAttempts; this.isTransparentRetry = isTransparentRetry; + this.isHedging = isHedging; } /** @@ -165,6 +168,15 @@ public boolean isTransparentRetry() { return isTransparentRetry; } + /** + * Whether the stream is hedging. + * + * @since 1.74.0 + */ + public boolean isHedging() { + return isHedging; + } + /** * Converts this StreamInfo into a new Builder. * @@ -174,7 +186,9 @@ public Builder toBuilder() { return new Builder() .setCallOptions(callOptions) .setPreviousAttempts(previousAttempts) - .setIsTransparentRetry(isTransparentRetry); + .setIsTransparentRetry(isTransparentRetry) + .setIsHedging(isHedging); + } /** @@ -192,6 +206,7 @@ public String toString() { .add("callOptions", callOptions) .add("previousAttempts", previousAttempts) .add("isTransparentRetry", isTransparentRetry) + .add("isHedging", isHedging) .toString(); } @@ -204,6 +219,7 @@ public static final class Builder { private CallOptions callOptions = CallOptions.DEFAULT; private int previousAttempts; private boolean isTransparentRetry; + private boolean isHedging; Builder() { } @@ -236,11 +252,21 @@ public Builder setIsTransparentRetry(boolean isTransparentRetry) { return this; } + /** + * Sets whether the stream is hedging. + * + * @since 1.74.0 + */ + public Builder setIsHedging(boolean isHedging) { + this.isHedging = isHedging; + return this; + } + /** * Builds a new StreamInfo. */ public StreamInfo build() { - return new StreamInfo(callOptions, previousAttempts, isTransparentRetry); + return new StreamInfo(callOptions, previousAttempts, isTransparentRetry, isHedging); } } } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index b9e8dd79f09..4b24b1eae3d 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -250,7 +250,8 @@ public void runInContext() { stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { ClientStreamTracer[] tracers = - GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); + GrpcUtil.getClientStreamTracers(callOptions, headers, 0, + false, false); String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions"; Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED); String description = String.format( diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 1434afa884c..4bf69f6b48a 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -757,13 +757,15 @@ public ListenableFuture getStats() { /** Gets stream tracers based on CallOptions. */ public static ClientStreamTracer[] getClientStreamTracers( - CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) { + CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry, + boolean isHedging) { List factories = callOptions.getStreamTracerFactories(); ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1]; StreamInfo streamInfo = StreamInfo.newBuilder() .setCallOptions(callOptions) .setPreviousAttempts(previousAttempts) .setIsTransparentRetry(isTransparentRetry) + .setIsHedging(isHedging) .build(); for (int i = 0; i < factories.size(); i++) { tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index e8f106c7775..dc6d65750f6 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -479,7 +479,8 @@ public ClientStream newStream( // the delayed transport or a real transport will go in-use and cancel the idle timer. if (!retryEnabled) { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, 0, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false, + /* isHedging= */false); Context origContext = context.attach(); try { return delayedTransport.newStream(method, headers, callOptions, tracers); @@ -519,10 +520,10 @@ void postCommit() { @Override ClientStream newSubstream( Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts, - boolean isTransparentRetry) { + boolean isTransparentRetry, boolean isHedging) { CallOptions newOptions = callOptions.withStreamTracerFactory(factory); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - newOptions, newHeaders, previousAttempts, isTransparentRetry); + newOptions, newHeaders, previousAttempts, isTransparentRetry, isHedging); Context origContext = context.attach(); try { return delayedTransport.newStream(method, newHeaders, newOptions, tracers); diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 01ef457460f..30c9f55e796 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -88,7 +88,8 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented method, CallOptions callOptions, Metadata headers, Context context) { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, 0, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false, + /* isHedging= */ false); Context origContext = context.attach(); // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't // matter here because OOB communication should be sparse, and it's not on application RPC's diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 6d33c2eb3e7..1fc8e8c0ba7 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -266,7 +266,8 @@ public ClientStreamTracer newClientStreamTracer( Metadata newHeaders = updateHeaders(headers, previousAttemptCount); // NOTICE: This set _must_ be done before stream.start() and it actually is. - sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry); + sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry, + isHedging); return sub; } @@ -276,7 +277,7 @@ public ClientStreamTracer newClientStreamTracer( */ abstract ClientStream newSubstream( Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, - boolean isTransparentRetry); + boolean isTransparentRetry, boolean isHedging); /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ @VisibleForTesting diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index 773dcb99dd7..ced4272afe3 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -59,7 +59,8 @@ public ClientStream newStream(MethodDescriptor method, transport = notReadyTransport; } ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, 0, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false, + /* isHedging= */ false); Context origContext = context.attach(); try { return transport.newStream(method, headers, callOptions, tracers); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 9b1ec343bb7..213e9a704c5 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -186,7 +186,8 @@ ClientStream newSubstream( Metadata metadata, ClientStreamTracer.Factory tracerFactory, int previousAttempts, - boolean isTransparentRetry) { + boolean isTransparentRetry, + boolean isHedging) { bufferSizeTracer = tracerFactory.newClientStreamTracer(STREAM_INFO, metadata); int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6b257a18a6c..e0af0f80ed3 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -18,8 +18,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.HEDGE_BUCKETS; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LATENCY_BUCKETS; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.RETRY_BUCKETS; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.SIZE_BUCKETS; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TRANSPARENT_RETRY_BUCKETS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -64,8 +67,8 @@ public Stopwatch get() { }; @VisibleForTesting - static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING", - false); + static boolean ENABLE_OTEL_TRACING = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING", false); private final OpenTelemetry openTelemetrySdk; private final MeterProvider meterProvider; @@ -241,6 +244,54 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } + if (isMetricEnabled("grpc.client.call.retries", enableMetrics, disableDefault)) { + builder.clientCallRetriesCounter( + meter.histogramBuilder( + "grpc.client.call.retries") + .setUnit("{retry}") + .setDescription("Number of retries during the client call. " + + "If there were no retries, 0 is not reported.") + .ofLongs() + .setExplicitBucketBoundariesAdvice(RETRY_BUCKETS) + .build()); + } + + if (isMetricEnabled("grpc.client.call.transparent_retries", enableMetrics, + disableDefault)) { + builder.clientCallTransparentRetriesCounter( + meter.histogramBuilder( + "grpc.client.call.transparent_retries") + .setUnit("{transparent_retry}") + .setDescription("Number of transparent retries during the client call. " + + "If there were no transparent retries, 0 is not reported.") + .ofLongs() + .setExplicitBucketBoundariesAdvice(TRANSPARENT_RETRY_BUCKETS) + .build()); + } + + if (isMetricEnabled("grpc.client.call.hedges", enableMetrics, disableDefault)) { + builder.clientCallHedgesCounter( + meter.histogramBuilder( + "grpc.client.call.hedges") + .setUnit("{hedge}") + .setDescription("Number of hedges during the client call. " + + "If there were no hedges, 0 is not reported.") + .ofLongs() + .setExplicitBucketBoundariesAdvice(HEDGE_BUCKETS) + .build()); + } + + if (isMetricEnabled("grpc.client.call.retry_delay", enableMetrics, disableDefault)) { + builder.clientCallRetryDelayCounter( + meter.histogramBuilder( + "grpc.client.call.retry_delay") + .setUnit("s") + .setDescription("Total time of delay while there is no active attempt during the " + + "client call") + .setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS) + .build()); + } + if (isMetricEnabled("grpc.server.call.started", enableMetrics, disableDefault)) { builder.serverCallCountCounter( meter.counterBuilder("grpc.server.call.started") @@ -259,8 +310,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } - if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size", enableMetrics, - disableDefault)) { + if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size", + enableMetrics, disableDefault)) { builder.serverTotalSentCompressedMessageSizeCounter( meter.histogramBuilder( "grpc.server.call.sent_total_compressed_message_size") @@ -271,8 +322,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } - if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size", enableMetrics, - disableDefault)) { + if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size", + enableMetrics, disableDefault)) { builder.serverTotalReceivedCompressedMessageSizeCounter( meter.histogramBuilder( "grpc.server.call.rcvd_total_compressed_message_size") diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 82ad41e7b20..1b72ec9fdcb 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -71,6 +71,7 @@ */ final class OpenTelemetryMetricsModule { private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName()); + private static final double NANOS_PER_SEC = 1_000_000_000.0; public static final ImmutableSet DEFAULT_PER_CALL_METRICS_SET = ImmutableSet.of( "grpc.client.attempt.started", @@ -292,9 +293,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory private final String fullMethodName; private final List callPlugins; private Status status; + private long retryDelayNanos; private long callLatencyNanos; private final Object lock = new Object(); private final AtomicLong attemptsPerCall = new AtomicLong(); + private final AtomicLong hedgedAttemptsPerCall = new AtomicLong(); + private final AtomicLong transparentRetriesPerCall = new AtomicLong(); @GuardedBy("lock") private int activeStreams; @GuardedBy("lock") @@ -331,6 +335,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada } if (++activeStreams == 1 && attemptStopwatch.isRunning()) { attemptStopwatch.stop(); + retryDelayNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS); } } // Skip recording for the first time, since it is already recorded in @@ -344,7 +349,11 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada module.resource.clientAttemptCountCounter().add(1, attribute); } } - if (!info.isTransparentRetry()) { + if (info.isTransparentRetry()) { + transparentRetriesPerCall.incrementAndGet(); + } else if (info.isHedging()) { + hedgedAttemptsPerCall.incrementAndGet(); + } else { attemptsPerCall.incrementAndGet(); } return newClientTracer(info); @@ -407,14 +416,65 @@ void recordFinishedCall() { tracer.recordFinishedAttempt(); } callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS); - io.opentelemetry.api.common.Attributes attribute = - io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName, - TARGET_KEY, target, - STATUS_KEY, status.getCode().toString()); + // Base attributes + io.opentelemetry.api.common.Attributes baseAttributes = + io.opentelemetry.api.common.Attributes.of( + METHOD_KEY, fullMethodName, + TARGET_KEY, target + ); + + // Duration if (module.resource.clientCallDurationCounter() != null) { - module.resource.clientCallDurationCounter() - .record(callLatencyNanos * SECONDS_PER_NANO, attribute); + module.resource.clientCallDurationCounter().record( + callLatencyNanos * SECONDS_PER_NANO, + baseAttributes.toBuilder() + .put(STATUS_KEY, status.getCode().toString()) + .build() + ); + } + + // Retry counts + if (module.resource.clientCallRetriesCounter() != null) { + + long retriesPerCall = 0; + long attempts = attemptsPerCall.get(); + if (attempts > 0) { + retriesPerCall = attempts - 1; + } + + if (retriesPerCall > 0) { + module.resource.clientCallRetriesCounter().record(retriesPerCall, baseAttributes); + } + } + + // Hedge counts + if (module.resource.clientCallHedgesCounter() != null) { + + long hedgesPerCall = 0; + long attempts = hedgedAttemptsPerCall.get(); + if (attempts > 0) { + hedgesPerCall = attempts - 1; + } + + if (hedgesPerCall > 0) { + module.resource.clientCallHedgesCounter().record(hedgesPerCall, baseAttributes); + } + } + + // Transparent Retry counts + if (module.resource.clientCallTransparentRetriesCounter() != null + && transparentRetriesPerCall.get() > 0) { + module.resource.clientCallTransparentRetriesCounter().record( + transparentRetriesPerCall.get(), baseAttributes); + } + + // Retry delay + if (module.resource.clientCallRetryDelayCounter() != null) { + module.resource.clientCallRetryDelayCounter().record( + retryDelayNanos / NANOS_PER_SEC, + baseAttributes + ); } } } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java index e519b7e1eb6..d32ae1e67f5 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java @@ -41,6 +41,17 @@ abstract class OpenTelemetryMetricsResource { @Nullable abstract LongHistogram clientTotalReceivedCompressedMessageSizeCounter(); + @Nullable + abstract LongHistogram clientCallRetriesCounter(); + + @Nullable + abstract LongHistogram clientCallTransparentRetriesCounter(); + + @Nullable + abstract LongHistogram clientCallHedgesCounter(); + + @Nullable + abstract DoubleHistogram clientCallRetryDelayCounter(); /* Server Metrics */ @Nullable @@ -73,6 +84,14 @@ abstract static class Builder { abstract Builder clientTotalReceivedCompressedMessageSizeCounter( LongHistogram counter); + abstract Builder clientCallRetriesCounter(LongHistogram counter); + + abstract Builder clientCallTransparentRetriesCounter(LongHistogram counter); + + abstract Builder clientCallHedgesCounter(LongHistogram counter); + + abstract Builder clientCallRetryDelayCounter(DoubleHistogram counter); + abstract Builder serverCallCountCounter(LongCounter counter); abstract Builder serverCallDurationCounter(DoubleHistogram counter); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 5214804d369..5ebbbb788d9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -49,6 +49,13 @@ public final class OpenTelemetryConstants { 0L, 1024L, 2048L, 4096L, 16384L, 65536L, 262144L, 1048576L, 4194304L, 16777216L, 67108864L, 268435456L, 1073741824L, 4294967296L); + public static final List RETRY_BUCKETS = ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L); + + public static final List TRANSPARENT_RETRY_BUCKETS = + ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L, 10L); + + public static final List HEDGE_BUCKETS = ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L); + private OpenTelemetryConstants() { } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 77f0268ec2f..c60a3d4f357 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -54,11 +54,14 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -94,6 +97,11 @@ public class OpenTelemetryMetricsModuleTest { private static final String CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE = "grpc.client.attempt.rcvd_total_compressed_message_size"; private static final String CLIENT_CALL_DURATION = "grpc.client.call.duration"; + private static final String CLIENT_CALL_RETRIES = "grpc.client.call.retries"; + private static final String CLIENT_CALL_TRANSPARENT_RETRIES = + "grpc.client.call.transparent_retries"; + private static final String CLIENT_CALL_HEDGES = "grpc.client.call.hedges"; + private static final String CLIENT_CALL_RETRY_DELAY = "grpc.client.call.retry_delay"; private static final String SERVER_CALL_COUNT = "grpc.server.call.started"; private static final String SERVER_CALL_DURATION = "grpc.server.call.duration"; private static final String SERVER_CALL_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE @@ -194,7 +202,7 @@ public ServerCall.Listener startCall( }).build()); final AtomicReference capturedCallOptions = new AtomicReference<>(); - ClientInterceptor callOptionsCatureInterceptor = new ClientInterceptor() { + ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { @@ -204,7 +212,7 @@ public ClientCall interceptCall( }; Channel interceptedChannel = ClientInterceptors.intercept( - grpcServerRule.getChannel(), callOptionsCatureInterceptor, + grpcServerRule.getChannel(), callOptionsCaptureInterceptor, module.getClientInterceptor("target:///")); ClientCall call; call = interceptedChannel.newCall(method, CALL_OPTIONS); @@ -378,6 +386,88 @@ public void clientBasicMetrics() { .hasBucketCounts(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))); + + assertThat(openTelemetryTesting.getMetrics()) + .extracting("name") + .doesNotContain( + CLIENT_CALL_RETRIES, + CLIENT_CALL_TRANSPARENT_RETRIES, + CLIENT_CALL_HEDGES, + CLIENT_CALL_RETRY_DELAY); + } + + @Test + public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsent() { + // Explicitly enable the retry metrics + Map enabledMetrics = ImmutableMap.of( + CLIENT_CALL_RETRIES, true, + CLIENT_CALL_TRANSPARENT_RETRIES, true, + CLIENT_CALL_HEDGES, true, + CLIENT_CALL_RETRY_DELAY, true + ); + + String target = "target:///"; + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetrics, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + + fakeClock.forwardTime(30, TimeUnit.MILLISECONDS); + tracer.outboundHeaders(); + fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); + tracer.outboundMessage(0); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + io.opentelemetry.api.common.Attributes finalAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_CALL_DURATION), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRY_DELAY) + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(0) + .hasCount(1) + .hasAttributes(finalAttributes))) + + ); + + List optionalMetricNames = Arrays.asList( + CLIENT_CALL_RETRIES, + CLIENT_CALL_TRANSPARENT_RETRIES, + CLIENT_CALL_HEDGES); + + for (String metricName : optionalMetricNames) { + Optional metric = openTelemetryTesting.getMetrics().stream() + .filter(m -> m.getName().equals(metricName)) + .findFirst(); + if (metric.isPresent()) { + assertThat(metric.get()) + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(0) + .hasCount(1) + .hasAttributes(finalAttributes))); + } + } } // This test is only unit-testing the metrics recording logic. The retry behavior is faked. @@ -831,6 +921,182 @@ public void recordAttemptMetrics() { .hasBucketBoundaries(sizeBuckets)))); } + @Test + public void recordAttemptMetrics_withRetryMetricsEnabled() { + Map enabledMetrics = ImmutableMap.of( + CLIENT_CALL_RETRIES, true, + CLIENT_CALL_TRANSPARENT_RETRIES, true, + CLIENT_CALL_HEDGES, true, + CLIENT_CALL_RETRY_DELAY, true + ); + + String target = "dns:///example.com"; + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetrics, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + method.getFullMethodName(), emptyList()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + fakeClock.forwardTime(154, TimeUnit.MILLISECONDS); + tracer.streamClosed(Status.UNAVAILABLE); + + fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); + tracer.streamClosed(Status.NOT_FOUND); + + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + fakeClock.forwardTime(32, MILLISECONDS); + tracer.streamClosed(Status.UNAVAILABLE); + + fakeClock.forwardTime(10, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + tracer.inboundWireSize(33); + fakeClock.forwardTime(24, MILLISECONDS); + tracer.streamClosed(Status.OK); // RPC succeeded + + // --- The overall call ends --- + callAttemptsTracerFactory.callEnded(Status.OK); + + // Define attributes for assertions + io.opentelemetry.api.common.Attributes finalAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + // FINAL ASSERTION BLOCK + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + // Default metrics + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_CALL_DURATION), + + // --- Assertions for the retry metrics --- + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRIES) + .hasUnit("{retry}") + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(1) // We faked one standard retry + .hasAttributes(finalAttributes))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_TRANSPARENT_RETRIES) + .hasUnit("{transparent_retry}") + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(2) // We faked two transparent retries + .hasAttributes(finalAttributes))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRY_DELAY) + .hasUnit("s") + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(1.02) // 1000ms + 10ms + 10ms + .hasAttributes(finalAttributes))) + ); + } + + @Test + public void recordAttemptMetrics_withHedgedCalls() { + // Enable the retry metrics, including hedges + Map enabledMetrics = ImmutableMap.of( + CLIENT_CALL_RETRIES, true, + CLIENT_CALL_TRANSPARENT_RETRIES, true, + CLIENT_CALL_HEDGES, true, + CLIENT_CALL_RETRY_DELAY, true + ); + + String target = "dns:///example.com"; + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetrics, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + method.getFullMethodName(), emptyList()); + + // Create a StreamInfo specifically for hedged attempts + final ClientStreamTracer.StreamInfo hedgedStreamInfo = + STREAM_INFO.toBuilder().setIsHedging(true).build(); + + // --- First attempt starts --- + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + + // --- Faking a hedged attempt --- + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); // Hedging delay + ClientStreamTracer hedgeTracer1 = + callAttemptsTracerFactory.newClientStreamTracer(hedgedStreamInfo, new Metadata()); + + // --- Faking a second hedged attempt --- + fakeClock.forwardTime(20, TimeUnit.MILLISECONDS); // Another hedging delay + ClientStreamTracer hedgeTracer2 = + callAttemptsTracerFactory.newClientStreamTracer(hedgedStreamInfo, new Metadata()); + + // --- Let the attempts resolve --- + fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); + // Initial attempt is cancelled because a hedge will succeed + tracer.streamClosed(Status.CANCELLED); + hedgeTracer1.streamClosed(Status.UNAVAILABLE); // First hedge fails + + fakeClock.forwardTime(30, TimeUnit.MILLISECONDS); + hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds + + // --- The overall call ends --- + callAttemptsTracerFactory.callEnded(Status.OK); + + // Define attributes for assertions + io.opentelemetry.api.common.Attributes finalAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + // FINAL ASSERTION BLOCK + // We expect 7 metrics: 5 default + hedges + retry_delay. + // Retries and transparent_retries are 0 and will not be reported. + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + // Default metrics + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE), + metric -> assertThat(metric).hasName(CLIENT_CALL_DURATION), + + // --- Assertions for the NEW metrics --- + metric -> assertThat(metric) + .hasName(CLIENT_CALL_HEDGES) + .hasUnit("{hedge}") + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(1) + .hasAttributes(finalAttributes))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRY_DELAY) + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasCount(1) + .hasSum(0) + .hasAttributes(finalAttributes))) + ); + } + @Test public void clientStreamNeverCreatedStillRecordMetrics() { String target = "dns:///foo.example.com";