Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogPoller prune test #1351

Closed
wants to merge 9 commits into from
26 changes: 20 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,18 @@
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
lp.lggr.Infof("Pruning LogPoller logs...")
logPruneTick = time.After(utils.WithJitter(5 * time.Minute)) // = 7^5 avoids common factors with 1000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring poll period only to test

allRemoved, err := lp.PruneExpiredLogs(ctx)
if allRemoved {
lp.lggr.Infof("Finished pruning LogPoller logs. Next pruning in ~ %s", lp.pollPeriod*2401)
} else {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
lp.lggr.Warnf("Partially finished pruning LogPoller logs. Will resume in ~ %s", lp.pollPeriod*241)
}
if err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
}
}
}
Expand Down Expand Up @@ -1092,8 +1098,16 @@
// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
rowsRemoved, err := lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize)

Check failure on line 1101 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to rowsRemoved (ineffassign)
if err != nil {
return false, err
}

rowsRemoved, err = lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
if err != nil || rowsRemoved < lp.logPrunePageSize {
return true, err
}
return lp.logPrunePageSize == 0 || err != nil || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64
})
}

func (o *ObservedORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExcessLogs", del, func() (int64, error) {
return o.ORM.DeleteExcessLogs(ctx, limit)
})
}

func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) {
return withObservedQuery(o, "SelectBlockByNumber", func() (*LogPollerBlock, error) {
return o.ORM.SelectBlockByNumber(ctx, n)
Expand Down
Loading
Loading