From 73310b4273aed94cb328315ea5971bd0ba3c8da8 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 5 May 2026 13:22:59 -0400 Subject: [PATCH 1/2] dyninst/symdb: stream JSON into gzip, chunk by compressed size The SymDB upload pipeline previously held each batch in memory three times over: as a []Scope slice, as a marshalled JSON []byte, and as a gzipped []byte. Batches were flushed by buffered function count (default 10000), which let the in-memory []Scope grow large before any compression. Replace UploadBatch([]Scope) with a streaming BatchEncoder that owns the gzip writer and a json.Encoder wrapping it. Scopes are encoded straight into the gzip stream as they arrive, the caller no longer accumulates a slice, and flushes are triggered when the compressed buffer reaches a threshold (DefaultFlushThresholdBytes = 2 MiB). The envelope is written inside the gzip stream as {service,version,language,upload_id,batch_num, scopes:[...],final}, with final written at flush time. Threshold is soft: gzip's internal window means the flushed payload may overshoot by up to ~32 KiB. A threshold <= 0 forces per-scope flushing, preserving the cancel-between-flushes test behaviour previously achieved with maxBufferFuncs=1. ErrUpload is exposed as a sentinel so callers can distinguish HTTP-side failures (retryable) from local encoder errors via errors.Is. https://datadoghq.atlassian.net/browse/DEBUG-5553 --- pkg/dyninst/module/symdb.go | 45 ++- pkg/dyninst/module/symdb_test.go | 8 +- pkg/dyninst/symdb/uploader/symdb.go | 177 ++++++++---- pkg/dyninst/symdb/uploader/uploader_test.go | 294 +++++++++++++++----- 4 files changed, 364 insertions(+), 160 deletions(-) diff --git a/pkg/dyninst/module/symdb.go b/pkg/dyninst/module/symdb.go index 22eb53bc5fac..d6aca8035d2e 100644 --- a/pkg/dyninst/module/symdb.go +++ b/pkg/dyninst/module/symdb.go @@ -113,7 +113,7 @@ func newSymdbManager( opts ...option, ) *symdbManager { cfg := symdbManagerConfig{ - maxBufferFuncs: 10000, + flushThresholdBytes: uploader.DefaultFlushThresholdBytes, } for _, opt := range opts { opt(&cfg) @@ -154,8 +154,8 @@ func newSymdbManager( } type symdbManagerConfig struct { - maxBufferFuncs int - testingKnobs struct { + flushThresholdBytes int + testingKnobs struct { onDeferUpload func() onUploadRejectedByPersistentCache func() onUploadQueued func(queuedUploadInfo) @@ -168,9 +168,9 @@ type symdbManagerConfig struct { type option func(config *symdbManagerConfig) -func withMaxBufferFuncs(maxBufferFuncs int) option { +func withFlushThresholdBytes(flushThresholdBytes int) option { return func(c *symdbManagerConfig) { - c.maxBufferFuncs = maxBufferFuncs + c.flushThresholdBytes = flushThresholdBytes } } @@ -623,36 +623,24 @@ func (m *symdbManager) performUpload( m.uploadURL.String(), procID.service, procID.version, runtimeID, ) - uploadBuffer := make([]uploader.Scope, 0, 100) - bufferFuncs := 0 uploadID := uuid.New() - batchNum := 0 + enc := sender.NewBatchEncoder(uploadID, uploader.WithFlushThreshold(m.cfg.flushThresholdBytes)) var totalPackages, totalFuncs int - // Flush every so often in order to not store too many scopes in memory. + // Flush whenever the compressed payload reaches the threshold, or on the + // final package, to avoid keeping too much in memory at once. maybeFlush := func(final bool) error { if ctx.Err() != nil { return context.Cause(ctx) } - - if len(uploadBuffer) == 0 { + if !final && !enc.ShouldFlush() { return nil } - if final || bufferFuncs >= m.cfg.maxBufferFuncs { - log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions. Final chunk: %t", len(uploadBuffer), bufferFuncs, final) - batchNum++ - err := sender.UploadBatch(ctx, - uploader.UploadInfo{ - UploadID: uploadID, - BatchNum: batchNum, - Final: final, - }, - uploadBuffer, - ) - if err != nil { + log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final) + if err := enc.Flush(ctx, final); err != nil { + if errors.Is(err, uploader.ErrUpload) { return uploadError{cause: err} } - uploadBuffer = uploadBuffer[:0] - bufferFuncs = 0 + return err } return nil } @@ -667,10 +655,11 @@ func (m *symdbManager) performUpload( } scope := uploader.ConvertPackageToScope(pkg.Package, version.AgentVersion) - uploadBuffer = append(uploadBuffer, scope) + if err := enc.AddScope(scope); err != nil { + return fmt.Errorf("failed to encode scope for process %v: %w", procID.pid, err) + } totalPackages++ totalFuncs += pkg.Stats().NumFunctions - bufferFuncs += pkg.Stats().NumFunctions if err := maybeFlush(pkg.Final); err != nil { return err } @@ -680,7 +669,7 @@ func (m *symdbManager) performUpload( "(service: %s, version: %s, executable: %s):"+ " %d packages, %d functions, %d chunks in %v", procID.pid, procID.service, procID.version, executablePath, - totalPackages, totalFuncs, batchNum, time.Since(startTime)) + totalPackages, totalFuncs, enc.BatchCount(), time.Since(startTime)) return nil } diff --git a/pkg/dyninst/module/symdb_test.go b/pkg/dyninst/module/symdb_test.go index 059857685c94..4c02762d0bfe 100644 --- a/pkg/dyninst/module/symdb_test.go +++ b/pkg/dyninst/module/symdb_test.go @@ -165,10 +165,10 @@ func testSymdbManagerCancellation(t *testing.T, useStop bool) { symdbURL, object.NewInMemoryLoader(), "", /* cacheDir - no cache */ - // Use a small buffer (which will force a flush after every package) - // in order to have an opportunity to cancel the uploads in between - // flushes. - withMaxBufferFuncs(1), + // Use a flush threshold of 0 (which will force a flush after every + // package) in order to have an opportunity to cancel the uploads in + // between flushes. + withFlushThresholdBytes(0), ) t.Cleanup(manager.stop) // Create a dummy runtime ID diff --git a/pkg/dyninst/symdb/uploader/symdb.go b/pkg/dyninst/symdb/uploader/symdb.go index 7fecd22b6e02..1f896a968c9e 100644 --- a/pkg/dyninst/symdb/uploader/symdb.go +++ b/pkg/dyninst/symdb/uploader/symdb.go @@ -14,6 +14,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "mime/multipart" "net/http" @@ -26,6 +27,15 @@ import ( "github.com/google/uuid" ) +// DefaultFlushThresholdBytes is the default compressed-size threshold at which +// a BatchEncoder will flush an HTTP chunk. +const DefaultFlushThresholdBytes = 2 * 1024 * 1024 + +// ErrUpload is returned (wrapped) by BatchEncoder.Flush when the HTTP upload +// step fails. Callers can use errors.Is(err, ErrUpload) to distinguish +// upload-side failures (typically retryable) from local encoder errors. +var ErrUpload = errors.New("symdb upload failed") + // ScopeType represents the type of scope in the SymDB schema type ScopeType string @@ -119,61 +129,133 @@ func NewSymDBUploader( } } -// UploadInfo contains metadata about a batch of packages to be uploaded to -// SymDB. -type UploadInfo struct { - // UploadID identifies which logical upload this batch is part of: if packages - // are split into multiple batches because of size limits, they will all share - // the same uploadID. - UploadID uuid.UUID - // BatchNum is the number of the batch relative to the other batches in this - // upload. First batch has number 1. - BatchNum int - // Final is set if this is the final (or the only) batch of packages for - // this upload. - Final bool +// BatchEncoder streams Scope objects into a gzip-compressed SymDB JSON +// envelope and uploads chunks to the SymDB intake whenever the compressed +// payload reaches the configured threshold. A single BatchEncoder +// corresponds to one logical upload (one UploadID) and may emit multiple +// batches. +type BatchEncoder struct { + up *SymDBUploader + uploadID uuid.UUID + flushThreshold int + batchNum int + buf bytes.Buffer + gz *gzip.Writer + enc *json.Encoder + scopeCount int + prefixWritten bool } -// UploadBatch uploads a batch the symbols for a batch of packages to SymDB (via -// the trace-agent). -func (s *SymDBUploader) UploadBatch(ctx context.Context, info UploadInfo, packages []Scope) error { - // Wrap the data in an envelope expected by the debugger backend. - var buf bytes.Buffer - buf.WriteString(`{ -"service": "` + s.service + `", -"version": "` + s.version + `", -"language": "go", -"upload_id": "` + info.UploadID.String() + `", -"batch_num": ` + strconv.Itoa(info.BatchNum) + `, -"final": ` + strconv.FormatBool(info.Final) + `, -"scopes": `) - - jsonBytes, err := json.Marshal(packages) - if err != nil { - return fmt.Errorf("failed to marshal scope: %w", err) +// BatchEncoderOption configures a BatchEncoder. +type BatchEncoderOption func(*BatchEncoder) + +// WithFlushThreshold sets the compressed-size threshold (in bytes) at which +// ShouldFlush will return true. The threshold is soft: in-flight bytes inside +// the gzip writer's internal window may not yet be reflected in the buffer +// length, so the actual flushed payload may overshoot by up to the gzip +// window size (~32 KiB). +func WithFlushThreshold(bytes int) BatchEncoderOption { + return func(b *BatchEncoder) { + b.flushThreshold = bytes + } +} + +// NewBatchEncoder creates a BatchEncoder for a single logical upload. +func (s *SymDBUploader) NewBatchEncoder(uploadID uuid.UUID, opts ...BatchEncoderOption) *BatchEncoder { + b := &BatchEncoder{ + up: s, + uploadID: uploadID, + flushThreshold: DefaultFlushThresholdBytes, + } + for _, opt := range opts { + opt(b) + } + b.gz = gzip.NewWriter(&b.buf) + b.enc = json.NewEncoder(b.gz) + b.enc.SetEscapeHTML(false) + return b +} + +// AddScope writes a single Scope into the current batch's gzip stream. On the +// first call of a new batch, it also writes the JSON envelope prefix. +func (b *BatchEncoder) AddScope(scope Scope) error { + if !b.prefixWritten { + b.batchNum++ + prefix := `{"service":"` + b.up.service + + `","version":"` + b.up.version + + `","language":"go","upload_id":"` + b.uploadID.String() + + `","batch_num":` + strconv.Itoa(b.batchNum) + + `,"scopes":[` + if _, err := b.gz.Write([]byte(prefix)); err != nil { + return fmt.Errorf("failed to write envelope prefix: %w", err) + } + b.prefixWritten = true + } + if b.scopeCount > 0 { + if _, err := b.gz.Write([]byte{','}); err != nil { + return fmt.Errorf("failed to write scope separator: %w", err) + } + } + if err := b.enc.Encode(scope); err != nil { + return fmt.Errorf("failed to encode scope: %w", err) } - buf.Write(jsonBytes) - buf.WriteString("}") + b.scopeCount++ + return nil +} - if err := s.uploadInner(ctx, buf.Bytes()); err != nil { - return fmt.Errorf("failed to send individual SymDB: %w", err) +// ShouldFlush reports whether the compressed buffer has reached the flush +// threshold. As a special case, a threshold <= 0 means "flush after every +// scope" — useful for tests that need to observe one HTTP request per scope. +func (b *BatchEncoder) ShouldFlush() bool { + if b.flushThreshold <= 0 { + return b.scopeCount > 0 } + return b.buf.Len() >= b.flushThreshold +} + +// BatchCount returns the number of batches that have been started (each +// successful Flush increments this). Useful for logging. +func (b *BatchEncoder) BatchCount() int { + return b.batchNum +} +// Flush finalises the current batch (if any scopes have been added) and +// uploads it. If no scopes have been added since the last flush, this is a +// no-op even when final is true. After Flush returns, the encoder is ready +// to accept scopes for the next batch. +// +// Errors from the HTTP upload step are wrapped with ErrUpload so callers can +// distinguish them with errors.Is. +func (b *BatchEncoder) Flush(ctx context.Context, final bool) error { + if !b.prefixWritten { + return nil + } + suffix := `],"final":` + strconv.FormatBool(final) + `}` + if _, err := b.gz.Write([]byte(suffix)); err != nil { + return fmt.Errorf("failed to write envelope suffix: %w", err) + } + if err := b.gz.Close(); err != nil { + return fmt.Errorf("failed to close gzip writer: %w", err) + } + if err := b.up.uploadInner(ctx, b.buf.Bytes()); err != nil { + return fmt.Errorf("%w: %w", ErrUpload, err) + } + b.buf.Reset() + b.gz.Reset(&b.buf) + b.enc = json.NewEncoder(b.gz) + b.enc.SetEscapeHTML(false) + b.scopeCount = 0 + b.prefixWritten = false return nil } -func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error { +func (s *SymDBUploader) uploadInner(ctx context.Context, compressedData []byte) error { // The upload is a multipart containing metadata expected by the event platform // and the gzipped SymDB data. var buf bytes.Buffer writer := multipart.NewWriter(&buf) - compressedData, err := compressSymDBData(symdbData) - if err != nil { - return fmt.Errorf("failed to compress SymDB data: %w", err) - } - fileHeader := make(textproto.MIMEHeader) fileHeader.Set("Content-Disposition", `form-data; name="file"; filename="file.gz"`) fileHeader.Set("Content-Type", "application/gzip") @@ -234,21 +316,6 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error return nil } -func compressSymDBData(data []byte) ([]byte, error) { - var buf bytes.Buffer - gzWriter := gzip.NewWriter(&buf) - - if _, err := gzWriter.Write(data); err != nil { - return nil, fmt.Errorf("failed to write to gzip writer: %w", err) - } - - if err := gzWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to close gzip writer: %w", err) - } - - return buf.Bytes(), nil -} - func cleanString(s string) string { return strings.ReplaceAll(s, " ", "") } diff --git a/pkg/dyninst/symdb/uploader/uploader_test.go b/pkg/dyninst/symdb/uploader/uploader_test.go index 312692e45681..03c5bc72b045 100644 --- a/pkg/dyninst/symdb/uploader/uploader_test.go +++ b/pkg/dyninst/symdb/uploader/uploader_test.go @@ -20,6 +20,7 @@ import ( "net/url" "sync" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -172,79 +173,79 @@ func validateSymDBRequest( require.Equal(t, "main", symdbRoot.Scopes[0].Name) } -func TestSymDBUploader(t *testing.T) { - createPackageScopes := func() []Scope { - return []Scope{ - { - ScopeType: ScopeTypePackage, - Name: "main", - StartLine: 0, - EndLine: 0, - LanguageSpecifics: &LanguageSpecifics{ - AgentVersion: "7.72.0-test", - }, - Scopes: []Scope{ - { - ScopeType: ScopeTypeMethod, - Name: "testFunction", - SourceFile: "/test/main.go", - StartLine: 10, - EndLine: 20, - Symbols: []Symbol{ - { - Name: "testVar", - Type: "string", - SymbolType: SymbolTypeLocal, - Line: &[]int{12}[0], - }, - { - Name: "arg1", - Type: "int", - SymbolType: SymbolTypeArg, - Line: &[]int{10}[0], - }, +func createPackageScopes() []Scope { + return []Scope{ + { + ScopeType: ScopeTypePackage, + Name: "main", + StartLine: 0, + EndLine: 0, + LanguageSpecifics: &LanguageSpecifics{ + AgentVersion: "7.72.0-test", + }, + Scopes: []Scope{ + { + ScopeType: ScopeTypeMethod, + Name: "testFunction", + SourceFile: "/test/main.go", + StartLine: 10, + EndLine: 20, + Symbols: []Symbol{ + { + Name: "testVar", + Type: "string", + SymbolType: SymbolTypeLocal, + Line: &[]int{12}[0], + }, + { + Name: "arg1", + Type: "int", + SymbolType: SymbolTypeArg, + Line: &[]int{10}[0], }, }, - { - ScopeType: ScopeTypeStruct, - Name: "main.TestStruct", - StartLine: 0, - EndLine: 0, - Symbols: []Symbol{ - { - Name: "field1", - Type: "string", - SymbolType: SymbolTypeField, - }, - { - Name: "field2", - Type: "int", - SymbolType: SymbolTypeField, - }, + }, + { + ScopeType: ScopeTypeStruct, + Name: "main.TestStruct", + StartLine: 0, + EndLine: 0, + Symbols: []Symbol{ + { + Name: "field1", + Type: "string", + SymbolType: SymbolTypeField, + }, + { + Name: "field2", + Type: "int", + SymbolType: SymbolTypeField, }, - Scopes: []Scope{ - { - ScopeType: ScopeTypeMethod, - Name: "method1", - SourceFile: "/test/main.go", - StartLine: 25, - EndLine: 30, - Symbols: []Symbol{ - { - Name: "receiver", - Type: "*main.TestStruct", - SymbolType: SymbolTypeArg, - Line: &[]int{25}[0], - }, + }, + Scopes: []Scope{ + { + ScopeType: ScopeTypeMethod, + Name: "method1", + SourceFile: "/test/main.go", + StartLine: 25, + EndLine: 30, + Symbols: []Symbol{ + { + Name: "receiver", + Type: "*main.TestStruct", + SymbolType: SymbolTypeArg, + Line: &[]int{25}[0], }, }, }, }, }, }, - } + }, } +} +func TestBatchEncoder(t *testing.T) { for _, injectError := range []bool{false, true} { testName := "success" if injectError { @@ -255,30 +256,28 @@ func TestSymDBUploader(t *testing.T) { ts := newTestServer() defer ts.Close() - uploader := NewSymDBUploader( + up := NewSymDBUploader( ts.serverURL.String(), "service1", "1.0.0", "dummy-runtime-id", ) - // Do a (blocking) upload in a goroutine so that the test goroutine can - // intercept the request. + // Do a (blocking) flush in a goroutine so that the test goroutine + // can intercept the request. var wg sync.WaitGroup wg.Add(1) uploadID := uuid.New() go func() { defer wg.Done() - scopes := createPackageScopes() - err := uploader.UploadBatch(context.Background(), - UploadInfo{ - UploadID: uploadID, - BatchNum: 1, - Final: true, - }, - scopes) + enc := up.NewBatchEncoder(uploadID) + for _, s := range createPackageScopes() { + assert.NoError(t, enc.AddScope(s)) + } + err := enc.Flush(context.Background(), true /* final */) if injectError { assert.Error(t, err) + assert.ErrorIs(t, err, ErrUpload) } else { assert.NoError(t, err) } @@ -296,3 +295,152 @@ func TestSymDBUploader(t *testing.T) { }) } } + +// TestBatchEncoder_MultiBatch drives the encoder with a tiny flush threshold, +// forcing one HTTP request per scope, and verifies that batch_num increments +// across requests and final=true is set only on the last one. +func TestBatchEncoder_MultiBatch(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + scopes := createPackageScopes() + // Duplicate the package a few times to get multiple batches. + scopes = append(scopes, scopes[0], scopes[0]) + uploadID := uuid.New() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + enc := up.NewBatchEncoder(uploadID, WithFlushThreshold(0)) + for i, s := range scopes { + assert.NoError(t, enc.AddScope(s)) + final := i == len(scopes)-1 + // With threshold<=0, ShouldFlush is true after every encoded scope. + if enc.ShouldFlush() || final { + assert.NoError(t, enc.Flush(context.Background(), final)) + } + } + assert.Equal(t, len(scopes), enc.BatchCount()) + }() + + for i := 0; i < len(scopes); i++ { + req := <-ts.requests + root := readSymDBRoot(t, req.r) + assert.Equal(t, i+1, root.BatchNum) + assert.Equal(t, i == len(scopes)-1, root.Final) + assert.Equal(t, uploadID.String(), root.UploadID) + req.w.WriteHeader(http.StatusOK) + close(req.done) + } + wg.Wait() +} + +// TestBatchEncoder_ShouldFlush exercises the threshold check directly without +// an HTTP server, using a no-network upload via a server that always 200s. +func TestBatchEncoder_ShouldFlush(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + const threshold = 64 + enc := up.NewBatchEncoder(uuid.New(), WithFlushThreshold(threshold)) + require.False(t, enc.ShouldFlush(), "should not flush before any scopes") + + // Add scopes until ShouldFlush flips. The gzip writer buffers internally + // (deflate window is 32 KiB), so it can take many small scopes before any + // bytes reach the underlying buffer. Bound the loop generously. + scope := createPackageScopes()[0] + flipped := false + for i := 0; i < 100000; i++ { + require.NoError(t, enc.AddScope(scope)) + if enc.ShouldFlush() { + flipped = true + break + } + } + require.True(t, flipped, "ShouldFlush never flipped") + + // Drain the upload that Flush will send. + doneC := make(chan error, 1) + go func() { + doneC <- enc.Flush(context.Background(), false /* final */) + }() + req := <-ts.requests + req.w.WriteHeader(http.StatusOK) + close(req.done) + require.NoError(t, <-doneC) + + // After flush, the buffer is reset and ShouldFlush reports false again. + require.False(t, enc.ShouldFlush(), "ShouldFlush should reset after Flush") +} + +// TestBatchEncoder_EmptyFinal verifies that Flush(ctx, true) with no scopes +// added is a no-op (no HTTP request). +func TestBatchEncoder_EmptyFinal(t *testing.T) { + ts := newTestServer() + defer ts.Close() + + up := NewSymDBUploader( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + ) + + enc := up.NewBatchEncoder(uuid.New()) + // No AddScope calls. + require.NoError(t, enc.Flush(context.Background(), true)) + require.Equal(t, 0, enc.BatchCount()) + + select { + case req := <-ts.requests: + t.Fatalf("unexpected HTTP request from empty Flush: %v", req.r.URL) + case <-time.After(50 * time.Millisecond): + // expected — no HTTP request issued + } +} + +// readSymDBRoot decompresses and parses the file part of a multipart SymDB +// upload request, returning the unmarshalled SymDBRoot. +func readSymDBRoot(t *testing.T, req *http.Request) SymDBRoot { + t.Helper() + _, params, err := mime.ParseMediaType(req.Header.Get("Content-Type")) + require.NoError(t, err) + reader := multipart.NewReader(req.Body, params["boundary"]) + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + require.NoError(t, err) + data, err := io.ReadAll(part) + require.NoError(t, err) + if part.FormName() != "file" { + continue + } + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + defer gzReader.Close() + raw, err := io.ReadAll(gzReader) + require.NoError(t, err) + var root SymDBRoot + require.NoError(t, json.Unmarshal(raw, &root)) + return root + } + t.Fatal("no file part in multipart request") + return SymDBRoot{} +} From efbc8af69867f3a39501a14eb8c1b6970ff3be76 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 5 May 2026 16:11:05 -0400 Subject: [PATCH 2/2] dyninst/symdb: address review feedback on streaming uploader API - Drop the WithFlushThreshold functional option. The threshold isn't truly optional (every caller needs to set it, and the 0 sentinel for flush-after-every-scope was a wart), and the encoder doesn't auto-flush anyway. Replace ShouldFlush with a Size() accessor and let the caller decide when to flush. - Hide SymDBUploader inside the BatchEncoder: callers now construct an encoder directly with NewBatchEncoder(url, service, version, runtimeID, uploadID, headers...) rather than going through SymDBUploader as an intermediary they then never use again. - Comment why we SetEscapeHTML(false): without it, '<', '>' and '&' inside string values get emitted as their six-character \u00XX escape, which would corrupt Go type names like 'chan<-' in the SymDB payload. - Fix the BatchEncoder doc comment: the encoder doesn't ship batches on its own; the caller must call Flush. - Migrate the symdb CLI to the new API so the package keeps building. As a side effect, this fixes a pre-existing bug where the CLI always sent BatchNum: 0 (the local batchNum was declared but never incremented); BatchEncoder increments it correctly per batch. --- pkg/dyninst/module/symdb.go | 7 +- pkg/dyninst/symdb/cli/main.go | 54 ++++----- pkg/dyninst/symdb/uploader/symdb.go | 115 ++++++++------------ pkg/dyninst/symdb/uploader/uploader_test.go | 78 ++++++------- 4 files changed, 105 insertions(+), 149 deletions(-) diff --git a/pkg/dyninst/module/symdb.go b/pkg/dyninst/module/symdb.go index d6aca8035d2e..b24e28f05a3e 100644 --- a/pkg/dyninst/module/symdb.go +++ b/pkg/dyninst/module/symdb.go @@ -619,12 +619,11 @@ func (m *symdbManager) performUpload( procID.pid, executablePath, err) } - sender := uploader.NewSymDBUploader( + enc := uploader.NewBatchEncoder( m.uploadURL.String(), procID.service, procID.version, runtimeID, + uuid.New(), ) - uploadID := uuid.New() - enc := sender.NewBatchEncoder(uploadID, uploader.WithFlushThreshold(m.cfg.flushThresholdBytes)) var totalPackages, totalFuncs int // Flush whenever the compressed payload reaches the threshold, or on the // final package, to avoid keeping too much in memory at once. @@ -632,7 +631,7 @@ func (m *symdbManager) performUpload( if ctx.Err() != nil { return context.Cause(ctx) } - if !final && !enc.ShouldFlush() { + if !final && enc.Size() < m.cfg.flushThresholdBytes { return nil } log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final) diff --git a/pkg/dyninst/symdb/cli/main.go b/pkg/dyninst/symdb/cli/main.go index 3d7ad7b41d74..9a2494304aed 100644 --- a/pkg/dyninst/symdb/cli/main.go +++ b/pkg/dyninst/symdb/cli/main.go @@ -206,20 +206,21 @@ func run() (retErr error) { opt := symdb.ExtractOptions{Scope: scope} - var up *uploader.SymDBUploader - // Headers to attach to every HTTP request. When the system-probe does the - // uploading, it sends the data through the local trace-agent, which deals - // with setting these headers. - headers := [][2]string{ - {"DD-EVP-ORIGIN", "symdb-cli"}, - {"DD-EVP-ORIGIN-VERSION", "0.1"}, - {"DD-API-KEY", *uploadAPIKey}, - } + var enc *uploader.BatchEncoder if *upload { - up = uploader.NewSymDBUploader( + // Headers to attach to every HTTP request. When the system-probe does + // the uploading, it sends the data through the local trace-agent, + // which deals with setting these headers. + headers := [][2]string{ + {"DD-EVP-ORIGIN", "symdb-cli"}, + {"DD-EVP-ORIGIN-VERSION", "0.1"}, + {"DD-API-KEY", *uploadAPIKey}, + } + enc = uploader.NewBatchEncoder( uploadURLParsed.String(), *uploadService, *uploadVersion, fmt.Sprintf("manual-upload-%d", rand.Intn(1000)), + uuid.New(), headers..., ) } @@ -229,31 +230,13 @@ func run() (retErr error) { return err } - uploadBuffer := make([]uploader.Scope, 0, 100) - bufferFuncs := 0 - // Flush every so ofter in order to not store too many scopes in memory. - const maxBufferFuncs = 10000 - uploadID := uuid.New() - batchNum := 0 maybeFlush := func(final bool) error { - if len(uploadBuffer) == 0 { + if !final && enc.Size() < uploader.DefaultFlushThresholdBytes { return nil } - if final || bufferFuncs >= maxBufferFuncs { - log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions", len(uploadBuffer), bufferFuncs) - err := up.UploadBatch( - context.Background(), - uploader.UploadInfo{ - UploadID: uploadID, - BatchNum: batchNum, - Final: final, - }, - uploadBuffer) - if err != nil { - return fmt.Errorf("upload failed: %w", err) - } - uploadBuffer = uploadBuffer[:0] - bufferFuncs = 0 + log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final) + if err := enc.Flush(context.Background(), final); err != nil { + return fmt.Errorf("upload failed: %w", err) } return nil } @@ -264,10 +247,11 @@ func run() (retErr error) { return err } - if up != nil { + if enc != nil { scope := uploader.ConvertPackageToScope(pkg.Package, "cli" /* agentVersion */) - uploadBuffer = append(uploadBuffer, scope) - bufferFuncs += pkg.Stats().NumFunctions + if err := enc.AddScope(scope); err != nil { + return fmt.Errorf("failed to encode scope: %w", err) + } if err := maybeFlush(pkg.Final); err != nil { return err } diff --git a/pkg/dyninst/symdb/uploader/symdb.go b/pkg/dyninst/symdb/uploader/symdb.go index 1f896a968c9e..f2ab33237fe7 100644 --- a/pkg/dyninst/symdb/uploader/symdb.go +++ b/pkg/dyninst/symdb/uploader/symdb.go @@ -102,9 +102,9 @@ type LineRange struct { End int `json:"end"` } -// SymDBUploader deals with uploading SymDB data in the JSON format expected by -// the debugger backend. -type SymDBUploader struct { +// uploader holds the destination and identity metadata used by a +// BatchEncoder when shipping a batch to the SymDB intake. +type uploader struct { url string service string version string @@ -112,66 +112,51 @@ type SymDBUploader struct { headers [][2]string } -// NewSymDBUploader returns a new SymDBUploader. -func NewSymDBUploader( - urlStr string, - service string, - version string, - runtimeID string, - headers ...[2]string, -) *SymDBUploader { - return &SymDBUploader{ - url: urlStr, - service: service, - version: version, - runtimeID: runtimeID, - headers: headers, - } -} - // BatchEncoder streams Scope objects into a gzip-compressed SymDB JSON -// envelope and uploads chunks to the SymDB intake whenever the compressed -// payload reaches the configured threshold. A single BatchEncoder -// corresponds to one logical upload (one UploadID) and may emit multiple -// batches. +// envelope and ships finalised batches to the SymDB intake when the caller +// invokes Flush. A single BatchEncoder corresponds to one logical upload +// (one UploadID) and may emit multiple batches. +// +// The encoder does not flush on its own. Callers should consult Size after +// each AddScope and call Flush when the compressed buffer crosses whatever +// threshold they choose (DefaultFlushThresholdBytes is provided as a +// reasonable default). type BatchEncoder struct { - up *SymDBUploader - uploadID uuid.UUID - flushThreshold int - batchNum int - buf bytes.Buffer - gz *gzip.Writer - enc *json.Encoder - scopeCount int - prefixWritten bool -} - -// BatchEncoderOption configures a BatchEncoder. -type BatchEncoderOption func(*BatchEncoder) - -// WithFlushThreshold sets the compressed-size threshold (in bytes) at which -// ShouldFlush will return true. The threshold is soft: in-flight bytes inside -// the gzip writer's internal window may not yet be reflected in the buffer -// length, so the actual flushed payload may overshoot by up to the gzip -// window size (~32 KiB). -func WithFlushThreshold(bytes int) BatchEncoderOption { - return func(b *BatchEncoder) { - b.flushThreshold = bytes - } + up uploader + uploadID uuid.UUID + batchNum int + buf bytes.Buffer + gz *gzip.Writer + enc *json.Encoder + scopeCount int + prefixWritten bool } -// NewBatchEncoder creates a BatchEncoder for a single logical upload. -func (s *SymDBUploader) NewBatchEncoder(uploadID uuid.UUID, opts ...BatchEncoderOption) *BatchEncoder { +// NewBatchEncoder creates a BatchEncoder for a single logical upload to the +// SymDB intake at url. +func NewBatchEncoder( + url string, + service string, + version string, + runtimeID string, + uploadID uuid.UUID, + headers ...[2]string, +) *BatchEncoder { b := &BatchEncoder{ - up: s, - uploadID: uploadID, - flushThreshold: DefaultFlushThresholdBytes, - } - for _, opt := range opts { - opt(b) + up: uploader{ + url: url, + service: service, + version: version, + runtimeID: runtimeID, + headers: headers, + }, + uploadID: uploadID, } b.gz = gzip.NewWriter(&b.buf) b.enc = json.NewEncoder(b.gz) + // json.Encoder defaults to HTML-escaping <, > and & inside string values + // (each replaced by its six-character \u00XX escape). The SymDB payload + // is not embedded in HTML, so disable that. b.enc.SetEscapeHTML(false) return b } @@ -203,18 +188,15 @@ func (b *BatchEncoder) AddScope(scope Scope) error { return nil } -// ShouldFlush reports whether the compressed buffer has reached the flush -// threshold. As a special case, a threshold <= 0 means "flush after every -// scope" — useful for tests that need to observe one HTTP request per scope. -func (b *BatchEncoder) ShouldFlush() bool { - if b.flushThreshold <= 0 { - return b.scopeCount > 0 - } - return b.buf.Len() >= b.flushThreshold +// Size reports the number of compressed bytes currently buffered for the +// in-progress batch. The gzip writer buffers internally (~32 KiB deflate +// window), so Size is a lower bound on the eventual flushed payload size. +func (b *BatchEncoder) Size() int { + return b.buf.Len() } -// BatchCount returns the number of batches that have been started (each -// successful Flush increments this). Useful for logging. +// BatchCount returns the number of batches that have been started so far +// (each AddScope after a Flush starts a new batch). Useful for logging. func (b *BatchEncoder) BatchCount() int { return b.batchNum } @@ -243,13 +225,12 @@ func (b *BatchEncoder) Flush(ctx context.Context, final bool) error { b.buf.Reset() b.gz.Reset(&b.buf) b.enc = json.NewEncoder(b.gz) - b.enc.SetEscapeHTML(false) b.scopeCount = 0 b.prefixWritten = false return nil } -func (s *SymDBUploader) uploadInner(ctx context.Context, compressedData []byte) error { +func (s *uploader) uploadInner(ctx context.Context, compressedData []byte) error { // The upload is a multipart containing metadata expected by the event platform // and the gzipped SymDB data. diff --git a/pkg/dyninst/symdb/uploader/uploader_test.go b/pkg/dyninst/symdb/uploader/uploader_test.go index 03c5bc72b045..bbc29178a9d1 100644 --- a/pkg/dyninst/symdb/uploader/uploader_test.go +++ b/pkg/dyninst/symdb/uploader/uploader_test.go @@ -256,13 +256,6 @@ func TestBatchEncoder(t *testing.T) { ts := newTestServer() defer ts.Close() - up := NewSymDBUploader( - ts.serverURL.String(), - "service1", - "1.0.0", - "dummy-runtime-id", - ) - // Do a (blocking) flush in a goroutine so that the test goroutine // can intercept the request. var wg sync.WaitGroup @@ -270,7 +263,13 @@ func TestBatchEncoder(t *testing.T) { uploadID := uuid.New() go func() { defer wg.Done() - enc := up.NewBatchEncoder(uploadID) + enc := NewBatchEncoder( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + uploadID, + ) for _, s := range createPackageScopes() { assert.NoError(t, enc.AddScope(s)) } @@ -296,20 +295,13 @@ func TestBatchEncoder(t *testing.T) { } } -// TestBatchEncoder_MultiBatch drives the encoder with a tiny flush threshold, -// forcing one HTTP request per scope, and verifies that batch_num increments -// across requests and final=true is set only on the last one. +// TestBatchEncoder_MultiBatch drives the encoder by flushing after every +// scope and verifies that batch_num increments across requests and final=true +// is set only on the last one. func TestBatchEncoder_MultiBatch(t *testing.T) { ts := newTestServer() defer ts.Close() - up := NewSymDBUploader( - ts.serverURL.String(), - "service1", - "1.0.0", - "dummy-runtime-id", - ) - scopes := createPackageScopes() // Duplicate the package a few times to get multiple batches. scopes = append(scopes, scopes[0], scopes[0]) @@ -319,14 +311,17 @@ func TestBatchEncoder_MultiBatch(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - enc := up.NewBatchEncoder(uploadID, WithFlushThreshold(0)) + enc := NewBatchEncoder( + ts.serverURL.String(), + "service1", + "1.0.0", + "dummy-runtime-id", + uploadID, + ) for i, s := range scopes { assert.NoError(t, enc.AddScope(s)) final := i == len(scopes)-1 - // With threshold<=0, ShouldFlush is true after every encoded scope. - if enc.ShouldFlush() || final { - assert.NoError(t, enc.Flush(context.Background(), final)) - } + assert.NoError(t, enc.Flush(context.Background(), final)) } assert.Equal(t, len(scopes), enc.BatchCount()) }() @@ -343,36 +338,35 @@ func TestBatchEncoder_MultiBatch(t *testing.T) { wg.Wait() } -// TestBatchEncoder_ShouldFlush exercises the threshold check directly without -// an HTTP server, using a no-network upload via a server that always 200s. -func TestBatchEncoder_ShouldFlush(t *testing.T) { +// TestBatchEncoder_Size exercises Size, which reports the underlying buffer +// length and grows monotonically as scopes accumulate (modulo gzip's internal +// buffering) and resets to zero after Flush. +func TestBatchEncoder_Size(t *testing.T) { ts := newTestServer() defer ts.Close() - up := NewSymDBUploader( + enc := NewBatchEncoder( ts.serverURL.String(), "service1", "1.0.0", "dummy-runtime-id", + uuid.New(), ) + require.Zero(t, enc.Size(), "Size should be zero before any scopes") - const threshold = 64 - enc := up.NewBatchEncoder(uuid.New(), WithFlushThreshold(threshold)) - require.False(t, enc.ShouldFlush(), "should not flush before any scopes") - - // Add scopes until ShouldFlush flips. The gzip writer buffers internally - // (deflate window is 32 KiB), so it can take many small scopes before any - // bytes reach the underlying buffer. Bound the loop generously. + // Add scopes until Size grows. gzip buffers internally (~32 KiB deflate + // window), so it may take many small scopes before bytes reach the + // underlying buffer. scope := createPackageScopes()[0] - flipped := false + grew := false for i := 0; i < 100000; i++ { require.NoError(t, enc.AddScope(scope)) - if enc.ShouldFlush() { - flipped = true + if enc.Size() > 0 { + grew = true break } } - require.True(t, flipped, "ShouldFlush never flipped") + require.True(t, grew, "Size never grew above zero") // Drain the upload that Flush will send. doneC := make(chan error, 1) @@ -384,8 +378,7 @@ func TestBatchEncoder_ShouldFlush(t *testing.T) { close(req.done) require.NoError(t, <-doneC) - // After flush, the buffer is reset and ShouldFlush reports false again. - require.False(t, enc.ShouldFlush(), "ShouldFlush should reset after Flush") + require.Zero(t, enc.Size(), "Size should reset after Flush") } // TestBatchEncoder_EmptyFinal verifies that Flush(ctx, true) with no scopes @@ -394,14 +387,13 @@ func TestBatchEncoder_EmptyFinal(t *testing.T) { ts := newTestServer() defer ts.Close() - up := NewSymDBUploader( + enc := NewBatchEncoder( ts.serverURL.String(), "service1", "1.0.0", "dummy-runtime-id", + uuid.New(), ) - - enc := up.NewBatchEncoder(uuid.New()) // No AddScope calls. require.NoError(t, enc.Flush(context.Background(), true)) require.Equal(t, 0, enc.BatchCount())