Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add fdv2 store & update sources to use fdv2 protocol definitions #192

Merged
merged 40 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5f6af74
chore: add fdv2 store
cwaldren-ld Sep 17, 2024
941fee4
remove old comment
cwaldren-ld Sep 18, 2024
d11e039
introduce DataQuality enum
cwaldren-ld Sep 18, 2024
4d03305
add SetPersist
cwaldren-ld Sep 19, 2024
24f380e
use persist concept
cwaldren-ld Sep 19, 2024
097429e
make store implement FDv2 protocol
cwaldren-ld Sep 20, 2024
c72dd6d
remove old data destination interface
cwaldren-ld Sep 20, 2024
c530451
revert chnages to datasource/streaming_data_source_events.go
cwaldren-ld Sep 21, 2024
01aa57b
use fdv2 types in streaming data source event handler
cwaldren-ld Sep 21, 2024
aa67ce4
refactor concurrency tests
cwaldren-ld Sep 21, 2024
2a54265
optimize comments on store.go
cwaldren-ld Sep 21, 2024
76da39e
add dump method to memory store for atomically getting segments and f…
cwaldren-ld Sep 21, 2024
f4772f6
inMemoryStore tests for SetBasis/ApplyDelta
cwaldren-ld Sep 23, 2024
521c1d3
add new memorystorev2 package
cwaldren-ld Sep 23, 2024
0d2f8e9
update tests
cwaldren-ld Sep 23, 2024
d853f88
more unit tests
cwaldren-ld Sep 23, 2024
3e5a64d
chore: introduce memorystorev2
cwaldren-ld Sep 24, 2024
84b493c
benchmarks
cwaldren-ld Sep 24, 2024
6432563
lints
cwaldren-ld Sep 24, 2024
746c1a8
use defer pattern for locks
cwaldren-ld Sep 24, 2024
b0c8a7c
make element matcher consistent
cwaldren-ld Sep 24, 2024
eaab968
doc tweaks
cwaldren-ld Sep 24, 2024
fd652d1
Merge branch 'cw/sdk-709/memory-store' into cw/sdk-557/fdv2-store
cwaldren-ld Sep 24, 2024
b755ed9
add better collection matcher
cwaldren-ld Sep 24, 2024
c54fff0
Merge branch 'v7' into cw/sdk-557/fdv2-store
cwaldren-ld Sep 24, 2024
b4ec701
make Selector a pointer type
cwaldren-ld Sep 24, 2024
924722a
cleanup persistent store insertion logic
cwaldren-ld Sep 24, 2024
84c5ef0
lints
cwaldren-ld Sep 24, 2024
f75bc04
more lints
cwaldren-ld Sep 25, 2024
fa2b9e6
Merge branch 'v7' into cw/sdk-557/fdv2-store
cwaldren-ld Sep 25, 2024
024c0df
attempt to suppress TODO lint failures
cwaldren-ld Sep 25, 2024
6ac4d74
move store.init() into store.SetBasis
cwaldren-ld Sep 26, 2024
f72ebc0
no need to inspect error from ApplyDelta/SetBasis
cwaldren-ld Sep 26, 2024
ecfef44
remove old TODO
cwaldren-ld Sep 26, 2024
39478ba
refactor: more fdv2 protocol types + usage in data sources (#198)
cwaldren-ld Oct 1, 2024
e2a9ce3
Merge branch 'v7' into cw/sdk-557/fdv2-store
cwaldren-ld Oct 1, 2024
8809b4e
refactor: insert items into fdv2 store in dependency order (#197)
cwaldren-ld Oct 8, 2024
b1ce858
code review: fixing comments
cwaldren-ld Oct 8, 2024
e5bb240
lints
cwaldren-ld Oct 8, 2024
e6ddefe
remove circular dependency in toposort
cwaldren-ld Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions internal/datasource/streaming_data_source_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
deleteDataRequiredProperties = []string{"path", "version"} //nolint:gochecknoglobals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a reversion of recent changes that made these public (for use in datasourcev2), but are no longer necessary.

)

// PutData is the logical representation of the data in the "put" event. In the JSON representation,
// putData is the logical representation of the data in the "put" event. In the JSON representation,
// the "data" property is actually a map of maps, but the schema we use internally is a list of
// lists instead.
//
Expand All @@ -37,12 +37,12 @@ var (
// }
// }
// }
type PutData struct {
type putData struct {
Path string // we don't currently do anything with this
Data []ldstoretypes.Collection
}

// PatchData is the logical representation of the data in the "patch" event. In the JSON representation,
// patchData is the logical representation of the data in the "patch" event. In the JSON representation,
// there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into
// Kind and Key when we parse it. The "data" property is the JSON representation of the flag or
// segment, which we deserialize into an ItemDescriptor.
Expand All @@ -56,13 +56,13 @@ type PutData struct {
// "version": 2, ...etc.
// }
// }
type PatchData struct {
type patchData struct {
Kind ldstoretypes.DataKind
Key string
Data ldstoretypes.ItemDescriptor
}

// DeleteData is the logical representation of the data in the "delete" event. In the JSON representation,
// deleteData is the logical representation of the data in the "delete" event. In the JSON representation,
// there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into
// Kind and Key when we parse it.
//
Expand All @@ -72,14 +72,14 @@ type PatchData struct {
// "path": "/flags/flagkey",
// "version": 3
// }
type DeleteData struct {
type deleteData struct {
Kind ldstoretypes.DataKind
Key string
Version int
}

func parsePutData(data []byte) (PutData, error) {
var ret PutData
func parsePutData(data []byte) (putData, error) {
var ret putData
r := jreader.NewReader(data)
for obj := r.Object().WithRequiredProperties(putDataRequiredProperties); obj.Next(); {
switch string(obj.Name()) {
Expand All @@ -92,15 +92,15 @@ func parsePutData(data []byte) (PutData, error) {
return ret, r.Error()
}

func parsePatchData(data []byte) (PatchData, error) {
var ret PatchData
func parsePatchData(data []byte) (patchData, error) {
var ret patchData
r := jreader.NewReader(data)
var kind datakinds.DataKindInternal
var key string
parseItem := func() (PatchData, error) {
parseItem := func() (patchData, error) {
item, err := kind.DeserializeFromJSONReader(&r)
if err != nil {
return PatchData{}, err
return patchData{}, err
}
ret.Data = item
return ret, nil
Expand All @@ -126,7 +126,7 @@ func parsePatchData(data []byte) (PatchData, error) {
}
}
if err := r.Error(); err != nil {
return PatchData{}, err
return patchData{}, err
}
// If we got here, it means we couldn't parse the data model object yet because we saw the
// "data" property first. But we definitely saw both properties (otherwise we would've got
Expand All @@ -138,13 +138,13 @@ func parsePatchData(data []byte) (PatchData, error) {
}
}
if r.Error() != nil {
return PatchData{}, r.Error()
return patchData{}, r.Error()
}
return PatchData{}, errors.New("patch event had no data property")
return patchData{}, errors.New("patch event had no data property")
}

func parseDeleteData(data []byte) (DeleteData, error) {
var ret DeleteData
func parseDeleteData(data []byte) (deleteData, error) {
var ret deleteData
r := jreader.NewReader(data)
for obj := r.Object().WithRequiredProperties(deleteDataRequiredProperties); obj.Next(); {
switch string(obj.Name()) {
Expand All @@ -161,7 +161,7 @@ func parseDeleteData(data []byte) (DeleteData, error) {
}
}
if r.Error() != nil {
return DeleteData{}, r.Error()
return deleteData{}, r.Error()
}
return ret, nil
}
Expand Down
29 changes: 18 additions & 11 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
)

Expand All @@ -18,10 +18,10 @@ const (
pollingWillRetryMessage = "will retry at next scheduled poll interval"
)

// Requester allows PollingProcessor to delegate fetching data to another component.
cwaldren-ld marked this conversation as resolved.
Show resolved Hide resolved
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type Requester interface {
Request() (data []ldstoretypes.Collection, cached bool, err error)
type PollingRequester interface {
Request() (*PollingResponse, error)
BaseURI() string
FilterKey() string
}
Expand All @@ -34,7 +34,7 @@ type Requester interface {
type PollingProcessor struct {
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
requester Requester
requester PollingRequester
pollInterval time.Duration
loggers ldlog.Loggers
setInitializedOnce sync.Once
Expand All @@ -58,7 +58,7 @@ func newPollingProcessor(
context subsystems.ClientContext,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
requester Requester,
requester PollingRequester,
pollInterval time.Duration,
) *PollingProcessor {
pp := &PollingProcessor{
Expand Down Expand Up @@ -142,16 +142,23 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
}

func (pp *PollingProcessor) poll() error {
allData, cached, err := pp.requester.Request()
response, err := pp.requester.Request()

if err != nil {
return err
}

// We initialize the store only if the request wasn't cached
if !cached {
pp.dataDestination.Init(allData, nil)
if response.Cached() {
return nil
}

cwaldren-ld marked this conversation as resolved.
Show resolved Hide resolved
switch response.Intent() {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(response.Events(), response.Selector(), true)
case fdv2proto.IntentTransferChanges:
pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true)
}

return nil
}

Expand Down
122 changes: 101 additions & 21 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"net/http"
"net/url"

es "github.com/launchdarkly/eventsource"
"github.com/launchdarkly/go-jsonstream/v3/jreader"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
Expand Down Expand Up @@ -68,42 +69,121 @@ func (r *pollingRequester) BaseURI() string {
func (r *pollingRequester) FilterKey() string {
return r.filterKey
}
func (r *pollingRequester) Request() ([]ldstoretypes.Collection, bool, error) {

func (r *pollingRequester) Request() (*PollingResponse, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}

body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
if err != nil {
return nil, false, err
return nil, err
}
if cached {
return nil, true, nil
return NewCachedPollingResponse(), nil
}

var payload pollingPayload
var payload fdv2proto.PollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
return nil, false, malformedJSONError{err}
return nil, malformedJSONError{err}
}

esEvents := make([]es.Event, 0, len(payload.Events))
for _, event := range payload.Events {
esEvents = append(esEvents, event)
parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) {
dataKind, err := kind.ToFDV1()
if err != nil {
return ldstoretypes.ItemDescriptor{}, err
}
item, err := dataKind.DeserializeFromJSONReader(&r)
return item, err
}

data, err := convertChangesetEventsToPutData(esEvents)
if err != nil {
return nil, false, malformedJSONError{err}
} else if len(data) != 1 {
return nil, false, malformedJSONError{errors.New("missing expected put event")}
}
updates := make([]fdv2proto.Event, 0, len(payload.Events))

putData, ok := data[0].(datasource.PutData)
if !ok {
return nil, false, malformedJSONError{errors.New("payload is not a PutData")}
}
var intentCode fdv2proto.IntentCode

return putData.Data, cached, nil
for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
{
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
} else if len(serverIntent.Payloads) == 0 {
return nil, errors.New("server-intent event has no payloads")
}

intentCode = serverIntent.Payloads[0].Code
if intentCode == fdv2proto.IntentNone {
return NewCachedPollingResponse(), nil
}
}
case fdv2proto.EventPutObject:
{
r := jreader.NewReader(event.Data)

var (
key string
kind fdv2proto.ObjectKind
item ldstoretypes.ItemDescriptor
err error
version int
)

for obj := r.Object().WithRequiredProperties([]string{
versionField, kindField, keyField, objectField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
case keyField:
key = r.String()
case objectField:
item, err = parseItem(r, kind)
if err != nil {
return nil, err
}
}
}
updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version})
}
case fdv2proto.EventDeleteObject:
{
r := jreader.NewReader(event.Data)

var (
version int
kind fdv2proto.ObjectKind
key string
)

for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
//nolint:godox
// TODO: An unrecognized kind should be ignored for forwards compat; the question is,
// do we throw out the DeleteObject here, or let the SDK's store handle it?
case keyField:
key = r.String()
}
}
updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version})
}
case fdv2proto.EventPayloadTransferred:
//nolint:godox
// TODO: deserialize the state and create a fdv2proto.Selector.
}
}

if intentCode == "" {
return nil, errors.New("no server-intent event found in polling response")
}

return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
Expand Down
48 changes: 48 additions & 0 deletions internal/datasourcev2/polling_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package datasourcev2

import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

// PollingResponse represents the result of a polling request.
type PollingResponse struct {
events []fdv2proto.Event
cached bool
intent fdv2proto.IntentCode
cwaldren-ld marked this conversation as resolved.
Show resolved Hide resolved
selector *fdv2proto.Selector
}

// NewCachedPollingResponse indicates that the response has not changed.
func NewCachedPollingResponse() *PollingResponse {
return &PollingResponse{
cached: true,
}
}

// NewPollingResponse indicates that data was received.
func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event,
selector *fdv2proto.Selector) *PollingResponse {
return &PollingResponse{
events: events,
intent: intent,
selector: selector,
}
}

// Events returns the events in the response.
func (p *PollingResponse) Events() []fdv2proto.Event {
return p.events
}

// Cached returns true if the response was cached, meaning data has not changed.
func (p *PollingResponse) Cached() bool {
return p.cached
}

// Intent returns the server intent code of the response.
func (p *PollingResponse) Intent() fdv2proto.IntentCode {
return p.intent
}

// Selector returns the Selector of the response.
func (p *PollingResponse) Selector() *fdv2proto.Selector {
return p.selector
}
Loading