diff --git a/pkg/collectconfig/executor/consumer_log_sub_analysis.go b/pkg/collectconfig/executor/consumer_log_sub_analysis.go index 50234df..b6bee04 100644 --- a/pkg/collectconfig/executor/consumer_log_sub_analysis.go +++ b/pkg/collectconfig/executor/consumer_log_sub_analysis.go @@ -140,11 +140,11 @@ func (c *logAnalysisSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, func (c *logAnalysisSubConsumer) Emit(expectedTs int64) bool { var state *logAnalysisSubConsumerState c.parent.timeline.Update(func(timeline *storage.Timeline) { - shard := c.parent.timeline.GetShard(expectedTs, true) + shard := c.parent.timeline.GetShard(expectedTs) if shard == nil { return } - shard.Frozen = true + defer shard.Freeze() if shard.Data == nil { return } diff --git a/pkg/collectconfig/executor/consumer_log_sub_stat.go b/pkg/collectconfig/executor/consumer_log_sub_stat.go index dddbe2f..313bacb 100644 --- a/pkg/collectconfig/executor/consumer_log_sub_stat.go +++ b/pkg/collectconfig/executor/consumer_log_sub_stat.go @@ -151,14 +151,14 @@ func (c *logStatSubConsumer) Emit(expectedTs int64) bool { var datum []*model.DetailData c.parent.timeline.Update(func(timeline *storage.Timeline) { - shard := timeline.GetShard(expectedTs, true) + shard := timeline.GetShard(expectedTs) if shard == nil { logger.Infoz("[consumer] [log] emit nil", // zap.String("key", c.parent.key), // zap.Time("ts", time.UnixMilli(expectedTs))) // return } - shard.Frozen = true + defer shard.Freeze() points := shard.InternalGetAllPoints() for _, v := range points { tags := make(map[string]string, len(v.Keys)) diff --git a/pkg/collectconfig/executor/storage/storage.go b/pkg/collectconfig/executor/storage/storage.go index 04c3940..e2c110c 100644 --- a/pkg/collectconfig/executor/storage/storage.go +++ b/pkg/collectconfig/executor/storage/storage.go @@ -158,7 +158,7 @@ func (t *Timeline) Unlock() { } func (t *Timeline) GetOrCreateShard(ts int64) *Shard { - shard := t.GetShard(ts, false) + shard := t.GetShard(ts) if shard == nil { shard = t.CreateShard(ts) } @@ -169,7 +169,7 @@ func (t *Timeline) InternalGetShard() []*Shard { return t.shards } -func (t *Timeline) GetShard(ts int64, clear bool) *Shard { +func (t *Timeline) GetShard(ts int64) *Shard { if t.shards == nil { return nil } @@ -182,12 +182,6 @@ func (t *Timeline) GetShard(ts int64, clear bool) *Shard { if s.no != no { return nil } - // release memory quickly - if clear { - s.points = nil - s.Data = nil - s.Data2 = nil - } return s } @@ -235,3 +229,10 @@ func (s *Shard) InternalGetAllPoints() map[string]*Point { func (s *Shard) PointCount() int { return len(s.points) } + +func (s *Shard) Freeze() { + s.Frozen = true + s.points = nil + s.Data = nil + s.Data2 = nil +}