Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 17 additions & 28 deletions pkg/dyninst/module/symdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newSymdbManager(
opts ...option,
) *symdbManager {
cfg := symdbManagerConfig{
maxBufferFuncs: 10000,
flushThresholdBytes: uploader.DefaultFlushThresholdBytes,
}
for _, opt := range opts {
opt(&cfg)
Expand Down Expand Up @@ -154,8 +154,8 @@ func newSymdbManager(
}

type symdbManagerConfig struct {
maxBufferFuncs int
testingKnobs struct {
flushThresholdBytes int
testingKnobs struct {
onDeferUpload func()
onUploadRejectedByPersistentCache func()
onUploadQueued func(queuedUploadInfo)
Expand All @@ -168,9 +168,9 @@ type symdbManagerConfig struct {

type option func(config *symdbManagerConfig)

func withMaxBufferFuncs(maxBufferFuncs int) option {
func withFlushThresholdBytes(flushThresholdBytes int) option {
return func(c *symdbManagerConfig) {
c.maxBufferFuncs = maxBufferFuncs
c.flushThresholdBytes = flushThresholdBytes
}
}

Expand Down Expand Up @@ -623,36 +623,24 @@ func (m *symdbManager) performUpload(
m.uploadURL.String(),
procID.service, procID.version, runtimeID,
)
uploadBuffer := make([]uploader.Scope, 0, 100)
bufferFuncs := 0
uploadID := uuid.New()
batchNum := 0
enc := sender.NewBatchEncoder(uploadID, uploader.WithFlushThreshold(m.cfg.flushThresholdBytes))
var totalPackages, totalFuncs int
// Flush every so often in order to not store too many scopes in memory.
// Flush whenever the compressed payload reaches the threshold, or on the
// final package, to avoid keeping too much in memory at once.
maybeFlush := func(final bool) error {
if ctx.Err() != nil {
return context.Cause(ctx)
}

if len(uploadBuffer) == 0 {
if !final && !enc.ShouldFlush() {
return nil
}
if final || bufferFuncs >= m.cfg.maxBufferFuncs {
log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions. Final chunk: %t", len(uploadBuffer), bufferFuncs, final)
batchNum++
err := sender.UploadBatch(ctx,
uploader.UploadInfo{
UploadID: uploadID,
BatchNum: batchNum,
Final: final,
},
uploadBuffer,
)
if err != nil {
log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final)
if err := enc.Flush(ctx, final); err != nil {
if errors.Is(err, uploader.ErrUpload) {
return uploadError{cause: err}
}
uploadBuffer = uploadBuffer[:0]
bufferFuncs = 0
return err
}
return nil
}
Expand All @@ -667,10 +655,11 @@ func (m *symdbManager) performUpload(
}

scope := uploader.ConvertPackageToScope(pkg.Package, version.AgentVersion)
uploadBuffer = append(uploadBuffer, scope)
if err := enc.AddScope(scope); err != nil {
return fmt.Errorf("failed to encode scope for process %v: %w", procID.pid, err)
}
totalPackages++
totalFuncs += pkg.Stats().NumFunctions
bufferFuncs += pkg.Stats().NumFunctions
if err := maybeFlush(pkg.Final); err != nil {
return err
}
Expand All @@ -680,7 +669,7 @@ func (m *symdbManager) performUpload(
"(service: %s, version: %s, executable: %s):"+
" %d packages, %d functions, %d chunks in %v",
procID.pid, procID.service, procID.version, executablePath,
totalPackages, totalFuncs, batchNum, time.Since(startTime))
totalPackages, totalFuncs, enc.BatchCount(), time.Since(startTime))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/dyninst/module/symdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func testSymdbManagerCancellation(t *testing.T, useStop bool) {
symdbURL,
object.NewInMemoryLoader(),
"", /* cacheDir - no cache */
// Use a small buffer (which will force a flush after every package)
// in order to have an opportunity to cancel the uploads in between
// flushes.
withMaxBufferFuncs(1),
// Use a flush threshold of 0 (which will force a flush after every
// package) in order to have an opportunity to cancel the uploads in
// between flushes.
withFlushThresholdBytes(0),
)
t.Cleanup(manager.stop)
// Create a dummy runtime ID
Expand Down
191 changes: 136 additions & 55 deletions pkg/dyninst/symdb/uploader/symdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"mime/multipart"
"net/http"
Expand All @@ -26,6 +27,15 @@ import (
"github.com/google/uuid"
)

// DefaultFlushThresholdBytes is the default compressed-size threshold at which
// a BatchEncoder will flush an HTTP chunk.
const DefaultFlushThresholdBytes = 2 * 1024 * 1024

// ErrUpload is returned (wrapped) by BatchEncoder.Flush when the HTTP upload
// step fails. Callers can use errors.Is(err, ErrUpload) to distinguish
// upload-side failures (typically retryable) from local encoder errors.
var ErrUpload = errors.New("symdb upload failed")

// ScopeType represents the type of scope in the SymDB schema
type ScopeType string

Expand Down Expand Up @@ -119,61 +129,133 @@ func NewSymDBUploader(
}
}

// UploadInfo contains metadata about a batch of packages to be uploaded to
// SymDB.
type UploadInfo struct {
// UploadID identifies which logical upload this batch is part of: if packages
// are split into multiple batches because of size limits, they will all share
// the same uploadID.
UploadID uuid.UUID
// BatchNum is the number of the batch relative to the other batches in this
// upload. First batch has number 1.
BatchNum int
// Final is set if this is the final (or the only) batch of packages for
// this upload.
Final bool
// BatchEncoder streams Scope objects into a gzip-compressed SymDB JSON
// envelope and uploads chunks to the SymDB intake whenever the compressed
// payload reaches the configured threshold. A single BatchEncoder
// corresponds to one logical upload (one UploadID) and may emit multiple
// batches.
type BatchEncoder struct {
up *SymDBUploader
uploadID uuid.UUID
flushThreshold int
batchNum int
buf bytes.Buffer
gz *gzip.Writer
enc *json.Encoder
scopeCount int
prefixWritten bool
}

// UploadBatch uploads a batch the symbols for a batch of packages to SymDB (via
// the trace-agent).
func (s *SymDBUploader) UploadBatch(ctx context.Context, info UploadInfo, packages []Scope) error {
// Wrap the data in an envelope expected by the debugger backend.
var buf bytes.Buffer
buf.WriteString(`{
"service": "` + s.service + `",
"version": "` + s.version + `",
"language": "go",
"upload_id": "` + info.UploadID.String() + `",
"batch_num": ` + strconv.Itoa(info.BatchNum) + `,
"final": ` + strconv.FormatBool(info.Final) + `,
"scopes": `)
// BatchEncoderOption configures a BatchEncoder.
type BatchEncoderOption func(*BatchEncoder)

jsonBytes, err := json.Marshal(packages)
if err != nil {
return fmt.Errorf("failed to marshal scope: %w", err)
// WithFlushThreshold sets the compressed-size threshold (in bytes) at which
// ShouldFlush will return true. The threshold is soft: in-flight bytes inside
// the gzip writer's internal window may not yet be reflected in the buffer
// length, so the actual flushed payload may overshoot by up to the gzip
// window size (~32 KiB).
func WithFlushThreshold(bytes int) BatchEncoderOption {
return func(b *BatchEncoder) {
b.flushThreshold = bytes
}
buf.Write(jsonBytes)
buf.WriteString("}")
}

if err := s.uploadInner(ctx, buf.Bytes()); err != nil {
return fmt.Errorf("failed to send individual SymDB: %w", err)
// NewBatchEncoder creates a BatchEncoder for a single logical upload.
func (s *SymDBUploader) NewBatchEncoder(uploadID uuid.UUID, opts ...BatchEncoderOption) *BatchEncoder {
b := &BatchEncoder{
up: s,
uploadID: uploadID,
flushThreshold: DefaultFlushThresholdBytes,
}
for _, opt := range opts {
opt(b)
}
b.gz = gzip.NewWriter(&b.buf)
b.enc = json.NewEncoder(b.gz)
b.enc.SetEscapeHTML(false)
return b
}

// AddScope writes a single Scope into the current batch's gzip stream. On the
// first call of a new batch, it also writes the JSON envelope prefix.
func (b *BatchEncoder) AddScope(scope Scope) error {
if !b.prefixWritten {
b.batchNum++
prefix := `{"service":"` + b.up.service +
`","version":"` + b.up.version +
`","language":"go","upload_id":"` + b.uploadID.String() +
`","batch_num":` + strconv.Itoa(b.batchNum) +
`,"scopes":[`
if _, err := b.gz.Write([]byte(prefix)); err != nil {
return fmt.Errorf("failed to write envelope prefix: %w", err)
}
b.prefixWritten = true
}
if b.scopeCount > 0 {
if _, err := b.gz.Write([]byte{','}); err != nil {
return fmt.Errorf("failed to write scope separator: %w", err)
}
}
if err := b.enc.Encode(scope); err != nil {
return fmt.Errorf("failed to encode scope: %w", err)
}
b.scopeCount++
return nil
}

func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error {
// ShouldFlush reports whether the compressed buffer has reached the flush
// threshold. As a special case, a threshold <= 0 means "flush after every
// scope" — useful for tests that need to observe one HTTP request per scope.
func (b *BatchEncoder) ShouldFlush() bool {
if b.flushThreshold <= 0 {
return b.scopeCount > 0
}
return b.buf.Len() >= b.flushThreshold
}

// BatchCount returns the number of batches that have been started (each
// successful Flush increments this). Useful for logging.
func (b *BatchEncoder) BatchCount() int {
return b.batchNum
}

// Flush finalises the current batch (if any scopes have been added) and
// uploads it. If no scopes have been added since the last flush, this is a
// no-op even when final is true. After Flush returns, the encoder is ready
// to accept scopes for the next batch.
//
// Errors from the HTTP upload step are wrapped with ErrUpload so callers can
// distinguish them with errors.Is.
func (b *BatchEncoder) Flush(ctx context.Context, final bool) error {
if !b.prefixWritten {
return nil
}
suffix := `],"final":` + strconv.FormatBool(final) + `}`
if _, err := b.gz.Write([]byte(suffix)); err != nil {
return fmt.Errorf("failed to write envelope suffix: %w", err)
}
if err := b.gz.Close(); err != nil {
return fmt.Errorf("failed to close gzip writer: %w", err)
}
if err := b.up.uploadInner(ctx, b.buf.Bytes(), b.uploadID, b.batchNum, final); err != nil {
return fmt.Errorf("%w: %w", ErrUpload, err)
}
b.buf.Reset()
b.gz.Reset(&b.buf)
b.enc = json.NewEncoder(b.gz)
b.enc.SetEscapeHTML(false)
b.scopeCount = 0
b.prefixWritten = false
return nil
}

func (s *SymDBUploader) uploadInner(ctx context.Context, compressedData []byte, uploadID uuid.UUID, batchNum int, final bool) error {
// The upload is a multipart containing metadata expected by the event platform
// and the gzipped SymDB data.

var buf bytes.Buffer
writer := multipart.NewWriter(&buf)

compressedData, err := compressSymDBData(symdbData)
if err != nil {
return fmt.Errorf("failed to compress SymDB data: %w", err)
}

fileHeader := make(textproto.MIMEHeader)
fileHeader.Set("Content-Disposition", `form-data; name="file"; filename="file.gz"`)
fileHeader.Set("Content-Type", "application/gzip")
Expand All @@ -187,6 +269,10 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error
return fmt.Errorf("failed to write compressed SymDB data: %w", err)
}

// Add a part containing the JSON that will eventually become the EvP event
// (as opposed to the part above, which will become an EvP attachment). The
// name="event" is recognized by EvP intake:
// https://github.com/DataDog/logs-backend/blob/038d9b05a3904b60b06c32e03db1aebb757ba41d/domains/intake/libs/intake-backend/edge/src/main/java/com/dd/evp/intake_backend/edge/http/multipart/V2MultiPartMessageDecoder.java#L40
eventHeader := make(textproto.MIMEHeader)
eventHeader.Set("Content-Disposition", `form-data; name="event"; filename="event.json"`)
eventHeader.Set("Content-Type", "application/json")
Expand All @@ -196,12 +282,22 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error
return fmt.Errorf("failed to create event part: %w", err)
}

// Some of the fields in here are duplicated inside the envelope in the
// attachment file; that's on purpose: having them in this message allows
// the debugger backend to have enough information for its bookkeeping
// without downloading the attachment. Having the info in the attachment is
// useful in order to make attachments self-contained.
meta := []byte(`{
"ddsource": "dd_debugger",
"service": "` + s.service + `",
"version": "` + s.version + `",
"runtimeId": "` + s.runtimeID + `",
"debugger": {
"type": "symdb"
"type": "symdb",
"upload_id": "` + uploadID.String() + `",
"batch_num": ` + strconv.Itoa(batchNum) + `,
"final": ` + strconv.FormatBool(final) + `,
"attachment_size": ` + strconv.Itoa(len(compressedData)) + `
}
}`)
Comment on lines 290 to 302
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looking at this again, I'd prefer you actually did use the Marshal code -- version and service aren't sanitized anywhere. I don't have any reason to believe anything bad is going happen but it doesn't seem worth it to serialize like this.

if _, err := eventPart.Write(meta); err != nil {
Expand Down Expand Up @@ -234,21 +330,6 @@ func (s *SymDBUploader) uploadInner(ctx context.Context, symdbData []byte) error
return nil
}

func compressSymDBData(data []byte) ([]byte, error) {
var buf bytes.Buffer
gzWriter := gzip.NewWriter(&buf)

if _, err := gzWriter.Write(data); err != nil {
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}

if err := gzWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
}

return buf.Bytes(), nil
}

func cleanString(s string) string {
return strings.ReplaceAll(s, " ", "")
}
Expand Down
Loading
Loading