diff --git a/pkg/logs/sender/grpc/batch_strategy.go b/pkg/logs/sender/grpc/batch_strategy.go index f39520e92ee..d10596527e5 100644 --- a/pkg/logs/sender/grpc/batch_strategy.go +++ b/pkg/logs/sender/grpc/batch_strategy.go @@ -32,7 +32,10 @@ const enableDeltaEncoding = true // StatefulExtra holds state changes (non-Log datums) from a batch // Used by inflight tracker to maintain snapshot state for stream rotation type StatefulExtra struct { - StateChanges []*statefulpb.Datum + StateChanges []*statefulpb.Datum + // WireDatums are the datums in the encoded payload. Inflight uses them to + // find state references and rebuild replay batches with lazy snapshot state. + WireDatums []*statefulpb.Datum PreCompressionBytes int } @@ -377,6 +380,7 @@ func (s *batchStrategy) sendMessagesWithDatums(messagesMetadata []*message.Messa Encoding: s.compression.ContentEncoding(), UnencodedSize: unencodedSize, StatefulExtra: &StatefulExtra{ + WireDatums: wireDatums, PreCompressionBytes: preCompressionBytes, }, } diff --git a/pkg/logs/sender/grpc/inflight.go b/pkg/logs/sender/grpc/inflight.go index 535c2153250..c568fdc724b 100644 --- a/pkg/logs/sender/grpc/inflight.go +++ b/pkg/logs/sender/grpc/inflight.go @@ -10,6 +10,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb" + "github.com/DataDog/datadog-agent/pkg/util/compression" ) // inflightTracker is a bounded FIFO queue that tracks payloads in two regions: @@ -32,13 +33,14 @@ import ( type inflightTracker struct { workerID string items []*message.Payload - head int // Index of the oldest sent item (awaiting ack) - sentTail int // Index of the first buffered item that's not yet sent - tail int // Index of the next available slot for new buffered items - cap int // Maximum total capacity of the tracker - headBatchID uint32 // BatchID of the oldest sent payload (at head) - batchIDCounter uint32 // Next batchID to be assigned when markSent is called - snapshot *snapshotState // Accumulated state for new streams + head int // Index of the oldest sent item (awaiting ack) + sentTail int // Index of the first buffered item that's not yet sent + tail int // Index of the next available slot for new buffered items + cap int // Maximum total capacity of the tracker + headBatchID uint32 // BatchID of the oldest sent payload (at head) + batchIDCounter uint32 // Next batchID to be assigned when markSent is called + snapshot *snapshotState // Accumulated state for new streams + streamSent stateReferences // State definitions already sent on the current stream } // newInflightTracker creates a new bounded inflight tracker with the given capacity @@ -49,6 +51,10 @@ func newInflightTracker(workerID string, capacity int) *inflightTracker { items: make([]*message.Payload, capacity+1), cap: capacity, snapshot: newSnapshotState(), + streamSent: stateReferences{ + dictEntryIDs: make(map[uint64]struct{}), + patternIDs: make(map[uint64]struct{}), + }, } } @@ -126,6 +132,8 @@ func (t *inflightTracker) markSent() bool { return false } + t.markStateSent(t.items[t.sentTail]) + // If this is the first sent item, set headBatchID if t.head == t.sentTail { t.headBatchID = t.batchIDCounter @@ -145,6 +153,33 @@ func (t *inflightTracker) nextToSend() *message.Payload { return t.items[t.sentTail] } +func (t *inflightTracker) nextToSendEncoded(compressor compression.Compressor) ([]byte, error) { + payload := t.nextToSend() + if payload == nil { + return nil, nil + } + + extra, ok := payload.StatefulExtra.(*StatefulExtra) + if !ok || extra == nil || len(extra.WireDatums) == 0 { + return payload.Encoded, nil + } + + prefix := t.missingSnapshotDefines(extra.WireDatums) + if len(prefix) == 0 { + return payload.Encoded, nil + } + + datums := make([]*statefulpb.Datum, 0, len(prefix)+len(extra.WireDatums)) + datums = append(datums, prefix...) + datums = append(datums, extra.WireDatums...) + + serialized, err := proto.Marshal(&statefulpb.DatumSequence{Data: datums}) + if err != nil { + return nil, err + } + return compressor.Compress(serialized) +} + // sentCount returns the number of sent payloads awaiting ack func (t *inflightTracker) sentCount() int { return (t.sentTail - t.head + len(t.items)) % len(t.items) @@ -165,12 +200,26 @@ func (t *inflightTracker) resetOnRotation() { // Make the first batchID be 1, 0 is reserved for the snapshot state t.headBatchID = 1 t.batchIDCounter = 1 + t.streamSent = newStateReferences() } // getSnapshot returns the current snapshot state for stream bootstrapping // Returns serialized bytes (marshaled DatumSequence) or nil if empty func (t *inflightTracker) getSnapshot() []byte { - return t.snapshot.serialize() + refs, ok := t.inflightReferences() + if !ok { + serialized, sent := t.snapshot.serialize(nil) + t.streamSent = sent + return serialized + } + + serialized, sent := t.snapshot.serialize(&refs) + t.streamSent = sent + return serialized +} + +func (t *inflightTracker) resetStreamSent() { + t.streamSent = newStateReferences() } // snapshotState maintains the accumulated state changes for stream bootstrapping @@ -180,6 +229,61 @@ type snapshotState struct { patternMap map[uint64]*statefulpb.PatternDefine } +type stateReferences struct { + dictEntryIDs map[uint64]struct{} + patternIDs map[uint64]struct{} +} + +func newStateReferences() stateReferences { + return stateReferences{ + dictEntryIDs: make(map[uint64]struct{}), + patternIDs: make(map[uint64]struct{}), + } +} + +func (r stateReferences) clone() stateReferences { + clone := newStateReferences() + for id := range r.dictEntryIDs { + clone.dictEntryIDs[id] = struct{}{} + } + for id := range r.patternIDs { + clone.patternIDs[id] = struct{}{} + } + return clone +} + +func (r stateReferences) hasDictEntry(id uint64) bool { + _, ok := r.dictEntryIDs[id] + return ok +} + +func (r stateReferences) hasPattern(id uint64) bool { + _, ok := r.patternIDs[id] + return ok +} + +func (r stateReferences) addDictEntry(id uint64) { + if id == 0 { + return + } + r.dictEntryIDs[id] = struct{}{} +} + +func (r stateReferences) addPattern(id uint64) { + if id == 0 { + return + } + r.patternIDs[id] = struct{}{} +} + +func (r stateReferences) deleteDictEntry(id uint64) { + delete(r.dictEntryIDs, id) +} + +func (r stateReferences) deletePattern(id uint64) { + delete(r.patternIDs, id) +} + // newSnapshotState creates a new empty snapshot state func newSnapshotState() *snapshotState { return &snapshotState{ @@ -208,28 +312,34 @@ func (s *snapshotState) apply(extra *StatefulExtra) { } } -// serialize returns the current snapshot state as serialized bytes -// Returns the marshaled DatumSequence containing all pattern and dictionary definitions +// serialize returns the current snapshot state as serialized bytes. +// If refs is non-nil, only definitions referenced by queued inflight data are included. // Used to send snapshot on new stream creation -func (s *snapshotState) serialize() []byte { - // Calculate total datums needed - totalSize := len(s.patternMap) + len(s.dictMap) - - if totalSize == 0 { - return nil - } +func (s *snapshotState) serialize(refs *stateReferences) ([]byte, stateReferences) { + sent := newStateReferences() + datums := make([]*statefulpb.Datum, 0, len(s.patternMap)+len(s.dictMap)) - datums := make([]*statefulpb.Datum, 0, totalSize) - - for _, pattern := range s.patternMap { + for id, pattern := range s.patternMap { + if refs != nil && !refs.hasPattern(id) { + continue + } datums = append(datums, &statefulpb.Datum{ Data: &statefulpb.Datum_PatternDefine{PatternDefine: pattern}, }) + sent.addPattern(id) } - for _, entry := range s.dictMap { + for id, entry := range s.dictMap { + if refs != nil && !refs.hasDictEntry(id) { + continue + } datums = append(datums, &statefulpb.Datum{ Data: &statefulpb.Datum_DictEntryDefine{DictEntryDefine: entry}, }) + sent.addDictEntry(id) + } + + if len(datums) == 0 { + return nil, sent } datumSeq := &statefulpb.DatumSequence{ @@ -237,5 +347,202 @@ func (s *snapshotState) serialize() []byte { } serialized, _ := proto.Marshal(datumSeq) - return serialized + return serialized, sent +} + +func (t *inflightTracker) inflightReferences() (stateReferences, bool) { + refs := newStateReferences() + for count, index := 0, t.head; count < t.totalCount(); count++ { + payload := t.items[index] + extra, ok := payload.StatefulExtra.(*StatefulExtra) + if !ok || extra == nil || extra.WireDatums == nil { + return stateReferences{}, false + } + addDatumReferences(refs, extra.WireDatums) + index = (index + 1) % len(t.items) + } + return refs, true +} + +func (t *inflightTracker) missingSnapshotDefines(datums []*statefulpb.Datum) []*statefulpb.Datum { + known := t.streamSent.clone() + missing := newStateReferences() + + for _, datum := range datums { + switch d := datum.Data.(type) { + case *statefulpb.Datum_PatternDefine: + known.addPattern(d.PatternDefine.PatternId) + case *statefulpb.Datum_PatternDelete: + known.deletePattern(d.PatternDelete.PatternId) + case *statefulpb.Datum_DictEntryDefine: + known.addDictEntry(d.DictEntryDefine.Id) + case *statefulpb.Datum_DictEntryDelete: + known.deleteDictEntry(d.DictEntryDelete.Id) + case *statefulpb.Datum_Logs: + t.addMissingLogReferences(missing, known, d.Logs) + case *statefulpb.Datum_DeltaEncodingSync: + t.addMissingDeltaEncodingSyncReferences(missing, known, d.DeltaEncodingSync) + } + } + + prefix := make([]*statefulpb.Datum, 0, len(missing.patternIDs)+len(missing.dictEntryIDs)) + for id := range missing.patternIDs { + pattern := t.snapshot.patternMap[id] + if pattern == nil { + continue + } + prefix = append(prefix, &statefulpb.Datum{ + Data: &statefulpb.Datum_PatternDefine{PatternDefine: pattern}, + }) + } + for id := range missing.dictEntryIDs { + entry := t.snapshot.dictMap[id] + if entry == nil { + continue + } + prefix = append(prefix, &statefulpb.Datum{ + Data: &statefulpb.Datum_DictEntryDefine{DictEntryDefine: entry}, + }) + } + return prefix +} + +func (t *inflightTracker) addMissingLogReferences(missing stateReferences, known stateReferences, log *statefulpb.Log) { + if log == nil { + return + } + t.addMissingDynamicValueReference(missing, known, log.Status) + t.addMissingDynamicValueReference(missing, known, log.Service) + if log.Tags != nil { + t.addMissingDynamicValueReference(missing, known, log.Tags.Tagset) + } + if structured := log.GetStructured(); structured != nil { + t.addMissingPatternReference(missing, known, structured.PatternId) + t.addMissingDynamicValueReferences(missing, known, structured.DynamicValues) + t.addMissingDynamicValueReference(missing, known, structured.JsonMessageKey) + t.addMissingDictEntryReference(missing, known, structured.JsonContextSchemaId) + t.addMissingDynamicValueReferences(missing, known, structured.JsonContextValues) + } +} + +func (t *inflightTracker) addMissingDeltaEncodingSyncReferences(missing stateReferences, known stateReferences, sync *statefulpb.DeltaEncodingSync) { + if sync == nil { + return + } + t.addMissingPatternReference(missing, known, sync.PatternId) + if sync.Tags != nil { + t.addMissingDynamicValueReference(missing, known, sync.Tags.Tagset) + } +} + +func (t *inflightTracker) addMissingDynamicValueReferences(missing stateReferences, known stateReferences, values []*statefulpb.DynamicValue) { + for _, value := range values { + t.addMissingDynamicValueReference(missing, known, value) + } +} + +func (t *inflightTracker) addMissingDynamicValueReference(missing stateReferences, known stateReferences, value *statefulpb.DynamicValue) { + if value == nil { + return + } + t.addMissingDictEntryReference(missing, known, value.GetDictIndex()) +} + +func (t *inflightTracker) addMissingDictEntryReference(missing stateReferences, known stateReferences, id uint64) { + if id == 0 || known.hasDictEntry(id) || t.snapshot.dictMap[id] == nil { + return + } + missing.addDictEntry(id) + known.addDictEntry(id) +} + +func (t *inflightTracker) addMissingPatternReference(missing stateReferences, known stateReferences, id uint64) { + if id == 0 || known.hasPattern(id) || t.snapshot.patternMap[id] == nil { + return + } + missing.addPattern(id) + known.addPattern(id) +} + +func (t *inflightTracker) markStateSent(payload *message.Payload) { + extra, ok := payload.StatefulExtra.(*StatefulExtra) + if !ok || extra == nil || len(extra.WireDatums) == 0 { + return + } + for _, datum := range t.missingSnapshotDefines(extra.WireDatums) { + switch d := datum.Data.(type) { + case *statefulpb.Datum_PatternDefine: + t.streamSent.addPattern(d.PatternDefine.PatternId) + case *statefulpb.Datum_DictEntryDefine: + t.streamSent.addDictEntry(d.DictEntryDefine.Id) + } + } + t.streamSent.applyStateChanges(extra.WireDatums) +} + +func (r stateReferences) applyStateChanges(datums []*statefulpb.Datum) { + for _, datum := range datums { + switch d := datum.Data.(type) { + case *statefulpb.Datum_PatternDefine: + r.addPattern(d.PatternDefine.PatternId) + case *statefulpb.Datum_PatternDelete: + r.deletePattern(d.PatternDelete.PatternId) + case *statefulpb.Datum_DictEntryDefine: + r.addDictEntry(d.DictEntryDefine.Id) + case *statefulpb.Datum_DictEntryDelete: + r.deleteDictEntry(d.DictEntryDelete.Id) + } + } +} + +func addDatumReferences(refs stateReferences, datums []*statefulpb.Datum) { + for _, datum := range datums { + switch d := datum.Data.(type) { + case *statefulpb.Datum_Logs: + addLogReferences(refs, d.Logs) + case *statefulpb.Datum_DeltaEncodingSync: + addDeltaEncodingSyncReferences(refs, d.DeltaEncodingSync) + } + } +} + +func addLogReferences(refs stateReferences, log *statefulpb.Log) { + if log == nil { + return + } + addDynamicValueReference(refs, log.Status) + addDynamicValueReference(refs, log.Service) + if log.Tags != nil { + addDynamicValueReference(refs, log.Tags.Tagset) + } + if structured := log.GetStructured(); structured != nil { + refs.addPattern(structured.PatternId) + addDynamicValueReferences(refs, structured.DynamicValues) + addDynamicValueReference(refs, structured.JsonMessageKey) + refs.addDictEntry(structured.JsonContextSchemaId) + addDynamicValueReferences(refs, structured.JsonContextValues) + } +} + +func addDeltaEncodingSyncReferences(refs stateReferences, sync *statefulpb.DeltaEncodingSync) { + if sync == nil { + return + } + refs.addPattern(sync.PatternId) + if sync.Tags != nil { + addDynamicValueReference(refs, sync.Tags.Tagset) + } +} + +func addDynamicValueReferences(refs stateReferences, values []*statefulpb.DynamicValue) { + for _, value := range values { + addDynamicValueReference(refs, value) + } +} + +func addDynamicValueReference(refs stateReferences, value *statefulpb.DynamicValue) { + if value == nil { + return + } + refs.addDictEntry(value.GetDictIndex()) } diff --git a/pkg/logs/sender/grpc/inflight_test.go b/pkg/logs/sender/grpc/inflight_test.go index b24cb1bdc6d..166d18a600f 100644 --- a/pkg/logs/sender/grpc/inflight_test.go +++ b/pkg/logs/sender/grpc/inflight_test.go @@ -9,8 +9,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb" + noopimpl "github.com/DataDog/datadog-agent/pkg/util/compression/impl-noop" ) // Helper function to create test payloads @@ -480,3 +484,170 @@ func TestInflightTrackerBatchIDAfterRotation(t *testing.T) { assert.Equal(t, uint32(1), tracker.getHeadBatchID()) assert.Equal(t, uint32(3), tracker.nextBatchID()) } + +func TestInflightTrackerSnapshotOnlyIncludesReferencedState(t *testing.T) { + tracker := newInflightTracker("test", 5) + tracker.snapshot.apply(&StatefulExtra{ + StateChanges: []*statefulpb.Datum{ + createInflightPatternDefine(1, "pattern1"), + createInflightPatternDefine(2, "pattern2"), + createInflightDictEntryDefine(10, "value10"), + createInflightDictEntryDefine(20, "value20"), + }, + }) + require.True(t, tracker.append(createInflightPayloadWithWireDatums( + createInflightLogDatum(2, 20), + ))) + + snapshot := tracker.getSnapshot() + require.NotNil(t, snapshot) + + datumSeq := decodeInflightDatumSequence(t, snapshot) + require.Len(t, datumSeq.Data, 2) + assert.Equal(t, map[uint64]string{2: "pattern2"}, collectInflightPatterns(datumSeq.Data)) + assert.Equal(t, map[uint64]string{20: "value20"}, collectInflightDictEntries(datumSeq.Data)) + assert.True(t, tracker.streamSent.hasPattern(2)) + assert.False(t, tracker.streamSent.hasPattern(1)) + assert.True(t, tracker.streamSent.hasDictEntry(20)) + assert.False(t, tracker.streamSent.hasDictEntry(10)) +} + +func TestInflightTrackerNextToSendPrependsLazySnapshotState(t *testing.T) { + tracker := newInflightTracker("test", 5) + tracker.snapshot.apply(&StatefulExtra{ + StateChanges: []*statefulpb.Datum{ + createInflightPatternDefine(1, "pattern1"), + createInflightPatternDefine(2, "pattern2"), + createInflightDictEntryDefine(10, "value10"), + createInflightDictEntryDefine(20, "value20"), + }, + }) + tracker.streamSent = newStateReferences() + tracker.streamSent.addPattern(1) + tracker.streamSent.addDictEntry(10) + + require.True(t, tracker.append(createInflightPayloadWithWireDatums( + createInflightLogDatum(2, 20), + ))) + + encoded, err := tracker.nextToSendEncoded(noopimpl.New()) + require.NoError(t, err) + + datumSeq := decodeInflightDatumSequence(t, encoded) + require.Len(t, datumSeq.Data, 3) + assert.Equal(t, map[uint64]string{2: "pattern2"}, collectInflightPatterns(datumSeq.Data[:2])) + assert.Equal(t, map[uint64]string{20: "value20"}, collectInflightDictEntries(datumSeq.Data[:2])) + require.NotNil(t, datumSeq.Data[2].GetLogs()) + + require.True(t, tracker.markSent()) + assert.True(t, tracker.streamSent.hasPattern(2)) + assert.True(t, tracker.streamSent.hasDictEntry(20)) +} + +func TestInflightTrackerNextToSendDoesNotPrependStateDefinedInSamePayload(t *testing.T) { + tracker := newInflightTracker("test", 5) + tracker.snapshot.apply(&StatefulExtra{ + StateChanges: []*statefulpb.Datum{ + createInflightPatternDefine(3, "old-pattern3"), + createInflightDictEntryDefine(30, "old-value30"), + }, + }) + tracker.streamSent = newStateReferences() + + require.True(t, tracker.append(createInflightPayloadWithWireDatums( + createInflightPatternDefine(3, "new-pattern3"), + createInflightDictEntryDefine(30, "new-value30"), + createInflightLogDatum(3, 30), + ))) + + encoded, err := tracker.nextToSendEncoded(noopimpl.New()) + require.NoError(t, err) + + datumSeq := decodeInflightDatumSequence(t, encoded) + require.Len(t, datumSeq.Data, 3) + require.Equal(t, "new-pattern3", datumSeq.Data[0].GetPatternDefine().Template) + require.Equal(t, "new-value30", datumSeq.Data[1].GetDictEntryDefine().Value) + require.NotNil(t, datumSeq.Data[2].GetLogs()) +} + +func createInflightPatternDefine(id uint64, template string) *statefulpb.Datum { + return &statefulpb.Datum{ + Data: &statefulpb.Datum_PatternDefine{ + PatternDefine: &statefulpb.PatternDefine{ + PatternId: id, + Template: template, + }, + }, + } +} + +func createInflightDictEntryDefine(id uint64, value string) *statefulpb.Datum { + return &statefulpb.Datum{ + Data: &statefulpb.Datum_DictEntryDefine{ + DictEntryDefine: &statefulpb.DictEntryDefine{ + Id: id, + Value: value, + }, + }, + } +} + +func createInflightLogDatum(patternID uint64, dictID uint64) *statefulpb.Datum { + return &statefulpb.Datum{ + Data: &statefulpb.Datum_Logs{ + Logs: &statefulpb.Log{ + Content: &statefulpb.Log_Structured{ + Structured: &statefulpb.StructuredLog{ + PatternId: patternID, + DynamicValues: []*statefulpb.DynamicValue{ + { + Value: &statefulpb.DynamicValue_DictIndex{ + DictIndex: dictID, + }, + }, + }, + }, + }, + }, + }, + } +} + +func createInflightPayloadWithWireDatums(datums ...*statefulpb.Datum) *message.Payload { + serialized, _ := proto.Marshal(&statefulpb.DatumSequence{Data: datums}) + return &message.Payload{ + Encoded: serialized, + StatefulExtra: &StatefulExtra{ + WireDatums: datums, + }, + } +} + +func decodeInflightDatumSequence(t *testing.T, data []byte) statefulpb.DatumSequence { + t.Helper() + var datumSeq statefulpb.DatumSequence + require.NoError(t, proto.Unmarshal(data, &datumSeq)) + return datumSeq +} + +func collectInflightPatterns(datums []*statefulpb.Datum) map[uint64]string { + patterns := make(map[uint64]string) + for _, datum := range datums { + pattern := datum.GetPatternDefine() + if pattern != nil { + patterns[pattern.PatternId] = pattern.Template + } + } + return patterns +} + +func collectInflightDictEntries(datums []*statefulpb.Datum) map[uint64]string { + entries := make(map[uint64]string) + for _, datum := range datums { + entry := datum.GetDictEntryDefine() + if entry != nil { + entries[entry.Id] = entry.Value + } + } + return entries +} diff --git a/pkg/logs/sender/grpc/stream_worker.go b/pkg/logs/sender/grpc/stream_worker.go index d5877fc0d79..e6935eb7ca6 100644 --- a/pkg/logs/sender/grpc/stream_worker.go +++ b/pkg/logs/sender/grpc/stream_worker.go @@ -551,6 +551,7 @@ func (s *streamWorker) finishStreamRotation(streamInfo *streamInfo) { compressed, err := s.compression.Compress(serialized) if err != nil { log.Errorf("Worker %s: Failed to compress snapshot: %v", s.workerID, err) + s.inflight.resetStreamSent() } else { // Send compressed snapshot to sender goroutine via channel // This call won't block because it's buffered channel's first write @@ -766,7 +767,13 @@ func (s *streamWorker) handleIrrecoverableError(reason string, streamInfo *strea // getNextBatch crafts a StatefulBatch with the next batch to send from the // inflight tracker. It doesn't change inflight tracker state func (s *streamWorker) getNextBatch() *statefulpb.StatefulBatch { - return createBatch(s.inflight.nextToSend().Encoded, s.inflight.nextBatchID()) + payload := s.inflight.nextToSend() + data, err := s.inflight.nextToSendEncoded(s.compression) + if err != nil { + log.Errorf("Worker %s: failed to prepend lazy snapshot state: %v", s.workerID, err) + data = payload.Encoded + } + return createBatch(data, s.inflight.nextBatchID()) } // createBatch creates a StatefulBatch from serialized data and batch ID diff --git a/tmp24l5mja4.Dockerfile b/tmp24l5mja4.Dockerfile new file mode 100644 index 00000000000..25693c21a74 --- /dev/null +++ b/tmp24l5mja4.Dockerfile @@ -0,0 +1,59 @@ +FROM ubuntu:latest AS src + +COPY . /usr/src/datadog-agent + +RUN find /usr/src/datadog-agent -type f \! -name \*.go -print0 | xargs -0 rm +RUN find /usr/src/datadog-agent -type d -empty -print0 | xargs -0 rmdir + +FROM ubuntu:latest AS bin + +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get clean && apt-get -o Acquire::Retries=4 update && apt-get install -y patchelf + +COPY bin/agent/agent /opt/datadog-agent/bin/agent/agent +COPY bin/agent/dist/conf.d /etc/datadog-agent/conf.d +COPY dev/lib/libdatadog-agent-rtloader.so.0.1.0 /opt/datadog-agent/embedded/lib/libdatadog-agent-rtloader.so.0.1.0 +COPY dev/lib/libdatadog-agent-three.so /opt/datadog-agent/embedded/lib/libdatadog-agent-three.so + +COPY dev/lib/libpatterns.so /opt/datadog-agent/embedded/lib/libpatterns.so + +RUN patchelf --set-rpath /opt/datadog-agent/embedded/lib /opt/datadog-agent/bin/agent/agent +RUN patchelf --set-rpath /opt/datadog-agent/embedded/lib /opt/datadog-agent/embedded/lib/libdatadog-agent-rtloader.so.0.1.0 +RUN patchelf --set-rpath /opt/datadog-agent/embedded/lib /opt/datadog-agent/embedded/lib/libdatadog-agent-three.so +RUN patchelf --set-rpath /opt/datadog-agent/embedded/lib /opt/datadog-agent/embedded/lib/libpatterns.so + + +FROM golang:latest AS dlv + +RUN go install github.com/go-delve/delve/cmd/dlv@latest + +FROM gcr.io/datadoghq/agent:7.78.2 AS bash_completion + +RUN apt-get clean && apt-get -o Acquire::Retries=4 update && apt-get install -y gawk + +RUN awk -i inplace '!/^#/ {uncomment=0} uncomment {gsub(/^#/, "")} /# enable bash completion/ {uncomment=1} {print}' /etc/bash.bashrc + +FROM gcr.io/datadoghq/agent:7.78.2 + +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get clean && apt-get -o Acquire::Retries=4 update && apt-get install -y bash-completion less vim tshark && apt-get clean + +ENV DELVE_PAGER=less + +COPY --from=dlv /go/bin/dlv /usr/local/bin/dlv +COPY --from=bash_completion /etc/bash.bashrc /etc/bash.bashrc +COPY --from=src /usr/src/datadog-agent /root/repos/datadog-agent +COPY --from=bin /opt/datadog-agent/bin/agent/agent /opt/datadog-agent/bin/agent/agent +COPY --from=bin /opt/datadog-agent/embedded/lib/libdatadog-agent-rtloader.so.0.1.0 /opt/datadog-agent/embedded/lib/libdatadog-agent-rtloader.so.0.1.0 +COPY --from=bin /opt/datadog-agent/embedded/lib/libdatadog-agent-three.so /opt/datadog-agent/embedded/lib/libdatadog-agent-three.so +COPY --from=bin /opt/datadog-agent/embedded/lib/libpatterns.so /opt/datadog-agent/embedded/lib/libpatterns.so +COPY --from=bin /etc/datadog-agent/conf.d /etc/datadog-agent/conf.d + + +RUN agent completion bash > /usr/share/bash-completion/completions/agent +RUN process-agent completion bash > /usr/share/bash-completion/completions/process-agent +RUN security-agent completion bash > /usr/share/bash-completion/completions/security-agent +RUN system-probe completion bash > /usr/share/bash-completion/completions/system-probe +RUN trace-agent completion bash > /usr/share/bash-completion/completions/trace-agent + +ENV DD_SSLKEYLOGFILE=/tmp/sslkeylog.txt