Skip to content

Commit

Permalink
- Add DeleteExcessLogs(), benchmarks for that, and DeleteExpiredLogs(…
Browse files Browse the repository at this point in the history
…) with limit=1000

- Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks
- Add UNIQUE INDEXes to replace previous primary keys
- Set MaxLogsKept = 1 for contract transmitter
- Add logging of when pruning happens and whether it was fully completed
  • Loading branch information
reductionista committed Sep 4, 2024
1 parent 17ce920 commit 681a257
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 256 deletions.
30 changes: 24 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,14 @@ func (lp *logPoller) backgroundWorkerRun() {
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
}
case <-logPruneTick:
lp.lggr.Infof("Pruning LogPoller logs...")
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
if lp.PruneExpiredLogs(ctx) {
lp.lggr.Infof("Finished pruning LogPoller logs. Next pruning in ~ %s", lp.pollPeriod*2401)
} else {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
lp.lggr.Warnf("Partially finished pruning LogPoller logs. Will resume in ~ %s", lp.pollPeriod*241)
}
}
}
Expand Down Expand Up @@ -1090,10 +1092,26 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it can only
// return false if an actual error is encountered
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (done bool) {
done = true

rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
if err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
done = false
} else if lp.logPrunePageSize != 0 && rowsRemoved < lp.logPrunePageSize {
done = false
}

rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize)
if err != nil {
lp.lggr.Errorw("Unable to prune excess logs", "err", err)
} else if lp.logPrunePageSize != 0 && rowsRemoved < lp.logPrunePageSize {
done = false
}
return done
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64
})
}

func (o *ObservedORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExcessLogs", del, func() (int64, error) {
return o.ORM.DeleteExcessLogs(ctx, limit)
})
}

func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) {
return withObservedQuery(o, "SelectBlockByNumber", func() (*LogPollerBlock, error) {
return o.ORM.SelectBlockByNumber(ctx, n)
Expand Down
Loading

0 comments on commit 681a257

Please sign in to comment.