diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index dacacbaad196..220fe712d397 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -220,6 +220,8 @@ Type-specific configuration options: | `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. | | `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. | | `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. | +| `headers` | a list of headers to attach to each HTTP request Inherited from `http-defaults.headers` if not specified. | +| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `http-defaults.compression` if not specified. | Configuration options shared across all sink types: diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index bb315356df15..42a9b1c2f2ff 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -91,7 +91,7 @@ exp,benchmark 17,Revoke/revoke_all_on_2_tables 21,Revoke/revoke_all_on_3_tables 14,RevokeRole/revoke_1_role -16,RevokeRole/revoke_2_roles +18,RevokeRole/revoke_2_roles 10,ShowGrants/grant_2_roles 11,ShowGrants/grant_3_roles 12,ShowGrants/grant_4_roles diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go index ce0bf9b19bd4..8ee7b032ac77 100644 --- a/pkg/cli/log_flags_test.go +++ b/pkg/cli/log_flags_test.go @@ -53,6 +53,7 @@ func TestSetupLogging(t *testing.T) { `unsafe-tls: false, ` + `timeout: 0s, ` + `disable-keep-alives: false, ` + + `compression: gzip, ` + `filter: INFO, ` + `format: json-compact, ` + `redactable: true, ` + diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 6e2964b0971b..04cbc59fdd71 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -13,6 +13,7 @@ package kvcoord import ( "context" "fmt" + "reflect" "runtime" "runtime/pprof" "strings" @@ -63,6 +64,54 @@ var ( Measurement: "Partial Batches", Unit: metric.Unit_COUNT, } + metaDistSenderReplicaAddressedBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.replica_addressed.bytes", + Help: `Total byte count of replica-addressed batch requests processed`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderReplicaAddressedBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.replica_addressed.bytes", + Help: `Total byte count of replica-addressed batch responses received`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_region.bytes", + Help: `Total byte count of replica-addressed batch requests processed cross + region when region tiers are configured`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_region.bytes", + Help: `Total byte count of replica-addressed batch responses received cross + region when region tiers are configured`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossZoneBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_zone.bytes", + Help: `Total byte count of replica-addressed batch requests processed cross + zone within the same region when region and zone tiers are configured. + However, if the region tiers are not configured, this count may also include + batch data sent between different regions. Ensuring consistent configuration + of region and zone tiers across nodes helps to accurately monitor the data + transmitted.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossZoneBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_zone.bytes", + Help: `Total byte count of replica-addressed batch responses received cross + zone within the same region when region and zone tiers are configured. + However, if the region tiers are not configured, this count may also include + batch data received between different regions. Ensuring consistent + configuration of region and zone tiers across nodes helps to accurately + monitor the data transmitted.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaDistSenderAsyncSentCount = metric.Metadata{ Name: "distsender.batches.async.sent", Help: "Number of partial batches sent asynchronously", @@ -241,44 +290,56 @@ func max(a, b int64) int64 { // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { - BatchCount *metric.Counter - PartialBatchCount *metric.Counter - AsyncSentCount *metric.Counter - AsyncThrottledCount *metric.Counter - SentCount *metric.Counter - LocalSentCount *metric.Counter - NextReplicaErrCount *metric.Counter - NotLeaseHolderErrCount *metric.Counter - InLeaseTransferBackoffs *metric.Counter - RangeLookups *metric.Counter - SlowRPCs *metric.Gauge - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter - MethodCounts [kvpb.NumMethods]*metric.Counter - ErrCounts [kvpb.NumErrors]*metric.Counter + BatchCount *metric.Counter + PartialBatchCount *metric.Counter + ReplicaAddressedBatchRequestBytes *metric.Counter + ReplicaAddressedBatchResponseBytes *metric.Counter + CrossRegionBatchRequestBytes *metric.Counter + CrossRegionBatchResponseBytes *metric.Counter + CrossZoneBatchRequestBytes *metric.Counter + CrossZoneBatchResponseBytes *metric.Counter + AsyncSentCount *metric.Counter + AsyncThrottledCount *metric.Counter + SentCount *metric.Counter + LocalSentCount *metric.Counter + NextReplicaErrCount *metric.Counter + NotLeaseHolderErrCount *metric.Counter + InLeaseTransferBackoffs *metric.Counter + RangeLookups *metric.Counter + SlowRPCs *metric.Gauge + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter + MethodCounts [kvpb.NumMethods]*metric.Counter + ErrCounts [kvpb.NumErrors]*metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { m := DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), - InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), - RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), - SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + ReplicaAddressedBatchRequestBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchRequestBytes), + ReplicaAddressedBatchResponseBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchResponseBytes), + CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes), + CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes), + CrossZoneBatchRequestBytes: metric.NewCounter(metaDistSenderCrossZoneBatchRequestBytes), + CrossZoneBatchResponseBytes: metric.NewCounter(metaDistSenderCrossZoneBatchResponseBytes), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), + RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), + SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), } for i := range m.MethodCounts { method := kvpb.Method(i).String() @@ -297,6 +358,43 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } +// getDistSenderCounterMetrics fetches the count of each specified DisSender +// metric from the `metricNames` parameter and returns the result as a map. The +// keys in the map represent the metric metadata names, while the corresponding +// values indicate the count of each metric. If any of the specified metric +// cannot be found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (dm *DistSenderMetrics) getDistSenderCounterMetrics( + metricsName []string, +) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstDistSenderMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(*dm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstDistSenderMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + // FirstRangeProvider is capable of providing DistSender with the descriptor of // the first range in the cluster and notifying the DistSender when the first // range in the cluster has changed. @@ -367,6 +465,14 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual + // batch request byte count to the test. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual + // batch response byte count to the test. + BatchResponseInterceptor func(br *kvpb.BatchResponse) + // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -520,8 +626,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency } - if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { - ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch + if cfg.TestingKnobs.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor + } + + if cfg.TestingKnobs.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor } return ds @@ -2178,7 +2288,16 @@ func (ds *DistSender) sendToReplicas( ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested || (desc.Generation == 0 && routing.LeaseSeq() == 0), } + + if ds.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor(ba) + } + shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba) br, err = transport.SendNext(ctx, ba) + if ds.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor(br) + } + ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2434,6 +2553,75 @@ func (ds *DistSender) sendToReplicas( } } +// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given +// batch request is cross-region and cross-zone respectively. +func (ds *DistSender) isCrossRegionCrossZoneBatch( + ctx context.Context, ba *kvpb.BatchRequest, +) (bool, bool) { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + if err != nil { + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + return false, false + } + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + if err != nil { + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + return false, false + } + isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality) + if regionErr != nil { + log.VEventf(ctx, 2, "%v", regionErr) + } + if zoneErr != nil { + log.VEventf(ctx, 2, "%v", zoneErr) + } + return isCrossRegion, isCrossZone +} + +// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a +// more meaningful way. Cross-region metrics monitor activities across different +// regions. Cross-zone metrics monitor cross-zone activities within the same +// region or in cases where region tiers are not configured. The check result is +// returned here to avoid redundant check for metrics updates after receiving +// batch responses. +func (ds *DistSender) checkAndUpdateBatchRequestMetrics( + ctx context.Context, ba *kvpb.BatchRequest, +) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { + ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) + isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba) + if isCrossRegion { + if !isCrossZone { + log.VEventf(ctx, 2, "unexpected: cross region but same zone") + } else { + ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) + shouldIncCrossRegion = true + } + } else { + if isCrossZone { + ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) + shouldIncCrossZone = true + } + } + return shouldIncCrossRegion, shouldIncCrossZone +} + +// checkAndUpdateBatchResponseMetrics updates the batch response metrics based +// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These +// parameters are determined during the initial check for batch requests. The +// underlying assumption is that if requests were cross-region or cross-zone, +// the response should be as well. +func (ds *DistSender) checkAndUpdateBatchResponseMetrics( + br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, +) { + ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) + if shouldIncCrossRegion { + ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) + } + if shouldIncCrossZone { + ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) + } +} + // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index b49f6bc18271..e8319fabc1a3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,6 +5651,203 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} + +// TestDistSenderBatchMetrics verifies that the DistSender.Send() +// correctly updates the cross-region, cross-zone byte count metrics. +func TestDistSenderBatchMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // The initial setup ensures the correct setup for three nodes (with different + // localities), single-range, three replicas (on different nodes). + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + rangeDesc := testUserRangeDescriptor3Replicas + replicas := rangeDesc.InternalReplicas + + // The servers localities are configured so that the first batch request sent + // from server0 to server0 is same-region, same-zone. The second batch request + // sent from server0 to server1 is cross-region. The second batch request sent + // from server0 to server2 is cross-zone within the same region. + const numNodes = 3 + serverLocality := [numNodes]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, + } + + nodes := make([]roachpb.NodeDescriptor, 3) + for i := 0; i < numNodes; i++ { + nodes[i] = roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), + Address: util.UnresolvedAddr{}, + Locality: serverLocality[i], + } + } + ns := &mockNodeStore{nodes: nodes} + + var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + return ba.CreateReply(), nil + } + interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { + interceptedBatchRequestBytes = int64(ba.Size()) + }, + BatchResponseInterceptor: func(br *kvpb.BatchResponse) { + interceptedBatchResponseBytes = int64(br.Size()) + }, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + distSender := NewDistSender(cfg) + metricsNames := []string{ + "distsender.batch_requests.replica_addressed.bytes", + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_requests.cross_region.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_requests.cross_zone.bytes", + "distsender.batch_responses.cross_zone.bytes"} + + getExpectedDelta := func( + isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, + ) map[string]int64 { + ternaryOp := func(b bool, num int64) (res int64) { + if b { + res = num + } + return res + } + + expectedDelta := make(map[string]int64) + expectedDelta[metricsNames[0]] = interceptedRequest + expectedDelta[metricsNames[1]] = interceptedResponse + expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) + expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) + expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) + expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) + return expectedDelta + } + + sameRegionSameZoneRequest := int64(0) + sameRegionSameZoneResponse := int64(0) + + for _, tc := range []struct { + toReplica int + isCrossRegion bool + isCrossZone bool + }{ + // First test sets replica[0] as leaseholder, enforcing a within-region, + // within-zone batch request / response. + {toReplica: 0, isCrossRegion: false, isCrossZone: false}, + // Second test sets replica[1] as leaseholder, enforcing a cross-region, + // batch request / response. Note that although the request is cross-zone, + // the cross-zone metrics is not expected to increment. + {toReplica: 1, isCrossRegion: true, isCrossZone: false}, + // Third test sets replica[2] as leaseholder, enforcing a within-region, + // cross-zone batch request / response. Cross-zone metrics is only expected + // to increment when it is cross-zone, same-region activities. + {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + } { + t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { + beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + + ba := &kvpb.BatchRequest{} + if tc.toReplica == 0 { + // Send a different request type for the first request to avoid having + // the same byte count for three requests and coincidental correct + // results. + get := &kvpb.GetRequest{} + get.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(get) + } else { + put := &kvpb.PutRequest{} + put.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(put) + } + + ba.Header = kvpb.Header{ + // DistSender is set to be at the server0. + GatewayNodeID: 1, + } + distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[tc.toReplica], + }, + }) + + if _, err := distSender.Send(ctx, ba); err != nil { + t.Fatal(err) + } + + require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), + "expected bytes not set correctly") + require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), + "expected bytes not set correctly") + if tc.toReplica == 0 { + // Record the first batch request and response that was sent same + // region, same zone for future testing. + sameRegionSameZoneRequest = interceptedBatchRequestBytes + sameRegionSameZoneResponse = interceptedBatchResponseBytes + } + + expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, + interceptedBatchRequestBytes, interceptedBatchResponseBytes) + afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + if err != nil { + t.Error(err) + } + require.Equal(t, expected, diffMetrics) + }) + t.Run("SameRegionSameZone", func(t *testing.T) { + // Since the region and zone tiers are all configured in this test, we + // expect that the byte count of batch requests sent within the same + // region and same zone should equal to the total byte count of requests + // minus the combined byte count of cross-region and cross-zone requests + // metrics. Similar expectation for batch responses. + metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + if err != nil { + t.Error(err) + } + totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] + totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] + crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] + crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] + crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] + crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] + require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) + require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + }) + } +} + // TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff // ensures that a NLHE from an uninitialized replica, which points to a replica // that isn't part of the range, doesn't result in the dist sender getting diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 58bb2efafe1d..dc7b88927ad7 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,6 +61,19 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // Currently, BatchRequestInterceptor and BatchResponseInterceptor only + // intercepts DistSender.Send() to pass the actual batch request and response + // byte count to the test. However, it can be easily extended to validate + // other properties of batch requests / response if required. + + // BatchRequestInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 55c034eaa9c0..0ccb4bcd58ee 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -247,6 +247,7 @@ go_test( "batch_spanset_test.go", "below_raft_protos_test.go", "client_atomic_membership_change_test.go", + "client_invalidsplit_test.go", "client_lease_test.go", "client_merge_test.go", "client_metrics_test.go", diff --git a/pkg/kv/kvserver/client_invalidsplit_test.go b/pkg/kv/kvserver/client_invalidsplit_test.go new file mode 100644 index 000000000000..4355aec8835f --- /dev/null +++ b/pkg/kv/kvserver/client_invalidsplit_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestSplitAtInvalidTenantPrefix(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // badKey is the tenant prefix followed by a "large" byte that indicates + // that it should be followed by a separate uvarint encoded key (which is + // not there). + // + // See: https://github.com/cockroachdb/cockroach/issues/104796 + var badKey = append([]byte{'\xfe'}, '\xfd') + _, _, err := keys.DecodeTenantPrefix(badKey) + t.Log(err) + require.Error(t, err) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + _, _, err = tc.SplitRange(badKey) + require.Error(t, err) + require.Contains(t, err.Error(), `checking for valid tenantID`) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4bd39652ce57..8ba1d7f34ff9 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -391,6 +391,9 @@ func (r *Replica) adminSplitWithDescriptor( if !storage.IsValidSplitKey(foundSplitKey) { return reply, errors.Errorf("cannot split range at key %s", splitKey) } + if _, _, err := keys.DecodeTenantPrefixE(splitKey.AsRawKey()); err != nil { + return reply, errors.Wrapf(err, "checking for valid tenantID") + } } // If the range starts at the splitKey, we treat the AdminSplit diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 34facc5c94c7..5af4a3a9f776 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -164,7 +164,10 @@ var retiredSettings = map[string]struct{}{ // renamed. "spanconfig.host_coalesce_adjacent.enabled": {}, "sql.defaults.experimental_stream_replication.enabled": {}, - "sql.log.unstructured_entries.enabled": {}, + + // removed as of 23.2. + "sql.log.unstructured_entries.enabled": {}, + "sql.auth.createrole_allows_grant_role_membership.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/grant_role.go b/pkg/sql/grant_role.go index 137ebb11d929..f76d1965f441 100644 --- a/pkg/sql/grant_role.go +++ b/pkg/sql/grant_role.go @@ -54,7 +54,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) defer span.Finish() - hasAdminRole, err := p.HasAdminRole(ctx) + hasCreateRolePriv, err := p.HasRoleOption(ctx, roleoption.CREATEROLE) if err != nil { return nil, err } @@ -63,6 +63,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR if err != nil { return nil, err } + grantingRoleHasAdminOptionOnAdmin := allRoles[username.AdminRoleName()] inputRoles, err := decodeusername.FromNameList(n.Roles) if err != nil { @@ -76,19 +77,24 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR } for _, r := range inputRoles { - // If the user is an admin, don't check if the user is allowed to add/drop - // roles in the role. However, if the role being modified is the admin role, then - // make sure the user is an admin with the admin option. - if hasAdminRole && !r.IsAdminRole() { + // If the current user has CREATEROLE, and the role being granted is not an + // admin there is no need to check if the user is allowed to grant/revoke + // membership in the role. However, if the role being granted is an admin, + // then make sure the current user also has the admin option for that role. + grantedRoleIsAdmin, err := p.UserHasAdminRole(ctx, r) + if err != nil { + return nil, err + } + if hasCreateRolePriv && !grantedRoleIsAdmin { continue } - if isAdmin, ok := allRoles[r]; !ok || !isAdmin { - if r.IsAdminRole() { + if hasAdminOption := allRoles[r]; !hasAdminOption && !grantingRoleHasAdminOptionOnAdmin { + if grantedRoleIsAdmin { return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a role admin for role %s", p.User(), r) + "%s must have admin option on role %q", p.User(), r) } return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a superuser or role admin for role %s", p.User(), r) + "%s must have CREATEROLE or have admin option on role %q", p.User(), r) } } diff --git a/pkg/sql/logictest/testdata/logic_test/grant_in_txn b/pkg/sql/logictest/testdata/logic_test/grant_in_txn index 2931fc27709b..559bfd49e030 100644 --- a/pkg/sql/logictest/testdata/logic_test/grant_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/grant_in_txn @@ -100,7 +100,7 @@ SAVEPOINT before_invalid_grant # This grant should fail as testuser no longer has right to this grant # via role_foo. -statement error testuser is not a superuser or role admin for role role_bar +statement error testuser must have CREATEROLE or have admin option on role \"role_bar\" GRANT role_bar TO testuser; statement ok diff --git a/pkg/sql/logictest/testdata/logic_test/grant_role b/pkg/sql/logictest/testdata/logic_test/grant_role index 47b70bec8701..f6e1103237a0 100644 --- a/pkg/sql/logictest/testdata/logic_test/grant_role +++ b/pkg/sql/logictest/testdata/logic_test/grant_role @@ -32,3 +32,130 @@ GRANT testuser TO public statement error pgcode 42704 role/user \"public\" does not exist REVOKE testuser FROM public + +# CREATEROLE should allow a user to GRANT/REVOKE on a role even if they do not +# have the admin option for that role. +subtest grant_with_createrole + +statement ok +CREATE USER grantor WITH CREATEROLE; +CREATE ROLE transitiveadmin; +GRANT admin TO transitiveadmin + +statement ok +SET ROLE grantor + +statement ok +CREATE ROLE parent1; +CREATE ROLE child1; +GRANT parent1 TO child1 + +# Verify that CREATEROLE is not sufficient to give admin to other users. +statement error grantor must have admin option on role \"admin\" +GRANT admin TO child2 + +# It also shouldn't allow anyone to get admin transitively. +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE + +query TTB colnames +SHOW GRANTS ON ROLE parent1 +---- +role_name member is_admin +parent1 child1 false + +statement ok +SET ROLE grantor; +REVOKE parent1 FROM child1; +RESET ROLE + +# Without CREATEROLE, the admin option is required to grant a role. +subtest grant_with_admin_option + +statement ok +CREATE ROLE parent2; +CREATE ROLE child2; +GRANT parent2 TO grantor WITH ADMIN OPTION; +ALTER USER grantor WITH NOCREATEROLE + +statement ok +SET ROLE grantor + +statement ok +GRANT parent2 TO child2 + +statement ok +RESET ROLE + +query TTB colnames,rowsort +SHOW GRANTS ON ROLE parent2 +---- +role_name member is_admin +parent2 child2 false +parent2 grantor true + +statement ok +SET ROLE grantor; +REVOKE parent2 FROM child2; +RESET ROLE + +statement ok +GRANT admin TO grantor; +SET ROLE grantor + +# Verify that testuser can only grant an admin role if it has the admin option +# on that role. +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +GRANT transitiveadmin TO grantor; +SET ROLE grantor + +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +GRANT transitiveadmin TO grantor WITH ADMIN OPTION; +SET ROLE grantor + +# Now that grantor has the admin option on transitiveadmin, it can grant the role. +statement ok +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +REVOKE transitiveadmin FROM grantor; +REVOKE transitiveadmin FROM child2; +GRANT admin TO grantor WITH ADMIN OPTION + +# If grantor has the admin option on admin, it also can grant transitiveadmin. +statement ok +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +REVOKE admin FROM grantor; +REVOKE transitiveadmin FROM child2 + +# Without CREATEROLE or the admin option, then an error should occur during +# granting. +subtest grant_no_privilege + +statement ok +CREATE ROLE parent3; +CREATE ROLE child3 + +statement ok +SET ROLE grantor + +statement error grantor must have CREATEROLE or have admin option on role \"parent3\" +GRANT parent3 TO child3 + +statement ok +RESET ROLE diff --git a/pkg/sql/logictest/testdata/logic_test/role b/pkg/sql/logictest/testdata/logic_test/role index 8ba1e2aa64d9..5d759bdfcd2b 100644 --- a/pkg/sql/logictest/testdata/logic_test/role +++ b/pkg/sql/logictest/testdata/logic_test/role @@ -149,7 +149,7 @@ GRANT unknownrole TO testuser # Test role "grant" and WITH ADMIN option. user testuser -statement error pq: testuser is not a superuser or role admin for role testrole +statement error testuser must have CREATEROLE or have admin option on role "testrole" GRANT testrole TO testuser2 user root @@ -173,7 +173,7 @@ testrole testuser false user testuser -statement error pq: testuser is not a superuser or role admin for role testrole +statement error testuser must have CREATEROLE or have admin option on role "testrole" GRANT testrole TO testuser2 user root @@ -428,13 +428,13 @@ rolec roled testuser -statement error pq: testuser is not a superuser or role admin for role roled +statement error testuser must have CREATEROLE or have admin option on role "roled" GRANT roled TO rolee -statement error pq: testuser is not a superuser or role admin for role rolec +statement error testuser must have CREATEROLE or have admin option on role "rolec" GRANT rolec TO rolee -statement error pq: testuser is not a superuser or role admin for role roleb +statement error testuser must have CREATEROLE or have admin option on role "roleb" GRANT roleb TO rolee statement ok @@ -617,7 +617,7 @@ testuser statement ok REVOKE ADMIN OPTION FOR rolea FROM testuser -statement error pq: testuser is not a superuser or role admin for role rolea +statement error testuser must have CREATEROLE or have admin option on role "rolea" REVOKE ADMIN OPTION FOR rolea FROM root statement ok @@ -678,7 +678,7 @@ CREATE DATABASE db2 statement error user testuser does not have DROP privilege on database db1 DROP DATABASE db1 -statement error testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin TO testuser user root @@ -762,13 +762,13 @@ statement ok SELECT * FROM db2.foo # We may be in the 'newgroup', but we don't have the admin option. -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" GRANT newgroup TO testuser2 -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" REVOKE newgroup FROM testuser -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" GRANT newgroup TO testuser WITH ADMIN OPTION # Regression for #31784 @@ -781,10 +781,10 @@ GRANT admin TO testuser user testuser -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin TO user1 -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" REVOKE admin FROM user1 user root @@ -990,14 +990,14 @@ CREATE ROLE IF NOT EXISTS roleg statement ok CREATE ROLE IF NOT EXISTS roleg -# Need Admin option to GRANT role, CREATEROLE should not give GRANT role privilege for other roles statement ok CREATE USER testuser3 -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin to testuser3 -statement error pq: testuser is not a superuser or role admin for role roleg +# CREATEROLE should give GRANT role privilege for other roles. +statement ok GRANT roleg to testuser3 user root @@ -1117,11 +1117,12 @@ CREATE ROLE thisshouldntwork LOGIN LOGIN statement ok DROP ROLE parentrole -query TTB colnames +query TTB colnames,rowsort SHOW GRANTS ON ROLE ---- -role_name member is_admin -admin root true +role_name member is_admin +admin root true +roleg testuser3 false query TTB colnames SHOW GRANTS ON ROLE admin diff --git a/pkg/sql/revoke_role.go b/pkg/sql/revoke_role.go index 0a6ba0a32765..47ac48c080d2 100644 --- a/pkg/sql/revoke_role.go +++ b/pkg/sql/revoke_role.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/decodeusername" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" @@ -43,7 +44,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) defer span.Finish() - hasAdminRole, err := p.HasAdminRole(ctx) + hasCreateRolePriv, err := p.HasRoleOption(ctx, roleoption.CREATEROLE) if err != nil { return nil, err } @@ -52,6 +53,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo if err != nil { return nil, err } + revokingRoleHasAdminOptionOnAdmin := allRoles[username.AdminRoleName()] inputRoles, err := decodeusername.FromNameList(n.Roles) if err != nil { @@ -65,19 +67,24 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo } for _, r := range inputRoles { - // If the user is an admin, don't check if the user is allowed to add/drop - // roles in the role. However, if the role being modified is the admin role, then - // make sure the user is an admin with the admin option. - if hasAdminRole && !r.IsAdminRole() { + // If the current user has CREATEROLE, and the role being revoked is not an + // admin there is no need to check if the user is allowed to grant/revoke + // membership in the role. However, if the role being revoked is an admin, + // then make sure the current user also has the admin option for that role. + revokedRoleIsAdmin, err := p.UserHasAdminRole(ctx, r) + if err != nil { + return nil, err + } + if hasCreateRolePriv && !revokedRoleIsAdmin { continue } - if isAdmin, ok := allRoles[r]; !ok || !isAdmin { - if r.IsAdminRole() { + if hasAdminOption := allRoles[r]; !hasAdminOption && !revokingRoleHasAdminOptionOnAdmin { + if revokedRoleIsAdmin { return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a role admin for role %s", p.User(), r) + "%s must have admin option on role %q", p.User(), r) } return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a superuser or role admin for role %s", p.User(), r) + "%s must have CREATEROLE or have admin option on role %q", p.User(), r) } } diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 86645315379c..aa9cf04e6365 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -62,6 +62,7 @@ go_library( "//pkg/util/encoding/encodingtype", "//pkg/util/envutil", "//pkg/util/fileutil", + "//pkg/util/httputil", "//pkg/util/jsonbytes", "//pkg/util/log/channel", "//pkg/util/log/logconfig", diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index df906a0fb3fc..ac8b71e890a4 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -12,12 +12,14 @@ package log import ( "bytes" + "compress/gzip" "crypto/tls" "fmt" "net/http" "net/url" "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/errors" ) @@ -94,7 +96,37 @@ func (hs *httpSink) output(b []byte, opt sinkOutputOptions) (err error) { } func doPost(hs *httpSink, b []byte) (*http.Response, error) { - resp, err := hs.client.Post(hs.address, hs.contentType, bytes.NewReader(b)) + var buf = bytes.Buffer{} + var req *http.Request + + if *hs.config.Compression == logconfig.GzipCompression { + g := gzip.NewWriter(&buf) + _, err := g.Write(b) + if err != nil { + return nil, err + } + err = g.Close() + if err != nil { + return nil, err + } + } else { + buf.Write(b) + } + + req, err := http.NewRequest(http.MethodPost, hs.address, &buf) + if err != nil { + return nil, err + } + + if *hs.config.Compression == logconfig.GzipCompression { + req.Header.Add(httputil.ContentEncodingHeader, httputil.GzipEncoding) + } + + for k, v := range hs.config.Headers { + req.Header.Add(k, v) + } + req.Header.Add(httputil.ContentTypeHeader, hs.contentType) + resp, err := hs.client.Do(req) if err != nil { return nil, err } diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go index f9e473824e96..8debeac07128 100644 --- a/pkg/util/log/http_sink_test.go +++ b/pkg/util/log/http_sink_test.go @@ -11,6 +11,7 @@ package log import ( + "bytes" "context" "io" "net" @@ -179,8 +180,9 @@ func TestMessageReceived(t *testing.T) { timeout := 5 * time.Second tb := true defaults := logconfig.HTTPDefaults{ - Address: &address, - Timeout: &timeout, + Address: &address, + Timeout: &timeout, + Compression: &logconfig.NoneCompression, // We need to disable keepalives otherwise the HTTP server in the // test will let an async goroutine run waiting for more requests. @@ -293,3 +295,62 @@ func TestHTTPSinkContentTypePlainText(t *testing.T) { testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) } + +func TestHTTPSinkHeadersAndCompression(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "json" + expectedContentType := "application/json" + expectedContentEncoding := logconfig.GzipCompression + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + Buffering: disabledBufferingCfg, + }, + + Compression: &logconfig.GzipCompression, + Headers: map[string]string{"X-CRDB-TEST": "secret-value"}, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s", expectedContentType, contentType) + } + contentEncoding := header.Get("Content-Encoding") + if contentEncoding != expectedContentEncoding { + return errors.Newf("mismatched content encoding: expected %s, got %s", expectedContentEncoding, contentEncoding) + } + + var isGzipped = func(dat []byte) bool { + gzipPrefix := []byte("\x1F\x8B\x08") + return bytes.HasPrefix(dat, gzipPrefix) + } + + if !isGzipped([]byte(body)) { + return errors.New("expected gzipped body") + } + for k, v := range header { + if k == "X-Crdb-Test" { + for _, vv := range v { + if vv == "secret-value" { + return nil + } + } + } + } + return errors.New("expected to find special header in request") + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index c934b77bc6c4..92a2e4bc5a5a 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -476,6 +476,9 @@ type FileSinkConfig struct { prefix string } +var GzipCompression = "gzip" +var NoneCompression = "none" + // HTTPDefaults refresents the configuration defaults for HTTP sinks. type HTTPDefaults struct { // Address is the network address of the http server. The @@ -502,6 +505,13 @@ type HTTPDefaults struct { // overhead in production systems. DisableKeepAlives *bool `yaml:"disable-keep-alives,omitempty"` + // Headers is a list of headers to attach to each HTTP request + Headers map[string]string `yaml:",omitempty,flow"` + + // Compression can be "none" or "gzip" to enable gzip compression. + // Set to "gzip" by default. + Compression *string `yaml:",omitempty"` + CommonSinkConfig `yaml:",inline"` } diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 48581a2d9106..aa2724499016 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -543,3 +543,122 @@ capture-stray-errors: enable: true dir: /default-dir max-group-size: 100MiB + +# Check that each component of buffering struct propagates to http-sinks +# Ensure servers have gzip compression on by default and headers if set +yaml +http-defaults: + buffering: + max-staleness: 15s + flush-trigger-size: 10KiB + max-buffer-size: 2MiB +sinks: + http-servers: + a: + address: a + channels: STORAGE + headers: {X-CRDB-HEADER: header-value-a} + buffering: + max-staleness: 10s + b: + address: b + channels: OPS + headers: {X-CRDB-HEADER: header-value-b, X-ANOTHER-HEADER: zz-yy-bb} + buffering: + flush-trigger-size: 5.0KiB + c: + address: c + channels: HEALTH + buffering: + max-buffer-size: 3MiB + d: + address: d + channels: SESSIONS + buffering: NONE +---- +sinks: + file-groups: + default: + channels: {INFO: all} + filter: INFO + http-servers: + a: + channels: {INFO: [STORAGE]} + address: a + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + headers: {X-CRDB-HEADER: header-value-a} + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 10s + flush-trigger-size: 10KiB + max-buffer-size: 2.0MiB + format: newline + b: + channels: {INFO: [OPS]} + address: b + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + headers: {X-ANOTHER-HEADER: zz-yy-bb, X-CRDB-HEADER: header-value-b} + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 15s + flush-trigger-size: 5.0KiB + max-buffer-size: 2.0MiB + format: newline + c: + channels: {INFO: [HEALTH]} + address: c + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 15s + flush-trigger-size: 10KiB + max-buffer-size: 3.0MiB + format: newline + d: + channels: {INFO: [SESSIONS]} + address: d + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: NONE + stderr: + filter: NONE +capture-stray-errors: + enable: true + dir: /default-dir + max-group-size: 100MiB diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 150df6323cf7..71a205ec23bd 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -112,6 +112,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { DisableKeepAlives: &bf, Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(), Timeout: &zeroDuration, + Compression: &GzipCompression, } propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) @@ -454,6 +455,9 @@ func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { if hsc.Address == nil || len(*hsc.Address) == 0 { return errors.New("address cannot be empty") } + if *hsc.Compression != GzipCompression && *hsc.Compression != NoneCompression { + return errors.New("compression must be 'gzip' or 'none'") + } return c.ValidateCommonSinkConfig(hsc.CommonSinkConfig) } diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config index 61405cb63cf6..0a27abb5d4e4 100644 --- a/pkg/util/log/testdata/config +++ b/pkg/util/log/testdata/config @@ -119,6 +119,7 @@ sinks: unsafe-tls: false timeout: 0s disable-keep-alives: false + compression: gzip filter: INFO format: json-compact redact: false diff --git a/pkg/util/metric/hdrhistogram.go b/pkg/util/metric/hdrhistogram.go index b6b562403e2a..65175eef0751 100644 --- a/pkg/util/metric/hdrhistogram.go +++ b/pkg/util/metric/hdrhistogram.go @@ -18,14 +18,9 @@ import ( prometheusgo "github.com/prometheus/client_model/go" ) -const ( - // HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher - // values will be recorded as this value instead. - HdrHistogramMaxLatency = 10 * time.Second - - // The number of histograms to keep in rolling window. - hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't -) +// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher +// values will be recorded as this value instead. +const HdrHistogramMaxLatency = 10 * time.Second // A HdrHistogram collects observed values by keeping bucketed counts. For // convenience, internally two sets of buckets are kept: A cumulative set (i.e. @@ -64,12 +59,12 @@ func NewHdrHistogram( Metadata: metadata, maxVal: maxVal, } - wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs) + wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs) h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs) h.mu.sliding = wHist h.mu.tickHelper = &tickHelper{ nextT: now(), - tickInterval: duration / hdrHistogramHistWrapNum, + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { wHist.Rotate() }, @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric { // TotalWindowed implements the WindowedHistogram interface. func (h *HdrHistogram) TotalWindowed() (int64, float64) { - pHist := h.ToPrometheusMetricWindowed().Histogram - return int64(pHist.GetSampleCount()), pHist.GetSampleSum() + h.mu.Lock() + defer h.mu.Unlock() + hist := h.mu.sliding.Merge() + totalSum := float64(hist.TotalCount()) * hist.Mean() + return hist.TotalCount(), totalSum } func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { hist := &prometheusgo.Histogram{} maybeTick(h.mu.tickHelper) - bars := h.mu.sliding.Current.Distribution() + mergedHist := h.mu.sliding.Merge() + bars := mergedHist.Distribution() hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars)) var cumCount uint64 @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { } hist.SampleCount = &cumCount hist.SampleSum = &sum // can do better here; we approximate in the loop - return &prometheusgo.Metric{ Histogram: hist, } @@ -233,13 +231,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 { func (h *HdrHistogram) Mean() float64 { h.mu.Lock() defer h.mu.Unlock() - return h.mu.cumulative.Mean() } func (h *HdrHistogram) MeanWindowed() float64 { h.mu.Lock() defer h.mu.Unlock() - - return h.mu.sliding.Current.Mean() + hist := h.mu.sliding.Merge() + return hist.Mean() } diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 9674739c3109..e3508c2ebdb3 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -27,9 +27,14 @@ import ( "github.com/rcrowley/go-metrics" ) -// TestSampleInterval is passed to histograms during tests which don't -// want to concern themselves with supplying a "correct" interval. -const TestSampleInterval = time.Duration(math.MaxInt64) +const ( + // TestSampleInterval is passed to histograms during tests which don't + // want to concern themselves with supplying a "correct" interval. + TestSampleInterval = time.Duration(math.MaxInt64) + // WindowedHistogramWrapNum is the number of histograms to keep in rolling + // window. + WindowedHistogramWrapNum = 2 +) // Iterable provides a method for synchronized access to interior objects. type Iterable interface { @@ -97,10 +102,12 @@ type WindowedHistogram interface { Total() (int64, float64) // MeanWindowed returns the average of the samples in the current window. MeanWindowed() float64 - // Mean returns the average of the sample in teh cumulative histogram. + // Mean returns the average of the sample in the cumulative histogram. Mean() float64 // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the windowed histogram. + // Methods implementing this interface should the merge buckets, sums, + // and counts of previous and current windows. ValueAtQuantileWindowed(q float64) float64 } @@ -224,7 +231,10 @@ const ( type HistogramOptions struct { // Metadata is the metric Metadata associated with the histogram. Metadata Metadata - // Duration is the histogram's window duration. + // Duration is the total duration of all windows in the histogram. + // The individual window duration is equal to the + // Duration/WindowedHistogramWrapNum (i.e., the number of windows + // in the histogram). Duration time.Duration // MaxVal is only relevant to the HdrHistogram, and represents the // highest trackable value in the resulting histogram buckets. @@ -256,7 +266,7 @@ func NewHistogram(opt HistogramOptions) IHistogram { // NewHistogram is a prometheus-backed histogram. Depending on the value of // opts.Buckets, this is suitable for recording any kind of quantity. Common // sensible choices are {IO,Network}LatencyBuckets. -func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64) *Histogram { +func newHistogram(meta Metadata, duration time.Duration, buckets []float64) *Histogram { // TODO(obs-inf): prometheus supports labeled histograms but they require more // plumbing and don't fit into the PrometheusObservable interface any more. opts := prometheus.HistogramOpts{ @@ -268,8 +278,11 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64 cum: cum, } h.windowed.tickHelper = &tickHelper{ - nextT: now(), - tickInterval: windowDuration, + nextT: now(), + // We want to divide the total window duration by the number of windows + // because we need to rotate the windows at uniformly distributed + // intervals within a histogram's total duration. + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { h.windowed.prev = h.windowed.cur h.windowed.cur = prometheus.NewHistogram(opts) @@ -293,16 +306,13 @@ type Histogram struct { Metadata cum prometheus.Histogram - // TODO(obs-inf): the way we implement windowed histograms is not great. If - // the windowed histogram is pulled right after a tick, it will be mostly - // empty. We could add a third bucket and represent the merged view of the two - // most recent buckets to avoid that. Or we could "just" double the rotation - // interval (so that the histogram really collects for 20s when we expect to - // persist the contents every 10s). Really it would make more sense to - // explicitly rotate the histogram atomically with collecting its contents, - // but that is now how we have set it up right now. It should be doable - // though, since there is only one consumer of windowed histograms - our - // internal timeseries system. + // TODO(obs-inf): the way we implement windowed histograms is not great. + // We could "just" double the rotation interval (so that the histogram really + // collects for 20s when we expect to persist the contents every 10s). + // Really it would make more sense to explicitly rotate the histogram + // atomically with collecting its contents, but that is now how we have set + // it up right now. It should be doable though, since there is only one + // consumer of windowed histograms - our internal timeseries system. windowed struct { // prometheus.Histogram is thread safe, so we only // need an RLock to record into it. But write lock @@ -368,15 +378,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric { return m } -// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type. +// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the +// right type. func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric { h.windowed.Lock() defer h.windowed.Unlock() - m := &prometheusgo.Metric{} - if err := h.windowed.cur.Write(m); err != nil { + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + if err := h.windowed.cur.Write(cur); err != nil { panic(err) } - return m + if h.windowed.prev != nil { + if err := h.windowed.prev.Write(prev); err != nil { + panic(err) + } + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + } + return cur } // GetMetadata returns the metric's metadata including the Prometheus @@ -428,7 +446,8 @@ func (h *Histogram) MeanWindowed() float64 { // 2. Since the prometheus client library ensures buckets are in a strictly // increasing order at creation, we do not sort them. func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 { - return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q) + return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, + q) } var _ PrometheusExportable = (*ManualWindowHistogram)(nil) @@ -592,11 +611,25 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric { return m } +// ToPrometheusMetricWindowedLocked returns a filled-in prometheus metric of the +// right type. +func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowedLocked() *prometheusgo.Metric { + cur := &prometheusgo.Metric{} + if err := mwh.mu.cum.Write(cur); err != nil { + panic(err) + } + if mwh.mu.prev != nil { + MergeWindowedHistogram(cur.Histogram, mwh.mu.prev) + } + return cur +} + // TotalWindowed implements the WindowedHistogram interface. func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) { mwh.mu.RLock() defer mwh.mu.RUnlock() - return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum() + pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram + return int64(pHist.GetSampleCount()), pHist.GetSampleSum() } // Total implements the WindowedHistogram interface. @@ -608,7 +641,8 @@ func (mwh *ManualWindowHistogram) Total() (int64, float64) { func (mwh *ManualWindowHistogram) MeanWindowed() float64 { mwh.mu.RLock() defer mwh.mu.RUnlock() - return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount()) + pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram + return pHist.GetSampleSum() / float64(pHist.GetSampleCount()) } func (mwh *ManualWindowHistogram) Mean() float64 { @@ -626,7 +660,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 { if mwh.mu.cur == nil { return 0 } - return ValueAtQuantileWindowed(mwh.mu.cur, q) + return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowedLocked().Histogram, q) } // A Counter holds a single mutable atomic value. @@ -881,6 +915,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata { return baseMetadata } +// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum +// from the previous windowed histogram to those of the current windowed +// histogram. +// NB: Buckets on each histogram must be the same +func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) { + for i, bucket := range cur.Bucket { + count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount + *bucket.CumulativeCount = count + } + sampleCount := *cur.SampleCount + *prev.SampleCount + *cur.SampleCount = sampleCount + sampleSum := *cur.SampleSum + *prev.SampleSum + *cur.SampleSum = sampleSum +} + // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the given histogram. func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 { diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index b69c87584e2d..a0f9cde324f3 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -15,6 +15,7 @@ import ( "encoding/json" "math" "reflect" + "sort" "sync" "testing" "time" @@ -285,11 +286,10 @@ func TestNewHistogramRotate(t *testing.T) { // But cumulative histogram has history (if i > 0). count, _ := h.Total() require.EqualValues(t, i, count) - // Add a measurement and verify it's there. { h.RecordValue(12345) - f := float64(12345) + f := float64(12345) + sum _, wSum := h.TotalWindowed() require.Equal(t, wSum, f) } @@ -298,3 +298,150 @@ func TestNewHistogramRotate(t *testing.T) { // Go to beginning. } } + +func TestHistogramWindowed(t *testing.T) { + defer TestingSetNow(nil)() + setNow(0) + + duration := 10 * time.Second + + h := NewHistogram(HistogramOptions{ + Mode: HistogramModePrometheus, + Metadata: Metadata{}, + Duration: duration, + Buckets: IOLatencyBuckets, + }) + + measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000, + 25000000, 30000000, 40000000, 90000000} + + // Sort the measurements so we can calculate the expected quantile values + // for the first windowed histogram after the measurements have been recorded. + sortedMeasurements := make([]int64, len(measurements)) + copy(sortedMeasurements, measurements) + sort.Slice(sortedMeasurements, func(i, j int) bool { + return sortedMeasurements[i] < sortedMeasurements[j] + }) + + // Calculate the expected quantile values as the lowest bucket values that are + // greater than each measurement. + count := 0 + j := 0 + var expQuantileValues []float64 + for i := range IOLatencyBuckets { + if j < len(sortedMeasurements) && IOLatencyBuckets[i] > float64( + sortedMeasurements[j]) { + count += 1 + j += 1 + expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i]) + } + } + + w := 2 + var expHist []prometheusgo.Histogram + var expSum float64 + var expCount uint64 + for i := 0; i < w; i++ { + h.Inspect(func(interface{}) {}) // trigger ticking + if i == 0 { + // If there is no previous window, we should be unable to calculate mean + // or quantile without any observations. + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(99.99)) + if !math.IsNaN(h.MeanWindowed()) { + t.Fatalf("mean should be undefined with no observations") + } + // Record all measurements on first iteration. + for _, m := range measurements { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + // Because we have 10 observations, we expect quantiles to correspond + // to observation indices (e.g., the 8th expected quantile value is equal + // to the value interpolated at the 80th percentile). + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) + require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10)) + require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } else { + // The SampleSum and SampleCount values in the current window before any + // observations should be equal to those of the previous window, after all + // observations (the quantile values will also be the same). + expSum = *expHist[i-1].SampleSum + expCount = *expHist[i-1].SampleCount + + // After recording a few higher-value observations in the second window, + // the quantile values will shift in the direction of the observations. + for _, m := range sortedMeasurements[len(sortedMeasurements)-3 : len( + sortedMeasurements)-1] { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + require.Less(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Less(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } + + // In all cases, the windowed mean should be equal to the expected sum/count + require.Equal(t, expSum/float64(expCount), h.MeanWindowed()) + + expHist = append(expHist, prometheusgo.Histogram{ + SampleCount: &expCount, + SampleSum: &expSum, + }) + + // Increment Now time to trigger tick on the following iteration. + setNow(time.Duration(i+1) * (duration / 2)) + } +} + +func TestMergeWindowedHistogram(t *testing.T) { + measurements := []int64{4000000, 90000000} + opts := prometheus.HistogramOpts{ + Buckets: IOLatencyBuckets, + } + + prevWindow := prometheus.NewHistogram(opts) + curWindow := prometheus.NewHistogram(opts) + + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + + prevWindow.Observe(float64(measurements[0])) + require.NoError(t, prevWindow.Write(prev)) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram into an empty current histogram + // should result in the current histogram containing the same sample sum, + // sample count, and per-bucket cumulative count values as the previous + // histogram. + require.Equal(t, uint64(1), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]), *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } + + curWindow.Observe(float64(measurements[1])) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram with a non-empty current histogram + // should result in the current histogram containing its original sample sum, + // sample count, and per-bucket cumulative count values, + // plus those of the previous histogram. + require.Equal(t, uint64(2), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]+measurements[1]), + *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[1]) { + require.Equal(t, uint64(2), *bucket.CumulativeCount) + } else if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } +}