Skip to content

Commit

Permalink
fix: fix storage clear
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo committed Jan 23, 2024
1 parent 794c214 commit 8ebf721
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/collectconfig/executor/consumer_log_sub_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func (c *logAnalysisSubConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext,
func (c *logAnalysisSubConsumer) Emit(expectedTs int64) bool {
var state *logAnalysisSubConsumerState
c.parent.timeline.Update(func(timeline *storage.Timeline) {
shard := c.parent.timeline.GetShard(expectedTs, true)
shard := c.parent.timeline.GetShard(expectedTs)
if shard == nil {
return
}
shard.Frozen = true
defer shard.Freeze()
if shard.Data == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/collectconfig/executor/consumer_log_sub_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ func (c *logStatSubConsumer) Emit(expectedTs int64) bool {

var datum []*model.DetailData
c.parent.timeline.Update(func(timeline *storage.Timeline) {
shard := timeline.GetShard(expectedTs, true)
shard := timeline.GetShard(expectedTs)
if shard == nil {
logger.Infoz("[consumer] [log] emit nil", //
zap.String("key", c.parent.key), //
zap.Time("ts", time.UnixMilli(expectedTs))) //
return
}
shard.Frozen = true
defer shard.Freeze()
points := shard.InternalGetAllPoints()
for _, v := range points {
tags := make(map[string]string, len(v.Keys))
Expand Down
17 changes: 9 additions & 8 deletions pkg/collectconfig/executor/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *Timeline) Unlock() {
}

func (t *Timeline) GetOrCreateShard(ts int64) *Shard {
shard := t.GetShard(ts, false)
shard := t.GetShard(ts)
if shard == nil {
shard = t.CreateShard(ts)
}
Expand All @@ -169,7 +169,7 @@ func (t *Timeline) InternalGetShard() []*Shard {
return t.shards
}

func (t *Timeline) GetShard(ts int64, clear bool) *Shard {
func (t *Timeline) GetShard(ts int64) *Shard {
if t.shards == nil {
return nil
}
Expand All @@ -182,12 +182,6 @@ func (t *Timeline) GetShard(ts int64, clear bool) *Shard {
if s.no != no {
return nil
}
// release memory quickly
if clear {
s.points = nil
s.Data = nil
s.Data2 = nil
}
return s
}

Expand Down Expand Up @@ -235,3 +229,10 @@ func (s *Shard) InternalGetAllPoints() map[string]*Point {
func (s *Shard) PointCount() int {
return len(s.points)
}

func (s *Shard) Freeze() {
s.Frozen = true
s.points = nil
s.Data = nil
s.Data2 = nil
}

0 comments on commit 8ebf721

Please sign in to comment.