Skip to content

Commit 786b656

Browse files
committed
added rbsproxystorage
1 parent 7f46c3d commit 786b656

File tree

8 files changed

+126
-15
lines changed

8 files changed

+126
-15
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/gin-gonic/gin v1.10.1
99
github.com/google/uuid v1.3.0
1010
github.com/splitio/gincache v1.0.1
11-
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f
11+
github.com/splitio/go-split-commons/v8 v8.0.0-20251022200537-5573c2615caf
1212
github.com/splitio/go-toolkit/v5 v5.4.1
1313
github.com/stretchr/testify v1.11.1
1414
go.etcd.io/bbolt v1.3.6

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
7474
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
7575
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
7676
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
77-
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f h1:2o8Hu3G4jAoF6Y0Ceptr4Bwp3x9wFDenp494Cu/V5nU=
78-
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI=
77+
github.com/splitio/go-split-commons/v8 v8.0.0-20251022200537-5573c2615caf h1:Gvd5Nv1Jnt943JYAnTXS8XljfLKuAOXNBanr+j9Yyuk=
78+
github.com/splitio/go-split-commons/v8 v8.0.0-20251022200537-5573c2615caf/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI=
7979
github.com/splitio/go-toolkit/v5 v5.4.1 h1:srTyvDBJZMUcJ/KiiQDMyjCuELVgTBh2TGRVn0sOXEE=
8080
github.com/splitio/go-toolkit/v5 v5.4.1/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw=
8181
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

splitio/commitversion.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ This file is created automatically, please do not edit
55
*/
66

77
// CommitVersion is the version of the last commit previous to release
8-
const CommitVersion = "5e4d9e1"
8+
const CommitVersion = "7f46c3d"

