Skip to content

Commit c37485e

Browse files
committed
add SetPersist
1 parent d11e039 commit c37485e

File tree

3 files changed

+24
-44
lines changed

3 files changed

+24
-44
lines changed

internal/datasystem/store.go

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Store struct {
4545
// Points to the active store. Swapped upon initialization.
4646
active subsystems.DataStore
4747

48-
quality DataQuality
48+
persist bool
4949

5050
// Protects the availability, persistentStore, quality, and active fields.
5151
mu sync.RWMutex
@@ -79,13 +79,19 @@ func NewStore(loggers ldlog.Loggers) *Store {
7979
s := &Store{
8080
persistentStore: nil,
8181
memoryStore: datastore.NewInMemoryDataStore(loggers),
82-
quality: QualityNone,
82+
persist: false,
8383
loggers: loggers,
8484
}
8585
s.active = s.memoryStore
8686
return s
8787
}
8888

89+
func (s *Store) SetPersist(persist bool) {
90+
s.mu.Lock()
91+
defer s.mu.Unlock()
92+
s.persist = persist
93+
}
94+
8995
// Close closes the store. If there is a persistent store configured, it will be closed.
9096
func (s *Store) Close() error {
9197
s.mu.Lock()
@@ -105,12 +111,11 @@ func (s *Store) getActive() subsystems.DataStore {
105111
}
106112

107113
// Mirroring returns true data is being mirrored to a persistent store.
108-
func (s *Store) mirroring() bool {
109-
return s.persistentStore != nil && s.persistentStore.mode == subsystems.DataStoreModeReadWrite &&
110-
s.quality == QualityTrusted
114+
func (s *Store) shouldPersist() bool {
115+
return s.persist && s.persistentStore != nil && s.persistentStore.mode == subsystems.DataStoreModeReadWrite
111116
}
112117

113-
// nolint:revive // Standard DataSourceUpdateSink method
118+
// nolint:revive // Standard DataDestination method
114119
func (s *Store) Init(allData []ldstoretypes.Collection, payloadVersion *int) bool {
115120
s.mu.Lock()
116121
defer s.mu.Unlock()
@@ -119,16 +124,15 @@ func (s *Store) Init(allData []ldstoretypes.Collection, payloadVersion *int) boo
119124
// TODO: handle errors from initializing the memory or persistent stores.
120125
if err := s.memoryStore.Init(allData); err == nil {
121126
s.active = s.memoryStore
122-
s.quality = QualityTrusted
123127
}
124128

125-
if s.mirroring() {
129+
if s.shouldPersist() {
126130
_ = s.persistentStore.impl.Init(allData) // TODO: insert in topo-sort order
127131
}
128132
return true
129133
}
130134

131-
// nolint:revive // Standard DataSourceUpdateSink method
135+
// nolint:revive // Standard DataDestination method
132136
func (s *Store) Upsert(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) bool {
133137
s.mu.RLock()
134138
defer s.mu.RUnlock()
@@ -141,13 +145,14 @@ func (s *Store) Upsert(kind ldstoretypes.DataKind, key string, item ldstoretypes
141145
// TXNS-PS: Requirement 1.3.3, must apply updates to in-memory before the persistent store.
142146
_, memErr = s.memoryStore.Upsert(kind, key, item)
143147

144-
if s.mirroring() {
148+
if s.shouldPersist() {
145149
_, persErr = s.persistentStore.impl.Upsert(kind, key, item)
146150
}
147151
return memErr == nil && persErr == nil
148152
}
149153

150-
// nolint:revive // Standard DataSourceUpdateSink method
154+
// GetDataStoreStatusProvider returns the status provider for the persistent store, if one is configured, otherwise
155+
// nil.
151156
func (s *Store) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider {
152157
s.mu.RLock()
153158
defer s.mu.RUnlock()
@@ -171,15 +176,14 @@ func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems
171176
}
172177

173178
s.active = s.persistentStore.impl
174-
s.quality = QualityUntrusted
175179
return s
176180
}
177181

178182
func (s *Store) Commit() error {
179183
s.mu.RLock()
180184
defer s.mu.RUnlock()
181185

182-
if s.mirroring() {
186+
if s.shouldPersist() {
183187
flags, err := s.memoryStore.GetAll(datakinds.Features)
184188
if err != nil {
185189
return err
@@ -207,17 +211,3 @@ func (s *Store) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDe
207211
func (s *Store) IsInitialized() bool {
208212
return s.getActive().IsInitialized()
209213
}
210-
211-
type DataQuality int
212-
213-
const (
214-
QualityNone = DataQuality(0)
215-
QualityUntrusted = DataQuality(1)
216-
QualityTrusted = DataQuality(2)
217-
)
218-
219-
func (s *Store) DataQuality() DataQuality {
220-
s.mu.RLock()
221-
defer s.mu.RUnlock()
222-
return s.quality
223-
}

internal/datasystem/store_test.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,42 +22,29 @@ func TestStore_New(t *testing.T) {
2222
assert.NoError(t, store.Close())
2323
}
2424

25-
func TestStore_NoPersistence_NewStore_DataQuality(t *testing.T) {
26-
logCapture := ldlogtest.NewMockLog()
27-
store := NewStore(logCapture.Loggers)
28-
defer store.Close()
29-
assert.Equal(t, store.DataQuality(), QualityNone)
30-
}
31-
3225
func TestStore_NoPersistence_NewStore_IsInitialized(t *testing.T) {
3326
logCapture := ldlogtest.NewMockLog()
3427
store := NewStore(logCapture.Loggers)
3528
defer store.Close()
3629
assert.False(t, store.IsInitialized())
3730
}
3831

39-
func TestStore_NoPersistence_MemoryStoreInitialized_DataQualityIsTrusted(t *testing.T) {
40-
// It doesn't matter if the data has a payload version or not: the data quality should be
41-
// trusted if it came from an initializer or synchronizer.
42-
// This isn't necessarily what we want going forward, the quality should vary depending on the
43-
// initializer/synchronizer implementation.
32+
func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) {
4433

4534
version1 := 1
4635
tests := []struct {
4736
name string
4837
payloadVersion *int
49-
quality DataQuality
5038
}{
51-
{"fresh data", &version1, QualityTrusted},
52-
{"stale data", nil, QualityTrusted},
39+
{"versioned data", &version1},
40+
{"unversioned data", nil},
5341
}
5442
for _, tt := range tests {
5543
t.Run(tt.name, func(t *testing.T) {
5644
logCapture := ldlogtest.NewMockLog()
5745
store := NewStore(logCapture.Loggers)
5846
defer store.Close()
5947
store.Init([]ldstoretypes.Collection{}, tt.payloadVersion)
60-
assert.Equal(t, store.DataQuality(), tt.quality)
6148
assert.True(t, store.IsInitialized())
6249
})
6350
}
@@ -95,6 +82,8 @@ func TestStore_Commit(t *testing.T) {
9582

9683
spy.isDown = false
9784

85+
store.SetPersist(true)
86+
9887
require.NoError(t, store.Commit())
9988

10089
assert.Equal(t, initPayload, spy.initPayload)
@@ -212,7 +201,7 @@ func TestStore_Concurrency(t *testing.T) {
212201
wg.Add(1)
213202
defer wg.Done()
214203
for i := 0; i < 100; i++ {
215-
_ = store.DataQuality()
204+
store.SetPersist(true)
216205
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
217206
}
218207
}()

subsystems/data_destination.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
// Do not use it.
1212
// You have been warned.
1313
type DataDestination interface {
14+
1415
// Init overwrites the current contents of the data store with a set of items for each collection.
1516
//
1617
// If the underlying data store returns an error during this operation, the SDK will log it,

0 commit comments

Comments
 (0)