Skip to content

Commit

Permalink
Split DeleteExcessLogs into FindExcessLogs & DeleteLogsByRowIds
Browse files Browse the repository at this point in the history
Also, add WithMaxLogsKept
  • Loading branch information
reductionista committed Sep 4, 2024
1 parent 681a257 commit c6dc805
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 27 deletions.
6 changes: 5 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,11 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (done bool) {
done = false
}

rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize)
rowIds, err := lp.orm.FindExcessLogs(ctx, lp.logPrunePageSize)
if err != nil {
return false
}
rowsRemoved, err = lp.orm.DeleteLogsByRowId(ctx, rowIds)
if err != nil {
lp.lggr.Errorw("Unable to prune excess logs", "err", err)
} else if lp.logPrunePageSize != 0 && rowsRemoved < lp.logPrunePageSize {
Expand Down
12 changes: 9 additions & 3 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,15 @@ func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64
})
}

func (o *ObservedORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExcessLogs", del, func() (int64, error) {
return o.ORM.DeleteExcessLogs(ctx, limit)
func (o *ObservedORM) FindExcessLogs(ctx context.Context, limit int64) ([]uint64, error) {
return withObservedQueryAndResults[uint64](o, "FindExcessLogs", func() ([]uint64, error) {
return o.ORM.FindExcessLogs(ctx, limit)
})
}

func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowId(ctx, rowIds)
})
}

Expand Down
19 changes: 10 additions & 9 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type ORM interface {
DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
DeleteExcessLogs(ctx context.Context, limit int64) (int64, error)
FindExcessLogs(ctx context.Context, limit int64) (rowIds []uint64, err error)
DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error)

GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error)
Expand Down Expand Up @@ -396,10 +397,8 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro
return result.RowsAffected()
}

// DeleteExcessLogs deletes any logs old enough that MaxLogsKept has been exceeded for every filter they match.
func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) {
var rowIds []uint64

// FindExcessLogs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match.
func (o *DSORM) FindExcessLogs(ctx context.Context, limit int64) (rowIds []uint64, err error) {
var limitClause string
if limit > 0 {
// We have to count the logs in descending order first, to know which ones to keep. But then reverse the order
Expand Down Expand Up @@ -433,10 +432,12 @@ func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error
(f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4))
) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)` + limitClause

err := o.ds.SelectContext(selectCtx, &rowIds, query, ubig.New(o.chainID))
if err != nil {
return 0, err
}
err = o.ds.SelectContext(selectCtx, &rowIds, query, ubig.New(o.chainID))
return rowIds, err
}

// DeleteLogsByRowId accepts a list of log row id's to delete
func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {

Check failure on line 440 in core/chains/evm/logpoller/orm.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds)
if err != nil {
Expand Down
28 changes: 21 additions & 7 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,22 @@ func TestORM(t *testing.T) {
assert.Len(t, logs, 7)

// filter0 has permanent retention, but MaxLogsKept=1. So of the 3 logs matching it, 2 of them are
// eligible to be pruned by DeleteExcessLogs. But only one should actually get pruned due to limit=1
deleted, err = o1.DeleteExcessLogs(ctx, 1)
// eligible to be pruned. But only one should actually get pruned due to limit=1
rowIds, err := o1.FindExcessLogs(ctx, 1)
require.NoError(t, err)
assert.Equal(t, int64(1), deleted)
assert.Len(t, rowIds, 1)
deleted, err = o1.DeleteLogsByRowId(ctx, rowIds)
require.NoError(t, err)
assert.Equal(t, int64(len(rowIds)), deleted)

// The second log eligible to be pruned should now be pruned,
// but not the third log since it's not eligible.
deleted, err = o1.DeleteExcessLogs(ctx, 0)
rowIds, err = o1.FindExcessLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(1), deleted)
assert.Len(t, rowIds, 1)
deleted, err = o1.DeleteLogsByRowId(ctx, rowIds)
require.NoError(t, err)
assert.Equal(t, int64(len(rowIds)), deleted)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
Expand Down Expand Up @@ -2128,10 +2134,18 @@ func Benchmark_LogPruning(b *testing.B) {
return o.DeleteExpiredLogs(ctx, 1000)
})
runBenchmarking("DeleteExcessLogsNoPaging", func(ctx context.Context) (int64, error) {
return o.DeleteExcessLogs(ctx, 0)
rowIds, err := o.FindExcessLogs(ctx, 0)
if err != nil {
return 0, err
}
return o.DeleteLogsByRowId(ctx, rowIds)
})
runBenchmarking("DeleteExcessLogsLimit1000", func(ctx context.Context) (int64, error) {
return o.DeleteExpiredLogs(ctx, 1000)
rowIds, err := o.FindExcessLogs(ctx, 1000)
if err != nil {
return 0, err
}
return o.DeleteLogsByRowId(ctx, rowIds)
})
}

Expand Down
17 changes: 12 additions & 5 deletions core/services/relay/evm/contract_transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func WithRetention(retention time.Duration) OCRTransmitterOption {
}
}

func WithMaxLogsKept(maxLogsKept uint64) OCRTransmitterOption {
return func(ct *contractTransmitter) {
ct.maxLogsKept = maxLogsKept
}
}

func WithReportToEthMetadata(reportToEvmTxMeta ReportToEthMetadata) OCRTransmitterOption {
return func(ct *contractTransmitter) {
if reportToEvmTxMeta != nil {
Expand All @@ -74,6 +80,7 @@ type contractTransmitter struct {
reportToEvmTxMeta ReportToEthMetadata
excludeSigs bool
retention time.Duration
maxLogsKept uint64
}

func transmitterFilterName(addr common.Address) string {
Expand Down Expand Up @@ -105,18 +112,18 @@ func NewOCRContractTransmitter(
lggr: lggr.Named("OCRContractTransmitter"),
reportToEvmTxMeta: reportToEvmTxMetaNoop,
excludeSigs: false,
retention: 0,
}

for _, opt := range opts {
opt(newContractTransmitter)
}

err := lp.RegisterFilter(ctx, logpoller.Filter{
Name: transmitterFilterName(address),
EventSigs: []common.Hash{transmitted.ID},
Retention: 0,
Addresses: []common.Address{address}, MaxLogsKept: 3},
Name: transmitterFilterName(address),
EventSigs: []common.Hash{transmitted.ID},
Retention: 0,
Addresses: []common.Address{address},
MaxLogsKept: newContractTransmitter.maxLogsKept},
)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (r *Relayer) NewCCIPCommitProvider(rargs commontypes.RelayArgs, pargs commo
subjectID := chainToUUID(configWatcher.chain.ID())
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{
subjectID: &subjectID,
}, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0))
}, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(1))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -517,7 +517,7 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont
subjectID := chainToUUID(configWatcher.chain.ID())
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{
subjectID: &subjectID,
}, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0), WithExcludeSignatures())
}, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(1), WithExcludeSignatures())
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c6dc805

Please sign in to comment.