From c3d089a284f76469a2486032fb42f3c4f650c986 Mon Sep 17 00:00:00 2001 From: df-wg Date: Tue, 18 Feb 2025 19:20:15 +0200 Subject: [PATCH] fix(router): send graphql closing boundary to fit Apollo client (#1579) Co-authored-by: Ludwig Co-authored-by: Alessandro Pagnin --- router-tests/events/nats_events_test.go | 138 ++++++++++++++++++ router/core/errors.go | 2 +- router/core/flushwriter.go | 51 +++++-- router/core/graph_server.go | 4 + router/core/graphql_handler.go | 16 +- router/core/router.go | 1 + router/pkg/config/config.go | 19 ++- router/pkg/config/config.schema.json | 11 ++ .../pkg/config/testdata/config_defaults.json | 3 + router/pkg/config/testdata/config_full.json | 3 + 10 files changed, 220 insertions(+), 28 deletions(-) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index aa7d8a9bc9..9f3f672243 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -501,6 +501,144 @@ func TestNatsEvents(t *testing.T) { } }) }) + t.Run("subscribe after message don't a boundary", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + RouterOptions: []core.Option{ + core.WithApolloCompatibilityFlagsConfig(config.ApolloCompatibilityFlags{ + SubscriptionMultipartPrintBoundary: config.ApolloCompatibilitySubscriptionMultipartPrintBoundary{ + Enabled: false, + }, + }), + }, + EnableNats: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + + subscribePayload := []byte(`{"query":"subscription { countFor(count: 0) }"}`) + + var done atomic.Bool + + go func() { + defer done.Store(true) + + client := http.Client{} + req := xEnv.MakeGraphQLMultipartRequest(http.MethodPost, bytes.NewReader(subscribePayload)) + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + + // Read the first part + + expected := "\r\n--graphql\nContent-Type: application/json\r\n\r\n{\"payload\":{\"data\":{\"countFor\":0}}}\n" + read := make([]byte, len(expected)) + _, err = reader.Read(read) + assert.NoError(t, err) + assert.Equal(t, expected, string(read)) + }() + + xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) + require.Eventually(t, done.Load, NatsWaitTimeout, time.Millisecond*100) + }) + }) + }) + + t.Run("multipart with apollo compatibility", func(t *testing.T) { + t.Parallel() + + t.Run("subscribe after message add a boundary", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + RouterOptions: []core.Option{ + core.WithApolloCompatibilityFlagsConfig(config.ApolloCompatibilityFlags{ + SubscriptionMultipartPrintBoundary: config.ApolloCompatibilitySubscriptionMultipartPrintBoundary{ + Enabled: true, + }, + }), + }, + EnableNats: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + + subscribePayload := []byte(`{"query":"subscription { countFor(count: 0) }"}`) + + var done atomic.Bool + + go func() { + defer done.Store(true) + + client := http.Client{} + req := xEnv.MakeGraphQLMultipartRequest(http.MethodPost, bytes.NewReader(subscribePayload)) + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + + // Read the first part + + expected := "\r\n--graphql\nContent-Type: application/json\r\n\r\n{\"payload\":{\"data\":{\"countFor\":0}}}\n\r\n--graphql" + read := make([]byte, len(expected)) + _, err = reader.Read(read) + assert.NoError(t, err) + assert.Equal(t, expected, string(read)) + }() + + xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) + require.Eventually(t, done.Load, NatsWaitTimeout, time.Millisecond*100) + }) + }) + + t.Run("subscribe with closing channel", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + RouterOptions: []core.Option{ + core.WithApolloCompatibilityFlagsConfig(config.ApolloCompatibilityFlags{ + SubscriptionMultipartPrintBoundary: config.ApolloCompatibilitySubscriptionMultipartPrintBoundary{ + Enabled: true, + }, + }), + }, + EnableNats: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + + subscribePayload := []byte(`{"query":"subscription { countFor(count: 3) }"}`) + + var done atomic.Bool + + go func() { + defer done.Store(true) + + client := http.Client{} + req := xEnv.MakeGraphQLMultipartRequest(http.MethodPost, bytes.NewReader(subscribePayload)) + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + + // Read the first part + assertMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"countFor\":0}}}") + assertMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"countFor\":1}}}") + assertMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"countFor\":2}}}") + assertMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"countFor\":3}}}") + assertLineEquals(t, reader, "") + assertLineEquals(t, reader, "--graphql--") + }() + + xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) + require.Eventually(t, done.Load, NatsWaitTimeout, time.Millisecond*100) + }) + }) }) t.Run("subscribe once", func(t *testing.T) { diff --git a/router/core/errors.go b/router/core/errors.go index 966aebaaa5..d933a2abc5 100644 --- a/router/core/errors.go +++ b/router/core/errors.go @@ -242,7 +242,7 @@ func writeRequestErrors(r *http.Request, w http.ResponseWriter, statusCode int, // writeMultipartError writes the error response in a multipart format with proper boundaries and headers. func writeMultipartError(w http.ResponseWriter, requestErrors graphqlerrors.RequestErrors, requestLogger *zap.Logger) error { // Start with the multipart boundary - prefix := GetWriterPrefix(false, true) + prefix := GetWriterPrefix(false, true, true) if _, err := w.Write([]byte(prefix)); err != nil { return err } diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index 432bdb2d08..1134bea79a 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -3,13 +3,14 @@ package core import ( "bytes" "context" - "github.com/wundergraph/astjson" - "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "io" "mime" "net/http" "strconv" "strings" + + "github.com/wundergraph/astjson" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) const ( @@ -22,6 +23,7 @@ const ( sseMimeType = "text/event-stream" heartbeat = "{}" multipartContent = multipartMime + "; boundary=" + multipartBoundary + multipartStart = "\r\n--" + multipartBoundary ) type HttpFlushWriter struct { @@ -33,6 +35,10 @@ type HttpFlushWriter struct { sse bool multipart bool buf *bytes.Buffer + firstMessage bool + // apolloSubscriptionMultipartPrintBoundary if set to true will send the multipart boundary at the end of the message to allow + // misbehaving client (like apollo client) to read the message just sent before the next one or the heartbeat + apolloSubscriptionMultipartPrintBoundary bool } func (f *HttpFlushWriter) Complete() { @@ -43,7 +49,11 @@ func (f *HttpFlushWriter) Complete() { _, _ = f.writer.Write([]byte("event: complete")) } else if f.multipart { // Write the final boundary in the multipart response - _, _ = f.writer.Write([]byte("--" + multipartBoundary + "--\n")) + if f.apolloSubscriptionMultipartPrintBoundary { + _, _ = f.writer.Write([]byte("--\n")) + } else { + _, _ = f.writer.Write([]byte("--" + multipartBoundary + "--\n")) + } } f.Close() } @@ -72,7 +82,10 @@ func (f *HttpFlushWriter) Flush() (err error) { resp := f.buf.Bytes() f.buf.Reset() - flushBreak := GetWriterPrefix(f.sse, f.multipart) + flushBreak := GetWriterPrefix(f.sse, f.multipart, !f.apolloSubscriptionMultipartPrintBoundary || f.firstMessage) + if f.firstMessage { + f.firstMessage = false + } if f.multipart && len(resp) > 0 { var err error resp, err = wrapMultipartMessage(resp) @@ -83,7 +96,11 @@ func (f *HttpFlushWriter) Flush() (err error) { separation := "\n\n" if f.multipart { - separation = "\n" + if !f.apolloSubscriptionMultipartPrintBoundary { + separation = "\n" + } else { + separation = "\n" + multipartStart + } } else if f.subscribeOnce { separation = "" } @@ -100,7 +117,7 @@ func (f *HttpFlushWriter) Flush() (err error) { return nil } -func GetSubscriptionResponseWriter(ctx *resolve.Context, r *http.Request, w http.ResponseWriter) (*resolve.Context, resolve.SubscriptionResponseWriter, bool) { +func GetSubscriptionResponseWriter(ctx *resolve.Context, r *http.Request, w http.ResponseWriter, apolloSubscriptionMultipartPrintBoundary bool) (*resolve.Context, resolve.SubscriptionResponseWriter, bool) { type withFlushWriter interface { SubscriptionResponseWriter() resolve.SubscriptionResponseWriter } @@ -119,12 +136,14 @@ func GetSubscriptionResponseWriter(ctx *resolve.Context, r *http.Request, w http flusher.Flush() flushWriter := &HttpFlushWriter{ - writer: w, - flusher: flusher, - sse: wgParams.UseSse, - multipart: wgParams.UseMultipart, - subscribeOnce: wgParams.SubscribeOnce, - buf: &bytes.Buffer{}, + writer: w, + flusher: flusher, + sse: wgParams.UseSse, + multipart: wgParams.UseMultipart, + subscribeOnce: wgParams.SubscribeOnce, + buf: &bytes.Buffer{}, + firstMessage: true, + apolloSubscriptionMultipartPrintBoundary: apolloSubscriptionMultipartPrintBoundary, } flushWriter.ctx, flushWriter.cancel = context.WithCancel(ctx.Context()) @@ -231,12 +250,16 @@ type SubscriptionParams struct { UseMultipart bool } -func GetWriterPrefix(sse bool, multipart bool) string { +func GetWriterPrefix(sse bool, multipart bool, firstMessage bool) string { flushBreak := "" if sse { flushBreak = "event: next\ndata: " } else if multipart { - flushBreak = "\r\n--" + multipartBoundary + "\nContent-Type: " + jsonContent + "\r\n\r\n" + messageStart := "" + if firstMessage { + messageStart = multipartStart + } + flushBreak = messageStart + "\nContent-Type: " + jsonContent + "\r\n\r\n" } return flushBreak diff --git a/router/core/graph_server.go b/router/core/graph_server.go index e4a726a749..17fe507cda 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -1015,6 +1015,10 @@ func (s *graphServer) buildGraphMux(ctx context.Context, } } + if s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled { + handlerOpts.ApolloSubscriptionMultipartPrintBoundary = s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled + } + graphqlHandler := NewGraphQLHandler(handlerOpts) executor.Resolver.SetAsyncErrorWriter(graphqlHandler) diff --git a/router/core/graphql_handler.go b/router/core/graphql_handler.go index 8197c02f5d..266c955ca1 100644 --- a/router/core/graphql_handler.go +++ b/router/core/graphql_handler.go @@ -77,6 +77,7 @@ type HandlerOptions struct { RateLimitConfig *config.RateLimitConfiguration SubgraphErrorPropagation config.SubgraphErrorPropagationConfiguration EngineLoaderHooks resolve.LoaderHooks + ApolloSubscriptionMultipartPrintBoundary bool } func NewGraphQLHandler(opts HandlerOptions) *GraphQLHandler { @@ -92,11 +93,12 @@ func NewGraphQLHandler(opts HandlerOptions) *GraphQLHandler { "wundergraph/cosmo/router/graphql_handler", trace.WithInstrumentationVersion("0.0.1"), ), - authorizer: opts.Authorizer, - rateLimiter: opts.RateLimiter, - rateLimitConfig: opts.RateLimitConfig, - subgraphErrorPropagation: opts.SubgraphErrorPropagation, - engineLoaderHooks: opts.EngineLoaderHooks, + authorizer: opts.Authorizer, + rateLimiter: opts.RateLimiter, + rateLimitConfig: opts.RateLimitConfig, + subgraphErrorPropagation: opts.SubgraphErrorPropagation, + engineLoaderHooks: opts.EngineLoaderHooks, + apolloSubscriptionMultipartPrintBoundary: opts.ApolloSubscriptionMultipartPrintBoundary, } return graphQLHandler } @@ -127,6 +129,8 @@ type GraphQLHandler struct { enablePersistedOperationCacheResponseHeader bool enableNormalizationCacheResponseHeader bool enableResponseHeaderPropagation bool + + apolloSubscriptionMultipartPrintBoundary bool } func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -191,7 +195,7 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.setDebugCacheHeaders(w, requestContext.operation) defer propagateSubgraphErrors(ctx) - ctx, writer, ok = GetSubscriptionResponseWriter(ctx, r, w) + ctx, writer, ok = GetSubscriptionResponseWriter(ctx, r, w, h.apolloSubscriptionMultipartPrintBoundary) if !ok { requestContext.logger.Error("unable to get subscription response writer", zap.Error(errCouldNotFlushResponse)) trackFinalResponseError(r.Context(), errCouldNotFlushResponse) diff --git a/router/core/router.go b/router/core/router.go index 0dfafca68b..221b9752f8 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1824,6 +1824,7 @@ func WithApolloCompatibilityFlagsConfig(cfg config.ApolloCompatibilityFlags) Opt cfg.ReplaceUndefinedOpFieldErrors.Enabled = true cfg.ReplaceInvalidVarErrors.Enabled = true cfg.ReplaceValidationErrorStatus.Enabled = true + cfg.SubscriptionMultipartPrintBoundary.Enabled = true } r.apolloCompatibilityFlags = cfg } diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index a35f08ae16..de7c728384 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -756,13 +756,14 @@ type AccessLogsSubgraphsConfig struct { } type ApolloCompatibilityFlags struct { - EnableAll bool `yaml:"enable_all" envDefault:"false" env:"APOLLO_COMPATIBILITY_ENABLE_ALL"` - ValueCompletion ApolloCompatibilityValueCompletion `yaml:"value_completion"` - TruncateFloats ApolloCompatibilityTruncateFloats `yaml:"truncate_floats"` - SuppressFetchErrors ApolloCompatibilitySuppressFetchErrors `yaml:"suppress_fetch_errors"` - ReplaceUndefinedOpFieldErrors ApolloCompatibilityReplaceUndefinedOpFieldErrors `yaml:"replace_undefined_op_field_errors"` - ReplaceInvalidVarErrors ApolloCompatibilityReplaceInvalidVarErrors `yaml:"replace_invalid_var_errors"` - ReplaceValidationErrorStatus ApolloCompatibilityReplaceValidationErrorStatus `yaml:"replace_validation_error_status"` + EnableAll bool `yaml:"enable_all" envDefault:"false" env:"APOLLO_COMPATIBILITY_ENABLE_ALL"` + ValueCompletion ApolloCompatibilityValueCompletion `yaml:"value_completion"` + TruncateFloats ApolloCompatibilityTruncateFloats `yaml:"truncate_floats"` + SuppressFetchErrors ApolloCompatibilitySuppressFetchErrors `yaml:"suppress_fetch_errors"` + ReplaceUndefinedOpFieldErrors ApolloCompatibilityReplaceUndefinedOpFieldErrors `yaml:"replace_undefined_op_field_errors"` + ReplaceInvalidVarErrors ApolloCompatibilityReplaceInvalidVarErrors `yaml:"replace_invalid_var_errors"` + ReplaceValidationErrorStatus ApolloCompatibilityReplaceValidationErrorStatus `yaml:"replace_validation_error_status"` + SubscriptionMultipartPrintBoundary ApolloCompatibilitySubscriptionMultipartPrintBoundary `yaml:"subscription_multipart_print_boundary"` } type ApolloCompatibilityValueCompletion struct { @@ -794,6 +795,10 @@ type ApolloCompatibilityReplaceValidationErrorStatus struct { Enabled bool `yaml:"enabled" envDefault:"false" env:"APOLLO_COMPATIBILITY_REPLACE_VALIDATION_ERROR_STATUS_ENABLED"` } +type ApolloCompatibilitySubscriptionMultipartPrintBoundary struct { + Enabled bool `yaml:"enabled" envDefault:"false" env:"APOLLO_COMPATIBILITY_SUBSCRIPTION_MULTIPART_PRINT_BOUNDARY_ENABLED"` +} + type ApolloRouterCompatibilityFlags struct { ReplaceInvalidVarErrors ApolloRouterCompatibilityReplaceInvalidVarErrors `yaml:"replace_invalid_var_errors"` SubrequestHTTPError ApolloRouterCompatibilitySubrequestHTTPError `yaml:"subrequest_http_error"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 0e943989b2..1e2bbf668f 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2393,6 +2393,17 @@ "default": false } } + }, + "subscription_multipart_print_boundary": { + "type": "object", + "description": "Prints the multipart boundary right after the message in multipart subscriptions. Without this flag, the Apollo client wouldn’t parse a message until the next one is pushed.", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "default": false + } + } } } }, diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 0252f7cffb..187b110953 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -411,6 +411,9 @@ }, "ReplaceValidationErrorStatus": { "Enabled": false + }, + "SubscriptionMultipartPrintBoundary": { + "Enabled": false } }, "ApolloRouterCompatibilityFlags": { diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 47109b49f8..29b2fe2130 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -698,6 +698,9 @@ }, "ReplaceValidationErrorStatus": { "Enabled": false + }, + "SubscriptionMultipartPrintBoundary": { + "Enabled": false } }, "ApolloRouterCompatibilityFlags": {