splitio/proxy/caching/workers.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type CacheAwareSplitSynchronizer struct {
2424
// NewCacheAwareSplitSync constructs a split-sync wrapper that evicts cache on updates
2525
func NewCacheAwareSplitSync(
2626
splitStorage storage.SplitStorage,
27+
ruleBasedStorage storage.RuleBasedSegmentsStorage,
2728
splitFetcher service.SplitFetcher,
2829
logger logging.LoggerInterface,
2930
runtimeTelemetry storage.TelemetryRuntimeProducer,
@@ -34,7 +35,7 @@ func NewCacheAwareSplitSync(
3435
) *CacheAwareSplitSynchronizer {
3536
return &CacheAwareSplitSynchronizer{
3637
// TODO add ruleBasedSegmentStorage, ruleBuilder, increase FLAG SPEC when we support RUleBased
37-
wrapped: split.NewSplitUpdater(splitStorage, nil, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, specVersion),
38+
wrapped: split.NewSplitUpdater(splitStorage, ruleBasedStorage, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, specVersion),
3839
splitStorage: splitStorage,
3940
cacheFlusher: cacheFlusher,
4041
}
@@ -81,15 +82,15 @@ type CacheAwareSegmentSynchronizer struct {
8182
func NewCacheAwareSegmentSync(
8283
splitStorage storage.SplitStorage,
8384
segmentStorage storage.SegmentStorage,
85+
ruleBasedStorage storage.RuleBasedSegmentsStorage,
8486
segmentFetcher service.SegmentFetcher,
8587
logger logging.LoggerInterface,
8688
runtimeTelemetry storage.TelemetryRuntimeProducer,
8789
cacheFlusher gincache.CacheFlusher,
8890
appMonitor application.MonitorProducerInterface,
8991
) *CacheAwareSegmentSynchronizer {
9092
return &CacheAwareSegmentSynchronizer{
91-
// TODO add ruleBasedSegmentStorage
92-
wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, nil, segmentFetcher, logger, runtimeTelemetry, appMonitor),
93+
wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, ruleBasedStorage, segmentFetcher, logger, runtimeTelemetry, appMonitor),
9394
cacheFlusher: cacheFlusher,
9495
splitStorage: splitStorage,
9596
segmentStorage: segmentStorage,

splitio/proxy/controllers/sdk.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
127127

128128
sParam, _ := ctx.GetQuery("s")
129129
spec, err := specs.ParseAndValidate(sParam)
130+
if spec == "1.3" {
131+
ctx.JSON(http.StatusBadRequest, gin.H{"code": 400, "message": "lala"})
132+
return
133+
}
130134
if err != nil {
131135
c.logger.Error(fmt.Sprintf("error parsing spec version: %s.", err))
132136
ctx.JSON(http.StatusBadRequest, gin.H{"code": 400, "message": err.Error()})

splitio/proxy/initialization.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
8383

8484
// Proxy storages already implement the observable interface, so no need to wrap them
8585
splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(cfg.FlagSetsFilter), cfg.Initialization.Snapshot != "")
86+
ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(logger)
8687
segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "")
8788
largeSegmentStorage := inmemory.NewLargeSegmentsStorage()
8889

@@ -119,8 +120,8 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
119120

120121
// setup feature flags, segments & local telemetry API interactions
121122
workers := synchronizer.Workers{
122-
SplitUpdater: caching.NewCacheAwareSplitSync(splitStorage, splitAPI.SplitFetcher, logger, localTelemetryStorage, httpCache, appMonitor, flagSetsFilter, advanced.FlagsSpecVersion),
123-
SegmentUpdater: caching.NewCacheAwareSegmentSync(splitStorage, segmentStorage, splitAPI.SegmentFetcher, logger, localTelemetryStorage, httpCache,
123+
SplitUpdater: caching.NewCacheAwareSplitSync(splitStorage, ruleBasedStorage, splitAPI.SplitFetcher, logger, localTelemetryStorage, httpCache, appMonitor, flagSetsFilter, advanced.FlagsSpecVersion),
124+
SegmentUpdater: caching.NewCacheAwareSegmentSync(splitStorage, segmentStorage, ruleBasedStorage, splitAPI.SegmentFetcher, logger, localTelemetryStorage, httpCache,
124125
appMonitor),
125126
TelemetryRecorder: telemetry.NewTelemetrySynchronizer(localTelemetryStorage, telemetryRecorder, splitStorage, segmentStorage, logger,
126127
metadata, localTelemetryStorage),
@@ -165,7 +166,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
165166
// health monitors are only started after successful init (otherwise they'll fail if the app doesn't sync correctly within the
166167
/// specified refresh period)
167168
before := time.Now()
168-
err = startBGSyng(syncManager, mstatus, cfg.Initialization.Snapshot != "", func() {
169+
err = startBGSync(syncManager, mstatus, cfg.Initialization.Snapshot != "", func() {
169170
logger.Info("Synchronizer tasks started")
170171
appMonitor.Start()
171172
servicesMonitor.Start()
@@ -262,6 +263,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
262263
FlagSets: cfg.FlagSetsFilter,
263264
FlagSetsStrictMatching: cfg.FlagSetStrictMatching,
264265
ProxyLargeSegmentStorage: largeSegmentStorage,
266+
SpecVersion: cfg.FlagSpecVersion,
265267
}
266268

267269
if ilcfg := cfg.Integrations.ImpressionListener; ilcfg.Endpoint != "" {
@@ -286,8 +288,7 @@ var (
286288
errUnrecoverable = errors.New("error and no snapshot available")
287289
)
288290

289-
func startBGSyng(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, onReady func()) error {
290-
291+
func startBGSync(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, onReady func()) error {
291292
attemptInit := func() bool {
292293
go m.Start()
293294
status := <-mstatus

splitio/proxy/initialization_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (m *syncManagerMock) Start() {
2525
}
2626
func (m *syncManagerMock) Stop() { panic("unimplemented") }
2727

28-
func (m *syncManagerMock) StartBGSyng(mstatus chan int, shouldRetry bool, onReady func()) error {
28+
func (m *syncManagerMock) StartBGSync(mstatus chan int, shouldRetry bool, onReady func()) error {
2929
panic("unimplemented")
3030
}
3131

@@ -37,7 +37,7 @@ func TestSyncManagerInitializationRetriesWithSnapshot(t *testing.T) {
3737

3838
// No snapshot and error
3939
complete := make(chan struct{}, 1)
40-
err := startBGSyng(sm, sm.c, false, func() { complete <- struct{}{} })
40+
err := startBGSync(sm, sm.c, false, func() { complete <- struct{}{} })
4141
if err != errUnrecoverable {
4242
t.Error("should be an unrecoverable error. Got: ", err)
4343
}
@@ -51,7 +51,7 @@ func TestSyncManagerInitializationRetriesWithSnapshot(t *testing.T) {
5151

5252
// Snapshot and error
5353
atomic.StoreInt64(&sm.execCount, 0)
54-
err = startBGSyng(sm, sm.c, true, func() { complete <- struct{}{} })
54+
err = startBGSync(sm, sm.c, true, func() { complete <- struct{}{} })
5555
if err != errRetrying {
5656
t.Error("should be a retrying error. Got: ", err)
5757
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package storage
2+
3+
import (
4+
"github.com/splitio/go-split-commons/v8/dtos"
5+
"github.com/splitio/go-split-commons/v8/storage"
6+
"github.com/splitio/go-split-commons/v8/storage/inmemory/mutexmap"
7+
"github.com/splitio/go-toolkit/v5/datastructures/set"
8+
"github.com/splitio/go-toolkit/v5/logging"
9+
)
10+
11+
// ProxyRuleBasedSegmentsStorage defines the interface of a storage that can be used for serving payloads
12+
// for different requested `since` parameters
13+
type ProxyRuleBasedSegmentsStorage interface {
14+
ChangesSince(since int64) (*dtos.RuleBasedSegmentsDTO, error)
15+
}
16+
17+
// ProxyRuleBasedSegmentsStorageImpl implements the ProxyRuleBasedSegmentsStorage interface and the SplitProducer interface
18+
type ProxyRuleBasedSegmentsStorageImpl struct {
19+
snapshot mutexmap.RuleBasedSegmentsStorageImpl
20+
logger logging.LoggerInterface
21+
// mtx sync.Mutex
22+
}
23+
24+
// NewProxyRuleBasedSegmentsStorage instantiates a new proxy storage that wraps an in-memory snapshot of the last known
25+
// flag configuration
26+
func NewProxyRuleBasedSegmentsStorage(logger logging.LoggerInterface) *ProxyRuleBasedSegmentsStorageImpl {
27+
snapshot := mutexmap.NewRuleBasedSegmentsStorage()
28+
29+
return &ProxyRuleBasedSegmentsStorageImpl{
30+
snapshot: *snapshot,
31+
logger: logger,
32+
}
33+
}
34+
35+
// ChangesSince retrieves the rule-based segments changes since the given change number
36+
func (p *ProxyRuleBasedSegmentsStorageImpl) ChangesSince(since int64) (*dtos.RuleBasedSegmentsDTO, error) {
37+
cn, _ := p.snapshot.ChangeNumber()
38+
return &dtos.RuleBasedSegmentsDTO{Since: since, Till: cn, RuleBasedSegments: p.snapshot.All()}, nil
39+
}
40+
41+
// All call is forwarded to the snapshot
42+
func (p *ProxyRuleBasedSegmentsStorageImpl) All() []dtos.RuleBasedSegmentDTO {
43+
return p.snapshot.All()
44+
}
45+
46+
// ChangeNumber returns the current change number
47+
func (p *ProxyRuleBasedSegmentsStorageImpl) ChangeNumber() (int64, error) {
48+
return p.snapshot.ChangeNumber()
49+
}
50+
51+
// Contains checks if the given rule-based segments are present in storage
52+
func (p *ProxyRuleBasedSegmentsStorageImpl) Contains(rbs []string) bool {
53+
return p.snapshot.Contains(rbs)
54+
}
55+
56+
// GetRuleBasedSegmentByName retrieves a rule-based segment by name
57+
func (p *ProxyRuleBasedSegmentsStorageImpl) GetRuleBasedSegmentByName(name string) (*dtos.RuleBasedSegmentDTO, error) {
58+
return p.snapshot.GetRuleBasedSegmentByName(name)
59+
}
60+
61+
// LargeSegments call is forwarded to the snapshot
62+
func (p *ProxyRuleBasedSegmentsStorageImpl) LargeSegments() *set.ThreadUnsafeSet {
63+
return p.snapshot.LargeSegments()
64+
}
65+
66+
// ReplaceAll replaces all rule-based segments in storage
67+
func (p *ProxyRuleBasedSegmentsStorageImpl) ReplaceAll(rbs []dtos.RuleBasedSegmentDTO, cn int64) error {
68+
return p.snapshot.ReplaceAll(rbs, cn)
69+
}
70+
71+
// RuleBasedSegmentNames retrieves the names of all rule-based segments
72+
func (p *ProxyRuleBasedSegmentsStorageImpl) RuleBasedSegmentNames() []string {
73+
return p.snapshot.RuleBasedSegmentNames()
74+
}
75+
76+
// Segments retrieves the names of all segments used in rule-based segments
77+
func (p *ProxyRuleBasedSegmentsStorageImpl) Segments() *set.ThreadUnsafeSet {
78+
return p.snapshot.Segments()
79+
}
80+
81+
// SetChangeNumber sets the change number
82+
func (p *ProxyRuleBasedSegmentsStorageImpl) SetChangeNumber(cn int64) error {
83+
return p.snapshot.SetChangeNumber(cn)
84+
}
85+
86+
// Update
87+
func (p *ProxyRuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, cn int64) {
88+
// TODO Add the other logic
89+
// p.setStartingPoint(changeNumber) // will be executed only the first time this method is called
90+
91+
// if len(toAdd) == 0 && len(toRemove) == 0 {
92+
// return
93+
// }
94+
95+
// p.mtx.Lock()
96+
// p.snapshot.Update(toAdd, toRemove, changeNumber)
97+
// p.historic.Update(toAdd, toRemove, changeNumber)
98+
// p.db.Update(toAdd, toRemove, changeNumber)
99+
// p.mtx.Unlock()
100+
101+
p.snapshot.Update(toAdd, toRemove, cn)
102+
}
103+
104+
var _ ProxyRuleBasedSegmentsStorage = (*ProxyRuleBasedSegmentsStorageImpl)(nil)
105+
var _ storage.RuleBasedSegmentsStorage = (*ProxyRuleBasedSegmentsStorageImpl)(nil)

0 commit comments

Comments
 (0)