Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/logs/patterns/eviction/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "eviction",
srcs = [
"eviction.go",
"eviction_manager.go",
"score.go",
"types.go",
],
importpath = "github.com/DataDog/datadog-agent/pkg/logs/patterns/eviction",
visibility = ["//visibility:public"],
)

go_test(
name = "eviction_test",
srcs = [
"eviction_manager_test.go",
"eviction_test.go",
"score_test.go",
],
embed = [":eviction"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
161 changes: 161 additions & 0 deletions pkg/logs/patterns/eviction/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package eviction provides shared eviction scoring algorithms for patterns and tags.
package eviction

import (
"container/heap"
"math"
"time"
)

// EvictLowestScoring evicts up to numToEvict items with the lowest eviction scores.
// Uses quickselect (partial sort) for O(N) average-case selection of the K lowest items,
// avoiding the O(N + K log N) cost of a full heap build + extraction.
func EvictLowestScoring(collection EvictableCollection, numToEvict int, decayFactor float64, gracePeriod time.Duration) (evicted []Evictable) {
if numToEvict <= 0 {
return nil
}

now := time.Now()

allItems := collection.CollectEvictables()
n := len(allItems)
if n == 0 {
return nil
}

scored := make([]heapItem, n)
for i, item := range allItems {
score := CalculateScore(
item.GetFrequency(),
item.GetCreatedAt(),
item.GetLastAccessAt(),
now,
decayFactor,
)
// Grace period: newly created items are ineligible for eviction
if gracePeriod > 0 && now.Sub(item.GetCreatedAt()) < gracePeriod {
score = math.MaxFloat64
}
scored[i] = heapItem{
item: item,
score: score,
}
}

k := numToEvict
if k > n {
k = n
}

// Quickselect partitions so scored[0:k] contains the k lowest-scoring items (unordered).
if k < n {
quickselectLowest(scored, k)
}

evicted = make([]Evictable, k)
for i := 0; i < k; i++ {
collection.RemoveEvictable(scored[i].item)
evicted[i] = scored[i].item
}

return evicted
}

// quickselectLowest rearranges items so that the k items with the smallest scores
// end up at indices [0, k). Iterative with median-of-three pivot selection.
func quickselectLowest(items []heapItem, k int) {
lo, hi := 0, len(items)-1
target := k - 1
for lo < hi {
pivotIdx := medianOfThreePivot(items, lo, hi)
pivotIdx = partitionItems(items, lo, hi, pivotIdx)
if pivotIdx == target {
return
} else if pivotIdx < target {
lo = pivotIdx + 1
} else {
hi = pivotIdx - 1
}
}
}

func medianOfThreePivot(items []heapItem, lo, hi int) int {
mid := lo + (hi-lo)/2
if items[lo].score > items[mid].score {
items[lo], items[mid] = items[mid], items[lo]
}
if items[lo].score > items[hi].score {
items[lo], items[hi] = items[hi], items[lo]
}
if items[mid].score > items[hi].score {
items[mid], items[hi] = items[hi], items[mid]
}
return mid
}

func partitionItems(items []heapItem, lo, hi, pivotIdx int) int {
pivot := items[pivotIdx].score
items[pivotIdx], items[hi] = items[hi], items[pivotIdx]
storeIdx := lo
for i := lo; i < hi; i++ {
if items[i].score < pivot {
items[i], items[storeIdx] = items[storeIdx], items[i]
storeIdx++
}
}
items[storeIdx], items[hi] = items[hi], items[storeIdx]
return storeIdx
}

// EvictToMemoryTarget evicts items until the target memory is freed.
// It uses actual item sizes rather than averages for precision.
func EvictToMemoryTarget(collection EvictableCollection, targetBytesToFree int64, decayFactor float64, gracePeriod time.Duration) (evicted []Evictable) {
if targetBytesToFree <= 0 {
return nil
}

now := time.Now()

// Build heap of all items sorted by eviction score
h := &evictionHeap{
items: make([]heapItem, 0),
}

for _, item := range collection.CollectEvictables() {
score := CalculateScore(
item.GetFrequency(),
item.GetCreatedAt(),
item.GetLastAccessAt(),
now,
decayFactor,
)
// Grace period: newly created items are ineligible for eviction
if gracePeriod > 0 && now.Sub(item.GetCreatedAt()) < gracePeriod {
score = math.MaxFloat64
}
h.items = append(h.items, heapItem{
item: item,
score: score,
})
}

heap.Init(h)

// Evict items until we've freed enough memory
evicted = make([]Evictable, 0)
bytesFreed := int64(0)

for h.Len() > 0 && bytesFreed < targetBytesToFree {
item := heap.Pop(h).(heapItem)
collection.RemoveEvictable(item.item)
bytesFreed += item.item.EstimatedBytes()
evicted = append(evicted, item.item)
}

return evicted
}
71 changes: 71 additions & 0 deletions pkg/logs/patterns/eviction/eviction_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package eviction provides shared eviction scoring algorithms for patterns and tags.
package eviction

import "time"

// Policy is the eviction policy type
type Policy int

const (
// PolicyLFUDecay uses LFU with power-law age decay
PolicyLFUDecay Policy = iota
)

// Strategy indicates which eviction method to use
type Strategy int

const (
// StrategyNone indicates no eviction is needed
StrategyNone Strategy = iota
// StrategyByCount evicts a specific number of items
StrategyByCount
// StrategyByBytes evicts items until enough memory is freed
StrategyByBytes
)

// Manager handles eviction using dual watermark system.
type Manager struct {
MaxItemCount int
MaxMemoryBytes int64
EvictionHighWatermark float64 // Trigger eviction at this threshold
EvictionLowWatermark float64 // Evict back to this target
AgeDecayFactor float64
GracePeriod time.Duration // Newly created items are ineligible for eviction during this period
}

// ShouldEvict checks if eviction should be triggered based on high watermark thresholds.
func (m *Manager) ShouldEvict(itemCount int, estimatedBytes int64) (bool, bool) {
countOverLimit := float64(itemCount) > float64(m.MaxItemCount)*m.EvictionHighWatermark
bytesOverLimit := float64(estimatedBytes) > float64(m.MaxMemoryBytes)*m.EvictionHighWatermark
return countOverLimit, bytesOverLimit
}

// EvictionTargets calculates how much to evict based on which threshold was exceeded.
func (m *Manager) EvictionTargets(itemCount int, estimatedBytes int64, countOverLimit, bytesOverLimit bool) (itemsToEvict int, bytesToFree int64, strategy Strategy) {
switch {
case bytesOverLimit:
// Memory eviction
targetBytes := m.applyWatermark(m.MaxMemoryBytes)
bytesToFree := estimatedBytes - targetBytes
return 0, bytesToFree, StrategyByBytes

case countOverLimit:
// Count-based eviction
targetCount := int(m.applyWatermark(int64(m.MaxItemCount)))
itemsToEvict := max(itemCount-targetCount, 1)
return itemsToEvict, 0, StrategyByCount

default:
return 0, 0, StrategyNone
}
}

// applyWatermark applies the low watermark to a limit value
func (m *Manager) applyWatermark(limit int64) int64 {
return int64(float64(limit) * m.EvictionLowWatermark)
}
Loading
Loading