From 2f0420f8f1d1565fbb6117679decc23e390e11ce Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 4 Sep 2024 09:40:47 -0700 Subject: [PATCH] Split DeleteExcessLogs into FindExcessLogs & DeleteLogsByRowIds Also, add WithMaxLogsKept --- core/chains/evm/logpoller/log_poller.go | 6 +++- core/chains/evm/logpoller/observability.go | 12 ++++++-- core/chains/evm/logpoller/orm.go | 18 ++++++------ core/chains/evm/logpoller/orm_test.go | 28 ++++++++++++++----- .../relay/evm/contract_transmitter.go | 17 +++++++---- core/services/relay/evm/evm.go | 4 +-- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index ad30429c19..b5dec1b775 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -1105,7 +1105,11 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (done bool) { done = false } - rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize) + rowIds, err := lp.orm.FindExcessLogs(ctx, lp.logPrunePageSize) + if err != nil { + return false + } + rowsRemoved, err = lp.orm.DeleteLogsByRowId(ctx, rowIds) if err != nil { lp.lggr.Errorw("Unable to prune excess logs", "err", err) } else if lp.logPrunePageSize != 0 && rowsRemoved < lp.logPrunePageSize { diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 26dbe3b263..a6e205cb4f 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -142,9 +142,15 @@ func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64 }) } -func (o *ObservedORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteExcessLogs", del, func() (int64, error) { - return o.ORM.DeleteExcessLogs(ctx, limit) +func (o *ObservedORM) FindExcessLogs(ctx context.Context, limit int64) ([]uint64, error) { + return withObservedQueryAndResults[uint64](o, "FindExcessLogs", func() ([]uint64, error) { + return o.ORM.FindExcessLogs(ctx, limit) + }) +} + +func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowId(ctx, rowIds) }) } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 26d54ebc5b..8e98136d65 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -37,7 +37,8 @@ type ORM interface { DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) - DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) + FindExcessLogs(ctx context.Context, limit int64) (rowIds []uint64, err error) + DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) @@ -396,10 +397,8 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro return result.RowsAffected() } -// DeleteExcessLogs deletes any logs old enough that MaxLogsKept has been exceeded for every filter they match. -func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) { - var rowIds []uint64 - +// FindExcessLogs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match. +func (o *DSORM) FindExcessLogs(ctx context.Context, limit int64) (rowIds []uint64, err error) { var limitClause string if limit > 0 { // We have to count the logs in descending order first, to know which ones to keep. But then reverse the order @@ -433,11 +432,12 @@ func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error (f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4)) ) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)` + limitClause - err := o.ds.SelectContext(selectCtx, &rowIds, query, ubig.New(o.chainID)) - if err != nil { - return 0, err - } + err = o.ds.SelectContext(selectCtx, &rowIds, query, ubig.New(o.chainID)) + return rowIds, err +} +// DeleteLogsByRowId accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds) if err != nil { return 0, err diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index c66211c3e7..d96f083bf6 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -468,16 +468,22 @@ func TestORM(t *testing.T) { assert.Len(t, logs, 7) // filter0 has permanent retention, but MaxLogsKept=1. So of the 3 logs matching it, 2 of them are - // eligible to be pruned by DeleteExcessLogs. But only one should actually get pruned due to limit=1 - deleted, err = o1.DeleteExcessLogs(ctx, 1) + // eligible to be pruned. But only one should actually get pruned due to limit=1 + rowIds, err := o1.FindExcessLogs(ctx, 1) require.NoError(t, err) - assert.Equal(t, int64(1), deleted) + assert.Len(t, rowIds, 1) + deleted, err = o1.DeleteLogsByRowId(ctx, rowIds) + require.NoError(t, err) + assert.Equal(t, int64(len(rowIds)), deleted) // The second log eligible to be pruned should now be pruned, // but not the third log since it's not eligible. - deleted, err = o1.DeleteExcessLogs(ctx, 0) + rowIds, err = o1.FindExcessLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(1), deleted) + assert.Len(t, rowIds, 1) + deleted, err = o1.DeleteLogsByRowId(ctx, rowIds) + require.NoError(t, err) + assert.Equal(t, int64(len(rowIds)), deleted) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) @@ -2128,10 +2134,18 @@ func Benchmark_LogPruning(b *testing.B) { return o.DeleteExpiredLogs(ctx, 1000) }) runBenchmarking("DeleteExcessLogsNoPaging", func(ctx context.Context) (int64, error) { - return o.DeleteExcessLogs(ctx, 0) + rowIds, err := o.FindExcessLogs(ctx, 0) + if err != nil { + return 0, err + } + return o.DeleteLogsByRowId(ctx, rowIds) }) runBenchmarking("DeleteExcessLogsLimit1000", func(ctx context.Context) (int64, error) { - return o.DeleteExpiredLogs(ctx, 1000) + rowIds, err := o.FindExcessLogs(ctx, 1000) + if err != nil { + return 0, err + } + return o.DeleteLogsByRowId(ctx, rowIds) }) } diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index b995853387..3807fd5312 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -54,6 +54,12 @@ func WithRetention(retention time.Duration) OCRTransmitterOption { } } +func WithMaxLogsKept(maxLogsKept uint64) OCRTransmitterOption { + return func(ct *contractTransmitter) { + ct.maxLogsKept = maxLogsKept + } +} + func WithReportToEthMetadata(reportToEvmTxMeta ReportToEthMetadata) OCRTransmitterOption { return func(ct *contractTransmitter) { if reportToEvmTxMeta != nil { @@ -74,6 +80,7 @@ type contractTransmitter struct { reportToEvmTxMeta ReportToEthMetadata excludeSigs bool retention time.Duration + maxLogsKept uint64 } func transmitterFilterName(addr common.Address) string { @@ -105,7 +112,6 @@ func NewOCRContractTransmitter( lggr: lggr.Named("OCRContractTransmitter"), reportToEvmTxMeta: reportToEvmTxMetaNoop, excludeSigs: false, - retention: 0, } for _, opt := range opts { @@ -113,10 +119,11 @@ func NewOCRContractTransmitter( } err := lp.RegisterFilter(ctx, logpoller.Filter{ - Name: transmitterFilterName(address), - EventSigs: []common.Hash{transmitted.ID}, - Retention: 0, - Addresses: []common.Address{address}, MaxLogsKept: 3}, + Name: transmitterFilterName(address), + EventSigs: []common.Hash{transmitted.ID}, + Retention: 0, + Addresses: []common.Address{address}, + MaxLogsKept: newContractTransmitter.maxLogsKept}, ) if err != nil { return nil, err diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 85cb3486ca..b2c74bb36a 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -440,7 +440,7 @@ func (r *Relayer) NewCCIPCommitProvider(rargs commontypes.RelayArgs, pargs commo subjectID := chainToUUID(configWatcher.chain.ID()) contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{ subjectID: &subjectID, - }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0)) + }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(1)) if err != nil { return nil, err } @@ -517,7 +517,7 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont subjectID := chainToUUID(configWatcher.chain.ID()) contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{ subjectID: &subjectID, - }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0), WithExcludeSignatures()) + }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(1), WithExcludeSignatures()) if err != nil { return nil, err }