54
54
import io .opentelemetry .api .common .AttributeKey ;
55
55
import io .opentelemetry .api .metrics .Meter ;
56
56
import io .opentelemetry .sdk .common .InstrumentationScopeInfo ;
57
+ import io .opentelemetry .sdk .metrics .data .MetricData ;
57
58
import io .opentelemetry .sdk .testing .junit4 .OpenTelemetryRule ;
58
59
import java .io .IOException ;
59
60
import java .io .InputStream ;
60
61
import java .util .Arrays ;
62
+ import java .util .List ;
61
63
import java .util .Map ;
64
+ import java .util .Optional ;
62
65
import java .util .concurrent .TimeUnit ;
63
66
import java .util .concurrent .atomic .AtomicReference ;
64
67
import javax .annotation .Nullable ;
68
+
65
69
import org .junit .Before ;
66
70
import org .junit .Rule ;
67
71
import org .junit .Test ;
@@ -94,6 +98,11 @@ public class OpenTelemetryMetricsModuleTest {
94
98
private static final String CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE
95
99
= "grpc.client.attempt.rcvd_total_compressed_message_size" ;
96
100
private static final String CLIENT_CALL_DURATION = "grpc.client.call.duration" ;
101
+ private static final String CLIENT_CALL_RETRIES = "grpc.client.call.retries" ;
102
+ private static final String CLIENT_CALL_TRANSPARENT_RETRIES =
103
+ "grpc.client.call.transparent_retries" ;
104
+ private static final String CLIENT_CALL_HEDGES = "grpc.client.call.hedges" ;
105
+ private static final String CLIENT_CALL_RETRY_DELAY = "grpc.client.call.retry_delay" ;
97
106
private static final String SERVER_CALL_COUNT = "grpc.server.call.started" ;
98
107
private static final String SERVER_CALL_DURATION = "grpc.server.call.duration" ;
99
108
private static final String SERVER_CALL_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE
@@ -194,7 +203,7 @@ public ServerCall.Listener<String> startCall(
194
203
}).build ());
195
204
196
205
final AtomicReference <CallOptions > capturedCallOptions = new AtomicReference <>();
197
- ClientInterceptor callOptionsCatureInterceptor = new ClientInterceptor () {
206
+ ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor () {
198
207
@ Override
199
208
public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
200
209
MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
@@ -204,7 +213,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
204
213
};
205
214
Channel interceptedChannel =
206
215
ClientInterceptors .intercept (
207
- grpcServerRule .getChannel (), callOptionsCatureInterceptor ,
216
+ grpcServerRule .getChannel (), callOptionsCaptureInterceptor ,
208
217
module .getClientInterceptor ("target:///" ));
209
218
ClientCall <String , String > call ;
210
219
call = interceptedChannel .newCall (method , CALL_OPTIONS );
@@ -378,6 +387,88 @@ public void clientBasicMetrics() {
378
387
.hasBucketCounts (0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
379
388
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 , 0 , 0 , 0 , 0 , 0 ,
380
389
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ))));
390
+
391
+ assertThat (openTelemetryTesting .getMetrics ())
392
+ .extracting ("name" )
393
+ .doesNotContain (
394
+ CLIENT_CALL_RETRIES ,
395
+ CLIENT_CALL_TRANSPARENT_RETRIES ,
396
+ CLIENT_CALL_HEDGES ,
397
+ CLIENT_CALL_RETRY_DELAY );
398
+ }
399
+
400
+ @ Test
401
+ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsent () {
402
+ // Explicitly enable the retry metrics
403
+ Map <String , Boolean > enabledMetrics = ImmutableMap .of (
404
+ CLIENT_CALL_RETRIES , true ,
405
+ CLIENT_CALL_TRANSPARENT_RETRIES , true ,
406
+ CLIENT_CALL_HEDGES , true ,
407
+ CLIENT_CALL_RETRY_DELAY , true
408
+ );
409
+
410
+ String target = "target:///" ;
411
+ OpenTelemetryMetricsResource resource = GrpcOpenTelemetry .createMetricInstruments (testMeter ,
412
+ enabledMetrics , disableDefaultMetrics );
413
+ OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule (resource );
414
+ OpenTelemetryMetricsModule .CallAttemptsTracerFactory callAttemptsTracerFactory =
415
+ new CallAttemptsTracerFactory (module , target , method .getFullMethodName (), emptyList ());
416
+ ClientStreamTracer tracer =
417
+ callAttemptsTracerFactory .newClientStreamTracer (STREAM_INFO , new Metadata ());
418
+
419
+ fakeClock .forwardTime (30 , TimeUnit .MILLISECONDS );
420
+ tracer .outboundHeaders ();
421
+ fakeClock .forwardTime (100 , TimeUnit .MILLISECONDS );
422
+ tracer .outboundMessage (0 );
423
+ tracer .streamClosed (Status .OK );
424
+ callAttemptsTracerFactory .callEnded (Status .OK );
425
+
426
+ io .opentelemetry .api .common .Attributes finalAttributes
427
+ = io .opentelemetry .api .common .Attributes .of (
428
+ TARGET_KEY , target ,
429
+ METHOD_KEY , method .getFullMethodName ());
430
+
431
+ assertThat (openTelemetryTesting .getMetrics ())
432
+ .satisfiesExactlyInAnyOrder (
433
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME ),
434
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME ),
435
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE ),
436
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE ),
437
+ metric -> assertThat (metric ).hasName (CLIENT_CALL_DURATION ),
438
+ metric -> assertThat (metric )
439
+ .hasName (CLIENT_CALL_RETRY_DELAY )
440
+ .hasHistogramSatisfying (
441
+ histogram ->
442
+ histogram .hasPointsSatisfying (
443
+ point ->
444
+ point
445
+ .hasSum (0 )
446
+ .hasCount (1 )
447
+ .hasAttributes (finalAttributes )))
448
+
449
+ );
450
+
451
+ List <String > optionalMetricNames = Arrays .asList (
452
+ CLIENT_CALL_RETRIES ,
453
+ CLIENT_CALL_TRANSPARENT_RETRIES ,
454
+ CLIENT_CALL_HEDGES );
455
+
456
+ for (String metricName : optionalMetricNames ) {
457
+ Optional <MetricData > metric = openTelemetryTesting .getMetrics ().stream ()
458
+ .filter (m -> m .getName ().equals (metricName ))
459
+ .findFirst ();
460
+ if (metric .isPresent ()) {
461
+ assertThat (metric .get ())
462
+ .hasHistogramSatisfying (
463
+ histogram ->
464
+ histogram .hasPointsSatisfying (
465
+ point ->
466
+ point
467
+ .hasSum (0 )
468
+ .hasCount (1 )
469
+ .hasAttributes (finalAttributes )));
470
+ }
471
+ }
381
472
}
382
473
383
474
// This test is only unit-testing the metrics recording logic. The retry behavior is faked.
@@ -831,6 +922,182 @@ public void recordAttemptMetrics() {
831
922
.hasBucketBoundaries (sizeBuckets ))));
832
923
}
833
924
925
+ @ Test
926
+ public void recordAttemptMetrics_withRetryMetricsEnabled () {
927
+ Map <String , Boolean > enabledMetrics = ImmutableMap .of (
928
+ CLIENT_CALL_RETRIES , true ,
929
+ CLIENT_CALL_TRANSPARENT_RETRIES , true ,
930
+ CLIENT_CALL_HEDGES , true ,
931
+ CLIENT_CALL_RETRY_DELAY , true
932
+ );
933
+
934
+ String target = "dns:///example.com" ;
935
+ OpenTelemetryMetricsResource resource = GrpcOpenTelemetry .createMetricInstruments (testMeter ,
936
+ enabledMetrics , disableDefaultMetrics );
937
+ OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule (resource );
938
+ OpenTelemetryMetricsModule .CallAttemptsTracerFactory callAttemptsTracerFactory =
939
+ new OpenTelemetryMetricsModule .CallAttemptsTracerFactory (module , target ,
940
+ method .getFullMethodName (), emptyList ());
941
+
942
+ ClientStreamTracer tracer =
943
+ callAttemptsTracerFactory .newClientStreamTracer (STREAM_INFO , new Metadata ());
944
+ fakeClock .forwardTime (154 , TimeUnit .MILLISECONDS );
945
+ tracer .streamClosed (Status .UNAVAILABLE );
946
+
947
+ fakeClock .forwardTime (1000 , TimeUnit .MILLISECONDS );
948
+ tracer = callAttemptsTracerFactory .newClientStreamTracer (STREAM_INFO , new Metadata ());
949
+ fakeClock .forwardTime (100 , TimeUnit .MILLISECONDS );
950
+ tracer .streamClosed (Status .NOT_FOUND );
951
+
952
+ fakeClock .forwardTime (10 , TimeUnit .MILLISECONDS );
953
+ tracer = callAttemptsTracerFactory .newClientStreamTracer (
954
+ STREAM_INFO .toBuilder ().setIsTransparentRetry (true ).build (), new Metadata ());
955
+ fakeClock .forwardTime (32 , MILLISECONDS );
956
+ tracer .streamClosed (Status .UNAVAILABLE );
957
+
958
+ fakeClock .forwardTime (10 , MILLISECONDS );
959
+ tracer = callAttemptsTracerFactory .newClientStreamTracer (
960
+ STREAM_INFO .toBuilder ().setIsTransparentRetry (true ).build (), new Metadata ());
961
+ tracer .inboundWireSize (33 );
962
+ fakeClock .forwardTime (24 , MILLISECONDS );
963
+ tracer .streamClosed (Status .OK ); // RPC succeeded
964
+
965
+ // --- The overall call ends ---
966
+ callAttemptsTracerFactory .callEnded (Status .OK );
967
+
968
+ // Define attributes for assertions
969
+ io .opentelemetry .api .common .Attributes finalAttributes
970
+ = io .opentelemetry .api .common .Attributes .of (
971
+ TARGET_KEY , target ,
972
+ METHOD_KEY , method .getFullMethodName ());
973
+
974
+ // FINAL ASSERTION BLOCK
975
+ assertThat (openTelemetryTesting .getMetrics ())
976
+ .satisfiesExactlyInAnyOrder (
977
+ // Default metrics
978
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME ),
979
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME ),
980
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE ),
981
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE ),
982
+ metric -> assertThat (metric ).hasName (CLIENT_CALL_DURATION ),
983
+
984
+ // --- Assertions for the retry metrics ---
985
+ metric -> assertThat (metric )
986
+ .hasName (CLIENT_CALL_RETRIES )
987
+ .hasUnit ("{retry}" )
988
+ .hasHistogramSatisfying (histogram -> histogram .hasPointsSatisfying (
989
+ point -> point
990
+ .hasCount (1 )
991
+ .hasSum (1 ) // We faked one standard retry
992
+ .hasAttributes (finalAttributes ))),
993
+ metric -> assertThat (metric )
994
+ .hasName (CLIENT_CALL_TRANSPARENT_RETRIES )
995
+ .hasUnit ("{transparent_retry}" )
996
+ .hasHistogramSatisfying (histogram -> histogram .hasPointsSatisfying (
997
+ point -> point
998
+ .hasCount (1 )
999
+ .hasSum (2 ) // We faked two transparent retries
1000
+ .hasAttributes (finalAttributes ))),
1001
+ metric -> assertThat (metric )
1002
+ .hasName (CLIENT_CALL_RETRY_DELAY )
1003
+ .hasUnit ("s" )
1004
+ .hasHistogramSatisfying (histogram -> histogram .hasPointsSatisfying (
1005
+ point -> point
1006
+ .hasCount (1 )
1007
+ .hasSum (1.02 ) // 1000ms + 10ms + 10ms
1008
+ .hasAttributes (finalAttributes )))
1009
+ );
1010
+ }
1011
+
1012
+ @ Test
1013
+ public void recordAttemptMetrics_withHedgedCalls () {
1014
+ // Enable the retry metrics, including hedges
1015
+ Map <String , Boolean > enabledMetrics = ImmutableMap .of (
1016
+ CLIENT_CALL_RETRIES , true ,
1017
+ CLIENT_CALL_TRANSPARENT_RETRIES , true ,
1018
+ CLIENT_CALL_HEDGES , true ,
1019
+ CLIENT_CALL_RETRY_DELAY , true
1020
+ );
1021
+
1022
+ String target = "dns:///example.com" ;
1023
+ OpenTelemetryMetricsResource resource = GrpcOpenTelemetry .createMetricInstruments (testMeter ,
1024
+ enabledMetrics , disableDefaultMetrics );
1025
+ OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule (resource );
1026
+ OpenTelemetryMetricsModule .CallAttemptsTracerFactory callAttemptsTracerFactory =
1027
+ new OpenTelemetryMetricsModule .CallAttemptsTracerFactory (module , target ,
1028
+ method .getFullMethodName (), emptyList ());
1029
+
1030
+ // Create a StreamInfo specifically for hedged attempts
1031
+ final ClientStreamTracer .StreamInfo HEDGED_STREAM_INFO =
1032
+ STREAM_INFO .toBuilder ().setIsHedging (true ).build ();
1033
+
1034
+ // --- First attempt starts ---
1035
+ ClientStreamTracer tracer =
1036
+ callAttemptsTracerFactory .newClientStreamTracer (STREAM_INFO , new Metadata ());
1037
+
1038
+ // --- Faking a hedged attempt ---
1039
+ fakeClock .forwardTime (10 , TimeUnit .MILLISECONDS ); // Hedging delay
1040
+ ClientStreamTracer hedgeTracer1 =
1041
+ callAttemptsTracerFactory .newClientStreamTracer (HEDGED_STREAM_INFO , new Metadata ());
1042
+
1043
+ // --- Faking a second hedged attempt ---
1044
+ fakeClock .forwardTime (20 , TimeUnit .MILLISECONDS ); // Another hedging delay
1045
+ ClientStreamTracer hedgeTracer2 =
1046
+ callAttemptsTracerFactory .newClientStreamTracer (HEDGED_STREAM_INFO , new Metadata ());
1047
+
1048
+ // --- Let the attempts resolve ---
1049
+ fakeClock .forwardTime (50 , TimeUnit .MILLISECONDS );
1050
+ // Initial attempt is cancelled because a hedge will succeed
1051
+ tracer .streamClosed (Status .CANCELLED );
1052
+ hedgeTracer1 .streamClosed (Status .UNAVAILABLE ); // First hedge fails
1053
+
1054
+ fakeClock .forwardTime (30 , TimeUnit .MILLISECONDS );
1055
+ hedgeTracer2 .streamClosed (Status .OK ); // Second hedge succeeds
1056
+
1057
+ // --- The overall call ends ---
1058
+ callAttemptsTracerFactory .callEnded (Status .OK );
1059
+
1060
+ // Define attributes for assertions
1061
+ io .opentelemetry .api .common .Attributes finalAttributes
1062
+ = io .opentelemetry .api .common .Attributes .of (
1063
+ TARGET_KEY , target ,
1064
+ METHOD_KEY , method .getFullMethodName ());
1065
+
1066
+ // FINAL ASSERTION BLOCK
1067
+ // We expect 7 metrics: 5 default + hedges + retry_delay.
1068
+ // Retries and transparent_retries are 0 and will not be reported.
1069
+ assertThat (openTelemetryTesting .getMetrics ())
1070
+ .satisfiesExactlyInAnyOrder (
1071
+ // Default metrics
1072
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME ),
1073
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME ),
1074
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE ),
1075
+ metric -> assertThat (metric ).hasName (CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE ),
1076
+ metric -> assertThat (metric ).hasName (CLIENT_CALL_DURATION ),
1077
+
1078
+ // --- Assertions for the NEW metrics ---
1079
+ metric -> assertThat (metric )
1080
+ .hasName (CLIENT_CALL_HEDGES )
1081
+ .hasUnit ("{hedge}" )
1082
+ .hasHistogramSatisfying (histogram -> histogram .hasPointsSatisfying (
1083
+ point -> point
1084
+ .hasCount (1 )
1085
+ .hasSum (1 )
1086
+ .hasAttributes (finalAttributes ))),
1087
+ metric -> assertThat (metric )
1088
+ .hasName (CLIENT_CALL_RETRY_DELAY )
1089
+ .hasUnit ("s" )
1090
+ .hasHistogramSatisfying (
1091
+ histogram ->
1092
+ histogram .hasPointsSatisfying (
1093
+ point ->
1094
+ point
1095
+ .hasCount (1 )
1096
+ .hasSum (0 )
1097
+ .hasAttributes (finalAttributes )))
1098
+ );
1099
+ }
1100
+
834
1101
@ Test
835
1102
public void clientStreamNeverCreatedStillRecordMetrics () {
836
1103
String target = "dns:///foo.example.com" ;
0 commit comments