diff --git a/.changeset/brown-geese-boil.md b/.changeset/brown-geese-boil.md new file mode 100644 index 0000000000..fa7f65f733 --- /dev/null +++ b/.changeset/brown-geese-boil.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +FilteredLogs receive Expression instead of whole KeyFilter. #internal diff --git a/.changeset/sweet-pumas-refuse.md b/.changeset/sweet-pumas-refuse.md new file mode 100644 index 0000000000..fd642a9c94 --- /dev/null +++ b/.changeset/sweet-pumas-refuse.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability diff --git a/.tool-versions b/.tool-versions index 077946cbee..cab9e5edc2 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,4 @@ -golang 1.22.5 +golang 1.22.7 mockery 2.43.2 nodejs 20.13.1 pnpm 9.4.0 @@ -8,4 +8,4 @@ zig 0.11.0 golangci-lint 1.59.1 protoc 25.1 python 3.10.5 -task 3.35.1 \ No newline at end of file +task 3.35.1 diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index c0882ff76c..a04b4fdb19 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, return nil, ErrDisabled } -func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) { +func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) { return nil, ErrDisabled } diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dee5d1d1a5..44de497562 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math/big" + "math/rand" "sort" "strings" "sync" @@ -24,8 +25,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -68,7 +69,7 @@ type LogPoller interface { LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // chainlink-common query filtering - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type LogPollerTest interface { @@ -131,7 +132,8 @@ type logPoller struct { // Usually the only way to recover is to manually remove the offending logs and block from the database. // LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should // recover automatically without needing to restart the LogPoller. - finalityViolated *atomic.Bool + finalityViolated *atomic.Bool + countBasedLogPruningActive *atomic.Bool } type Opts struct { @@ -157,24 +159,25 @@ type Opts struct { // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller { return &logPoller{ - stopCh: make(chan struct{}), - ec: ec, - orm: orm, - headTracker: headTracker, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - replayStart: make(chan int64), - replayComplete: make(chan error), - pollPeriod: opts.PollPeriod, - backupPollerBlockDelay: opts.BackupPollerBlockDelay, - finalityDepth: opts.FinalityDepth, - useFinalityTag: opts.UseFinalityTag, - backfillBatchSize: opts.BackfillBatchSize, - rpcBatchSize: opts.RpcBatchSize, - keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, - logPrunePageSize: opts.LogPrunePageSize, - filters: make(map[string]Filter), - filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. - finalityViolated: new(atomic.Bool), + stopCh: make(chan struct{}), + ec: ec, + orm: orm, + headTracker: headTracker, + lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + replayStart: make(chan int64), + replayComplete: make(chan error), + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, + filters: make(map[string]Filter), + filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. + finalityViolated: new(atomic.Bool), + countBasedLogPruningActive: new(atomic.Bool), } } @@ -212,6 +215,12 @@ func (filter *Filter) Contains(other *Filter) bool { if other == nil { return true } + if other.Retention != filter.Retention { + return false + } + if other.MaxLogsKept != filter.MaxLogsKept { + return false + } addresses := make(map[common.Address]interface{}) for _, addr := range filter.Addresses { addresses[addr] = struct{}{} @@ -277,7 +286,7 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { lp.lggr.Warnw("Filter already present, no-op", "name", filter.Name, "filter", filter) return nil } - lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter) + lp.lggr.Warnw("Updating existing filter", "name", filter.Name, "filter", filter) } if err := lp.orm.InsertFilter(ctx, filter); err != nil { @@ -285,6 +294,9 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { } lp.filters[filter.Name] = filter lp.filterDirty = true + if filter.MaxLogsKept > 0 { + lp.countBasedLogPruningActive.Store(true) + } return nil } @@ -540,18 +552,47 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } +// loadFilters loads the filters from db, and activates count-based Log Pruning +// if required by any of the filters func (lp *logPoller) loadFilters(ctx context.Context) error { + filters, err := lp.lockAndLoadFilters(ctx) + if err != nil { + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + } + if lp.countBasedLogPruningActive.Load() { + return nil + } + for _, filter := range filters { + if filter.MaxLogsKept != 0 { + lp.countBasedLogPruningActive.Store(true) + return nil + } + } + return nil +} + +// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock +func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) { lp.filterMu.Lock() defer lp.filterMu.Unlock() - filters, err := lp.orm.LoadFilters(ctx) + filters, err = lp.orm.LoadFilters(ctx) if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + return filters, err } lp.filters = filters lp.filterDirty = true - return nil + return filters, nil +} + +// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period +func tickStaggeredDelay(minDelay time.Duration, period time.Duration) <-chan time.Time { + return time.After(minDelay + timeutil.JitterPct(1.0).Apply(period/2)) +} + +func tickWithDefaultJitter(interval time.Duration) <-chan time.Time { + return time.After(services.DefaultJitter.Apply(interval)) } func (lp *logPoller) run() { @@ -638,31 +679,62 @@ func (lp *logPoller) backgroundWorkerRun() { ctx, cancel := lp.stopCh.NewCtx() defer cancel() + blockPruneShortInterval := lp.pollPeriod * 100 + blockPruneInterval := blockPruneShortInterval * 10 + logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100 + logPruneInterval := logPruneShortInterval * 10 + // Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs. // Usually, node after restart will have some work to boot the plugins and other services. - // Deferring first prune by minutes reduces risk of putting too much pressure on the database. - blockPruneTick := time.After(5 * time.Minute) - logPruneTick := time.After(10 * time.Minute) + // Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database. + blockPruneTick := tickStaggeredDelay(5*time.Minute, blockPruneInterval) + logPruneTick := tickStaggeredDelay(5*time.Minute, logPruneInterval) + + // Start initial prune of unmatched logs after 5-15 successful expired log prunes, so that not all chains start + // around the same time. After that, every 20 successful expired log prunes. + successfulExpiredLogPrunes := 1 + rand.Intn(4) for { select { case <-ctx.Done(): return case <-blockPruneTick: - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000)) + lp.lggr.Infow("pruning old blocks") + blockPruneTick = tickWithDefaultJitter(blockPruneInterval) if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil { - lp.lggr.Errorw("Unable to prune old blocks", "err", err) + lp.lggr.Errorw("unable to prune old blocks", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new blocks - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100)) + blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval) + lp.lggr.Warnw("reached page limit while pruning old blocks") + } else { + lp.lggr.Debugw("finished pruning old blocks") } case <-logPruneTick: - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000 + logPruneTick = tickWithDefaultJitter(logPruneInterval) + lp.lggr.Infof("pruning expired logs") if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil { - lp.lggr.Errorw("Unable to prune expired logs", "err", err) + lp.lggr.Errorw("unable to prune expired logs", "err", err) } else if !allRemoved { + lp.lggr.Warnw("reached page limit while pruning expired logs") // Tick faster when cleanup can't keep up with the pace of new logs - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241)) + logPruneTick = tickWithDefaultJitter(logPruneShortInterval) + } else if successfulExpiredLogPrunes >= 4 { + // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times + // since the last time unmatched logs were pruned + lp.lggr.Infof("finished pruning expired logs: pruning unmatched logs") + if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { + lp.lggr.Errorw("unable to prune unmatched logs", "err", err) + } else if !allRemoved { + lp.lggr.Warnw("reached page limit while pruning unmatched logs") + logPruneTick = tickWithDefaultJitter(logPruneShortInterval) + } else { + lp.lggr.Debugw("finished pruning unmatched logs") + successfulExpiredLogPrunes = 0 + } + } else { + lp.lggr.Debugw("finished pruning expired logs") + successfulExpiredLogPrunes++ } } } @@ -1065,7 +1137,8 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. -// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true. +// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, then it +// will always return true unless there is an actual error. func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { latestBlock, err := lp.orm.SelectLatestBlock(ctx) if err != nil { @@ -1089,10 +1162,46 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } -// PruneExpiredLogs logs that are older than their retention period defined in Filter. -// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true. +// PruneExpiredLogs will attempt to remove any logs which have passed their retention period. Returns whether all expired +// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { + done := true + rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize) + if err != nil { + lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err) + return false, err + } else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize { + done = false + } + + if !lp.countBasedLogPruningActive.Load() { + return done, err + } + + rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize) + if err != nil { + lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err) + return false, err + } + rowsRemoved, err = lp.orm.DeleteLogsByRowID(ctx, rowIDs) + if err != nil { + lp.lggr.Errorw("Unable to prune excess logs", "err", err) + } else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize { + done = false + } + return done, err +} + +// PruneUnmatchedLogs will attempt to remove any logs which no longer match a registered filter. Returns whether all unmatched +// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered +func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { + ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize) + if err != nil { + return false, err + } + rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids) + return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } @@ -1518,6 +1627,25 @@ func EvmWord(i uint64) common.Hash { return common.BytesToHash(b) } -func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName) } + +// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter. +// If no expressions are provided, or an error occurs, an empty slice is returned. +func Where(expressions ...query.Expression) ([]query.Expression, error) { + filter, err := query.Where( + "", + expressions..., + ) + + if err != nil { + return []query.Expression{}, err + } + + if filter.Expressions == nil { + return []query.Expression{}, nil + } + + return filter.Expressions, nil +} diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 548711c19b..6a34e899df 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks" @@ -2052,3 +2054,39 @@ func TestFindLCA(t *testing.T) { }) } } + +func TestWhere(t *testing.T) { + address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") + eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234") + ts := time.Now() + + expr1 := logpoller.NewAddressFilter(address) + expr2 := logpoller.NewEventSigFilter(eventSig) + expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte) + expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0)) + + t.Run("Valid combination of filters", func(t *testing.T) { + result, err := logpoller.Where(expr1, expr2, expr3, expr4) + assert.NoError(t, err) + assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result) + }) + + t.Run("No expressions (should return empty slice)", func(t *testing.T) { + result, err := logpoller.Where() + assert.NoError(t, err) + assert.Equal(t, []query.Expression{}, result) + }) + + t.Run("Invalid boolean expression", func(t *testing.T) { + invalidExpr := query.Expression{ + BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{}, + }, + } + + result, err := logpoller.Where(invalidExpr) + assert.Error(t, err) + assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions") + assert.Equal(t, []query.Expression{}, result) + }) +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 4ce68839d1..9ae4d9767c 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -124,7 +124,7 @@ func (_c *LogPoller_DeleteLogsAndBlocksAfter_Call) RunAndReturn(run func(context } // FilteredLogs provides a mock function with given fields: ctx, filter, limitAndSort, queryName -func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { +func (_m *LogPoller) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { ret := _m.Called(ctx, filter, limitAndSort, queryName) if len(ret) == 0 { @@ -133,10 +133,10 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l var r0 []logpoller.Log var r1 error - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { return rf(ctx, filter, limitAndSort, queryName) } - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) []logpoller.Log); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) []logpoller.Log); ok { r0 = rf(ctx, filter, limitAndSort, queryName) } else { if ret.Get(0) != nil { @@ -144,7 +144,7 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l } } - if rf, ok := ret.Get(1).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, []query.Expression, query.LimitAndSort, string) error); ok { r1 = rf(ctx, filter, limitAndSort, queryName) } else { r1 = ret.Error(1) @@ -160,16 +160,16 @@ type LogPoller_FilteredLogs_Call struct { // FilteredLogs is a helper method to define mock.On call // - ctx context.Context -// - filter query.KeyFilter +// - filter []query.Expression // - limitAndSort query.LimitAndSort // - queryName string func (_e *LogPoller_Expecter) FilteredLogs(ctx interface{}, filter interface{}, limitAndSort interface{}, queryName interface{}) *LogPoller_FilteredLogs_Call { return &LogPoller_FilteredLogs_Call{Call: _e.mock.On("FilteredLogs", ctx, filter, limitAndSort, queryName)} } -func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(query.KeyFilter), args[2].(query.LimitAndSort), args[3].(string)) + run(args[0].(context.Context), args[1].([]query.Expression), args[2].(query.LimitAndSort), args[3].(string)) }) return _c } @@ -179,7 +179,7 @@ func (_c *LogPoller_FilteredLogs_Call) Return(_a0 []logpoller.Log, _a1 error) *L return _c } -func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { _c.Call.Return(run) return _c } diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 782307e7d0..776fe5bf21 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -142,6 +142,24 @@ func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64 }) } +func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { + return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) { + return o.ORM.SelectUnmatchedLogIDs(ctx, limit) + }) +} + +func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error) { + return withObservedQueryAndResults[uint64](o, "SelectExcessLogIDs", func() ([]uint64, error) { + return o.ORM.SelectExcessLogIDs(ctx, limit) + }) +} + +func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowID(ctx, rowIDs) + }) +} + func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) { return withObservedQuery(o, "SelectBlockByNumber", func() (*LogPollerBlock, error) { return o.ORM.SelectBlockByNumber(ctx, n) @@ -262,7 +280,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c }) } -func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return withObservedQueryAndResults(o, queryName, func() ([]Log, error) { return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName) }) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 5e668a4ad1..f96c207eca 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -119,6 +119,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) + rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) + require.NoError(t, err) + require.Equal(t, int64(0), rowsAffected) + assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) + + rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) + require.NoError(t, err) + require.Equal(t, int64(2), rowsAffected) + assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete")) + // Don't update counters in case of an error require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0))) assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 9cbb21a606..4d7cf33ebe 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -3,6 +3,7 @@ package logpoller import ( "context" "database/sql" + "errors" "fmt" "math/big" "strings" @@ -15,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -32,10 +34,13 @@ type ORM interface { LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error + DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error + SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) + SelectExcessLogIDs(ctx context.Context, limit int64) (rowIDs []uint64, err error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) @@ -63,7 +68,7 @@ type ORM interface { SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // FilteredLogs accepts chainlink-common filtering DSL. - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type DSORM struct { @@ -184,9 +189,52 @@ func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { return filters, err } +func blocksQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, strings.Join(blocksFields[:], ", "), clause) +} +func logsQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, strings.Join(logsFields[:], ", "), clause) +} + +func logsQueryWithTablePrefix(tableAlias string, clause string) string { + var s strings.Builder + for i, field := range logsFields { + if i > 0 { + s.WriteString(", ") + } + s.WriteString(fmt.Sprintf("%s.%s", tableAlias, field)) + } + return fmt.Sprintf(`SELECT %s FROM evm.logs AS %s %s`, s.String(), tableAlias, clause) +} + +func withConfs(query string, tableAlias string, confs evmtypes.Confirmations) string { + var lastConfirmedBlock string + + var tablePrefix string + if tableAlias != "" { + tablePrefix = tableAlias + "." + } + if confs == evmtypes.Finalized { + lastConfirmedBlock = `finalized_block_number` + } else { + lastConfirmedBlock = `block_number - :confs` + } + return fmt.Sprintf(`%s %sblock_number <= ( + SELECT %s + FROM evm.log_poller_blocks + WHERE evm_chain_id = :evm_chain_id + ORDER BY block_number DESC LIMIT 1)`, query, tablePrefix, lastConfirmedBlock) +} + +func logsQueryWithConfs(clause string, confs evmtypes.Confirmations) string { + return withConfs(logsQuery(clause), "", confs) +} + func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_hash = $1 AND evm_chain_id = $2`, hash.Bytes(), ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_hash = $1 AND evm_chain_id = $2`), + hash.Bytes(), ubig.New(o.chainID)); err != nil { return nil, err } return &b, nil @@ -194,7 +242,9 @@ func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPo func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_number = $1 AND evm_chain_id = $2`, n, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2`), n, ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -202,7 +252,9 @@ func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlo func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -210,7 +262,10 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`), + ubig.New(o.chainID), minAllowedBlockNumber, + ); err != nil { return nil, err } return &b, nil @@ -223,15 +278,11 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id AND event_sig = :event_sig - AND address = :address - AND block_number <= %s - ORDER BY block_number desc, log_index DESC - LIMIT 1 - `, nestedBlockNumberQuery(confs)) + AND address = :address AND `, confs) + + `ORDER BY block_number desc, log_index DESC LIMIT 1` var l Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -244,31 +295,84 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig return &l, nil } +type RangeQueryer[T comparable] struct { + chainID *ubig.Big + ds sqlutil.DataSource + query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error) + acc []T +} + +func NewRangeQueryer[T comparable](evmChainID *big.Int, ds sqlutil.DataSource, query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)) *RangeQueryer[T] { + return &RangeQueryer[T]{ + chainID: ubig.New(evmChainID), + ds: ds, + query: query, + } +} + +// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number +// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks. +// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once +// the limit on results is reached or block_number = end. The query will never be executed on blocks where block_number > end, and +// it will never be executed on block_number = B unless it has also been executed on all blocks with block_number < B +// r.AddResults(moreResults []T) should be called if this is a query returning results (ie, SELECT). These will be accumulated in +// r.acc and can be retrieved later with r.AllResults() +func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) (rowsAffected int64, err error) { + if limit == 0 { + return r.query(ctx, r, 0, end) + } + + var start int64 + err = r.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks + WHERE evm_chain_id = $1`, r.chainID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err + } + + // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion + var upper int64 + for lower := start; rowsAffected < limit; lower = upper + 1 { + upper = lower + limit - 1 + if upper > end { + upper = end + } + + rows, err2 := r.query(ctx, r, lower, upper) + if err2 != nil { + return rowsAffected, err2 + } + rowsAffected += rows + + if upper >= end { + break + } + } + return rowsAffected, nil +} + +func (r *RangeQueryer[T]) AddResults(moreResults []T) { + r.acc = append(r.acc, moreResults...) +} + +func (r *RangeQueryer[T]) AllResults() []T { + return r.acc +} + // DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. // Otherwise, it will delete all blocks at once. func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - if limit > 0 { - result, err := o.ds.ExecContext(ctx, - `DELETE FROM evm.log_poller_blocks - WHERE block_number IN ( - SELECT block_number FROM evm.log_poller_blocks - WHERE block_number <= $1 - AND evm_chain_id = $2 - LIMIT $3 - ) - AND evm_chain_id = $2`, - end, ubig.New(o.chainID), limit) + q := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + result, err := r.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`, + r.chainID, lower, upper) if err != nil { return 0, err } return result.RowsAffected() - } - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks - WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) - if err != nil { - return 0, err - } - return result.RowsAffected() + }) + return q.ExecPagedQuery(ctx, limit, end) } func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { @@ -282,8 +386,8 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // Latency without upper bound filter can be orders of magnitude higher for large number of logs. _, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 - AND block_number >= $2 - AND block_number <= (SELECT MAX(block_number) + AND block_number >= $2 + AND block_number <= (SELECT MAX(block_number) FROM evm.log_poller_blocks WHERE evm_chain_id = $1)`, ubig.New(o.chainID), start) @@ -313,36 +417,105 @@ type Exp struct { ShouldDelete bool } -func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { - var err error - var result sql.Result - if limit > 0 { - result, err = o.ds.ExecContext(ctx, ` - DELETE FROM evm.logs - WHERE (evm_chain_id, address, event_sig, block_number) IN ( - SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number - FROM evm.logs l - INNER JOIN ( - SELECT address, event, MAX(retention) AS retention - FROM evm.log_poller_filters +func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { + batchLogsSubQuery := `SELECT id, evm_chain_id, address, event_sig FROM evm.logs + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3` + + query := fmt.Sprintf(` + SELECT l.id FROM (%s) l LEFT JOIN ( + SELECT evm_chain_id, address, event + FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - HAVING NOT 0 = ANY(ARRAY_AGG(retention)) - ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') - LIMIT $2 - )`, ubig.New(o.chainID), limit) - } else { - result, err = o.ds.ExecContext(ctx, `WITH r AS - ( SELECT address, event, MAX(retention) AS retention + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event + WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL + `, batchLogsSubQuery) + + latestBlock, err := o.SelectLatestBlock(ctx) + if err != nil { + return ids, err + } + + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + var rowIDs []uint64 + err2 := r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) + if err2 != nil { + return 0, err2 + } + r.AddResults(rowIDs) + return int64(len(rowIDs)), nil + }) + + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) + + return r.AllResults(), err +} + +// SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match. +func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []uint64, err error) { + // Roll up the filter table into 1 row per filter + withSubQuery := ` + SELECT name, + ARRAY_AGG(address) AS addresses, ARRAY_AGG(event) AS events, + MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, just need MAX for GROUP BY FROM evm.log_poller_filters WHERE evm_chain_id=$1 - GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) - ) DELETE FROM evm.logs l USING r - WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) - ubig.New(o.chainID)) + GROUP BY name` + + // Count logs matching each filter in reverse order, labeling anything after the filter.max_logs_kept'th with old=true + countLogsSubQuery := ` + SELECT l.id, block_number, log_index, max_logs_kept != 0 AND + ROW_NUMBER() OVER(PARTITION BY f.name ORDER BY block_number, log_index DESC) > max_logs_kept AS old + FROM filters f JOIN evm.logs l ON + l.address = ANY(f.addresses) AND l.event_sig = ANY(f.events) + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3 + ` + + // Return all logs considered "old" by every filter they match + query := fmt.Sprintf(`WITH filters AS ( %s ) SELECT id FROM ( %s ) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)`, + withSubQuery, countLogsSubQuery) + + latestBlock, err := o.SelectLatestBlock(ctx) + if err != nil { + return results, err } + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + var rowIDs []uint64 + err = r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) + if err != nil { + return 0, err + } + r.AddResults(rowIDs) + return int64(len(rowIDs)), err + }) + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) + + return r.AllResults(), err +} + +// DeleteExpiredLogs removes any logs which either: +// - don't match any currently registered filters, or +// - have a timestamp older than any matching filter's retention, UNLESS there is at +// least one matching filter with retention=0 +func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { + limitClause := "" + if limit > 0 { + limitClause = fmt.Sprintf("LIMIT %d", limit) + } + + query := fmt.Sprintf(` + WITH rows_to_delete AS ( + SELECT l.id + FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event, MAX(retention) AS retention + FROM evm.log_poller_filters + WHERE evm_chain_id = $1 + GROUP BY evm_chain_id, address, event + HAVING MIN(retention) > 0 + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND + l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') %s + ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) + result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) if err != nil { return 0, err } @@ -393,7 +566,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` - _, err := o.ds.NamedExecContext(ctx, query, logs[start:end]) + _, err := tx.NamedExecContext(ctx, query, logs[start:end]) if err != nil { if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit @@ -425,11 +598,11 @@ func (o *DSORM) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([ return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -454,13 +627,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -485,14 +658,12 @@ func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Addre return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -519,12 +690,12 @@ func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, addres return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = ANY(:event_sig_array) - AND block_number BETWEEN :start_block AND :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = ANY(:event_sig_array) + AND block_number BETWEEN :start_block AND :end_block + ORDER BY block_number, log_index`) query, sqlArgs, err := o.ds.BindNamed(query, args) if err != nil { @@ -547,11 +718,11 @@ func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]L return nil, err } - query := `SELECT * FROM evm.log_poller_blocks + query := blocksQuery(` WHERE block_number >= :start_block AND block_number <= :end_block AND evm_chain_id = :evm_chain_id - ORDER BY block_number ASC` + ORDER BY block_number ASC`) var blocks []LogPollerBlock query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -578,17 +749,17 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs WHERE (block_number, address, event_sig) IN ( - SELECT MAX(block_number), address, event_sig FROM evm.logs + query := logsQueryWithConfs(`WHERE id IN ( + SELECT LAST_VALUE(id) OVER( + PARTITION BY evm_chain_id, address, event_sig + ORDER BY block_number, log_index + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) FROM evm.logs WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s - GROUP BY event_sig, address - ) - ORDER BY block_number ASC`, nestedBlockNumberQuery(confs)) + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number >= :start_block AND `, confs) + ` + )` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -613,13 +784,12 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, if err != nil { return 0, err } - query := fmt.Sprintf(` - SELECT COALESCE(MAX(block_number), 0) FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s`, nestedBlockNumberQuery(confs)) + + query := withConfs(`SELECT COALESCE(MAX(block_number), 0) FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number >= :start_block AND `, "", confs) var blockNumber int64 query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -644,14 +814,12 @@ func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Addr return nil, err } - query := fmt.Sprintf(`SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND substring(data from 32*:word_index+1 for 32) <= :word_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min + AND substring(data from 32*:word_index+1 for 32) <= :word_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -675,14 +843,12 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -706,15 +872,14 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index_min+1 for 32) <= :word_value - AND substring(data from 32*:word_index_max+1 for 32) >= :word_value - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index_min+1 for 32) <= :word_value + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -738,14 +903,11 @@ func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address c return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -770,15 +932,12 @@ func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common. return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND topics[:topic_index] <= :topic_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min + AND topics[:topic_index] <= :topic_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -802,14 +961,12 @@ func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, e return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -835,14 +992,14 @@ func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end in return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -868,16 +1025,13 @@ func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index - `, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -901,12 +1055,12 @@ func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Ad return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND tx_hash = :tx_hash - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND tx_hash = :tx_hash + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -936,25 +1090,22 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return nil, err } - nestedQuery := nestedBlockNumberQuery(confs) - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :sigA - AND block_number BETWEEN :start_block AND :end_block - AND block_number <= %s - EXCEPT - SELECT a.* FROM evm.logs AS a - INNER JOIN evm.logs B - ON a.evm_chain_id = b.evm_chain_id - AND a.address = b.address - AND a.topics[:topic_index] = b.topics[:topic_index] - AND a.event_sig = :sigA - AND b.event_sig = :sigB - AND b.block_number BETWEEN :start_block AND :end_block - AND b.block_number <= %s - ORDER BY block_number, log_index`, nestedQuery, nestedQuery) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :sigA + AND block_number BETWEEN :start_block AND :end_block AND `, confs) + + ` EXCEPT ` + + withConfs(logsQueryWithTablePrefix("a", ` + INNER JOIN evm.logs AS b + ON a.evm_chain_id = b.evm_chain_id + AND a.address = b.address + AND a.topics[:topic_index] = b.topics[:topic_index] + AND a.event_sig = :sigA + AND b.event_sig = :sigB + AND b.block_number BETWEEN :start_block AND :end_block + AND `), "b", confs) + + ` ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -968,9 +1119,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return logs, nil } -// TODO flaky BCF-3258 -func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { - qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort) +func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { + qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort) if err != nil { return nil, err } @@ -993,19 +1143,11 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitA return logs, nil } -func nestedBlockNumberQuery(confs evmtypes.Confirmations) string { - if confs == evmtypes.Finalized { - return ` - (SELECT finalized_block_number - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` - } - // Intentionally wrap with greatest() function and don't return negative block numbers when :confs > :block_number - // It doesn't impact logic of the outer query, because block numbers are never less or equal to 0 (guarded by log_poller_blocks_block_number_check) - return ` - (SELECT greatest(block_number - :confs, 0) - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` +// DeleteLogsByRowID accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIDs) + if err != nil { + return 0, err + } + return result.RowsAffected() } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ed3f58504a..6e618ba9ce 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -4,16 +4,19 @@ import ( "bytes" "context" "database/sql" + "errors" "fmt" "math" "math/big" + "strconv" "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/ethereum/go-ethereum/common" - "github.com/jackc/pgx/v4" pkgerrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -186,6 +189,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { + t.Parallel() th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 @@ -319,10 +323,22 @@ func TestORM(t *testing.T) { Data: []byte("hello short retention"), BlockTimestamp: time.Now(), }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: 7, + BlockHash: common.HexToHash("0x1239"), + BlockNumber: int64(17), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello2 short retention"), + BlockTimestamp: time.Now(), + }, { EvmChainId: ubig.New(th.ChainID), LogIndex: 8, - BlockHash: common.HexToHash("0x1238"), + BlockHash: common.HexToHash("0x1239"), BlockNumber: int64(17), EventSig: topic2, Topics: [][]byte{topic2[:]}, @@ -333,10 +349,39 @@ func TestORM(t *testing.T) { }, })) - t.Log(latest.BlockNumber) + // Insert a couple logs on a different chain, to make sure + // these aren't affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 9, + BlockHash: common.HexToHash("0x1999"), + BlockNumber: int64(18), + EventSig: topic, + Topics: [][]byte{topic[:], topic2[:]}, + Address: common.HexToAddress("0x5555"), + TxHash: common.HexToHash("0x1543"), + Data: []byte("different log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + logs, err := o1.SelectLogsByBlockRange(ctx, 1, 17) require.NoError(t, err) - require.Len(t, logs, 8) + require.Len(t, logs, 9) logs, err = o1.SelectLogsByBlockRange(ctx, 10, 10) require.NoError(t, err) @@ -413,7 +458,7 @@ func TestORM(t *testing.T) { require.Equal(t, 2, len(lgs)) require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1237"), 16, time.Now(), 0)) - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1238"), 17, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1238"), 17, time.Now(), 17)) filter0 := logpoller.Filter{ Name: "permanent retention filter", @@ -451,38 +496,211 @@ func TestORM(t *testing.T) { require.Equal(t, int64(17), latest.BlockNumber) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Len(t, logs, 8) + require.Len(t, logs, 9) - // Delete expired logs + // Delete expired logs with page limit time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period - deleted, err := o1.DeleteExpiredLogs(ctx, 0) + deleted, err := o1.DeleteExpiredLogs(ctx, 1) + require.NoError(t, err) + assert.Equal(t, int64(1), deleted) + + // Delete expired logs without page limit + deleted, err = o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) assert.Equal(t, int64(1), deleted) + + // Select unmatched logs with page limit + ids, err := o1.SelectUnmatchedLogIDs(ctx, 2) + require.NoError(t, err) + assert.Len(t, ids, 2) + + // Select unmatched logs without page limit + ids, err = o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + assert.Len(t, ids, 3) + + // Delete logs by row id + deleted, err = o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(3), deleted) + + // Ensure that both of the logs from the second chain are still there + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) + require.NoError(t, err) + assert.Len(t, logs, 1) + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x5555"), topic) + require.NoError(t, err) + assert.Len(t, logs, 1) + logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything // matching filter12 should be kept regardless of what other filters it matches. - assert.Len(t, logs, 7) + assert.Len(t, logs, 4) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Zero(t, len(logs)) + assert.Zero(t, len(logs)) } -type PgxLogger struct { - lggr logger.Logger -} +func TestORM_SelectExcessLogs(t *testing.T) { + t.Parallel() + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) -func NewPgxLogger(lggr logger.Logger) PgxLogger { - return PgxLogger{lggr} -} + topic := common.HexToHash("0x1599") + topic2 := common.HexToHash("0x1600") + + blockHashes := []common.Hash{common.HexToHash("0x1234"), common.HexToHash("0x1235"), common.HexToHash("0x1236")} + + // Insert blocks for active chain + for i := int64(0); i < 3; i++ { + blockNumber := 10 + i + require.NoError(t, o1.InsertBlock(ctx, blockHashes[i], blockNumber, time.Now(), blockNumber)) + b1, err := o1.SelectBlockByHash(ctx, blockHashes[i]) + require.NoError(t, err) + require.Equal(t, blockNumber, b1.BlockNumber) + } + + // Insert block from a different chain + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1234"), 17, time.Now(), 17)) + b, err := o2.SelectBlockByHash(ctx, common.HexToHash("0x1234")) + require.NoError(t, err) + require.Equal(t, int64(17), b.BlockNumber) + + for i := int64(0); i < 7; i++ { + require.NoError(t, o1.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(10), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(11), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(12), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + })) + } + + logs, err := o1.SelectLogsByBlockRange(ctx, 1, 12) + require.NoError(t, err) + require.Len(t, logs, 21) + + // Insert a log on a different chain, to make sure + // it's not affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + + logs, err = o2.SelectLogsByBlockRange(ctx, 1, 17) + require.NoError(t, err) + require.Len(t, logs, 1) + + filter1 := logpoller.Filter{ + Name: "MaxLogsKept = 0 (addr 1234 topic1)", + Addresses: []common.Address{common.HexToAddress("0x1234")}, + EventSigs: types.HashArray{topic}, + MaxLogsKept: 0, + } -func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) { + filter12 := logpoller.Filter{ // retain both topic1 and topic2 on contract3 for at least 1ms + Name: "MaxLogsKept = 1 (addr 1235 topic1 & topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic, topic2}, + Retention: time.Millisecond, + MaxLogsKept: 1, + } + filter2 := logpoller.Filter{ // retain topic2 on contract3 for at least 1 hour + Name: "MaxLogsKept = 5 (addr 1235 topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic2}, + MaxLogsKept: 5, + } + + // Test inserting filters and reading them back + require.NoError(t, o1.InsertFilter(ctx, filter1)) + require.NoError(t, o1.InsertFilter(ctx, filter12)) + require.NoError(t, o1.InsertFilter(ctx, filter2)) + filters, err := o1.LoadFilters(ctx) + require.NoError(t, err) + require.Len(t, filters, 3) + assert.Equal(t, filter1, filters["MaxLogsKept = 0 (addr 1234 topic1)"]) + assert.Equal(t, filter12, filters["MaxLogsKept = 1 (addr 1235 topic1 & topic2)"]) + assert.Equal(t, filter2, filters["MaxLogsKept = 5 (addr 1235 topic2)"]) + + ids, err := o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + require.Len(t, ids, 0) + + // Number of excess logs eligible for pruning: + // 2 of the 7 matching filter2 + 6 of the 7 matching filter12 but not filter2 = 8 total of 21 + + // Test SelectExcessLogIDs with limit less than # blocks + // ( should only consider blocks 10 & 11, returning 6 excess events from block 11 + // but ignoring the 2 in block 12 ) + ids, err = o1.SelectExcessLogIDs(ctx, 2) + require.NoError(t, err) + assert.Len(t, ids, 6) + + // Test SelectExcessLogIDs with limit greater than # blocks: + ids, err = o1.SelectExcessLogIDs(ctx, 4) + require.NoError(t, err) + assert.Len(t, ids, 8) + + // Test SelectExcessLogIDs with no limit + ids, err = o1.SelectExcessLogIDs(ctx, 10) + require.NoError(t, err) + assert.Len(t, ids, 8) + + deleted, err := o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(8), deleted) } func TestLogPollerFilters(t *testing.T) { @@ -604,8 +822,8 @@ func TestORM_IndexedLogs(t *testing.T) { } for idx, value := range topicValues { - topicFilters.Expressions[idx] = logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(value).Hex(), Operator: primitives.Eq}, + topicFilters.Expressions[idx] = logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(value), Operator: primitives.Eq}, }) } @@ -629,7 +847,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) @@ -638,19 +856,17 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - blockRangeFilter := func(start, end uint64, topicIdx uint64, topicValues []uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - filtersForTopics(topicIdx, topicValues), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + filtersForTopics(topicIdx, topicValues), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -658,7 +874,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 1, 1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "1", 1, []uint64{1}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -666,7 +882,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{2}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{2}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -674,7 +890,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{1}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -682,7 +898,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 0") - _, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 0, []uint64{1}), limiter, "") + _, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 0, []uint64{1}), limiter, "") require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 0") @@ -690,7 +906,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 4") - _, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 4, []uint64{1}), limiter, "") + _, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 4, []uint64{1}), limiter, "") require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 4") @@ -702,30 +918,28 @@ func TestORM_IndexedLogs(t *testing.T) { Expressions: []query.Expression{ logpoller.NewAddressFilter(addr), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(1, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(2).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(2), Operator: primitives.Gte}, }), query.Confidence(primitives.Unconfirmed), }, } - lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") + lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + rangeFilter := func(topicIdx uint64, min, max uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(min), Operator: primitives.Gte}, + }), + logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(max), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -833,7 +1047,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { }, } - retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "") + retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 2, len(retrievedLogs)) @@ -874,19 +1088,17 @@ func TestORM_DataWords(t *testing.T) { }, })) - wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + wordFilter := func(wordIdx int, word1, word2 uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(wordIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word1), Operator: primitives.Gte}, + }), + logpoller.NewEventByWordFilter(wordIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word2), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -945,15 +1157,13 @@ func TestORM_DataWords(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - filter := query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + filter := []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(0, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(1), Operator: primitives.Gte}, + }), + query.Confidence(primitives.Unconfirmed), } lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") @@ -1042,7 +1252,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { } require.NoError(t, o1.InsertLogs(ctx, inputLogs)) - filter := func(sigs []common.Hash, startBlock, endBlock int64) query.KeyFilter { + filter := func(sigs []common.Hash, startBlock, endBlock string) query.KeyFilter { filters := []query.Expression{ logpoller.NewAddressFilter(sourceAddr), } @@ -1064,8 +1274,8 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { filters = append(filters, query.Expression{ BoolExpression: query.BoolExpression{ Expressions: []query.Expression{ - query.Block(uint64(startBlock), primitives.Gte), - query.Block(uint64(endBlock), primitives.Lte), + query.Block(startBlock, primitives.Gte), + query.Block(endBlock, primitives.Lte), }, BoolOperator: query.AND, }, @@ -1097,12 +1307,75 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { }) assertion(t, logs, err, startBlock, endBlock) - - logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, startBlock, endBlock), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "") assertion(t, logs, err, startBlock, endBlock) } +type mockQueryExecutor struct { + mock.Mock +} + +func (m *mockQueryExecutor) Exec(ctx context.Context, r *logpoller.RangeQueryer[uint64], lower, upper int64) (int64, error) { + res := m.Called(lower, upper) + return int64(res.Int(0)), res.Error(1) +} + +func Test_ExecPagedQuery(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + lggr := logger.Test(t) + chainID := testutils.NewRandomEVMChainID() + db := pgtest.NewSqlxDB(t) + o := logpoller.NewORM(chainID, db, lggr) + + m := mockQueryExecutor{} + + queryError := errors.New("some error") + m.On("Exec", int64(0), int64(0)).Return(0, queryError).Once() + + // Should handle errors gracefully + r := logpoller.NewRangeQueryer(chainID, db, m.Exec) + _, err := r.ExecPagedQuery(ctx, 0, 0) + assert.ErrorIs(t, err, queryError) + + m.On("Exec", int64(0), int64(60)).Return(4, nil).Once() + + // Query should only get executed once with limitBlock=end if called with limit=0 + numResults, err := r.ExecPagedQuery(ctx, 0, 60) + require.NoError(t, err) + assert.Equal(t, int64(4), numResults) + + // Should report actual db errors + _, err = r.ExecPagedQuery(ctx, 300, 1000) + assert.Error(t, err) + + require.NoError(t, o.InsertBlock(ctx, common.HexToHash("0x1234"), 42, time.Now(), 0)) + + m.On("Exec", mock.Anything, mock.Anything).Return(3, nil) + + // Should get called with limitBlock = 342, 642, 942, 1000 + numResults, err = r.ExecPagedQuery(ctx, 300, 1000) + require.NoError(t, err) + assert.Equal(t, int64(12), numResults) // 3 results in each of 4 calls + m.AssertNumberOfCalls(t, "Exec", 6) // 4 new calls, plus the prior 2 + expectedLimitBlocks := [][]int64{{42, 341}, {342, 641}, {642, 941}, {942, 1000}} + for _, expected := range expectedLimitBlocks { + m.AssertCalled(t, "Exec", expected[0], expected[1]) + } + + // Should not go all the way to 1000, but stop after ~ 13 results have + // been returned + numResults, err = r.ExecPagedQuery(ctx, 15, 1000) + require.NoError(t, err) + assert.Equal(t, int64(15), numResults) + m.AssertNumberOfCalls(t, "Exec", 11) + expectedLimitBlocks = [][]int64{{42, 56}, {57, 71}, {72, 86}, {87, 101}, {102, 116}} // upper[n] = 42 + 15 * n - 1 for n = 1, 2, 3, 4, 5, lower[n] = upper[n-1] + 1 + for _, expected := range expectedLimitBlocks { + m.AssertCalled(t, "Exec", expected[0], expected[1]) + } +} + func TestORM_DeleteBlocksBefore(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM @@ -1160,14 +1433,12 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String()) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String()) - logFilter := func(start, end uint64, address common.Address) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(address), - logpoller.NewEventSigFilter(event1), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + logFilter := func(start, end string, address common.Address) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(address), + logpoller.NewEventSigFilter(event1), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -1181,7 +1452,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[1].BlockHash.String()) assert.Equal(t, address1, lgs[1].Address) - lgs, err = th.ORM.FilteredLogs(ctx, logFilter(1, 3, address1), query.LimitAndSort{ + lgs, err = th.ORM.FilteredLogs(ctx, logFilter("1", "3", address1), query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)}, }, "") require.NoError(t, err) @@ -1201,7 +1472,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, address2, lgs[0].Address) assert.Equal(t, event1.Bytes(), lgs[0].Topics[0]) - lgs, err = th.ORM.FilteredLogs(ctx, logFilter(2, 2, address2), query.LimitAndSort{ + lgs, err = th.ORM.FilteredLogs(ctx, logFilter("2", "2", address2), query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)}, }, "") require.NoError(t, err) @@ -1565,7 +1836,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { events: []common.Hash{event1, event2}, addrs: []common.Address{address1, address2}, confs: 0, - fromBlock: 3, + fromBlock: 4, expectedBlockNumber: 0, }, { @@ -1676,8 +1947,8 @@ func TestSelectLogsCreatedAfter(t *testing.T) { if len(topicVals) > 0 { exp := make([]query.Expression, len(topicVals)) for idx, val := range topicVals { - exp[idx] = logpoller.NewEventByTopicFilter(uint64(topicIdx), []primitives.ValueComparator{ - {Value: val.String(), Operator: primitives.Eq}, + exp[idx] = logpoller.NewEventByTopicFilter(uint64(topicIdx), []logpoller.HashedValueComparator{ + {Value: val, Operator: primitives.Eq}, }) } @@ -1721,7 +1992,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1734,7 +2005,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1964,11 +2235,11 @@ func TestSelectLogsDataWordBetween(t *testing.T) { Expressions: []query.Expression{ logpoller.NewAddressFilter(address), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Lte}, + logpoller.NewEventByWordFilter(0, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word), Operator: primitives.Lte}, }), - logpoller.NewEventByWordFilter(eventSig, 1, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByWordFilter(1, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word), Operator: primitives.Gte}, }), query.Confidence(primitives.Unconfirmed), }, @@ -1990,7 +2261,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index e08ea93da7..0acac07575 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -26,6 +27,10 @@ const ( var ( ErrUnexpectedCursorFormat = errors.New("unexpected cursor format") + logsFields = [...]string{"evm_chain_id", "log_index", "block_hash", "block_number", + "address", "event_sig", "topics", "tx_hash", "data", "created_at", "block_timestamp"} + blocksFields = [...]string{"evm_chain_id", "block_hash", "block_number", "block_timestamp", + "finalized_block_number", "created_at"} ) // The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression @@ -220,7 +225,7 @@ func (v *pgDSLParser) buildQuery(chainID *big.Int, expressions []query.Expressio v.err = nil // build the query string - clauses := []string{"SELECT evm.logs.* FROM evm.logs"} + clauses := []string{logsQuery("")} where, err := v.whereClause(expressions, limiter) if err != nil { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index 5e99ec7ba8..b4099e000d 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -34,7 +35,7 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) require.NoError(t, err) - assert.Equal(t, "SELECT evm.logs.* FROM evm.logs WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort, result) + assert.Equal(t, logsQuery(" WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort), result) assertArgs(t, args, 1) }) @@ -52,15 +53,14 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + - "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -80,12 +80,11 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "ORDER BY " + defaultSort + " " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "ORDER BY " + defaultSort + " " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -102,10 +101,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -122,10 +120,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortByBlock(query.Asc), query.NewSortByTimestamp(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number ASC, block_timestamp DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number ASC, block_timestamp DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -147,16 +144,15 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND tx_hash = :tx_hash_0 " + - "AND block_number != :block_number_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND tx_hash = :tx_hash_0 " + + "AND block_number != :block_number_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -175,10 +171,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -194,10 +189,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -213,10 +207,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -243,10 +236,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -268,10 +260,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -304,12 +295,11 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp >= :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp >= :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -353,14 +343,13 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + - "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + + "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index 3e58143a28..bba12f240f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -336,8 +336,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index ecc8acb576..b7354da364 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -351,8 +351,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index d594dfb921..2ee8dcc286 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -54,6 +54,12 @@ func WithRetention(retention time.Duration) OCRTransmitterOption { } } +func WithMaxLogsKept(maxLogsKept uint64) OCRTransmitterOption { + return func(ct *contractTransmitter) { + ct.maxLogsKept = maxLogsKept + } +} + func WithReportToEthMetadata(reportToEvmTxMeta ReportToEthMetadata) OCRTransmitterOption { return func(ct *contractTransmitter) { if reportToEvmTxMeta != nil { @@ -74,6 +80,7 @@ type contractTransmitter struct { reportToEvmTxMeta ReportToEthMetadata excludeSigs bool retention time.Duration + maxLogsKept uint64 } func transmitterFilterName(addr common.Address) string { @@ -106,15 +113,14 @@ func NewOCRContractTransmitter( reportToEvmTxMeta: reportToEvmTxMetaNoop, excludeSigs: false, retention: 0, + maxLogsKept: 0, } for _, opt := range opts { opt(newContractTransmitter) } - // TODO It would be better to keep MaxLogsKept = 1 for the OCR contract transmitter instead of Retention. We are always interested only in the latest log. - // Although MaxLogsKept is present in the Filter struct, it is not supported by LogPoller yet. - err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}, Retention: newContractTransmitter.retention}) + err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}, Retention: newContractTransmitter.retention, MaxLogsKept: newContractTransmitter.maxLogsKept}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index acfb1aa630..f386fa0575 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -168,7 +168,7 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim } remapped.Expressions = append(defaultExpressions, remapped.Expressions...) - logs, err := e.lp.FilteredLogs(ctx, remapped, limitAndSort, e.contractName+"-"+e.eventName) + logs, err := e.lp.FilteredLogs(ctx, remapped.Expressions, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName) if err != nil { return nil, err } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index f2143ceded..b79f06078a 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -454,7 +454,7 @@ func (r *Relayer) NewCCIPCommitProvider(rargs commontypes.RelayArgs, pargs commo subjectID := chainToUUID(configWatcher.chain.ID()) contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{ subjectID: &subjectID, - }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0)) + }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(0)) if err != nil { return nil, err } @@ -543,7 +543,7 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont subjectID := chainToUUID(configWatcher.chain.ID()) contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, r.ks.Eth(), configWatcher, configTransmitterOpts{ subjectID: &subjectID, - }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithRetention(0), WithExcludeSignatures()) + }, OCR2AggregatorTransmissionContractABI, WithReportToEthMetadata(fn), WithMaxLogsKept(0), WithExcludeSignatures()) if err != nil { return nil, err } diff --git a/core/store/migrate/migrations/0248_log_poller_primary_keys.sql b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql new file mode 100644 index 0000000000..2d64d90a3a --- /dev/null +++ b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql @@ -0,0 +1,43 @@ +-- +goose Up + +-- Replace (block_hash, log_index, evm_chain_id) primary key on evm.log_poller_blocks with new id column +ALTER TABLE evm.logs DROP CONSTRAINT logs_pkey; +ALTER TABLE evm.logs ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; +-- Replaces previous primary key index, but also useful for accelerating DeleteLogsAndBlocksAfter +-- This also strengthens the uniqueness requirement, ensuring that we can't insert logs for two different blocks +-- with the same block number but different block hashes--something which would corrupt the db +CREATE UNIQUE INDEX idx_logs_chain_block_logindex ON evm.logs (evm_chain_id, block_number, log_index); + +-- Previously used for WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 ... ORDER BY block_number, created_at +DROP INDEX IF EXISTS evm.idx_evm_logs_ordered_by_block_and_created_at; +-- Useful for the current form of those queries: WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 ... ORDER BY block_number, log_index +CREATE INDEX idx_logs_chain_address_event_block ON evm.logs (evm_chain_id, address, event_sig, block_number); +CREATE UNIQUE INDEX idx_logs_chain_address_event_block_logindex_uq ON evm.logs (evm_chain_id, address, event_sig, block_number, log_index); +CREATE INDEX idx_logs_chain_address_event_block_logindex ON evm.logs (evm_chain_id, address, event_sig, block_number, log_index); + +-- Replace (block_number, evm_chain_id) primary key on evm.log_poller_blocks with new id column +ALTER TABLE evm.log_poller_blocks DROP CONSTRAINT log_poller_blocks_pkey; +ALTER TABLE evm.log_poller_blocks ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; + +-- Add UNIQUE keyword to (evm_chain_id, block_number DESC) index (and rename for consistency) +-- This index is useful for any query for logs which includies a "confs" param, and for DeleteBlocksBefore, DeleteLogsAndBlocksAfter, etc. +-- Prior to this we also had a separate UNIQUE INDEX for (block_number, evm_chain_id) generated by the primary key defintion. +-- Adding the UNIQUE keyword to this allows us to combine the two indices into a single index, saving on disk space--with the +-- added benefit of making insertions into this index slightly faster (since it can rely on the keys being unique) +DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_order_by_block; +CREATE UNIQUE INDEX idx_log_poller_blocks_chain_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); + +-- +goose Down + +-- revert evm.log_poller_blocks +DROP INDEX IF EXISTS evm.idx_log_poller_blocks_chain_block; +CREATE INDEX idx_evm_log_poller_blocks_order_by_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); +ALTER TABLE evm.log_poller_blocks DROP COLUMN id; +ALTER TABLE evm.log_poller_blocks ADD PRIMARY KEY (block_number, evm_chain_id); + +-- revert evm.logs +DROP INDEX IF EXISTS evm.idx_logs_chain_address_event_block_logindex; +CREATE INDEX idx_evm_logs_ordered_by_block_and_created_at ON evm.logs (evm_chain_id, address, event_sig, block_number, created_at); +DROP INDEX IF EXISTS evm.idx_logs_chain_block_logindex; +ALTER TABLE evm.logs DROP COLUMN id; +ALTER TABLE evm.logs ADD PRIMARY KEY (block_hash, log_index, evm_chain_id); diff --git a/integration-tests/.tool-versions b/integration-tests/.tool-versions index d623afb283..342dafb1fa 100644 --- a/integration-tests/.tool-versions +++ b/integration-tests/.tool-versions @@ -1,4 +1,4 @@ -golang 1.22.5 +golang 1.22.7 k3d 5.4.6 kubectl 1.25.5 nodejs 20.13.1 diff --git a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml index c404d3a0c0..4bad510a14 100644 --- a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml +++ b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml @@ -15,7 +15,7 @@ # If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets` # E2E_TEST_CHAINLINK_IMAGE=".dkr.ecr..amazonaws.com/chainlink-ccip" [CCIP.Env.NewCLCluster.Common.ChainlinkImage] -version = "2.14.0-ccip1.5.0" +version = "max-logs-kept-tests" [CCIP] [CCIP.ContractVersions] @@ -132,6 +132,8 @@ DeltaReconcile = '5s' CommonChainConfigTOML = """ LogPollInterval = '1s' +LogPrunePageSize = 700 +LogKeepBlocksDepth = 4000 [HeadTracker] HistoryDepth = 200 @@ -158,7 +160,7 @@ BatchGasLimit = 11000000 [CCIP.Groups.load.TokenConfig] TimeoutForPriceUpdate = '15m' NoOfTokensPerChain = 10 -NoOfTokensWithDynamicPrice = 10 +NoOfTokensWithDynamicPrice = 0 DynamicPriceUpdateInterval = '15s' CCIPOwnerTokens = true