diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 11da0c184b8..16a2bd13660 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2930,6 +2930,8 @@ const ( ShardDistributorAssignLoopSuccess ShardDistributorAssignLoopFail + ShardDistributorActiveShards + ShardDistributorStoreExecutorNotFound ShardDistributorStoreFailuresPerNamespace ShardDistributorStoreRequestsPerNamespace @@ -3702,6 +3704,8 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ShardDistributorAssignLoopSuccess: {metricName: "shard_distrubutor_shard_assign_success", metricType: Counter}, ShardDistributorAssignLoopFail: {metricName: "shard_distrubutor_shard_assign_fail", metricType: Counter}, + ShardDistributorActiveShards: {metricName: "shard_distributor_active_shards", metricType: Gauge}, + ShardDistributorStoreExecutorNotFound: {metricName: "shard_distributor_store_executor_not_found", metricType: Counter}, ShardDistributorStoreFailuresPerNamespace: {metricName: "shard_distributor_store_failures_per_namespace", metricType: Counter}, ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 9ee9e0aecb5..071e5680dd7 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -347,6 +347,10 @@ func NamespaceTag(namespace string) Tag { return metricWithUnknown("namespace", namespace) } +func NamespaceTypeTag(namespaceType string) Tag { + return metricWithUnknown("namespace_type", namespaceType) +} + func TaskCategoryTag(category string) Tag { return metricWithUnknown("task_category", category) } diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 60421952ec0..f6befdfa0f1 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -325,6 +325,15 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo return fmt.Errorf("assign shards: %w", err) } + totalActiveShards := 0 + for _, assignedState := range namespaceState.ShardAssignments { + totalActiveShards += len(assignedState.AssignedShards) + } + metricsLoopScope.Tagged( + metrics.NamespaceTag(p.namespaceCfg.Name), + metrics.NamespaceTypeTag(p.namespaceCfg.Type), + ).UpdateGauge(metrics.ShardDistributorActiveShards, float64(totalActiveShards)) + return nil }