From ffb8c46f0d95949e2077f4d926f05db0edc113b4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Nov 2024 12:47:43 -0800 Subject: [PATCH 1/4] Pass telemetry directly to the Controller Do not enqueue telemetry to a channel just to dequeue it and send it to the Controller. --- internal/pkg/instrumentation/manager.go | 11 ++--------- internal/pkg/instrumentation/manager_test.go | 11 ++++------- internal/pkg/instrumentation/probe/probe.go | 10 +++++----- 3 files changed, 11 insertions(+), 21 deletions(-) diff --git a/internal/pkg/instrumentation/manager.go b/internal/pkg/instrumentation/manager.go index 5b8783389..5d45f820d 100644 --- a/internal/pkg/instrumentation/manager.go +++ b/internal/pkg/instrumentation/manager.go @@ -13,7 +13,6 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/rlimit" - "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/trace" dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql" @@ -58,7 +57,6 @@ type Manager struct { exe *link.Executable td *process.TargetDetails runningProbesWG sync.WaitGroup - telemetryCh chan ptrace.ScopeSpans currentConfig Config probeMu sync.Mutex state managerState @@ -74,7 +72,6 @@ func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, g globalImpl: globalImpl, loadedIndicator: loadIndicator, cp: cp, - telemetryCh: make(chan ptrace.ScopeSpans), } err := m.registerProbes() @@ -227,7 +224,7 @@ func (m *Manager) runProbe(p probe.Probe) { m.runningProbesWG.Add(1) go func(ap probe.Probe) { defer m.runningProbesWG.Done() - ap.Run(m.telemetryCh) + ap.Run(m.otelController.Trace) }(p) } @@ -290,9 +287,8 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error m.logger.Debug("Shutting down all probes") err := m.cleanup(target) - // Wait for all probes to stop before closing the chan they send on. + // Wait for all probes to stop. m.runningProbesWG.Wait() - close(m.telemetryCh) m.state = managerStateStopped m.probeMu.Unlock() @@ -300,9 +296,6 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error done <- errors.Join(err, ctx.Err()) }() - for e := range m.telemetryCh { - m.otelController.Trace(e) - } return <-done } diff --git a/internal/pkg/instrumentation/manager_test.go b/internal/pkg/instrumentation/manager_test.go index 1caeabe07..7d5ee4674 100644 --- a/internal/pkg/instrumentation/manager_test.go +++ b/internal/pkg/instrumentation/manager_test.go @@ -235,7 +235,6 @@ func TestRunStopping(t *testing.T) { otelController: ctrl, logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: p}, - telemetryCh: make(chan ptrace.ScopeSpans), cp: NewNoopConfigProvider(nil), } @@ -288,7 +287,7 @@ func (p slowProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Conf return nil } -func (p slowProbe) Run(c chan<- ptrace.ScopeSpans) { +func (p slowProbe) Run(func(ptrace.ScopeSpans)) { } func (p slowProbe) Close() error { @@ -308,7 +307,7 @@ func (p *noopProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Con return nil } -func (p *noopProbe) Run(c chan<- ptrace.ScopeSpans) { +func (p *noopProbe) Run(func(ptrace.ScopeSpans)) { p.running = true } @@ -370,7 +369,6 @@ func TestConfigProvider(t *testing.T) { netHTTPServerProbeID: &noopProbe{}, somePackageProducerProbeID: &noopProbe{}, }, - telemetryCh: make(chan ptrace.ScopeSpans), cp: newDummyProvider(Config{ InstrumentationLibraryConfigs: map[LibraryID]Library{ netHTTPClientLibID: {TracesEnabled: &falseVal}, @@ -475,10 +473,10 @@ func (p *hangingProbe) Load(*link.Executable, *process.TargetDetails, *sampling. return nil } -func (p *hangingProbe) Run(c chan<- ptrace.ScopeSpans) { +func (p *hangingProbe) Run(handle func(ptrace.ScopeSpans)) { <-p.closeReturned // Write after Close has returned. - c <- ptrace.NewScopeSpans() + handle(ptrace.NewScopeSpans()) } func (p *hangingProbe) Close() error { @@ -498,7 +496,6 @@ func TestRunStopDeadlock(t *testing.T) { otelController: ctrl, logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: p}, - telemetryCh: make(chan ptrace.ScopeSpans), cp: NewNoopConfigProvider(nil), } diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index b759a202a..fcce6bb7f 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -42,7 +42,7 @@ type Probe interface { Load(*link.Executable, *process.TargetDetails, *sampling.Config) error // Run runs the events processing loop. - Run(tracesChan chan<- ptrace.ScopeSpans) + Run(func(ptrace.ScopeSpans)) // Close stops the Probe. Close() error @@ -267,7 +267,7 @@ type SpanProducer[BPFObj any, BPFEvent any] struct { } // Run runs the events processing loop. -func (i *SpanProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) { +func (i *SpanProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { for { event, err := i.read() if err != nil { @@ -285,7 +285,7 @@ func (i *SpanProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) { i.ProcessFn(event).CopyTo(ss.Spans()) - dest <- ss + handle(ss) } } @@ -296,7 +296,7 @@ type TraceProducer[BPFObj any, BPFEvent any] struct { } // Run runs the events processing loop. -func (i *TraceProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) { +func (i *TraceProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { for { event, err := i.read() if err != nil { @@ -306,7 +306,7 @@ func (i *TraceProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) { continue } - dest <- i.ProcessFn(event) + handle(i.ProcessFn(event)) } } From b484fbecb411a699efbc17331a1fd6b4446a25ad Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Nov 2024 13:10:54 -0800 Subject: [PATCH 2/4] Add the attrs func Syntactic sugar for: var kvs []attribute.KeyValue kvs = appendAttrs(kvs, ptraceAttributeMap) --- internal/pkg/opentelemetry/controller.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/pkg/opentelemetry/controller.go b/internal/pkg/opentelemetry/controller.go index 42c0d9e0e..ef0d62474 100644 --- a/internal/pkg/opentelemetry/controller.go +++ b/internal/pkg/opentelemetry/controller.go @@ -113,6 +113,11 @@ func (c *Controller) Shutdown(ctx context.Context) error { return nil } +func attrs(m pcommon.Map) []attribute.KeyValue { + out := make([]attribute.KeyValue, 0, m.Len()) + return appendAttrs(out, m) +} + func appendAttrs(dest []attribute.KeyValue, m pcommon.Map) []attribute.KeyValue { m.Range(func(k string, v pcommon.Value) bool { dest = append(dest, attr(k, v)) @@ -213,8 +218,7 @@ func appendEventOpts(dest []trace.EventOption, e ptrace.SpanEvent) []trace.Event dest = append(dest, trace.WithTimestamp(ts)) } - var kvs []attribute.KeyValue - kvs = appendAttrs(kvs, e.Attributes()) + kvs := attrs(e.Attributes()) if len(kvs) > 0 { dest = append(dest, trace.WithAttributes(kvs...)) } @@ -244,8 +248,8 @@ func (c *Controller) links(links ptrace.SpanLinkSlice) []trace.Link { TraceFlags: trace.TraceFlags(l.Flags()), TraceState: ts, }), + Attributes: attrs(l.Attributes()), } - out[i].Attributes = appendAttrs(out[i].Attributes, l.Attributes()) } return out } From c12d04c55f6420b36f23ad5ed194cd42785db2e2 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 6 Nov 2024 13:12:07 -0800 Subject: [PATCH 3/4] Add concurrent safety to Controller.Trace Use TracerProvider.Tracer directly instead of caching Tracers in the Controller. The API ensures that TracerProvider.Tracer is concurrent safe and the default SDK already handles the caching of Tracers (no need to duplicate that logic here). --- internal/pkg/opentelemetry/controller.go | 25 +++---- internal/pkg/opentelemetry/controller_test.go | 65 ++++++++++--------- 2 files changed, 44 insertions(+), 46 deletions(-) diff --git a/internal/pkg/opentelemetry/controller.go b/internal/pkg/opentelemetry/controller.go index ef0d62474..e5954cad5 100644 --- a/internal/pkg/opentelemetry/controller.go +++ b/internal/pkg/opentelemetry/controller.go @@ -19,19 +19,6 @@ import ( type Controller struct { logger *slog.Logger tracerProvider trace.TracerProvider - tracersMap map[tracerID]trace.Tracer -} - -type tracerID struct{ name, version, schema string } - -func (c *Controller) getTracer(name, version, schema string) trace.Tracer { - tID := tracerID{name: name, version: version, schema: schema} - t, exists := c.tracersMap[tID] - if !exists { - t = c.tracerProvider.Tracer(name, trace.WithInstrumentationVersion(version), trace.WithSchemaURL(schema)) - c.tracersMap[tID] = t - } - return t } // Trace creates a trace span for event. @@ -43,7 +30,12 @@ func (c *Controller) Trace(ss ptrace.ScopeSpans) { kvs []attribute.KeyValue ) - t := c.getTracer(ss.Scope().Name(), ss.Scope().Version(), ss.SchemaUrl()) + tracer := c.tracerProvider.Tracer( + ss.Scope().Name(), + trace.WithInstrumentationVersion(ss.Scope().Version()), + trace.WithInstrumentationAttributes(attrs(ss.Scope().Attributes())...), + trace.WithSchemaURL(ss.SchemaUrl()), + ) for k := 0; k < ss.Spans().Len(); k++ { pSpan := ss.Spans().At(k) @@ -51,7 +43,7 @@ func (c *Controller) Trace(ss ptrace.ScopeSpans) { c.logger.Debug("dropping invalid span", "name", pSpan.Name()) continue } - c.logger.Debug("handling span", "tracer", t, "span", pSpan) + c.logger.Debug("handling span", "tracer", tracer, "span", pSpan) ctx := context.Background() if !pSpan.ParentSpanID().IsEmpty() { @@ -71,7 +63,7 @@ func (c *Controller) Trace(ss ptrace.ScopeSpans) { trace.WithTimestamp(pSpan.StartTimestamp().AsTime()), trace.WithLinks(c.links(pSpan.Links())...), ) - _, span := t.Start(ctx, pSpan.Name(), startOpts...) + _, span := tracer.Start(ctx, pSpan.Name(), startOpts...) startOpts = startOpts[:0] kvs = kvs[:0] @@ -96,7 +88,6 @@ func NewController(logger *slog.Logger, tracerProvider trace.TracerProvider) (*C return &Controller{ logger: logger, tracerProvider: tracerProvider, - tracersMap: make(map[tracerID]trace.Tracer), }, nil } diff --git a/internal/pkg/opentelemetry/controller_test.go b/internal/pkg/opentelemetry/controller_test.go index 9a3a7bd3c..60566f439 100644 --- a/internal/pkg/opentelemetry/controller_test.go +++ b/internal/pkg/opentelemetry/controller_test.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "strings" + "sync" "sync/atomic" "testing" "time" @@ -314,35 +315,6 @@ func TestTrace(t *testing.T) { } } -func TestGetTracer(t *testing.T) { - exporter := tracetest.NewInMemoryExporter() - tp := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithBatcher(exporter), - sdktrace.WithResource(instResource()), - ) - defer func() { - err := tp.Shutdown(context.Background()) - assert.NoError(t, err) - }() - - ctrl, err := NewController(slog.Default(), tp) - assert.NoError(t, err) - - t1 := ctrl.getTracer("test", "v1", "schema") - assert.Equal(t, t1, ctrl.tracersMap[tracerID{name: "test", version: "v1", schema: "schema"}]) - - t2 := ctrl.getTracer("net/http", "", "") - assert.Equal(t, t2, ctrl.tracersMap[tracerID{name: "net/http", version: "", schema: ""}]) - - t3 := ctrl.getTracer("test", "v1", "schema") - assert.Same(t, t1, t3) - - t4 := ctrl.getTracer("net/http", "", "") - assert.Same(t, t2, t4) - assert.Equal(t, len(ctrl.tracersMap), 2) -} - type shutdownExporter struct { sdktrace.SpanExporter @@ -390,3 +362,38 @@ func TestShutdown(t *testing.T) { assert.True(t, exporter.called, "Exporter not shutdown") assert.Equal(t, uint32(nSpan), exporter.exported.Load(), "Pending spans not flushed") } + +func TestControllerTraceConcurrentSafe(t *testing.T) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(instResource()), + ) + defer func() { + err := tp.Shutdown(context.Background()) + assert.NoError(t, err) + }() + + ctrl, err := NewController(slog.Default(), tp) + assert.NoError(t, err) + + const goroutines = 10 + + var wg sync.WaitGroup + for n := 0; n < goroutines; n++ { + wg.Add(1) + go func() { + defer wg.Done() + + data := ptrace.NewScopeSpans() + data.Scope().SetName(fmt.Sprintf("tracer-%d", n%(goroutines/2))) + data.Scope().SetVersion("v1") + data.SetSchemaUrl("url") + data.Spans().AppendEmpty().SetName("test") + ctrl.Trace(data) + }() + } + + wg.Wait() +} From f02332fb8d192863a63e10b1962caf37a8bf4df6 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 7 Nov 2024 07:38:19 -0800 Subject: [PATCH 4/4] Document the Trace method is concurrent safe --- internal/pkg/opentelemetry/controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/opentelemetry/controller.go b/internal/pkg/opentelemetry/controller.go index e5954cad5..253b9a979 100644 --- a/internal/pkg/opentelemetry/controller.go +++ b/internal/pkg/opentelemetry/controller.go @@ -22,6 +22,8 @@ type Controller struct { } // Trace creates a trace span for event. +// +// This method is safe to call concurrently. func (c *Controller) Trace(ss ptrace.ScopeSpans) { var ( startOpts []trace.SpanStartOption