|
| 1 | +package datasystem |
| 2 | + |
| 3 | +import ( |
| 4 | + "sync" |
| 5 | + |
| 6 | + "github.com/launchdarkly/go-sdk-common/v3/ldlog" |
| 7 | + "github.com/launchdarkly/go-server-sdk/v7/interfaces" |
| 8 | + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" |
| 9 | + "github.com/launchdarkly/go-server-sdk/v7/internal/datastore" |
| 10 | + "github.com/launchdarkly/go-server-sdk/v7/subsystems" |
| 11 | + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" |
| 12 | +) |
| 13 | + |
| 14 | +// Store is a hybrid persistent/in-memory store that serves queries for data from the evaluation |
| 15 | +// algorithm. |
| 16 | +// |
| 17 | +// At any given moment, 1 of 2 stores is active: in-memory, or persistent. This doesn't preclude a caller |
| 18 | +// from holding on to a reference to the persistent store even when we swap to the in-memory store. |
| 19 | +// |
| 20 | +// Once the in-memory store has data (either from initializers running, or from a synchronizer), the persistent |
| 21 | +// store is no longer regarded as active. From that point forward, GetActive() will return the in-memory store. |
| 22 | +// |
| 23 | +// The idea is that persistent stores can offer a way to immediately start evaluating flags before a connection |
| 24 | +// is made to LD (or even in a very brief moment before an initializer has run.) The persistent store has caching |
| 25 | +// logic which can result in inconsistent/stale date being used. Therefore, once we have fresh data, we don't |
| 26 | +// want to use the persistent store at all. |
| 27 | +// |
| 28 | +// A complication is that persistent stores have historically operated in multiple regimes. The first is "daemon mode", |
| 29 | +// where the SDK is effectively using the store in read-only mode, with the store being populated by Relay or another SDK. |
| 30 | +// The second is just plain persistent store mode, where it is both read and written to. In the FDv2 system, we explicitly |
| 31 | +// differentiate these cases using a read/read-write mode. In all cases, the in-memory store is used once it has data available. |
| 32 | +// This contrasts from FDv1 where even if data from LD is available, that data may fall out of memory due to the persistent |
| 33 | +// store's caching logic ("sparse mode", when the TTL is non-infinite). |
| 34 | +// |
| 35 | +// We have found this to almost always be undesirable for users. |
| 36 | +type Store struct { |
| 37 | + // Represents a remote store, like Redis. This is optional; if present, it's only used |
| 38 | + // before the in-memory store is initialized. |
| 39 | + persistentStore subsystems.DataStore |
| 40 | + |
| 41 | + // The persistentStore is read-only, or read-write. In read-only mode, the store |
| 42 | + // is *never* written to, and only read before the in-memory store is initialized. |
| 43 | + // This is equivalent to the concept of "daemon mode". |
| 44 | + // |
| 45 | + // In read-write mode, data from initializers/synchronizers is written to the store |
| 46 | + // as it is received. This is equivalent to the normal "persistent store" configuration |
| 47 | + // that an SDK can use to collaborate with zero or more other SDKs with a (possibly shared) database. |
| 48 | + persistentStoreMode subsystems.DataStoreMode |
| 49 | + |
| 50 | + // This exists as a quirk of the DataSourceUpdateSink interface, which store implements. The DataSourceUpdateSink |
| 51 | + // has a method to return a DataStoreStatusProvider so that a DataSource can monitor the state of the store. This |
| 52 | + // was originally used in fdv1 to know when the store went offline/online, so that data could be committed back |
| 53 | + // to the store when it came back online. In fdv2 system, this is handled by the FDv2 struct itself, so the |
| 54 | + // data source doesn't need any knowledge of it. We can delete this piece of infrastructure when we no longer |
| 55 | + // need to support fdv1 (or we could refactor the fdv2 data sources to use a different set of interfaces that don't |
| 56 | + // require this.) |
| 57 | + persistentStoreStatusProvider interfaces.DataStoreStatusProvider |
| 58 | + |
| 59 | + // Represents the store that all flag/segment data queries are served from after data is received from |
| 60 | + // initializers/synchronizers. Before the in-memory store is initialized, queries are served from the |
| 61 | + // persistentStore (if configured). |
| 62 | + memoryStore subsystems.DataStore |
| 63 | + |
| 64 | + active subsystems.DataStore |
| 65 | + |
| 66 | + // Whether the memoryStore's data should be considered authoritative, or fresh - that is, if it is known |
| 67 | + // to be the latest data. Data from a baked in file for example would not be considered refreshed. The purpose |
| 68 | + // of this is to know if we should commit data to the persistentStore. For example, if we initialize with "stale" |
| 69 | + // data from a local file (refreshed=false), we may not want to pollute a connected Redis database with it. |
| 70 | + // TODO: this could also be called "Authoritative". "It was the latest at some point.. that point being when we asked |
| 71 | + // if it was the latest". |
| 72 | + availability DataAvailability |
| 73 | + |
| 74 | + // Protects the refreshed, persistentStore, persistentStoreMode, and active fields. |
| 75 | + mu sync.RWMutex |
| 76 | + |
| 77 | + loggers ldlog.Loggers |
| 78 | +} |
| 79 | + |
| 80 | +// NewStore creates a new store. By default the store is in-memory. To add a persistent store, call SwapToPersistent. Ensure this is |
| 81 | +// called at configuration time, only once and before the store is ever accessed. |
| 82 | +func NewStore(loggers ldlog.Loggers) *Store { |
| 83 | + s := &Store{ |
| 84 | + persistentStore: nil, |
| 85 | + persistentStoreMode: subsystems.DataStoreModeRead, |
| 86 | + memoryStore: datastore.NewInMemoryDataStore(loggers), |
| 87 | + availability: Defaults, |
| 88 | + loggers: loggers, |
| 89 | + } |
| 90 | + s.active = s.memoryStore |
| 91 | + return s |
| 92 | +} |
| 93 | + |
| 94 | +// Close closes the store. If there is a persistent store configured, it will be closed. |
| 95 | +func (s *Store) Close() error { |
| 96 | + s.mu.Lock() |
| 97 | + defer s.mu.Unlock() |
| 98 | + if s.persistentStore != nil { |
| 99 | + return s.persistentStore.Close() |
| 100 | + } |
| 101 | + return nil |
| 102 | +} |
| 103 | + |
| 104 | +// GetActive returns the active store, either persistent or in-memory. If there is no persistent store configured, |
| 105 | +// the in-memory store is always active. |
| 106 | +func (s *Store) getActive() subsystems.DataStore { |
| 107 | + s.mu.RLock() |
| 108 | + defer s.mu.RUnlock() |
| 109 | + return s.active |
| 110 | +} |
| 111 | + |
| 112 | +// DataAvailability returns the status of the store's data. Defaults means there is no data, Cached means there is |
| 113 | +// data, but it's not guaranteed to be recent, and Refreshed means the data has been refreshed from the server. |
| 114 | +func (s *Store) DataAvailability() DataAvailability { |
| 115 | + s.mu.RLock() |
| 116 | + defer s.mu.RUnlock() |
| 117 | + return s.availability |
| 118 | +} |
| 119 | + |
| 120 | +// Mirroring returns true data is being mirrored to a persistent store. |
| 121 | +func (s *Store) mirroring() bool { |
| 122 | + return s.persistentStore != nil && s.persistentStoreMode == subsystems.DataStoreModeReadWrite |
| 123 | +} |
| 124 | + |
| 125 | +// nolint:revive // Standard DataSourceUpdateSink method |
| 126 | +func (s *Store) Init(allData []ldstoretypes.Collection, payloadVersion *int) bool { |
| 127 | + s.mu.Lock() |
| 128 | + defer s.mu.Unlock() |
| 129 | + |
| 130 | + // TXNS-PS: Requirement 1.3.3, must apply updates to in-memory before the persistent Store. |
| 131 | + // TODO: handle errors from initializing the memory or persistent stores. |
| 132 | + if err := s.memoryStore.Init(allData); err == nil { |
| 133 | + s.active = s.memoryStore |
| 134 | + if payloadVersion != nil { |
| 135 | + s.availability = Refreshed |
| 136 | + } else { |
| 137 | + s.availability = Cached |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + if s.mirroring() { |
| 142 | + _ = s.persistentStore.Init(allData) // TODO: insert in topo-sort order |
| 143 | + } |
| 144 | + return true |
| 145 | +} |
| 146 | + |
| 147 | +// nolint:revive // Standard DataSourceUpdateSink method |
| 148 | +func (s *Store) Upsert(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) bool { |
| 149 | + s.mu.RLock() |
| 150 | + defer s.mu.RUnlock() |
| 151 | + |
| 152 | + var ( |
| 153 | + memErr error |
| 154 | + persErr error |
| 155 | + ) |
| 156 | + |
| 157 | + // TXNS-PS: Requirement 1.3.3, must apply updates to in-memory before the persistent store. |
| 158 | + _, memErr = s.memoryStore.Upsert(kind, key, item) |
| 159 | + |
| 160 | + if s.mirroring() { |
| 161 | + _, persErr = s.persistentStore.Upsert(kind, key, item) |
| 162 | + } |
| 163 | + return memErr == nil && persErr == nil |
| 164 | +} |
| 165 | + |
| 166 | +// nolint:revive // Standard DataSourceUpdateSink method |
| 167 | +func (s *Store) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider { |
| 168 | + s.mu.RLock() |
| 169 | + defer s.mu.RUnlock() |
| 170 | + return s.persistentStoreStatusProvider |
| 171 | +} |
| 172 | + |
| 173 | +// WithPersistence exists only because of the way the SDK's configuration builders work - we need a ClientContext |
| 174 | +// before we can call Build to actually get the persistent store. That ClientContext requires the |
| 175 | +// DataStoreUpdateSink, which is what this store struct implements. |
| 176 | +func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems.DataStoreMode, statusProvider interfaces.DataStoreStatusProvider) *Store { |
| 177 | + s.mu.Lock() |
| 178 | + defer s.mu.Unlock() |
| 179 | + s.persistentStore = persistent |
| 180 | + s.persistentStoreMode = mode |
| 181 | + s.persistentStoreStatusProvider = statusProvider |
| 182 | + s.active = s.persistentStore |
| 183 | + |
| 184 | + if s.persistentStore.IsInitialized() { |
| 185 | + s.availability = Cached |
| 186 | + } else { |
| 187 | + s.availability = Defaults |
| 188 | + } |
| 189 | + return s |
| 190 | +} |
| 191 | + |
| 192 | +func (s *Store) Commit() error { |
| 193 | + s.mu.RLock() |
| 194 | + defer s.mu.RUnlock() |
| 195 | + |
| 196 | + // Note: DataAvailability() will also take a read lock. |
| 197 | + if s.availability == Refreshed && s.mirroring() { |
| 198 | + flags, err := s.memoryStore.GetAll(datakinds.Features) |
| 199 | + if err != nil { |
| 200 | + return err |
| 201 | + } |
| 202 | + segments, err := s.memoryStore.GetAll(datakinds.Segments) |
| 203 | + if err != nil { |
| 204 | + return err |
| 205 | + } |
| 206 | + return s.persistentStore.Init([]ldstoretypes.Collection{ |
| 207 | + {Kind: datakinds.Features, Items: flags}, |
| 208 | + {Kind: datakinds.Segments, Items: segments}, |
| 209 | + }) |
| 210 | + } |
| 211 | + return nil |
| 212 | +} |
| 213 | + |
| 214 | +func (s *Store) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) { |
| 215 | + return s.getActive().GetAll(kind) |
| 216 | +} |
| 217 | + |
| 218 | +func (s *Store) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDescriptor, error) { |
| 219 | + return s.getActive().Get(kind, key) |
| 220 | +} |
| 221 | + |
| 222 | +func (s *Store) IsInitialized() bool { |
| 223 | + return s.getActive().IsInitialized() |
| 224 | +} |
0 commit comments