diff --git a/pkg/dyninst/module/symdb.go b/pkg/dyninst/module/symdb.go index 22eb53bc5fac..d6aca8035d2e 100644 --- a/pkg/dyninst/module/symdb.go +++ b/pkg/dyninst/module/symdb.go @@ -113,7 +113,7 @@ func newSymdbManager( opts ...option, ) *symdbManager { cfg := symdbManagerConfig{ - maxBufferFuncs: 10000, + flushThresholdBytes: uploader.DefaultFlushThresholdBytes, } for _, opt := range opts { opt(&cfg) @@ -154,8 +154,8 @@ func newSymdbManager( } type symdbManagerConfig struct { - maxBufferFuncs int - testingKnobs struct { + flushThresholdBytes int + testingKnobs struct { onDeferUpload func() onUploadRejectedByPersistentCache func() onUploadQueued func(queuedUploadInfo) @@ -168,9 +168,9 @@ type symdbManagerConfig struct { type option func(config *symdbManagerConfig) -func withMaxBufferFuncs(maxBufferFuncs int) option { +func withFlushThresholdBytes(flushThresholdBytes int) option { return func(c *symdbManagerConfig) { - c.maxBufferFuncs = maxBufferFuncs + c.flushThresholdBytes = flushThresholdBytes } } @@ -623,36 +623,24 @@ func (m *symdbManager) performUpload( m.uploadURL.String(), procID.service, procID.version, runtimeID, ) - uploadBuffer := make([]uploader.Scope, 0, 100) - bufferFuncs := 0 uploadID := uuid.New() - batchNum := 0 + enc := sender.NewBatchEncoder(uploadID, uploader.WithFlushThreshold(m.cfg.flushThresholdBytes)) var totalPackages, totalFuncs int - // Flush every so often in order to not store too many scopes in memory. + // Flush whenever the compressed payload reaches the threshold, or on the + // final package, to avoid keeping too much in memory at once. maybeFlush := func(final bool) error { if ctx.Err() != nil { return context.Cause(ctx) } - - if len(uploadBuffer) == 0 { + if !final && !enc.ShouldFlush() { return nil } - if final || bufferFuncs >= m.cfg.maxBufferFuncs { - log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions. Final chunk: %t", len(uploadBuffer), bufferFuncs, final) - batchNum++ - err := sender.UploadBatch(ctx, - uploader.UploadInfo{ - UploadID: uploadID, - BatchNum: batchNum, - Final: final, - }, - uploadBuffer, - ) - if err != nil { + log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final) + if err := enc.Flush(ctx, final); err != nil { + if errors.Is(err, uploader.ErrUpload) { return uploadError{cause: err} } - uploadBuffer = uploadBuffer[:0] - bufferFuncs = 0 + return err } return nil } @@ -667,10 +655,11 @@ func (m *symdbManager) performUpload( } scope := uploader.ConvertPackageToScope(pkg.Package, version.AgentVersion) - uploadBuffer = append(uploadBuffer, scope) + if err := enc.AddScope(scope); err != nil { + return fmt.Errorf("failed to encode scope for process %v: %w", procID.pid, err) + } totalPackages++ totalFuncs += pkg.Stats().NumFunctions - bufferFuncs += pkg.Stats().NumFunctions if err := maybeFlush(pkg.Final); err != nil { return err } @@ -680,7 +669,7 @@ func (m *symdbManager) performUpload( "(service: %s, version: %s, executable: %s):"+ " %d packages, %d functions, %d chunks in %v", procID.pid, procID.service, procID.version, executablePath, - totalPackages, totalFuncs, batchNum, time.Since(startTime)) + totalPackages, totalFuncs, enc.BatchCount(), time.Since(startTime)) return nil } diff --git a/pkg/dyninst/module/symdb_test.go b/pkg/dyninst/module/symdb_test.go index 059857685c94..4c02762d0bfe 100644 --- a/pkg/dyninst/module/symdb_test.go +++ b/pkg/dyninst/module/symdb_test.go @@ -165,10 +165,10 @@ func testSymdbManagerCancellation(t *testing.T, useStop bool) { symdbURL, object.NewInMemoryLoader(), "", /* cacheDir - no cache */ - // Use a small buffer (which will force a flush after every package) - // in order to have an opportunity to cancel the uploads in between - // flushes. - withMaxBufferFuncs(1), + // Use a flush threshold of 0 (which will force a flush after every + // package) in order to have an opportunity to cancel the uploads in + // between flushes. + withFlushThresholdBytes(0), ) t.Cleanup(manager.stop) // Create a dummy runtime ID diff --git a/pkg/dyninst/symdb/uploader/symdb.go b/pkg/dyninst/symdb/uploader/symdb.go index 7fecd22b6e02..bd863d48dc77 100644 --- a/pkg/dyninst/symdb/uploader/symdb.go +++ b/pkg/dyninst/symdb/uploader/symdb.go @@ -14,6 +14,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "mime/multipart" "net/http" @@ -26,6 +27,15 @@ import ( "github.com/google/uuid" ) +// DefaultFlushThresholdBytes is the default compressed-size threshold at which +// a BatchEncoder will flush an HTTP chunk. +const DefaultFlushThresholdBytes = 2 * 1024 * 1024 + +// ErrUpload is returned (wrapped) by BatchEncoder.Flush when the HTTP upload +// step fails. Callers can use errors.Is(err, ErrUpload) to distinguish +// upload-side failures (typically retryable) from local encoder errors. +var ErrUpload = errors.New("symdb upload failed") + // ScopeType represents the type of scope in the SymDB schema type ScopeType string @@ -119,61 +129,133 @@ func NewSymDBUploader( } } -// UploadInfo contains metadata about a batch of packages to be uploaded to -// SymDB. -type UploadInfo struct { - // UploadID identifies which logical upload this batch is part of: if packages - // are split into multiple batches because of size limits, they will all share - // the same uploadID. - UploadID uuid.UUID - // BatchNum is the number of the batch relative to the other batches in this - // upload. First batch has number 1. - BatchNum int - // Final is set if this is the final (or the only) batch of packages for - // this upload. - Final bool +// BatchEncoder streams Scope objects into a gzip-compressed SymDB JSON +// envelope and uploads chunks to the SymDB intake whenever the compressed +// payload reaches the configured threshold. A single BatchEncoder +// corresponds to one logical upload (one UploadID) and may emit multiple +// batches. +type BatchEncoder struct { + up *SymDBUploader + uploadID uuid.UUID + flushThreshold int + batchNum int + buf bytes.Buffer + gz *gzip.Writer + enc *json.Encoder + scopeCount int + prefixWritten bool } -// UploadBatch uploads a batch the symbols for a batch of packages to SymDB (via -// the trace-agent). -func (s *SymDBUploader) UploadBatch(ctx context.Context, info UploadInfo, packages []Scope) error { - // Wrap the data in an envelope expected by the debugger backend. - var buf bytes.Buffer - buf.WriteString(`{ -"service": "` + s.service + `", -"version": "` + s.version + `", -"language": "go", -"upload_id": "` + info.UploadID.String() + `", -"batch_num": ` + strconv.Itoa(info.BatchNum) + `, -"final": ` + strconv.FormatBool(info.Final) + `, -"scopes": `) +// BatchEncoderOption configures a BatchEncoder. +type BatchEncoderOption func(*BatchEncoder) - jsonBytes, err := json.Marshal(packages) - if err != nil { - return fmt.Errorf("failed to marshal scope: %w", err) +// WithFlushThreshold sets the compressed-size threshold (in bytes) at which +// ShouldFlush will return true. The threshold is soft: in-flight bytes inside +// the gzip writer's internal window may not yet be reflected in the buffer +// length, so the actual flushed payload may overshoot by up to the gzip +// window size (~32 KiB). +func WithFlushThreshold(bytes int) BatchEncoderOption { + return func(b *BatchEncoder) { + b.flushThreshold = bytes } - buf.Write(jsonBytes) - buf.WriteString("}") +} - if err := s.uploadInner(ctx, buf.Bytes()); err != nil { - return fmt.Errorf("failed to send individual SymDB: %w", err) +// NewBatchEncoder creates a BatchEncoder for a single logical upload. +func (s *SymDBUploader) NewBatchEncoder(uploadID uuid.UUID, opts ...BatchEncoderOption) *BatchEncoder { + b := &BatchEncoder{ + up: s, + uploadID: uploadID, + flushThreshold: DefaultFlushThresholdBytes, } + for _, opt := range opts { + opt(b) + } + b.gz = gzip.NewWriter(&b.buf) + b.enc = json.NewEncoder(b.gz) + b.enc.SetEscapeHTML(false) + return b +} +// AddScope writes a single Scope into the current batch's gzip stream. On the +// first call of a new batch, it also writes the JSON envelope prefix. +func (b *BatchEncoder) AddScope(scope Scope) error { + if !b.prefixWritten { + b.batchNum++ + prefix := `{"service":"` + b.up.service + + `","version":"` + b.up.version + + `","language":"go","upload_id":"` + b.uploadID.String() + + `","batch_num":` + strconv.Itoa(b.batchNum) + + `,"scopes":[` + if _, err := b.gz.Write([]byte(prefix)); err != nil { + return fmt.Errorf("failed to write envelope prefix: %w", err) + } + b.prefixWritten = true + } + if b.scopeCount > 0 { + if _, err := b.gz.Write([]byte{','}); err != nil { + return fmt.Errorf("failed to write scope separator: %w", err) + } + } + if err := b.enc.Encode(scope); err != nil { + return fmt.Errorf("failed to encode scope: %w", err) + } + b.scopeCount++ return nil } -func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error { +// ShouldFlush reports whether the compressed buffer has reached the flush +// threshold. As a special case, a threshold <= 0 means "flush after every +// scope" — useful for tests that need to observe one HTTP request per scope. +func (b *BatchEncoder) ShouldFlush() bool { + if b.flushThreshold <= 0 { + return b.scopeCount > 0 + } + return b.buf.Len() >= b.flushThreshold +} + +// BatchCount returns the number of batches that have been started (each +// successful Flush increments this). Useful for logging. +func (b *BatchEncoder) BatchCount() int { + return b.batchNum +} + +// Flush finalises the current batch (if any scopes have been added) and +// uploads it. If no scopes have been added since the last flush, this is a +// no-op even when final is true. After Flush returns, the encoder is ready +// to accept scopes for the next batch. +// +// Errors from the HTTP upload step are wrapped with ErrUpload so callers can +// distinguish them with errors.Is. +func (b *BatchEncoder) Flush(ctx context.Context, final bool) error { + if !b.prefixWritten { + return nil + } + suffix := `],"final":` + strconv.FormatBool(final) + `}` + if _, err := b.gz.Write([]byte(suffix)); err != nil { + return fmt.Errorf("failed to write envelope suffix: %w", err) + } + if err := b.gz.Close(); err != nil { + return fmt.Errorf("failed to close gzip writer: %w", err) + } + if err := b.up.uploadInner(ctx, b.buf.Bytes(), b.uploadID, b.batchNum, final); err != nil { + return fmt.Errorf("%w: %w", ErrUpload, err) + } + b.buf.Reset() + b.gz.Reset(&b.buf) + b.enc = json.NewEncoder(b.gz) + b.enc.SetEscapeHTML(false) + b.scopeCount = 0 + b.prefixWritten = false + return nil +} + +func (s *SymDBUploader) uploadInner(ctx context.Context, compressedData []byte, uploadID uuid.UUID, batchNum int, final bool) error { // The upload is a multipart containing metadata expected by the event platform // and the gzipped SymDB data. var buf bytes.Buffer writer := multipart.NewWriter(&buf) - compressedData, err := compressSymDBData(symdbData) - if err != nil { - return fmt.Errorf("failed to compress SymDB data: %w", err) - } - fileHeader := make(textproto.MIMEHeader) fileHeader.Set("Content-Disposition", `form-data; name="file"; filename="file.gz"`) fileHeader.Set("Content-Type", "application/gzip") @@ -187,6 +269,10 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error return fmt.Errorf("failed to write compressed SymDB data: %w", err) } + // Add a part containing the JSON that will eventually become the EvP event + // (as opposed to the part above, which will become an EvP attachment). The + // name="event" is recognized by EvP intake: + // https://github.com/DataDog/logs-backend/blob/038d9b05a3904b60b06c32e03db1aebb757ba41d/domains/intake/libs/intake-backend/edge/src/main/java/com/dd/evp/intake_backend/edge/http/multipart/V2MultiPartMessageDecoder.java#L40 eventHeader := make(textproto.MIMEHeader) eventHeader.Set("Content-Disposition", `form-data; name="event"; filename="event.json"`) eventHeader.Set("Content-Type", "application/json") @@ -196,12 +282,22 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error return fmt.Errorf("failed to create event part: %w", err) } + // Some of the fields in here are duplicated inside the envelope in the + // attachment file; that's on purpose: having them in this message allows + // the debugger backend to have enough information for its bookkeeping + // without downloading the attachment. Having the info in the attachment is + // useful in order to make attachments self-contained. meta := []byte(`{ "ddsource": "dd_debugger", "service": "` + s.service + `", +"version": "` + s.version + `", "runtimeId": "` + s.runtimeID + `", "debugger": { - "type": "symdb" + "type": "symdb", + "upload_id": "` + uploadID.String() + `", + "batch_num": ` + strconv.Itoa(batchNum) + `, + "final": ` + strconv.FormatBool(final) + `, + "attachment_size": ` + strconv.Itoa(len(compressedData)) + ` } }`) if _, err := eventPart.Write(meta); err != nil { @@ -234,21 +330,6 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error return nil } -func compressSymDBData(data []byte) ([]byte, error) { - var buf bytes.Buffer - gzWriter := gzip.NewWriter(&buf) - - if _, err := gzWriter.Write(data); err != nil { - return nil, fmt.Errorf("failed to write to gzip writer: %w", err) - } - - if err := gzWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to close gzip writer: %w", err) - } - - return buf.Bytes(), nil -} - func cleanString(s string) string { return strings.ReplaceAll(s, " ", "") } diff --git a/pkg/dyninst/symdb/uploader/uploader_test.go b/pkg/dyninst/symdb/uploader/uploader_test.go index 312692e45681..03c5bc72b045 100644 --- a/pkg/dyninst/symdb/uploader/uploader_test.go +++ b/pkg/dyninst/symdb/uploader/uploader_test.go @@ -20,6 +20,7 @@ import ( "net/url" "sync" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -172,79 +173,79 @@ func validateSymDBRequest( require.Equal(t, "main", symdbRoot.Scopes[0].Name) } -func TestSymDBUploader(t *testing.T) { - createPackageScopes := func() []Scope { - return []Scope{ - { - ScopeType: ScopeTypePackage, - Name: "main", - StartLine: 0, - EndLine: 0, - LanguageSpecifics: &LanguageSpecifics{ - AgentVersion: "7.72.0-test", - }, - Scopes: []Scope{ - { - ScopeType: ScopeTypeMethod, - Name: "testFunction", - SourceFile: "/test/main.go", - StartLine: 10, - EndLine: 20, - Symbols: []Symbol{ - { - Name: "testVar", - Type: "string", - SymbolType: SymbolTypeLocal, - Line: &[]int{12}[0], - }, - { - Name: "arg1", - Type: "int", - SymbolType: SymbolTypeArg, - Line: &[]int{10}[0], - }, +func createPackageScopes() []Scope { + return []Scope{ + { + ScopeType: ScopeTypePackage, + Name: "main", + StartLine: 0, + EndLine: 0, + LanguageSpecifics: &LanguageSpecifics{ + AgentVersion: "7.72.0-test", + }, + Scopes: []Scope{ + { + ScopeType: ScopeTypeMethod, + Name: "testFunction", + SourceFile: "/test/main.go", + StartLine: 10, + EndLine: 20, + Symbols: []Symbol{ + { + Name: "testVar", + Type: "string", + SymbolType: SymbolTypeLocal, + Line: &[]int{12}[0], + }, + { + Name: "arg1", + Type: "int", + SymbolType: SymbolTypeArg, + Line: &[]int{10}[0], }, }, - { - ScopeType: ScopeTypeStruct, - Name: "main.TestStruct", - StartLine: 0, - EndLine: 0, - Symbols: []Symbol{ - { - Name: "field1", - Type: "string", - SymbolType: SymbolTypeField, - }, - { - Name: "field2", - Type: "int", - SymbolType: SymbolTypeField, - }, + }, + { + ScopeType: ScopeTypeStruct, + Name: "main.TestStruct", + StartLine: 0, + EndLine: 0, + Symbols: []Symbol{ + { + Name: "field1", + Type: "string", + SymbolType: SymbolTypeField, + }, + { + Name: "field2", + Type: "int", + SymbolType: SymbolTypeField, }, - Scopes: []Scope{ - { - ScopeType: ScopeTypeMethod, - Name: "method1", - SourceFile: "/test/main.go", - StartLine: 25, - EndLine: 30, - Symbols: []Symbol{ - { - Name: "receiver", - Type: "*main.TestStruct", - SymbolType: SymbolTypeArg, - Line: &[]int{25}[0], - }, + }, + Scopes: []Scope{ + { + ScopeType: ScopeTypeMethod, + Name: "method1", + SourceFile: "/test/main.go", + StartLine: 25, + EndLine: 30, + Symbols: []Symbol{ + { + Name: "receiver", + Type: "*main.TestStruct", + SymbolType: SymbolTypeArg, + Line: &[]int{25}[0], }, }, }, }, }, }, - } + }, } +} +func TestBatchEncoder(t *testing.T) { for _, injectError := range []bool{false, true} { testName := "success" if injectError { @@ -255,30 +256,28 @@ func TestSymDBUploader(t *testing.T) { ts := newTestServer() defer ts.Close() - uploader := NewSymDBUploader( + up := NewSymDBUploader( ts.serverURL.String(), "service1", "1.0.0", "dummy-runtime-id", ) - // Do a (blocking) upload in a goroutine so that the test goroutine can - // intercept the request. + // Do a (blocking) flush in a goroutine so that the test goroutine + // can intercept the request. var wg sync.WaitGroup wg.Add(1) uploadID := uuid.New() go func() { defer wg.Done() - scopes := createPackageScopes() - err := uploader.UploadBatch(context.Background(), - UploadInfo{ - UploadID: uploadID, - BatchNum: 1, - Final: true, - }, - scopes) + enc := up.NewBatchEncoder(uploadID) + for _, s := range createPackageScopes() { + assert.NoError(t, enc.AddScope(s)) + } + err := enc.Flush(context.Background(), true /* final */) if injectError { assert.Error(t, err) + assert.ErrorIs(t, err, ErrUpload) } else { assert.NoError(t, err) } @@ -296,3 +295,152 @@ func TestSymDBUploader(t *testing.T) { }) } } + +// TestBatchEncoder_MultiBatch drives the encoder with a tiny flush threshold, +// forcing one HTTP request per scope, and verifies that batch_num increments +// across requests and final=true is set only on the last one. +func TestBatchEncoder_MultiBatch(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + scopes := createPackageScopes() + // Duplicate the package a few times to get multiple batches. + scopes = append(scopes, scopes[0], scopes[0]) + uploadID := uuid.New() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + enc := up.NewBatchEncoder(uploadID, WithFlushThreshold(0)) + for i, s := range scopes { + assert.NoError(t, enc.AddScope(s)) + final := i == len(scopes)-1 + // With threshold<=0, ShouldFlush is true after every encoded scope. + if enc.ShouldFlush() || final { + assert.NoError(t, enc.Flush(context.Background(), final)) + } + } + assert.Equal(t, len(scopes), enc.BatchCount()) + }() + + for i := 0; i < len(scopes); i++ { + req := <-ts.requests + root := readSymDBRoot(t, req.r) + assert.Equal(t, i+1, root.BatchNum) + assert.Equal(t, i == len(scopes)-1, root.Final) + assert.Equal(t, uploadID.String(), root.UploadID) + req.w.WriteHeader(http.StatusOK) + close(req.done) + } + wg.Wait() +} + +// TestBatchEncoder_ShouldFlush exercises the threshold check directly without +// an HTTP server, using a no-network upload via a server that always 200s. +func TestBatchEncoder_ShouldFlush(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + const threshold = 64 + enc := up.NewBatchEncoder(uuid.New(), WithFlushThreshold(threshold)) + require.False(t, enc.ShouldFlush(), "should not flush before any scopes") + + // Add scopes until ShouldFlush flips. The gzip writer buffers internally + // (deflate window is 32 KiB), so it can take many small scopes before any + // bytes reach the underlying buffer. Bound the loop generously. + scope := createPackageScopes()[0] + flipped := false + for i := 0; i < 100000; i++ { + require.NoError(t, enc.AddScope(scope)) + if enc.ShouldFlush() { + flipped = true + break + } + } + require.True(t, flipped, "ShouldFlush never flipped") + + // Drain the upload that Flush will send. + doneC := make(chan error, 1) + go func() { + doneC <- enc.Flush(context.Background(), false /* final */) + }() + req := <-ts.requests + req.w.WriteHeader(http.StatusOK) + close(req.done) + require.NoError(t, <-doneC) + + // After flush, the buffer is reset and ShouldFlush reports false again. + require.False(t, enc.ShouldFlush(), "ShouldFlush should reset after Flush") +} + +// TestBatchEncoder_EmptyFinal verifies that Flush(ctx, true) with no scopes +// added is a no-op (no HTTP request). +func TestBatchEncoder_EmptyFinal(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + enc := up.NewBatchEncoder(uuid.New()) + // No AddScope calls. + require.NoError(t, enc.Flush(context.Background(), true)) + require.Equal(t, 0, enc.BatchCount()) + + select { + case req := <-ts.requests: + t.Fatalf("unexpected HTTP request from empty Flush: %v", req.r.URL) + case <-time.After(50 * time.Millisecond): + // expected — no HTTP request issued + } +} + +// readSymDBRoot decompresses and parses the file part of a multipart SymDB +// upload request, returning the unmarshalled SymDBRoot. +func readSymDBRoot(t *testing.T, req *http.Request) SymDBRoot { + t.Helper() + _, params, err := mime.ParseMediaType(req.Header.Get("Content-Type")) + require.NoError(t, err) + reader := multipart.NewReader(req.Body, params["boundary"]) + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + require.NoError(t, err) + data, err := io.ReadAll(part) + require.NoError(t, err) + if part.FormName() != "file" { + continue + } + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + defer gzReader.Close() + raw, err := io.ReadAll(gzReader) + require.NoError(t, err) + var root SymDBRoot + require.NoError(t, json.Unmarshal(raw, &root)) + return root + } + t.Fatal("no file part in multipart request") + return SymDBRoot{} +}