Skip to content

Commit e0af1cc

Browse files
Backport 7804 into 2.7.1 (#7896)
use grpc for communicating with compactor for query time filtering of data requested for deletion (#7804) Manual backport of #7804 and #7814 **What this PR does / why we need it**: Add grpc support to compactor for getting delete requests and gen number for query time filtering. Since these requests are internal to Loki, it would be good to use grpc instead of HTTP same as all the internal requests we do in Loki. I have added a new config for accepting the grpc address of the compactor. I tried having just the existing config and detecting if it is a grpc server, but it was hard to do it reliably, considering the different deployment modes we support. I think it is safe to keep it the same and eventually deprecate the existing config. **Checklist** - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated (cherry picked from commit 1410808) Co-authored-by: Sandeep Sukhani <[email protected]>
1 parent 6f4c2ab commit e0af1cc

29 files changed

+2341
-326
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,16 @@ Check the history of the branch FIXME.
4545

4646
#### Loki
4747

48+
##### Enhancements
49+
50+
* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion.
51+
4852
##### Fixes
53+
4954
* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients
5055

5156
##### Changes
57+
5258
* [7877](https://github.com/grafana/loki/pull/7877)A **trevorwhitney**: Due to a known bug with experimental new delete mode feature, the default delete mode has been changed to `filter-only`.
5359

5460
## 2.7.0

clients/pkg/promtail/targets/cloudflare/fields.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ var (
3131
allFields = append(extendedFields, []string{
3232
"BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources",
3333
"FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID",
34-
"RequestHeaders", "ResponseHeaders",
34+
"RequestHeaders", "ResponseHeaders",
3535
}...)
3636
)
3737

clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func Test_validateJobName(t *testing.T) {
6868
},
6969
},
7070
},
71-
wantErr: false,
71+
wantErr: false,
7272
expectedJob: "job_1_2_3_4_job",
7373
},
7474
}

docs/sources/configuration/_index.md

+4
Original file line numberDiff line numberDiff line change
@@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places.
29112911
# CLI flag: -common.compactor-address
29122912
[compactor_address: <string> | default = ""]
29132913

2914+
# Address and port number where the compactor grpc requests are being served.
2915+
# CLI flag: -common.compactor-grpc-address
2916+
[compactor_grpc_address: <string> | default = ""]
2917+
29142918
## analytics
29152919

29162920
The `analytics` block configures the reporting of Loki analytics to grafana.com.

pkg/logqlmodel/logqlmodel.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package logqlmodel
22

33
import (
4-
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
54
"github.com/prometheus/prometheus/promql/parser"
65

6+
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
7+
78
"github.com/grafana/loki/pkg/logproto"
89
"github.com/grafana/loki/pkg/logqlmodel/stats"
910
)

pkg/logqlmodel/metadata/context_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import (
55
"errors"
66
"testing"
77

8-
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
98
"github.com/stretchr/testify/require"
9+
10+
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
1011
)
1112

