Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 18 additions & 30 deletions pkg/dyninst/module/symdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newSymdbManager(
opts ...option,
) *symdbManager {
cfg := symdbManagerConfig{
maxBufferFuncs: 10000,
flushThresholdBytes: uploader.DefaultFlushThresholdBytes,
}
for _, opt := range opts {
opt(&cfg)
Expand Down Expand Up @@ -154,8 +154,8 @@ func newSymdbManager(
}

type symdbManagerConfig struct {
maxBufferFuncs int
testingKnobs struct {
flushThresholdBytes int
testingKnobs struct {
onDeferUpload func()
onUploadRejectedByPersistentCache func()
onUploadQueued func(queuedUploadInfo)
Expand All @@ -168,9 +168,9 @@ type symdbManagerConfig struct {

type option func(config *symdbManagerConfig)

func withMaxBufferFuncs(maxBufferFuncs int) option {
func withFlushThresholdBytes(flushThresholdBytes int) option {
return func(c *symdbManagerConfig) {
c.maxBufferFuncs = maxBufferFuncs
c.flushThresholdBytes = flushThresholdBytes
}
}

Expand Down Expand Up @@ -619,40 +619,27 @@ func (m *symdbManager) performUpload(
procID.pid, executablePath, err)
}

sender := uploader.NewSymDBUploader(
enc := uploader.NewBatchEncoder(
m.uploadURL.String(),
procID.service, procID.version, runtimeID,
uuid.New(),
)
uploadBuffer := make([]uploader.Scope, 0, 100)
bufferFuncs := 0
uploadID := uuid.New()
batchNum := 0
var totalPackages, totalFuncs int
// Flush every so often in order to not store too many scopes in memory.
// Flush whenever the compressed payload reaches the threshold, or on the
// final package, to avoid keeping too much in memory at once.
maybeFlush := func(final bool) error {
if ctx.Err() != nil {
return context.Cause(ctx)
}

if len(uploadBuffer) == 0 {
if !final && enc.Size() < m.cfg.flushThresholdBytes {
return nil
}
if final || bufferFuncs >= m.cfg.maxBufferFuncs {
log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions. Final chunk: %t", len(uploadBuffer), bufferFuncs, final)
batchNum++
err := sender.UploadBatch(ctx,
uploader.UploadInfo{
UploadID: uploadID,
BatchNum: batchNum,
Final: final,
},
uploadBuffer,
)
if err != nil {
log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final)
if err := enc.Flush(ctx, final); err != nil {
if errors.Is(err, uploader.ErrUpload) {
return uploadError{cause: err}
}
uploadBuffer = uploadBuffer[:0]
bufferFuncs = 0
return err
}
return nil
}
Expand All @@ -667,10 +654,11 @@ func (m *symdbManager) performUpload(
}

scope := uploader.ConvertPackageToScope(pkg.Package, version.AgentVersion)
uploadBuffer = append(uploadBuffer, scope)
if err := enc.AddScope(scope); err != nil {
return fmt.Errorf("failed to encode scope for process %v: %w", procID.pid, err)
}
totalPackages++
totalFuncs += pkg.Stats().NumFunctions
bufferFuncs += pkg.Stats().NumFunctions
if err := maybeFlush(pkg.Final); err != nil {
return err
}
Expand All @@ -680,7 +668,7 @@ func (m *symdbManager) performUpload(
"(service: %s, version: %s, executable: %s):"+
" %d packages, %d functions, %d chunks in %v",
procID.pid, procID.service, procID.version, executablePath,
totalPackages, totalFuncs, batchNum, time.Since(startTime))
totalPackages, totalFuncs, enc.BatchCount(), time.Since(startTime))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/dyninst/module/symdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func testSymdbManagerCancellation(t *testing.T, useStop bool) {
symdbURL,
object.NewInMemoryLoader(),
"", /* cacheDir - no cache */
// Use a small buffer (which will force a flush after every package)
// in order to have an opportunity to cancel the uploads in between
// flushes.
withMaxBufferFuncs(1),
// Use a flush threshold of 0 (which will force a flush after every
// package) in order to have an opportunity to cancel the uploads in
// between flushes.
withFlushThresholdBytes(0),
)
t.Cleanup(manager.stop)
// Create a dummy runtime ID
Expand Down
54 changes: 19 additions & 35 deletions pkg/dyninst/symdb/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,21 @@ func run() (retErr error) {

opt := symdb.ExtractOptions{Scope: scope}

var up *uploader.SymDBUploader
// Headers to attach to every HTTP request. When the system-probe does the
// uploading, it sends the data through the local trace-agent, which deals
// with setting these headers.
headers := [][2]string{
{"DD-EVP-ORIGIN", "symdb-cli"},
{"DD-EVP-ORIGIN-VERSION", "0.1"},
{"DD-API-KEY", *uploadAPIKey},
}
var enc *uploader.BatchEncoder
if *upload {
up = uploader.NewSymDBUploader(
// Headers to attach to every HTTP request. When the system-probe does
// the uploading, it sends the data through the local trace-agent,
// which deals with setting these headers.
headers := [][2]string{
{"DD-EVP-ORIGIN", "symdb-cli"},
{"DD-EVP-ORIGIN-VERSION", "0.1"},
{"DD-API-KEY", *uploadAPIKey},
}
enc = uploader.NewBatchEncoder(
uploadURLParsed.String(),
*uploadService, *uploadVersion,
fmt.Sprintf("manual-upload-%d", rand.Intn(1000)),
uuid.New(),
headers...,
)
}
Expand All @@ -229,31 +230,13 @@ func run() (retErr error) {
return err
}

uploadBuffer := make([]uploader.Scope, 0, 100)
bufferFuncs := 0
// Flush every so ofter in order to not store too many scopes in memory.
const maxBufferFuncs = 10000
uploadID := uuid.New()
batchNum := 0
maybeFlush := func(final bool) error {
if len(uploadBuffer) == 0 {
if !final && enc.Size() < uploader.DefaultFlushThresholdBytes {
return nil
}
if final || bufferFuncs >= maxBufferFuncs {
log.Tracef("SymDB: uploading symbols chunk: %d packages, %d functions", len(uploadBuffer), bufferFuncs)
err := up.UploadBatch(
context.Background(),
uploader.UploadInfo{
UploadID: uploadID,
BatchNum: batchNum,
Final: final,
},
uploadBuffer)
if err != nil {
return fmt.Errorf("upload failed: %w", err)
}
uploadBuffer = uploadBuffer[:0]
bufferFuncs = 0
log.Tracef("SymDB: uploading symbols chunk. Final chunk: %t", final)
if err := enc.Flush(context.Background(), final); err != nil {
return fmt.Errorf("upload failed: %w", err)
}
return nil
}
Expand All @@ -264,10 +247,11 @@ func run() (retErr error) {
return err
}

if up != nil {
if enc != nil {
scope := uploader.ConvertPackageToScope(pkg.Package, "cli" /* agentVersion */)
uploadBuffer = append(uploadBuffer, scope)
bufferFuncs += pkg.Stats().NumFunctions
if err := enc.AddScope(scope); err != nil {
return fmt.Errorf("failed to encode scope: %w", err)
}
if err := maybeFlush(pkg.Final); err != nil {
return err
}
Expand Down
Loading
Loading