1213
func TestHeaders(t *testing.T) {

pkg/loki/common/common.go

+4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type Config struct {
4343

4444
// CompactorAddress is the http address of the compactor in the form http://host:port
4545
CompactorAddress string `yaml:"compactor_address"`
46+
47+
// CompactorAddress is the grpc address of the compactor in the form host:port
48+
CompactorGRPCAddress string `yaml:"compactor_grpc_address"`
4649
}
4750

4851
func (c *Config) RegisterFlags(f *flag.FlagSet) {
@@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
5760
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")
5861

5962
f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
63+
f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port")
6064
}
6165

6266
type Storage struct {

pkg/loki/loki.go

+28-25
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/grafana/loki/pkg/storage"
4646
"github.com/grafana/loki/pkg/storage/config"
4747
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
48+
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
4849
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
4950
"github.com/grafana/loki/pkg/storage/stores/series/index"
5051
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
@@ -69,30 +70,31 @@ type Config struct {
6970
UseBufferedLogger bool `yaml:"use_buffered_logger"`
7071
UseSyncLogger bool `yaml:"use_sync_logger"`
7172

72-
Common common.Config `yaml:"common,omitempty"`
73-
Server server.Config `yaml:"server,omitempty"`
74-
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
75-
Distributor distributor.Config `yaml:"distributor,omitempty"`
76-
Querier querier.Config `yaml:"querier,omitempty"`
77-
CompactorClient compactor.ClientConfig `yaml:"delete_client,omitempty"`
78-
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
79-
Ingester ingester.Config `yaml:"ingester,omitempty"`
80-
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
81-
IndexGateway indexgateway.Config `yaml:"index_gateway"`
82-
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
83-
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
84-
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
85-
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
86-
Worker worker.Config `yaml:"frontend_worker,omitempty"`
87-
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
88-
Ruler ruler.Config `yaml:"ruler,omitempty"`
89-
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
90-
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
91-
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
92-
Tracing tracing.Config `yaml:"tracing"`
93-
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
94-
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
95-
UsageReport usagestats.Config `yaml:"analytics"`
73+
Common common.Config `yaml:"common,omitempty"`
74+
Server server.Config `yaml:"server,omitempty"`
75+
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
76+
Distributor distributor.Config `yaml:"distributor,omitempty"`
77+
Querier querier.Config `yaml:"querier,omitempty"`
78+
CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"`
79+
CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
80+
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
81+
Ingester ingester.Config `yaml:"ingester,omitempty"`
82+
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
83+
IndexGateway indexgateway.Config `yaml:"index_gateway"`
84+
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
85+
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
86+
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
87+
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
88+
Worker worker.Config `yaml:"frontend_worker,omitempty"`
89+
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
90+
Ruler ruler.Config `yaml:"ruler,omitempty"`
91+
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
92+
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
93+
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
94+
Tracing tracing.Config `yaml:"tracing"`
95+
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
96+
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
97+
UsageReport usagestats.Config `yaml:"analytics"`
9698
}
9799

98100
// RegisterFlags registers flag.
@@ -115,7 +117,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
115117
c.Common.RegisterFlags(f)
116118
c.Distributor.RegisterFlags(f)
117119
c.Querier.RegisterFlags(f)
118-
c.CompactorClient.RegisterFlags(f)
120+
c.CompactorHTTPClient.RegisterFlags(f)
121+
c.CompactorGRPCClient.RegisterFlags(f)
119122
c.IngesterClient.RegisterFlags(f)
120123
c.Ingester.RegisterFlags(f)
121124
c.StorageConfig.RegisterFlags(f)

pkg/loki/modules.go

+45-22
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"strings"
1313
"time"
1414

15-
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
16-
1715
"github.com/NYTimes/gziphandler"
1816
"github.com/go-kit/log"
1917
"github.com/go-kit/log/level"
@@ -43,6 +41,7 @@ import (
4341
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
4442
"github.com/grafana/loki/pkg/querier"
4543
"github.com/grafana/loki/pkg/querier/queryrange"
44+
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
4645
"github.com/grafana/loki/pkg/ruler"
4746
base_ruler "github.com/grafana/loki/pkg/ruler/base"
4847
"github.com/grafana/loki/pkg/runtime"
@@ -54,6 +53,8 @@ import (
5453
"github.com/grafana/loki/pkg/storage/config"
5554
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
5655
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
56+
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
57+
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
5758
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
5859
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
5960
"github.com/grafana/loki/pkg/storage/stores/series/index"
@@ -676,41 +677,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
676677
func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
677678
var client generationnumber.CacheGenClient
678679
if t.supportIndexDeleteRequest() {
679-
compactorAddress, err := t.compactorAddress()
680-
if err != nil {
681-
return nil, err
682-
}
683-
684-
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
680+
compactorAddress, isGRPCAddress, err := t.compactorAddress()
685681
if err != nil {
686682
return nil, err
687683
}
688684

689-
client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
690-
if err != nil {
691-
return nil, err
685+
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer)
686+
if isGRPCAddress {
687+
client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
688+
if err != nil {
689+
return nil, err
690+
}
691+
} else {
692+
client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
693+
if err != nil {
694+
return nil, err
695+
}
692696
}
693697
}
694698

695699
t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer)
696-
return services.NewIdleService(nil, nil), nil
700+
return services.NewIdleService(nil, func(failureCase error) error {
701+
t.cacheGenerationLoader.Stop()
702+
return nil
703+
}), nil
697704
}
698705

699706
func (t *Loki) supportIndexDeleteRequest() bool {
700707
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
701708
}
702709

703-
func (t *Loki) compactorAddress() (string, error) {
710+
// compactorAddress returns the configured address of the compactor.
711+
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
712+
func (t *Loki) compactorAddress() (string, bool, error) {
704713
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
705714
// In single binary or read modes, this module depends on Server
706-
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
715+
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
707716
}
708717

709-
if t.Cfg.Common.CompactorAddress == "" {
710-
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
718+
if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
719+
return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured")
711720
}
712721

713-
return t.Cfg.Common.CompactorAddress, nil
722+
if t.Cfg.Common.CompactorGRPCAddress != "" {
723+
return t.Cfg.Common.CompactorGRPCAddress, true, nil
724+
}
725+
726+
return t.Cfg.Common.CompactorAddress, false, nil
714727
}
715728

716729
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
@@ -1007,6 +1020,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
10071020
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))
10081021
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))
10091022
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))
1023+
grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler)
10101024
}
10111025

10121026
return t.compactor, nil
@@ -1123,17 +1137,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
11231137
return deletion.NewNoOpDeleteRequestsStore(), nil
11241138
}
11251139

1126-
compactorAddress, err := t.compactorAddress()
1140+
compactorAddress, isGRPCAddress, err := t.compactorAddress()
11271141
if err != nil {
11281142
return nil, err
11291143
}
11301144

1131-
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
1132-
if err != nil {
1133-
return nil, err
1145+
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer)
1146+
var compactorClient deletion.CompactorClient
1147+
if isGRPCAddress {
1148+
compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
1149+
if err != nil {
1150+
return nil, err
1151+
}
1152+
} else {
1153+
compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
1154+
if err != nil {
1155+
return nil, err
1156+
}
11341157
}
11351158

1136-
client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType)
1159+
client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType)
11371160
if err != nil {
11381161
return nil, err
11391162
}

pkg/querier/queryrange/queryrangebase/middleware_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ type fakeGenNumberLoader struct {
3131
func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
3232
return l.genNumber
3333
}
34+
35+
func (l *fakeGenNumberLoader) Stop() {}

pkg/querier/queryrange/queryrangebase/results_cache.go

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri
6868

6969
type CacheGenNumberLoader interface {
7070
GetResultsCacheGenNumber(tenantIDs []string) string
71+
Stop()
7172
}
7273

7374
// ResultsCacheConfig is the config for the results cache.

pkg/querier/queryrange/queryrangebase/results_cache_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -1054,3 +1054,5 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader {
10541054
func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
10551055
return ""
10561056
}
1057+
1058+
func (l mockCacheGenNumberLoader) Stop() {}

0 commit comments

Comments
 (0)