diff --git a/app/extend_vote.go b/app/extend_vote.go index 4e1fe8837..9100cf47a 100644 --- a/app/extend_vote.go +++ b/app/extend_vote.go @@ -31,7 +31,7 @@ import ( type OracleKeeper interface { GetTimestampBefore(ctx context.Context, queryId []byte, timestamp time.Time) (time.Time, error) GetTimestampAfter(ctx context.Context, queryId []byte, timestamp time.Time) (time.Time, error) - GetAggregatedReportsByHeight(ctx context.Context, height uint64) []oracletypes.Aggregate + GetAggregatedReportsByHeight(ctx context.Context, height uint64) ([]oracletypes.Aggregate, error) } type BridgeKeeper interface { diff --git a/app/mocks/OracleKeeper.go b/app/mocks/OracleKeeper.go index db26646c8..80c02c664 100644 --- a/app/mocks/OracleKeeper.go +++ b/app/mocks/OracleKeeper.go @@ -17,10 +17,14 @@ type OracleKeeper struct { } // GetAggregatedReportsByHeight provides a mock function with given fields: ctx, height -func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) []types.Aggregate { +func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) ([]types.Aggregate, error) { ret := _m.Called(ctx, height) var r0 []types.Aggregate + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) ([]types.Aggregate, error)); ok { + return rf(ctx, height) + } if rf, ok := ret.Get(0).(func(context.Context, uint64) []types.Aggregate); ok { r0 = rf(ctx, height) } else { @@ -29,7 +33,13 @@ func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, height) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // GetTimestampAfter provides a mock function with given fields: ctx, queryId, timestamp diff --git a/app/proposal_handler.go b/app/proposal_handler.go index 3b3c2d7ab..79b98a3ac 100644 --- a/app/proposal_handler.go +++ b/app/proposal_handler.go @@ -128,6 +128,11 @@ func (h *ProposalHandler) PrepareProposalHandler(ctx sdk.Context, req *abci.Requ func (h *ProposalHandler) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) { if req.Height > ctx.ConsensusParams().Abci.VoteExtensionsEnableHeight { + // in case proposer says 0 tx in a block after vote extensions enabled + if len(req.Txs) == 0 { + h.logger.Error("ProcessProposalHandler: rejecting proposal, empty transactions after vote extensions enabled") + return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil + } var injectedVoteExtTx VoteExtTx if err := json.Unmarshal(req.Txs[0], &injectedVoteExtTx); err != nil { h.logger.Error("ProcessProposalHandler: failed to decode injected vote extension tx", "err", err) diff --git a/app/proposal_handler_test.go b/app/proposal_handler_test.go index f48b04ae9..ac00f0aa9 100644 --- a/app/proposal_handler_test.go +++ b/app/proposal_handler_test.go @@ -387,6 +387,25 @@ func (s *ProposalHandlerTestSuite) TestProcessProposalHandler() { require.NotNil(res) } +func (s *ProposalHandlerTestSuite) TestProcessProposalHandler_EmptyTxs() { + require := s.Require() + p := s.proposalHandler + ctx := s.ctx + + ctx = ctx.WithBlockHeight(3) + + // Empty Txs after vote extensions enabled should reject, not panic + req := abcitypes.RequestProcessProposal{ + Txs: [][]byte{}, + Height: 3, + } + + res, err := p.ProcessProposalHandler(ctx, &req) + require.NoError(err) + require.NotNil(res) + require.Equal(abcitypes.ResponseProcessProposal_REJECT, res.Status) +} + func (s *ProposalHandlerTestSuite) TestPreBlocker() { require := s.Require() p := s.proposalHandler diff --git a/app/upgrades.go b/app/upgrades.go index 6752154b9..8bcca2f31 100644 --- a/app/upgrades.go +++ b/app/upgrades.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/tellor-io/layer/app/upgrades" - v_6_1_1 "github.com/tellor-io/layer/app/upgrades/v6.1.1" + v_6_1_2 "github.com/tellor-io/layer/app/upgrades/v6.1.2" upgradetypes "cosmossdk.io/x/upgrade/types" ) @@ -13,7 +13,7 @@ var ( // `Upgrades` defines the upgrade handlers and store loaders for the application. // New upgrades should be added to this slice after they are implemented. Upgrades = []*upgrades.Upgrade{ - &v_6_1_1.Upgrade, + &v_6_1_2.Upgrade, } Forks = []upgrades.Fork{} ) @@ -21,12 +21,12 @@ var ( // setupUpgradeHandlers registers the upgrade handlers to perform custom upgrade // logic and state migrations for software upgrades. func (app *App) setupUpgradeHandlers() { - if app.UpgradeKeeper.HasHandler(v_6_1_1.UpgradeName) { - panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_1.UpgradeName)) + if app.UpgradeKeeper.HasHandler(v_6_1_2.UpgradeName) { + panic(fmt.Sprintf("Cannot register duplicate upgrade handler '%s'", v_6_1_2.UpgradeName)) } app.UpgradeKeeper.SetUpgradeHandler( - v_6_1_1.UpgradeName, - v_6_1_1.CreateUpgradeHandler( + v_6_1_2.UpgradeName, + v_6_1_2.CreateUpgradeHandler( app.ModuleManager(), app.configurator, ), diff --git a/app/upgrades/v6.1.2/constants.go b/app/upgrades/v6.1.2/constants.go new file mode 100644 index 000000000..2b4a577aa --- /dev/null +++ b/app/upgrades/v6.1.2/constants.go @@ -0,0 +1,16 @@ +package v6_1_2 + +import ( + "github.com/tellor-io/layer/app/upgrades" + + store "cosmossdk.io/store/types" +) + +const ( + UpgradeName = "v6.1.2" +) + +var Upgrade = upgrades.Upgrade{ + UpgradeName: UpgradeName, + StoreUpgrades: store.StoreUpgrades{}, +} diff --git a/app/upgrades/v6.1.2/upgrade.go b/app/upgrades/v6.1.2/upgrade.go new file mode 100644 index 000000000..7e0b9afe0 --- /dev/null +++ b/app/upgrades/v6.1.2/upgrade.go @@ -0,0 +1,32 @@ +package v6_1_2 + +import ( + "context" + "fmt" + + upgradetypes "cosmossdk.io/x/upgrade/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/module" +) + +/* +Upgrade to v6.1.2 includes: + - Reporter stake caching: skip full recalculation when delegation state hasn't changed + - New reporter collections: LastValSetUpdateHeight, StakeRecalcFlag, RecalcAtTime + - Staking hooks now flag reporters for recalculation on validator set changes and delegation modifications + - Microreport pruning: oracle EndBlocker removes reports older than 30 days (batched, max 100/block) + - Simplified reporter PruneOldReports using oracle block-height lookup +*/ + +func CreateUpgradeHandler( + mm *module.Manager, + configurator module.Configurator, +) upgradetypes.UpgradeHandler { + return func(ctx context.Context, _ upgradetypes.Plan, vm module.VersionMap) (module.VersionMap, error) { + sdkCtx := sdk.UnwrapSDKContext(ctx) + sdkCtx.Logger().Info(fmt.Sprintf("Running %s Upgrade...", UpgradeName)) + + return mm.RunMigrations(ctx, configurator, vm) + } +} diff --git a/cmd/layerd/cmd/root.go b/cmd/layerd/cmd/root.go index a5d222194..9e5821733 100644 --- a/cmd/layerd/cmd/root.go +++ b/cmd/layerd/cmd/root.go @@ -4,6 +4,7 @@ import ( "errors" "io" "os" + "time" tmcfg "github.com/cometbft/cometbft/config" tmcli "github.com/cometbft/cometbft/libs/cli" @@ -122,6 +123,7 @@ func NewRootCmd( // return tmcfg.DefaultConfig if no custom configuration is required for the application. func initTendermintConfig() *tmcfg.Config { cfg := tmcfg.DefaultConfig() + cfg.Consensus.TimeoutCommit = 3 * time.Second return cfg } diff --git a/e2e/stake_cache_test.go b/e2e/stake_cache_test.go new file mode 100644 index 000000000..604285d3d --- /dev/null +++ b/e2e/stake_cache_test.go @@ -0,0 +1,598 @@ +package e2e_test + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "strconv" + "testing" + + interchaintest "github.com/strangelove-ventures/interchaintest/v8" + "github.com/strangelove-ventures/interchaintest/v8/chain/cosmos" + "github.com/strangelove-ventures/interchaintest/v8/ibc" + "github.com/strangelove-ventures/interchaintest/v8/testutil" + "github.com/stretchr/testify/require" + "github.com/tellor-io/layer/e2e" + layerutil "github.com/tellor-io/layer/testutil" + "github.com/tellor-io/layer/utils" + + "cosmossdk.io/math" + + sdk "github.com/cosmos/cosmos-sdk/types" +) + +const ( + stakeCacheCommissRate = "0.1" + moniker = "reporter_0" +) + +// TestStakingHooksTriggered specifically tests that the reporter module's staking hooks +// fire when a delegation occurs for a registered selector. +// This isolates the hook behavior: if hooks fire, StakeRecalcFlag gets set, +// and the next report recalculates. If hooks don't fire, the cached (stale) stake is used. +func TestStakingHooksTriggered(t *testing.T) { + require := require.New(t) + + cosmos.SetSDKConfig("tellor") + + chain, ic, ctx := e2e.SetupChain(t, 2, 0) + defer ic.Close() + + validators, err := e2e.GetValidators(ctx, chain) + require.NoError(err) + e2e.PrintValidatorInfo(ctx, validators) + + // Turn on minting + require.NoError(e2e.TurnOnMinting(ctx, chain, validators[0].Node)) + require.NoError(testutil.WaitForBlocks(ctx, 7, validators[0].Node)) + + // Step 1: Validator 0 becomes a reporter + minStakeAmt := "1000000" + txHash, err := validators[0].Node.ExecTx(ctx, validators[0].AccAddr, "reporter", "create-reporter", stakeCacheCommissRate, minStakeAmt, moniker, "--keyring-dir", validators[0].Node.HomeDir()) + require.NoError(err) + fmt.Println("=== Step 1: Reporter created, txHash:", txHash) + + // Step 2: Create selector - fund, delegate, and join reporter + fundAmt := math.NewInt(20_000 * 1e6) + initialDelegate := sdk.NewCoin("loya", math.NewInt(1000*1e6)) + user := interchaintest.GetAndFundTestUsers(t, ctx, "selector", fundAmt, chain)[0] + fmt.Println("=== Step 2: Selector address:", user.FormattedAddress()) + + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[0].ValAddr, initialDelegate.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("=== Step 2: Initial delegation txHash:", txHash) + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Verify delegation exists + delRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "staking", "delegation", user.FormattedAddress(), validators[0].ValAddr) + require.NoError(err) + fmt.Println("=== Step 2: Delegation exists:", string(delRes)) + + // Selector joins reporter + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "reporter", "select-reporter", validators[0].AccAddr, "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "5loya") + require.NoError(err) + fmt.Println("=== Step 2: Select-reporter txHash:", txHash) + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Verify selector->reporter relationship + selectorRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "reporter", "selector-reporter", user.FormattedAddress()) + require.NoError(err) + var selectorReporter e2e.QuerySelectorReporterResponse + require.NoError(json.Unmarshal(selectorRes, &selectorReporter)) + fmt.Println("=== Step 2: Selector's reporter:", selectorReporter.Reporter) + require.Equal(validators[0].AccAddr, selectorReporter.Reporter, "Selector should be linked to validator 0's reporter") + + // Step 3: Submit first report to establish cached stake + currentCycleListRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + var currentCycleList e2e.QueryCurrentCyclelistQueryResponse + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + value := layerutil.EncodeValue(500.0) + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Get first report power + qDataBz, err := hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz := utils.QueryIDFromData(qDataBz) + qId := hex.EncodeToString(qIdBz) + res, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + var aggRes e2e.QueryGetCurrentAggregateReportResponse + require.NoError(json.Unmarshal(res, &aggRes)) + firstPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("=== Step 3: First report power (cached baseline):", firstPower) + + // Step 4: Selector delegates MORE - this MUST trigger AfterDelegationModified hook + // The hook should set StakeRecalcFlag for the reporter + additionalDelegate := sdk.NewCoin("loya", math.NewInt(10_000*1e6)) + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[0].ValAddr, additionalDelegate.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("=== Step 4: Additional delegation txHash:", txHash) + + // Verify tx succeeded + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + txRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "tx", txHash) + require.NoError(err) + fmt.Println("=== Step 4: Delegate TX result:", string(txRes)) + + // Verify the delegation amount increased + delRes2, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "staking", "delegation", user.FormattedAddress(), validators[0].ValAddr) + require.NoError(err) + fmt.Println("=== Step 4: Updated delegation:", string(delRes2)) + + require.NoError(testutil.WaitForBlocks(ctx, 3, validators[0].Node)) + + // Step 5: Submit second report - if hooks fired, StakeRecalcFlag is set, + // and ReporterStake will recalculate instead of using cache + currentCycleListRes, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Get second report power + qDataBz, err = hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz = utils.QueryIDFromData(qDataBz) + qId = hex.EncodeToString(qIdBz) + res, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + require.NoError(json.Unmarshal(res, &aggRes)) + secondPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("=== Step 5: Second report power (after hook should recalc):", secondPower) + + // Step 6: Assertions + // If AfterDelegationModified hook fired: + // - StakeRecalcFlag was set for the reporter + // - Second submit called ReporterStake which saw the flag + // - ReporterStake recalculated and includes the new delegation + // - Power should increase by ~10000 (10000 * 1e6 loya / 1e6 power reduction) + // If hook did NOT fire: + // - No StakeRecalcFlag set + // - Second submit used cached stake (unless valset update also triggered recalc) + // - Power might still increase due to valset update recalc path + expectedIncrease := uint64(10_000) // 10K loya in power units + actualIncrease := secondPower - firstPower + fmt.Printf("=== Step 6: Power increase: %d (expected ~%d)\n", actualIncrease, expectedIncrease) + fmt.Printf("=== Step 6: First power: %d, Second power: %d\n", firstPower, secondPower) + + require.Greater(secondPower, firstPower, "Hook FAILED: Reporter stake should increase after selector's additional delegation. This means AfterDelegationModified hook did not fire or StakeRecalcFlag was not set.") +} + +// TestStakeCacheValSetUpdate tests that reporter stake is recalculated after validator set update +// Scenario: Reporter submits, validator power changes, reporter submits again +// Expected: Second submission should recalculate stake with new validator power +func TestStakeCacheValSetUpdate(t *testing.T) { + require := require.New(t) + + cosmos.SetSDKConfig("tellor") + + chain, ic, ctx := e2e.SetupChain(t, 2, 0) + defer ic.Close() + + validators, err := e2e.GetValidators(ctx, chain) + require.NoError(err) + e2e.PrintValidatorInfo(ctx, validators) + + // Turn on minting + require.NoError(e2e.TurnOnMinting(ctx, chain, validators[0].Node)) + require.NoError(testutil.WaitForBlocks(ctx, 7, validators[0].Node)) + + // Validator 0 becomes a reporter + minStakeAmt := "1000000" + txHash, err := validators[0].Node.ExecTx(ctx, validators[0].AccAddr, "reporter", "create-reporter", stakeCacheCommissRate, minStakeAmt, moniker, "--keyring-dir", validators[0].Node.HomeDir()) + require.NoError(err) + fmt.Println("TX HASH (validator 0 becomes reporter):", txHash) + + // Get current cyclelist query + currentCycleListRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + var currentCycleList e2e.QueryCurrentCyclelistQueryResponse + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + // First report + value := layerutil.EncodeValue(100.0) + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("First report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Query first report power + qDataBz, err := hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz := utils.QueryIDFromData(qDataBz) + qId := hex.EncodeToString(qIdBz) + res, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + var aggRes e2e.QueryGetCurrentAggregateReportResponse + require.NoError(json.Unmarshal(res, &aggRes)) + firstPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("First report power:", firstPower) + + // Fund validator 0 with extra tokens and self-delegate to increase their own reporter stake + // The validator is already a selector of themselves (from create-reporter), so this delegation + // triggers AfterDelegationModified hook and should be reflected in the next report's power. + delegateAmt := sdk.NewCoin("loya", math.NewInt(50_000*1e6)) + require.NoError(chain.SendFunds(ctx, "faucet", ibc.WalletAmount{ + Address: validators[0].AccAddr, + Amount: math.NewInt(100_000 * 1e6), + Denom: "loya", + })) + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + txHash, err = validators[0].Node.ExecTx(ctx, validators[0].AccAddr, "staking", "delegate", validators[0].ValAddr, delegateAmt.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("TX HASH (validator 0 self-delegates more):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 3, validators[0].Node)) + + // Second report - should trigger recalculation due to validator set update + currentCycleListRes, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("Second report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 3, validators[0].Node)) + + // Query second report power + qDataBz, err = hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz = utils.QueryIDFromData(qDataBz) + qId = hex.EncodeToString(qIdBz) + res, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + require.NoError(json.Unmarshal(res, &aggRes)) + secondPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("Second report power:", secondPower) + + // Second power should be greater due to new delegation + require.Greater(secondPower, firstPower, "Reporter stake should increase after delegation") +} + +// TestStakeCacheSelectorJoin tests that reporter stake is recalculated when a new selector joins +// Scenario: Reporter submits, new selector joins reporter, reporter submits again +// Expected: Second submission should include new selector's stake +func TestStakeCacheSelectorJoin(t *testing.T) { + require := require.New(t) + + cosmos.SetSDKConfig("tellor") + + chain, ic, ctx := e2e.SetupChain(t, 2, 0) + defer ic.Close() + + validators, err := e2e.GetValidators(ctx, chain) + require.NoError(err) + e2e.PrintValidatorInfo(ctx, validators) + + // Turn on minting + require.NoError(e2e.TurnOnMinting(ctx, chain, validators[0].Node)) + require.NoError(testutil.WaitForBlocks(ctx, 7, validators[0].Node)) + + // Create a user with stake + fundAmt := math.NewInt(2_000 * 1e6) + delegateAmt := sdk.NewCoin("loya", math.NewInt(1000*1e6)) + user := interchaintest.GetAndFundTestUsers(t, ctx, "selector", fundAmt, chain)[0] + + // User delegates to validator 1 + txHash, err := validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[1].ValAddr, delegateAmt.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("TX HASH (user delegates):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Validator 0 becomes a reporter + minStakeAmt := "1000000" + txHash, err = validators[0].Node.ExecTx(ctx, validators[0].AccAddr, "reporter", "create-reporter", stakeCacheCommissRate, minStakeAmt, moniker, "--keyring-dir", validators[0].Node.HomeDir()) + require.NoError(err) + fmt.Println("TX HASH (validator 0 becomes reporter):", txHash) + + // First report (only validator's self-delegation) + currentCycleListRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + var currentCycleList e2e.QueryCurrentCyclelistQueryResponse + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + value := layerutil.EncodeValue(200.0) + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("First report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Query first report power + qDataBz, err := hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz := utils.QueryIDFromData(qDataBz) + qId := hex.EncodeToString(qIdBz) + res, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + var aggRes e2e.QueryGetCurrentAggregateReportResponse + require.NoError(json.Unmarshal(res, &aggRes)) + firstPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("First report power:", firstPower) + + // User selects validator 0 as their reporter + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "reporter", "select-reporter", validators[0].AccAddr, "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "5loya") + require.NoError(err) + fmt.Println("TX HASH (user selects reporter):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Second report - should include selector's stake + currentCycleListRes, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("Second report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Query second report power + qDataBz, err = hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz = utils.QueryIDFromData(qDataBz) + qId = hex.EncodeToString(qIdBz) + res, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + require.NoError(json.Unmarshal(res, &aggRes)) + secondPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("Second report power:", secondPower) + + // Second power should be greater due to new selector + require.Greater(secondPower, firstPower, "Reporter stake should increase after selector joins") +} + +// TestStakeCacheSelectorSwitch tests that both reporters recalculate stake when selector switches +// Scenario: Selector is with reporter A, switches to reporter B, both reporters submit +// Expected: Reporter A loses stake, Reporter B gains stake +func TestStakeCacheSelectorSwitch(t *testing.T) { + require := require.New(t) + + cosmos.SetSDKConfig("tellor") + + chain, ic, ctx := e2e.SetupChain(t, 2, 0) + defer ic.Close() + + validators, err := e2e.GetValidators(ctx, chain) + require.NoError(err) + e2e.PrintValidatorInfo(ctx, validators) + + // Turn on minting + require.NoError(e2e.TurnOnMinting(ctx, chain, validators[0].Node)) + require.NoError(testutil.WaitForBlocks(ctx, 7, validators[0].Node)) + + // Both validators become reporters + for i := range validators { + minStakeAmt := "1000000" + moniker := fmt.Sprintf("reporter_%d", i) + txHash, err := validators[i].Node.ExecTx(ctx, validators[i].AccAddr, "reporter", "create-reporter", stakeCacheCommissRate, minStakeAmt, moniker, "--keyring-dir", validators[i].Node.HomeDir()) + require.NoError(err) + fmt.Printf("TX HASH (validator %d becomes reporter): %s\n", i, txHash) + } + + // Create a user with stake + fundAmt := math.NewInt(2_000 * 1e6) + delegateAmt := sdk.NewCoin("loya", math.NewInt(1000*1e6)) + user := interchaintest.GetAndFundTestUsers(t, ctx, "selector", fundAmt, chain)[0] + + // User delegates to validator 0 + txHash, err := validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[0].ValAddr, delegateAmt.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("TX HASH (user delegates to val 0):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // User selects validator 0 as their reporter initially + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "reporter", "select-reporter", validators[0].AccAddr, "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "5loya") + require.NoError(err) + fmt.Println("TX HASH (user selects validator 0 as reporter):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Both reporters submit first report + currentCycleListRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + var currentCycleList e2e.QueryCurrentCyclelistQueryResponse + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + value := layerutil.EncodeValue(300.0) + for i := range validators { + _, _, err = validators[i].Node.Exec(ctx, validators[i].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[i].Node.HomeDir()), validators[i].Node.Chain.Config().Env) + require.NoError(err) + fmt.Printf("Validator %d first report submitted\n", i) + } + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Get individual reporter powers from micro-reports before switch + var reporter0PowerBefore, reporter1PowerBefore uint64 + for i, v := range validators { + reports, _, err := e2e.QueryWithTimeout(ctx, v.Node, "oracle", "get-reportsby-reporter", v.AccAddr, "--page-limit", "1") + require.NoError(err) + var reportsRes e2e.QueryMicroReportsResponse + require.NoError(json.Unmarshal(reports, &reportsRes)) + require.NotEmpty(reportsRes.MicroReports, "Validator %d should have a micro-report", i) + power, err := strconv.ParseUint(reportsRes.MicroReports[0].Power, 10, 64) + require.NoError(err) + if i == 0 { + reporter0PowerBefore = power + } else { + reporter1PowerBefore = power + } + fmt.Printf("Validator %d power before switch: %d\n", i, power) + } + + // Reporter 0 should have more power (includes selector's delegation) + require.Greater(reporter0PowerBefore, reporter1PowerBefore, "Reporter 0 should have more power than reporter 1 before switch (has selector)") + + // User switches reporter from validator 0 to validator 1 + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "reporter", "switch-reporter", validators[1].AccAddr, "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "5loya") + require.NoError(err) + fmt.Println("TX HASH (user switches to validator 1):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Both reporters submit second report + currentCycleListRes, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + for i := range validators { + _, _, err = validators[i].Node.Exec(ctx, validators[i].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[i].Node.HomeDir()), validators[i].Node.Chain.Config().Env) + require.NoError(err) + fmt.Printf("Validator %d second report submitted\n", i) + } + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Get individual reporter powers after switch + var reporter0PowerAfter, reporter1PowerAfter uint64 + for i, v := range validators { + reports, _, err := e2e.QueryWithTimeout(ctx, v.Node, "oracle", "get-reportsby-reporter", v.AccAddr, "--page-limit", "2") + require.NoError(err) + var reportsRes e2e.QueryMicroReportsResponse + require.NoError(json.Unmarshal(reports, &reportsRes)) + require.GreaterOrEqual(len(reportsRes.MicroReports), 2, "Validator %d should have at least 2 micro-reports", i) + // Get the most recent report (last one) + latestReport := reportsRes.MicroReports[len(reportsRes.MicroReports)-1] + power, err := strconv.ParseUint(latestReport.Power, 10, 64) + require.NoError(err) + if i == 0 { + reporter0PowerAfter = power + } else { + reporter1PowerAfter = power + } + fmt.Printf("Validator %d power after switch: %d\n", i, power) + } + + // After switch: reporter 0 should lose selector's stake. + // Reporter 1 does NOT gain the selector's stake yet because the selector is locked + // for the unbonding period after switching reporters (LockedUntilTime is set in SwitchReporter). + // GetReporterStake skips selectors whose LockedUntilTime is after the current block time. + fmt.Printf("Reporter 0: %d -> %d\n", reporter0PowerBefore, reporter0PowerAfter) + fmt.Printf("Reporter 1: %d -> %d\n", reporter1PowerBefore, reporter1PowerAfter) + require.Less(reporter0PowerAfter, reporter0PowerBefore, "Reporter 0 should lose power after selector switches away") + require.Equal(reporter1PowerAfter, reporter1PowerBefore, "Reporter 1 should not gain power yet (selector is locked for unbonding period)") +} + +// TestStakeCacheDelegationChange tests that reporter stake is recalculated after delegation change +// Scenario: Selector delegates more to validator, reporter submits +// Expected: Reporter stake should reflect the new delegation amount +func TestStakeCacheDelegationChange(t *testing.T) { + require := require.New(t) + + cosmos.SetSDKConfig("tellor") + + chain, ic, ctx := e2e.SetupChain(t, 2, 0) + defer ic.Close() + + validators, err := e2e.GetValidators(ctx, chain) + require.NoError(err) + e2e.PrintValidatorInfo(ctx, validators) + + // Turn on minting + require.NoError(e2e.TurnOnMinting(ctx, chain, validators[0].Node)) + require.NoError(testutil.WaitForBlocks(ctx, 7, validators[0].Node)) + + // Validator 0 becomes a reporter + minStakeAmt := "1000000" + txHash, err := validators[0].Node.ExecTx(ctx, validators[0].AccAddr, "reporter", "create-reporter", stakeCacheCommissRate, minStakeAmt, moniker, "--keyring-dir", validators[0].Node.HomeDir()) + require.NoError(err) + fmt.Println("TX HASH (validator 0 becomes reporter):", txHash) + + // Create a user with stake + fundAmt := math.NewInt(10_000 * 1e6) + initialDelegate := sdk.NewCoin("loya", math.NewInt(1000*1e6)) + user := interchaintest.GetAndFundTestUsers(t, ctx, "selector", fundAmt, chain)[0] + + // User delegates initial amount + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[0].ValAddr, initialDelegate.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("TX HASH (user initial delegation):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // User selects validator 0 as their reporter + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "reporter", "select-reporter", validators[0].AccAddr, "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "5loya") + require.NoError(err) + fmt.Println("TX HASH (user selects reporter):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // First report + currentCycleListRes, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + var currentCycleList e2e.QueryCurrentCyclelistQueryResponse + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + value := layerutil.EncodeValue(400.0) + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("First report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Query first report power + qDataBz, err := hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz := utils.QueryIDFromData(qDataBz) + qId := hex.EncodeToString(qIdBz) + res, _, err := e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + var aggRes e2e.QueryGetCurrentAggregateReportResponse + require.NoError(json.Unmarshal(res, &aggRes)) + firstPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("First report power:", firstPower) + + // User delegates more (this triggers AfterDelegationModified hook) + additionalDelegate := sdk.NewCoin("loya", math.NewInt(5000*1e6)) + txHash, err = validators[0].Node.ExecTx(ctx, user.FormattedAddress(), "staking", "delegate", validators[0].ValAddr, additionalDelegate.String(), "--keyring-dir", validators[0].Node.HomeDir(), "--fees", "10loya") + require.NoError(err) + fmt.Println("TX HASH (user additional delegation):", txHash) + + require.NoError(testutil.WaitForBlocks(ctx, 3, validators[0].Node)) + + // Second report - should recalculate with new stake + currentCycleListRes, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "current-cyclelist-query") + require.NoError(err) + require.NoError(json.Unmarshal(currentCycleListRes, ¤tCycleList)) + + _, _, err = validators[0].Node.Exec(ctx, validators[0].Node.TxCommand("validator", "oracle", "submit-value", currentCycleList.QueryData, value, "--fees", "5loya", "--keyring-dir", validators[0].Node.HomeDir()), validators[0].Node.Chain.Config().Env) + require.NoError(err) + fmt.Println("Second report submitted") + + require.NoError(testutil.WaitForBlocks(ctx, 2, validators[0].Node)) + + // Query second report power + qDataBz, err = hex.DecodeString(currentCycleList.QueryData) + require.NoError(err) + qIdBz = utils.QueryIDFromData(qDataBz) + qId = hex.EncodeToString(qIdBz) + res, _, err = e2e.QueryWithTimeout(ctx, validators[0].Node, "oracle", "get-current-aggregate-report", qId) + require.NoError(err) + require.NoError(json.Unmarshal(res, &aggRes)) + secondPower, err := strconv.ParseUint(aggRes.Aggregate.AggregatePower, 10, 64) + require.NoError(err) + fmt.Println("Second report power:", secondPower) + + // Second power should be greater due to additional delegation + require.Greater(secondPower, firstPower, "Reporter stake should increase after additional delegation") +} diff --git a/e2e/upgrade_test.go b/e2e/upgrade_test.go index 623416eb8..ef4309e5f 100644 --- a/e2e/upgrade_test.go +++ b/e2e/upgrade_test.go @@ -28,7 +28,7 @@ const ( ) func TestLayerUpgrade(t *testing.T) { - ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.1") + ChainUpgradeTest(t, "layer", "layer", "local", "v6.1.2") } func ChainUpgradeTest(t *testing.T, chainName, upgradeContainerRepo, upgradeVersion, upgradeName string) { diff --git a/tests/integration/oracle_keeper_test.go b/tests/integration/oracle_keeper_test.go index 51bdfd69e..c1aca60d7 100644 --- a/tests/integration/oracle_keeper_test.go +++ b/tests/integration/oracle_keeper_test.go @@ -1404,3 +1404,219 @@ func (s *IntegrationTestSuite) TestRotateQueriesToExpiredTippedQuery() { // Also verify it's greater than the ID it had when tipped s.Greater(query.Id, tippedMetaId0, "Meta ID should have increased when expired tipped query was rotated to") } + +func (s *IntegrationTestSuite) TestSubmitMultipleReportsDifferentMetaIds() { + require := s.Require() + ctx := s.Setup.Ctx + ctx = ctx.WithConsensusParams(cmtproto.ConsensusParams{ + Abci: &cmtproto.ABCIParams{ + VoteExtensionsEnableHeight: 1, + }, + }) + ctx = ctx.WithBlockTime(time.Now()) + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + _, err := s.Setup.App.BeginBlocker(ctx) + require.NoError(err) + + msgServer := keeper.NewMsgServerImpl(s.Setup.Oraclekeeper) + repAccs, valAddrs, _ := s.createValidatorAccs([]uint64{100, 200}) + for _, val := range valAddrs { + require.NoError(s.Setup.Bridgekeeper.SetEVMAddressByOperator(ctx, val.String(), []byte("not real"))) + } + _, err = s.Setup.App.EndBlocker(ctx) + require.NoError(err) + + tipper := s.newKeysWithTokens() + + // register reporters + for _, rep := range repAccs { + require.NoError(s.Setup.Reporterkeeper.Reporters.Set(ctx, rep, reportertypes.NewReporter(reportertypes.DefaultMinCommissionRate, math.OneInt(), "reporter"))) + require.NoError(s.Setup.Reporterkeeper.Selectors.Set(ctx, rep, reportertypes.NewSelection(rep, 1))) + _, err := s.Setup.Reporterkeeper.ReporterStake(ctx, rep, []byte{}) + require.NoError(err) + } + + tip := sdk.NewCoin(s.Setup.Denom, math.NewInt(100_000)) + queryDatas := [][]byte{ethQueryData, btcQueryData, trbQueryData} + metaIds := make([]uint64, 3) + + for i, qd := range queryDatas { + // tip the query + _, err := msgServer.Tip(ctx, &types.MsgTip{Tipper: tipper.String(), QueryData: qd, Amount: tip}) + require.NoError(err) + + queryId := utils.QueryIDFromData(qd) + query, err := s.Setup.Oraclekeeper.CurrentQuery(ctx, queryId) + require.NoError(err) + metaIds[i] = query.Id + + // both reporters submit + for _, rep := range repAccs { + _, err = msgServer.SubmitValue(ctx, &types.MsgSubmitValue{ + Creator: rep.String(), + QueryData: qd, + Value: testValue, + }) + require.NoError(err) + } + + // advance blocks to expire and aggregate + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 3) + _, err = s.Setup.App.BeginBlocker(ctx) + require.NoError(err) + _, err = s.Setup.App.EndBlocker(ctx) + require.NoError(err) + require.NoError(s.Setup.Oraclekeeper.SetAggregatedReport(ctx)) + + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + } + + // verify all metaIds are different + require.NotEqual(metaIds[0], metaIds[1], "eth and btc should have different metaIds") + require.NotEqual(metaIds[1], metaIds[2], "btc and trb should have different metaIds") + require.NotEqual(metaIds[0], metaIds[2], "eth and trb should have different metaIds") + + // verify reports exist for each query + for i, qd := range queryDatas { + queryId := utils.QueryIDFromData(qd) + for _, rep := range repAccs { + report, err := s.Setup.Oraclekeeper.Reports.Get(ctx, collections.Join3(queryId, rep.Bytes(), metaIds[i])) + require.NoError(err) + require.Equal(rep.String(), report.Reporter) + } + } + + fmt.Printf("MetaIds - ETH/USD: %d, BTC/USD: %d, TRB/USD: %d\n", metaIds[0], metaIds[1], metaIds[2]) +} + +func (s *IntegrationTestSuite) TestRemoveOldReports() { + require := s.Require() + ctx := s.Setup.Ctx + ctx = ctx.WithConsensusParams(cmtproto.ConsensusParams{ + Abci: &cmtproto.ABCIParams{ + VoteExtensionsEnableHeight: 1, + }, + }) + + now := time.Now() + ctx = ctx.WithBlockTime(now) + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + _, err := s.Setup.App.BeginBlocker(ctx) + require.NoError(err) + + msgServer := keeper.NewMsgServerImpl(s.Setup.Oraclekeeper) + repAccs, valAddrs, _ := s.createValidatorAccs([]uint64{100, 200}) + for _, val := range valAddrs { + require.NoError(s.Setup.Bridgekeeper.SetEVMAddressByOperator(ctx, val.String(), []byte("not real"))) + } + _, err = s.Setup.App.EndBlocker(ctx) + require.NoError(err) + + tipper := s.newKeysWithTokens() + + for _, rep := range repAccs { + require.NoError(s.Setup.Reporterkeeper.Reporters.Set(ctx, rep, reportertypes.NewReporter(reportertypes.DefaultMinCommissionRate, math.OneInt(), "reporter"))) + require.NoError(s.Setup.Reporterkeeper.Selectors.Set(ctx, rep, reportertypes.NewSelection(rep, 1))) + _, err := s.Setup.Reporterkeeper.ReporterStake(ctx, rep, []byte{}) + require.NoError(err) + } + + tip := sdk.NewCoin(s.Setup.Denom, math.NewInt(100_000)) + + // helper to count all reports in the store + countReports := func(c sdk.Context) int { + count := 0 + _ = s.Setup.Oraclekeeper.Reports.Walk(c, nil, func(_ collections.Triple[[]byte, []byte, uint64], _ types.MicroReport) (bool, error) { + count++ + return false, nil + }) + return count + } + + // --- Round 1: submit reports 45 days ago (should be pruned) --- + oldTime := now.Add(-45 * 24 * time.Hour) + ctx = ctx.WithBlockTime(oldTime) + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + + _, err = msgServer.Tip(ctx, &types.MsgTip{Tipper: tipper.String(), QueryData: ethQueryData, Amount: tip}) + require.NoError(err) + for _, rep := range repAccs { + _, err = msgServer.SubmitValue(ctx, &types.MsgSubmitValue{Creator: rep.String(), QueryData: ethQueryData, Value: testValue}) + require.NoError(err) + } + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 3) + require.NoError(s.Setup.Oraclekeeper.SetAggregatedReport(ctx)) + + // --- submit reports 31 days ago (should be pruned) --- + oldTime2 := now.Add(-31 * 24 * time.Hour) + ctx = ctx.WithBlockTime(oldTime2) + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + + _, err = msgServer.Tip(ctx, &types.MsgTip{Tipper: tipper.String(), QueryData: btcQueryData, Amount: tip}) + require.NoError(err) + for _, rep := range repAccs { + _, err = msgServer.SubmitValue(ctx, &types.MsgSubmitValue{Creator: rep.String(), QueryData: btcQueryData, Value: testValue}) + require.NoError(err) + } + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 3) + require.NoError(s.Setup.Oraclekeeper.SetAggregatedReport(ctx)) + + // --- submit reports 5 days ago (should NOT be pruned) --- + recentTime := now.Add(-5 * 24 * time.Hour) + ctx = ctx.WithBlockTime(recentTime) + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 1) + + _, err = msgServer.Tip(ctx, &types.MsgTip{Tipper: tipper.String(), QueryData: trbQueryData, Amount: tip}) + require.NoError(err) + for _, rep := range repAccs { + _, err = msgServer.SubmitValue(ctx, &types.MsgSubmitValue{Creator: rep.String(), QueryData: trbQueryData, Value: testValue}) + require.NoError(err) + } + ctx = ctx.WithBlockHeight(ctx.BlockHeight() + 3) + require.NoError(s.Setup.Oraclekeeper.SetAggregatedReport(ctx)) + + // We should have 6 reports total: 2 reporters × 3 queries + totalBefore := countReports(ctx) + require.Equal(6, totalBefore, "should have 6 reports before pruning") + + // Prune with current time (now) + // All 4 old reports deleted + ctx = ctx.WithBlockTime(now) + require.NoError(s.Setup.Oraclekeeper.RemoveOldReports(ctx)) + require.Equal(2, countReports(ctx), "should have 2 reports after pruning old ones") + + // Prune again, nothing left to prune + require.NoError(s.Setup.Oraclekeeper.RemoveOldReports(ctx)) + require.Equal(2, countReports(ctx), "recent reports should not be pruned") + + // --- Verify the remaining reports are the TRB/USD ones --- + trbQueryId := utils.QueryIDFromData(trbQueryData) + for _, rep := range repAccs { + has, err := s.Setup.Oraclekeeper.Reports.Has(ctx, collections.Join3(trbQueryId, rep.Bytes(), uint64(2))) + require.NoError(err) + require.True(has, "TRB/USD report should still exist") + } + + // --- Verify old reports are gone --- + ethQueryId := utils.QueryIDFromData(ethQueryData) + btcQueryId := utils.QueryIDFromData(btcQueryData) + for _, rep := range repAccs { + has, err := s.Setup.Oraclekeeper.Reports.Has(ctx, collections.Join3(ethQueryId, rep.Bytes(), uint64(0))) + require.NoError(err) + require.False(has, "ETH/USD report should be pruned") + + has, err = s.Setup.Oraclekeeper.Reports.Has(ctx, collections.Join3(btcQueryId, rep.Bytes(), uint64(1))) + require.NoError(err) + require.False(has, "BTC/USD report should be pruned") + } + + // --- Verify indexes are also cleaned up --- + indexCount := 0 + iter, err := s.Setup.Oraclekeeper.Reports.Indexes.IdQueryId.Iterate(ctx, nil) + require.NoError(err) + defer iter.Close() + for ; iter.Valid(); iter.Next() { + indexCount++ + } + require.Equal(2, indexCount, "IdQueryId index should only have 2 entries after pruning") +} diff --git a/tests/integration/reporter_keeper_test.go b/tests/integration/reporter_keeper_test.go index c4355324a..230206512 100644 --- a/tests/integration/reporter_keeper_test.go +++ b/tests/integration/reporter_keeper_test.go @@ -8,6 +8,7 @@ import ( cmttypes "github.com/cometbft/cometbft/types" "github.com/tellor-io/layer/testutil/sample" layertypes "github.com/tellor-io/layer/types" + "github.com/tellor-io/layer/utils" oraclekeeper "github.com/tellor-io/layer/x/oracle/keeper" oracletypes "github.com/tellor-io/layer/x/oracle/types" "github.com/tellor-io/layer/x/reporter/keeper" @@ -670,33 +671,46 @@ func (s *IntegrationTestSuite) TestPruneOldReports() { err = rk.Report.Set(ctx, collections.Join(queryId3, collections.Join(reporter.Bytes(), uint64(300))), delegationsAmounts) require.NoError(err) - // Set up Aggregate entries in oracle keeper (needed for GetTimestampForBlockHeight) - // The BlockHeight index uses the Height field of the Aggregate - aggregate1 := oracletypes.Aggregate{ - QueryId: queryId1, + // Set up ETH/USD aggregate entries in oracle keeper for GetBlockHeightFromTimestamp. + // GetBlockHeightFromTimestamp uses ETH/USD queryId to resolve timestamps to block heights. + ethUsdQueryId := utils.QueryIDFromData(ethQueryData) + + // Aggregate at startTime with Height 50 (before block 100) + err = ok.Aggregates.Set(ctx, collections.Join(ethUsdQueryId, timestamp1), oracletypes.Aggregate{ + QueryId: ethUsdQueryId, AggregateValue: "100", AggregateReporter: reporter.String(), - Height: 100, - } - err = ok.Aggregates.Set(ctx, collections.Join(queryId1, timestamp1), aggregate1) + Height: 50, + }) require.NoError(err) - aggregate2 := oracletypes.Aggregate{ - QueryId: queryId2, + // Aggregate at startTime+30d with Height 150 (between block 100 and 200) + err = ok.Aggregates.Set(ctx, collections.Join(ethUsdQueryId, timestamp2), oracletypes.Aggregate{ + QueryId: ethUsdQueryId, AggregateValue: "100", AggregateReporter: reporter.String(), - Height: 200, - } - err = ok.Aggregates.Set(ctx, collections.Join(queryId2, timestamp2), aggregate2) + Height: 150, + }) require.NoError(err) - aggregate3 := oracletypes.Aggregate{ - QueryId: queryId3, + // Aggregate at startTime+50d with Height 250 (between block 200 and 300) + err = ok.Aggregates.Set(ctx, collections.Join(ethUsdQueryId, timestamp3), oracletypes.Aggregate{ + QueryId: ethUsdQueryId, AggregateValue: "100", AggregateReporter: reporter.String(), - Height: 300, - } - err = ok.Aggregates.Set(ctx, collections.Join(queryId3, timestamp3), aggregate3) + Height: 250, + }) + require.NoError(err) + + // Aggregate at startTime+80d with Height 350 (above block 300) + // Used when pruning at 120 days: cutoff=startTime+90d, nearest before is startTime+80d -> Height 350 + timestamp4 := uint64(startTime.Add(80 * 24 * time.Hour).UnixMilli()) + err = ok.Aggregates.Set(ctx, collections.Join(ethUsdQueryId, timestamp4), oracletypes.Aggregate{ + QueryId: ethUsdQueryId, + AggregateValue: "100", + AggregateReporter: reporter.String(), + Height: 350, + }) require.NoError(err) // Verify all 3 Report entries exist diff --git a/x/bridge/keeper/keeper.go b/x/bridge/keeper/keeper.go index 73974b48c..e620d749f 100644 --- a/x/bridge/keeper/keeper.go +++ b/x/bridge/keeper/keeper.go @@ -926,7 +926,11 @@ func (k Keeper) CreateNewReportSnapshots(ctx context.Context) error { k.Logger(ctx).Info("Error getting snapshot limit", "error", err) return err } - reports := k.oracleKeeper.GetAggregatedReportsByHeight(ctx, uint64(blockHeight)) + reports, err := k.oracleKeeper.GetAggregatedReportsByHeight(ctx, uint64(blockHeight)) + if err != nil { + k.Logger(ctx).Info("Error getting aggregated reports by height", "error", err) + return err + } for _, report := range reports { queryId := report.QueryId timeNow := sdkCtx.BlockTime().Add(time.Second) diff --git a/x/bridge/keeper/msg_server_withdraw_tokens.go b/x/bridge/keeper/msg_server_withdraw_tokens.go index 6fb8ab26d..fefdd17ea 100644 --- a/x/bridge/keeper/msg_server_withdraw_tokens.go +++ b/x/bridge/keeper/msg_server_withdraw_tokens.go @@ -8,6 +8,8 @@ import ( layer "github.com/tellor-io/layer/types" "github.com/tellor-io/layer/x/bridge/types" + errorsmod "cosmossdk.io/errors" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -16,12 +18,15 @@ import ( func (k msgServer) WithdrawTokens(goCtx context.Context, msg *types.MsgWithdrawTokens) (*types.MsgWithdrawTokensResponse, error) { sdkCtx := sdk.UnwrapSDKContext(goCtx) + sender, err := sdk.AccAddressFromBech32(msg.Creator) + if err != nil { + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) + } + if msg.Amount.Denom != layer.BondDenom || msg.Amount.Amount.IsZero() || msg.Amount.Amount.IsNegative() { return nil, sdkerrors.ErrInvalidRequest } - sender := sdk.MustAccAddressFromBech32(msg.Creator) - recipient, err := hex.DecodeString(msg.Recipient) if err != nil { return nil, sdkerrors.ErrInvalidRequest diff --git a/x/bridge/mocks/OracleKeeper.go b/x/bridge/mocks/OracleKeeper.go index 3316cf817..38051e2bd 100644 --- a/x/bridge/mocks/OracleKeeper.go +++ b/x/bridge/mocks/OracleKeeper.go @@ -74,10 +74,14 @@ func (_m *OracleKeeper) GetAggregateByTimestamp(ctx context.Context, queryId []b } // GetAggregatedReportsByHeight provides a mock function with given fields: ctx, height -func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) []types.Aggregate { +func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) ([]types.Aggregate, error) { ret := _m.Called(ctx, height) var r0 []types.Aggregate + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) ([]types.Aggregate, error)); ok { + return rf(ctx, height) + } if rf, ok := ret.Get(0).(func(context.Context, uint64) []types.Aggregate); ok { r0 = rf(ctx, height) } else { @@ -86,7 +90,13 @@ func (_m *OracleKeeper) GetAggregatedReportsByHeight(ctx context.Context, height } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, height) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // GetCurrentAggregateReport provides a mock function with given fields: ctx, queryId diff --git a/x/bridge/types/expected_keepers.go b/x/bridge/types/expected_keepers.go index faf9ec5e1..5816526c7 100644 --- a/x/bridge/types/expected_keepers.go +++ b/x/bridge/types/expected_keepers.go @@ -45,7 +45,7 @@ type OracleKeeper interface { GetAggregateByTimestamp(ctx context.Context, queryId []byte, timestamp uint64) (oracletypes.Aggregate, error) GetTimestampBefore(ctx context.Context, queryId []byte, timestamp time.Time) (time.Time, error) GetTimestampAfter(ctx context.Context, queryId []byte, timestamp time.Time) (time.Time, error) - GetAggregatedReportsByHeight(ctx context.Context, height uint64) []oracletypes.Aggregate + GetAggregatedReportsByHeight(ctx context.Context, height uint64) ([]oracletypes.Aggregate, error) SetAggregate(ctx context.Context, report *oracletypes.Aggregate, queryData []byte, queryType string) error GetCurrentAggregateReport(ctx context.Context, queryId []byte) (aggregate *oracletypes.Aggregate, timestamp time.Time, err error) GetNoStakeReportByQueryIdTimestamp(ctx context.Context, queryId []byte, timestamp uint64) (*oracletypes.NoStakeMicroReport, error) diff --git a/x/dispute/keeper/msg_server_withdraw_fee_refund.go b/x/dispute/keeper/msg_server_withdraw_fee_refund.go index b4bc2c69c..ab5ffe0e7 100644 --- a/x/dispute/keeper/msg_server_withdraw_fee_refund.go +++ b/x/dispute/keeper/msg_server_withdraw_fee_refund.go @@ -8,9 +8,11 @@ import ( "github.com/tellor-io/layer/x/dispute/types" "cosmossdk.io/collections" + errorsmod "cosmossdk.io/errors" "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) // WithdrawFeeRefund allows whoever paid the first round dispute fee to get refunded the fee if the @@ -18,7 +20,10 @@ import ( // If the dispute resolves to support, a first round fee payer also gets the disputed reporter's slashed tokens. func (k msgServer) WithdrawFeeRefund(ctx context.Context, msg *types.MsgWithdrawFeeRefund) (*types.MsgWithdrawFeeRefundResponse, error) { // should be ok to be called by anyone - feePayer := sdk.MustAccAddressFromBech32(msg.PayerAddress) + feePayer, err := sdk.AccAddressFromBech32(msg.PayerAddress) + if err != nil { + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid payer address (%s)", err) + } // dispute dispute, err := k.Disputes.Get(ctx, msg.Id) if err != nil { diff --git a/x/dispute/keeper/tally.go b/x/dispute/keeper/tally.go index 362a0344b..c07505aa2 100644 --- a/x/dispute/keeper/tally.go +++ b/x/dispute/keeper/tally.go @@ -247,8 +247,15 @@ func (k Keeper) UpdateDispute( result = types.VoteResult_NO_QUORUM_MAJORITY_INVALID } default: - k.Logger(ctx).Error("Vote tally", "result", "no majority") - return nil + // tie -- no single option has strictly more votes than both others + // treat as INVALID since no clear winner emerged + // set reuslt so that ExecuteVote doesnt err + k.Logger(ctx).Info("Vote tally", "result", "no majority, defaulting to invalid") + if quorum { + result = types.VoteResult_INVALID + } else { + result = types.VoteResult_NO_QUORUM_MAJORITY_INVALID + } } vote.VoteResult = result vote.VoteEnd = sdk.UnwrapSDKContext(ctx).BlockTime() diff --git a/x/dispute/keeper/tally_test.go b/x/dispute/keeper/tally_test.go index 2137cca68..0ea22246c 100644 --- a/x/dispute/keeper/tally_test.go +++ b/x/dispute/keeper/tally_test.go @@ -478,4 +478,46 @@ func (s *KeeperTestSuite) TestUpdateDispute() { require.NoError(err) require.Equal(voteRes.Id, vote.Id) require.Equal(voteRes.VoteResult, vote.VoteResult) + + // tie with quorum - two options equal and highest (support == against) + vote = types.Vote{ + Id: id, + VoteResult: types.VoteResult_NO_TALLY, // starts as NO_TALLY + } + scaledSupport = math.NewInt(50) + scaledAgainst = math.NewInt(50) + scaledInvalid = math.ZeroInt() + + require.NoError(k.UpdateDispute(ctx, id, dispute, vote, scaledSupport, scaledAgainst, scaledInvalid, true)) + voteRes, err = k.Votes.Get(ctx, id) + require.NoError(err) + require.Equal(types.VoteResult_INVALID, voteRes.VoteResult, "tie with quorum should result in INVALID") + + // tie without quorum - three-way tie + vote = types.Vote{ + Id: id, + VoteResult: types.VoteResult_NO_TALLY, + } + scaledSupport = math.NewInt(33) + scaledAgainst = math.NewInt(33) + scaledInvalid = math.NewInt(33) + + require.NoError(k.UpdateDispute(ctx, id, dispute, vote, scaledSupport, scaledAgainst, scaledInvalid, false)) + voteRes, err = k.Votes.Get(ctx, id) + require.NoError(err) + require.Equal(types.VoteResult_NO_QUORUM_MAJORITY_INVALID, voteRes.VoteResult, "tie without quorum should result in NO_QUORUM_MAJORITY_INVALID") + + // tie: support == invalid, both higher than against + vote = types.Vote{ + Id: id, + VoteResult: types.VoteResult_NO_TALLY, + } + scaledSupport = math.NewInt(40) + scaledAgainst = math.NewInt(20) + scaledInvalid = math.NewInt(40) + + require.NoError(k.UpdateDispute(ctx, id, dispute, vote, scaledSupport, scaledAgainst, scaledInvalid, true)) + voteRes, err = k.Votes.Get(ctx, id) + require.NoError(err) + require.Equal(types.VoteResult_INVALID, voteRes.VoteResult, "two-way tie with quorum should result in INVALID") } diff --git a/x/dispute/keeper/vote.go b/x/dispute/keeper/vote.go index 9273b418e..93da055c3 100644 --- a/x/dispute/keeper/vote.go +++ b/x/dispute/keeper/vote.go @@ -165,7 +165,10 @@ func (k Keeper) SetVoterReporterStake(ctx context.Context, id uint64, voter sdk. } tokensVotedBefore = math.ZeroInt() } - reporterTokens = reporterTokens.Sub(tokensVotedBefore) + reporterTokens, err = reporterTokens.SafeSub(tokensVotedBefore) + if err != nil { + return math.Int{}, err + } return reporterTokens, k.AddReporterVoteCount(ctx, id, reporterTokens.Uint64(), choice, oldVote) } // voter is non-reporter selector @@ -193,10 +196,16 @@ func (k Keeper) SetVoterReporterStake(ctx context.Context, id uint64, voter sdk. if err != nil { return math.Int{}, err } - // update reporter's power record for reward calculation - reporterVote.ReporterPower = reporterVote.ReporterPower.Sub(selectorTokens) + // update reporter's power record for reward calcu lation + reporterVote.ReporterPower, err = reporterVote.ReporterPower.SafeSub(selectorTokens) + if err != nil { + return math.Int{}, err + } // decrease reporterVote.VoterPower by selectorTokens - reporterVote.VoterPower = reporterVote.VoterPower.Sub(selectorTokens) + reporterVote.VoterPower, err = reporterVote.VoterPower.SafeSub(selectorTokens) + if err != nil { + return math.Int{}, err + } err = k.Voter.Set(ctx, collections.Join(id, reporter.Bytes()), reporterVote) if err != nil { return math.Int{}, err @@ -210,7 +219,10 @@ func (k Keeper) SetVoterReporterStake(ctx context.Context, id uint64, voter sdk. } delegatorTokensVoted = math.ZeroInt() } - delegatorTokensVoted = delegatorTokensVoted.Add(selectorTokens) + delegatorTokensVoted, err = delegatorTokensVoted.SafeAdd(selectorTokens) + if err != nil { + return math.Int{}, err + } err = k.ReportersWithDelegatorsVotedBefore.Set(ctx, collections.Join(reporter.Bytes(), id), delegatorTokensVoted) if err != nil { return math.Int{}, err diff --git a/x/oracle/abci.go b/x/oracle/abci.go index a2d1e47d7..088ead9dc 100644 --- a/x/oracle/abci.go +++ b/x/oracle/abci.go @@ -22,5 +22,9 @@ func EndBlocker(ctx context.Context, k keeper.Keeper) error { return err } - return k.RotateQueries(ctx) + if err := k.RotateQueries(ctx); err != nil { + return err + } + + return k.RemoveOldReports(ctx) } diff --git a/x/oracle/keeper/aggregate.go b/x/oracle/keeper/aggregate.go index 95a592102..84409f247 100644 --- a/x/oracle/keeper/aggregate.go +++ b/x/oracle/keeper/aggregate.go @@ -210,7 +210,7 @@ func (k Keeper) GetTimestampBefore(ctx context.Context, queryId []byte, timestam return true, nil }) if err != nil { - panic(err) + return time.Time{}, fmt.Errorf("failed to walk aggregates: %w", err) } if mostRecent == 0 { @@ -228,7 +228,7 @@ func (k Keeper) GetTimestampAfter(ctx context.Context, queryId []byte, timestamp return true, nil }) if err != nil { - panic(err) + return time.Time{}, fmt.Errorf("failed to walk aggregates: %w", err) } if mostRecent == 0 { @@ -238,15 +238,15 @@ func (k Keeper) GetTimestampAfter(ctx context.Context, queryId []byte, timestamp return time.UnixMilli(int64(mostRecent)), nil } -func (k Keeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) []types.Aggregate { +func (k Keeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) ([]types.Aggregate, error) { iter, err := k.Aggregates.Indexes.BlockHeight.MatchExact(ctx, height) if err != nil { - panic(err) + return nil, fmt.Errorf("failed to get aggregates by height: %w", err) } kvs, err := indexes.CollectKeyValues(ctx, k.Aggregates, iter) if err != nil { - panic(err) + return nil, fmt.Errorf("failed to collect aggregate key values: %w", err) } reports := make([]types.Aggregate, len(kvs)) @@ -254,7 +254,7 @@ func (k Keeper) GetAggregatedReportsByHeight(ctx context.Context, height uint64) reports[i] = kv.Value } - return reports + return reports, nil } func (k Keeper) GetCurrentAggregateReport(ctx context.Context, queryId []byte) (aggregate *types.Aggregate, timestamp time.Time, err error) { diff --git a/x/oracle/keeper/aggregate_test.go b/x/oracle/keeper/aggregate_test.go index 7960ba95e..b39a029c5 100644 --- a/x/oracle/keeper/aggregate_test.go +++ b/x/oracle/keeper/aggregate_test.go @@ -456,7 +456,8 @@ func (s *KeeperTestSuite) TestGetAggregatedReportsByHeight() { s.NoError(err) s.ctx = s.ctx.WithBlockHeight(15) - aggregates := s.oracleKeeper.GetAggregatedReportsByHeight(s.ctx, uint64(10)) + aggregates, err := s.oracleKeeper.GetAggregatedReportsByHeight(s.ctx, uint64(10)) + s.NoError(err) s.NotEqual(0, len(aggregates)) s.Equal(*aggregate, aggregates[0]) } diff --git a/x/oracle/keeper/keeper.go b/x/oracle/keeper/keeper.go index aa9603b22..6d68204aa 100644 --- a/x/oracle/keeper/keeper.go +++ b/x/oracle/keeper/keeper.go @@ -2,10 +2,12 @@ package keeper import ( "context" + "encoding/hex" "errors" "fmt" gomath "math" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi" layertypes "github.com/tellor-io/layer/types" @@ -28,8 +30,12 @@ import ( const ( twelveHrsInMillis = 12 * 60 * 60 * 1000 // twoMinInMillis = 2 * 60 * 1000 + maxPruneSize = 100 ) +// ethUsdQueryId is the queryId for ETH/USD SpotPrice, used to find nearest timestamps to block heights since its always available. +var ethUsdQueryId, _ = hex.DecodeString("83a7f3d48786ac2667503a61e8c415438ed2922eb86a2906e4ee66d9a2ce4992") + type ( Keeper struct { cdc codec.BinaryCodec @@ -465,3 +471,56 @@ func (k Keeper) GetTimestampForBlockHeight(ctx context.Context, blockHeight uint return 0, nil } + +// GetBlockHeightFromTimestamp returns the block height of the aggregate closest to +// and before the given timestamp, using the ETH/USD query which always has data. +func (k Keeper) GetBlockHeightFromTimestamp(ctx context.Context, timestamp time.Time) (uint64, error) { + aggTimestamp, err := k.GetTimestampBefore(ctx, ethUsdQueryId, timestamp) + if err != nil { + return 0, err + } + agg, err := k.GetAggregateByTimestamp(ctx, ethUsdQueryId, uint64(aggTimestamp.UnixMilli())) + if err != nil { + return 0, err + } + return agg.Height, nil +} + +// RemoveOldReports deletes microreports older than 30. +// It walks the IdQueryId index from the lowest metaId upward, collects +// keys for reports whose timestamp is past the cutoff, then deletes them. +// Returns the number of reports deleted. +func (k Keeper) RemoveOldReports(ctx context.Context) error { + sdkCtx := sdk.UnwrapSDKContext(ctx) + cutoff := sdkCtx.BlockTime().Add(-30 * 24 * time.Hour) + + var toDelete []collections.Triple[[]byte, []byte, uint64] + + iter, err := k.Reports.Indexes.IdQueryId.Iterate(ctx, nil) + if err != nil { + return err + } + defer iter.Close() + for ; iter.Valid() && len(toDelete) < maxPruneSize; iter.Next() { + pk, err := iter.PrimaryKey() + if err != nil { + return err + } + report, err := k.Reports.Get(ctx, pk) + if err != nil { + return err + } + if !report.Timestamp.Before(cutoff) { + break + } + toDelete = append(toDelete, pk) + } + + for _, pk := range toDelete { + if err := k.Reports.Remove(ctx, pk); err != nil { + return err + } + } + + return nil +} diff --git a/x/oracle/keeper/msg_no_stake_report.go b/x/oracle/keeper/msg_no_stake_report.go index 247346307..c4449c6af 100644 --- a/x/oracle/keeper/msg_no_stake_report.go +++ b/x/oracle/keeper/msg_no_stake_report.go @@ -17,7 +17,8 @@ import ( func (k msgServer) NoStakeReport(ctx context.Context, msg *types.MsgNoStakeReport) (res *types.MsgNoStakeReportResponse, err error) { sdkCtx := sdk.UnwrapSDKContext(ctx) - if err := k.validateNoStakeReport(msg); err != nil { + reporterAddr, err := k.validateNoStakeReport(msg) + if err != nil { return nil, err } @@ -47,11 +48,6 @@ func (k msgServer) NoStakeReport(ctx context.Context, msg *types.MsgNoStakeRepor } } - reporterAddr, err := sdk.AccAddressFromBech32(msg.Creator) - if err != nil { - return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) - } - // Size limit check (0.5MB) limit, err := k.keeper.QueryDataLimit.Get(ctx) if err != nil { @@ -74,18 +70,18 @@ func (k msgServer) NoStakeReport(ctx context.Context, msg *types.MsgNoStakeRepor return &types.MsgNoStakeReportResponse{}, nil } -func (k msgServer) validateNoStakeReport(msg *types.MsgNoStakeReport) error { - _, err := sdk.AccAddressFromBech32(msg.Creator) +func (k msgServer) validateNoStakeReport(msg *types.MsgNoStakeReport) (reporter sdk.AccAddress, err error) { + reporter, err = sdk.AccAddressFromBech32(msg.Creator) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) } // make sure query data is not empty if len(msg.QueryData) == 0 { - return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "query data cannot be empty") + return nil, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "query data cannot be empty") } // make sure value is not empty if msg.Value == "" { - return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "value cannot be empty") + return nil, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "value cannot be empty") } - return nil + return reporter, nil } diff --git a/x/oracle/keeper/msg_server_tip.go b/x/oracle/keeper/msg_server_tip.go index a420d3874..e82f1ff2f 100644 --- a/x/oracle/keeper/msg_server_tip.go +++ b/x/oracle/keeper/msg_server_tip.go @@ -37,12 +37,11 @@ import ( func (k msgServer) Tip(goCtx context.Context, msg *types.MsgTip) (*types.MsgTipResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) - err := validateTip(msg) + tipper, err := validateTip(msg) if err != nil { return nil, err } - tipper := sdk.MustAccAddressFromBech32(msg.Tipper) params, err := k.keeper.GetParams(ctx) if err != nil { return nil, err @@ -152,18 +151,18 @@ func (k msgServer) Tip(goCtx context.Context, msg *types.MsgTip) (*types.MsgTipR return &types.MsgTipResponse{}, nil } -func validateTip(msg *types.MsgTip) error { - _, err := sdk.AccAddressFromBech32(msg.Tipper) +func validateTip(msg *types.MsgTip) (tipper sdk.AccAddress, err error) { + tipper, err = sdk.AccAddressFromBech32(msg.Tipper) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid tipper address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid tipper address (%s)", err) } // ensure that the msg.Amount.Denom matches the layer.BondDenom and the amount is a positive number if msg.Amount.Denom != layer.BondDenom || msg.Amount.Amount.IsZero() || msg.Amount.Amount.IsNegative() { - return errorsmod.Wrapf(sdkerrors.ErrInvalidCoins, "invalid tip amount (%s)", msg.Amount.String()) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidCoins, "invalid tip amount (%s)", msg.Amount.String()) } // ensure that the queryData is not empty if len(msg.QueryData) == 0 { - return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "query data is empty") + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "query data is empty") } - return nil + return tipper, nil } diff --git a/x/reporter/ante/ante.go b/x/reporter/ante/ante.go index 0573b58e8..93a381a9f 100644 --- a/x/reporter/ante/ante.go +++ b/x/reporter/ante/ante.go @@ -117,7 +117,7 @@ func (t TrackStakeChangesDecorator) checkStakeChange(ctx sdk.Context, msg sdk.Ms return err } var dstValAddr sdk.ValAddress - if addr, err := sdk.ValAddressFromBech32(msg.ValidatorSrcAddress); err == nil { + if addr, err := sdk.ValAddressFromBech32(msg.ValidatorDstAddress); err == nil { dstValAddr = addr } else { return err diff --git a/x/reporter/keeper/hooks.go b/x/reporter/keeper/hooks.go index fc2669ae5..37289acb2 100644 --- a/x/reporter/keeper/hooks.go +++ b/x/reporter/keeper/hooks.go @@ -25,12 +25,17 @@ func (k Keeper) Hooks() Hooks { return Hooks{k} } +// AfterValidatorBonded is called when a validator becomes bonded status. +// We set LastValSetUpdateHeight so all reporters recalculate on their next report. func (h Hooks) AfterValidatorBonded(ctx context.Context, _ sdk.ConsAddress, _ sdk.ValAddress) error { - return nil + return h.k.LastValSetUpdateHeight.Set(ctx, uint64(sdk.UnwrapSDKContext(ctx).BlockHeight())) } +// AfterValidatorBeginUnbonding is called when a validator leaves the bonded set. +// Selectors delegating to this validator will no longer have those tokens counted, +// so all reporters should recalculate. func (h Hooks) AfterValidatorBeginUnbonding(ctx context.Context, _ sdk.ConsAddress, _ sdk.ValAddress) error { - return nil + return h.k.LastValSetUpdateHeight.Set(ctx, uint64(sdk.UnwrapSDKContext(ctx).BlockHeight())) } func (h Hooks) AfterValidatorRemoved(_ context.Context, _ sdk.ConsAddress, _ sdk.ValAddress) error { @@ -41,8 +46,10 @@ func (h Hooks) AfterValidatorCreated(_ context.Context, _ sdk.ValAddress) error func (h Hooks) BeforeValidatorModified(_ context.Context, _ sdk.ValAddress) error { return nil } -func (h Hooks) BeforeValidatorSlashed(_ context.Context, _ sdk.ValAddress, _ sdkmath.LegacyDec) error { - return nil +// BeforeValidatorSlashed is called when a validator is about to be slashed and i think this doesn't necessarily mean +// they are leaving the validator set but it does mean their voting power is being reduced, so we should trigger a recalculation for all reporters +func (h Hooks) BeforeValidatorSlashed(ctx context.Context, _ sdk.ValAddress, _ sdkmath.LegacyDec) error { + return h.k.LastValSetUpdateHeight.Set(ctx, uint64(sdk.UnwrapSDKContext(ctx).BlockHeight())) } func (h Hooks) AfterUnbondingInitiated(_ context.Context, _ uint64) error { return nil } @@ -55,8 +62,13 @@ func (h Hooks) BeforeDelegationSharesModified(_ context.Context, _ sdk.AccAddres return nil } -func (h Hooks) AfterDelegationModified(_ context.Context, _ sdk.AccAddress, _ sdk.ValAddress) error { - return nil +func (h Hooks) AfterDelegationModified(ctx context.Context, delAddr sdk.AccAddress, _ sdk.ValAddress) error { + selector, err := h.k.Selectors.Get(ctx, delAddr.Bytes()) + if err != nil { + // Not a selector, ignore + return nil + } + return h.k.FlagStakeRecalc(ctx, sdk.AccAddress(selector.Reporter)) } // this hook is called in the staking module when a delegation is being created and its implemented here to update a selector's delegations count diff --git a/x/reporter/keeper/hooks_test.go b/x/reporter/keeper/hooks_test.go index 04e39ea26..37d4c4278 100644 --- a/x/reporter/keeper/hooks_test.go +++ b/x/reporter/keeper/hooks_test.go @@ -43,3 +43,56 @@ func TestBeforeDelegationRemoved(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(0), selector.DelegationsCount) } + +func TestAfterValidatorBonded_SetsLastValSetUpdateHeight(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + consAddr := sdk.ConsAddress(sample.AccAddressBytes()) + valAddr := sdk.ValAddress(sample.AccAddressBytes()) + + ctx = ctx.WithBlockHeight(42) + + // Should set LastValSetUpdateHeight + require.NoError(t, k.Hooks().AfterValidatorBonded(ctx, consAddr, valAddr)) + + height, err := k.LastValSetUpdateHeight.Get(ctx) + require.NoError(t, err) + require.Equal(t, uint64(42), height) +} + +func TestAfterValidatorBeginUnbonding_SetsLastValSetUpdateHeight(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + consAddr := sdk.ConsAddress(sample.AccAddressBytes()) + valAddr := sdk.ValAddress(sample.AccAddressBytes()) + + ctx = ctx.WithBlockHeight(99) + + // Should set LastValSetUpdateHeight + require.NoError(t, k.Hooks().AfterValidatorBeginUnbonding(ctx, consAddr, valAddr)) + + height, err := k.LastValSetUpdateHeight.Get(ctx) + require.NoError(t, err) + require.Equal(t, uint64(99), height) +} + +func TestAfterDelegationModified_SetsRecalcFlag(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + selectorAddr := sample.AccAddressBytes() + reporterAddr := sample.AccAddressBytes() + valAddr := sdk.ValAddress(sample.AccAddressBytes()) + + // Non-selector delegation should not set any flag + require.NoError(t, k.Hooks().AfterDelegationModified(ctx, selectorAddr, valAddr)) + has, err := k.StakeRecalcFlag.Has(ctx, reporterAddr.Bytes()) + require.NoError(t, err) + require.False(t, has) + + // Register selector with reporter + require.NoError(t, k.Selectors.Set(ctx, selectorAddr, types.NewSelection(reporterAddr, 1))) + + // Now delegation modification should set the recalc flag for reporter + require.NoError(t, k.Hooks().AfterDelegationModified(ctx, selectorAddr, valAddr)) + + has, err = k.StakeRecalcFlag.Has(ctx, reporterAddr.Bytes()) + require.NoError(t, err) + require.True(t, has) +} diff --git a/x/reporter/keeper/keeper.go b/x/reporter/keeper/keeper.go index d4aaa5c18..49954f8b6 100644 --- a/x/reporter/keeper/keeper.go +++ b/x/reporter/keeper/keeper.go @@ -36,6 +36,10 @@ type ( DistributionQueue collections.Map[uint64, types.DistributionQueueItem] // key: queue index -> item to distribute DistributionQueueCounter collections.Item[types.DistributionQueueCounter] // tracks head and tail of queue + LastValSetUpdateHeight collections.Item[uint64] // block height of last validator set update + StakeRecalcFlag collections.Map[[]byte, bool] // reporters flagged for stake recalculation + RecalcAtTime collections.Map[[]byte, int64] // per-reporter earliest lock expiry in seconds requiring recalculation + Schema collections.Schema logger log.Logger @@ -84,6 +88,9 @@ func NewKeeper( ReporterPeriodData: collections.NewMap(sb, types.ReporterPeriodDataPrefix, "reporter_period_data", collections.BytesKey, codec.CollValue[types.PeriodRewardData](cdc)), DistributionQueue: collections.NewMap(sb, types.DistributionQueuePrefix, "distribution_queue", collections.Uint64Key, codec.CollValue[types.DistributionQueueItem](cdc)), DistributionQueueCounter: collections.NewItem(sb, types.DistributionQueueCounterPrefix, "distribution_queue_counter", codec.CollValue[types.DistributionQueueCounter](cdc)), + LastValSetUpdateHeight: collections.NewItem(sb, types.LastValSetUpdateHeightPrefix, "last_val_set_update_height", collections.Uint64Value), + StakeRecalcFlag: collections.NewMap(sb, types.StakeRecalcFlagPrefix, "stake_recalc_flag", collections.BytesKey, collections.BoolValue), + RecalcAtTime: collections.NewMap(sb, types.RecalcAtTimePrefix, "recalc_at_time", collections.BytesKey, collections.Int64Value), authority: authority, logger: logger, accountKeeper: accountKeeper, @@ -208,6 +215,10 @@ func CalculateRewardAmount(reporterPower, totalPower uint64, reward math.Int) ma return amount } +func (k *Keeper) FlagStakeRecalc(ctx context.Context, reporter sdk.AccAddress) error { + return k.StakeRecalcFlag.Set(ctx, reporter.Bytes(), true) +} + func (k *Keeper) SetOracleKeeper(ok types.OracleKeeper) { k.oracleKeeper = ok } @@ -238,72 +249,43 @@ func (k Keeper) GetLastReportedAtBlock(ctx context.Context, reporter []byte) (ui return 0, nil } -// PruneOldReports removes Report entries older than 60 days. -// It iterates the BlockNumber index, checks each block's timestamp via oracle, -// and deletes entries where timestamp < cutoff (60 days ago). -// maxIterations limits how many entries are processed per call. -func (k Keeper) PruneOldReports(ctx context.Context, maxIterations int) error { +// PruneOldReports removes Report entries older than 30 days. +// It finds the cutoff block height with by calling the oracle using +// ETH/USD aggregates, then iterates and deletes entries below that height. +func (k Keeper) PruneOldReports(ctx context.Context, maxBatchSize int) error { if k.oracleKeeper == nil { return nil } sdkCtx := sdk.UnwrapSDKContext(ctx) - // Calculate cutoff timestamp (60 days ago) - cutoffTimestamp := uint64(sdkCtx.BlockTime().Add(-60 * 24 * time.Hour).UnixMilli()) + cutoff := sdkCtx.BlockTime().Add(-30 * 24 * time.Hour) - // Iterate BlockNumber index in ascending order - // Index key structure from IterateRaw: Pair[Pair[reporter, blockNumber], queryId] - pc := collections.PairKeyCodec(collections.BytesKey, collections.Uint64Key) - start := collections.Join([]byte{}, uint64(0)) - startBuffer := make([]byte, pc.Size(start)) - _, _ = pc.Encode(startBuffer, start) + cutoffBlock, err := k.oracleKeeper.GetBlockHeightFromTimestamp(ctx, cutoff) + if err != nil { + return nil + } + + type reportKey = collections.Pair[[]byte, collections.Pair[[]byte, uint64]] + var toDelete []reportKey - iter, err := k.Report.Indexes.BlockNumber.IterateRaw(ctx, startBuffer, nil, collections.OrderAscending) + iter, err := k.Report.Iterate(ctx, nil) if err != nil { return err } defer iter.Close() - keysToDelete := make([]collections.Pair[[]byte, collections.Pair[[]byte, uint64]], 0) - // Cache: blockNumber -> isOld (true if timestamp < cutoff) - // because multiple Report entries can exist at the same block height (different reporters/queryIds) - // we want to avoid redundant oracle calls for the same block height - checkedBlocks := make(map[uint64]bool) - iterations := 0 - - for ; iter.Valid() && iterations < maxIterations; iter.Next() { - iterations++ - - key, err := iter.Key() + for ; iter.Valid() && len(toDelete) < maxBatchSize; iter.Next() { + pk, err := iter.Key() if err != nil { - continue - } - - blockNumber := key.K1().K2() - - // Check cache first to avoid redundant oracle calls - isOld, found := checkedBlocks[blockNumber] - if !found { - // Query oracle for timestamp at this block height - timestamp, err := k.oracleKeeper.GetTimestampForBlockHeight(ctx, blockNumber) - if err != nil || timestamp == 0 { - // Can't determine age, skip this entry - continue - } - isOld = timestamp < cutoffTimestamp - checkedBlocks[blockNumber] = isOld + return err } - - if isOld { - // Reconstruct primary key: (queryId, (reporter, blockNumber)) - primaryKey := collections.Join(key.K2(), collections.Join(key.K1().K1(), key.K1().K2())) - keysToDelete = append(keysToDelete, primaryKey) + if pk.K2().K2() < cutoffBlock { + toDelete = append(toDelete, pk) } } - // Delete old entries, can't delete during iteration - for _, key := range keysToDelete { - if err := k.Report.Remove(ctx, key); err != nil { + for _, pk := range toDelete { + if err := k.Report.Remove(ctx, pk); err != nil { return err } } diff --git a/x/reporter/keeper/keeper_test.go b/x/reporter/keeper/keeper_test.go index e461eab4e..6786bf964 100644 --- a/x/reporter/keeper/keeper_test.go +++ b/x/reporter/keeper/keeper_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" keepertest "github.com/tellor-io/layer/testutil/keeper" "github.com/tellor-io/layer/testutil/sample" @@ -281,17 +282,10 @@ func TestPruneOldReports(t *testing.T) { require.NoError(t, k.Report.Set(ctx, collections.Join([]byte("queryid1"), collections.Join(reporter1.Bytes(), uint64(400))), types.DelegationsAmounts{Total: math.OneInt()})) // Set up mock OracleKeeper + // GetBlockHeightFromTimestamp returns the cutoff block height. + // Blocks 100, 200 are below cutoff (old), blocks 300, 400 are above (recent). oracleKeeper := mocks.NewOracleKeeper(t) - - // Mock timestamps: blocks 100, 200 are old; blocks 300, 400 are recent - // Day 10 = 90 days ago from now - // Day 30 = 70 days ago from now - // Day 50 = 50 days ago from now (within 60 days) - // Day 80 = 20 days ago from now (within 60 days) - oracleKeeper.On("GetTimestampForBlockHeight", ctx, uint64(100)).Return(uint64(now.Add(-90*24*time.Hour).UnixMilli()), nil) - oracleKeeper.On("GetTimestampForBlockHeight", ctx, uint64(200)).Return(uint64(now.Add(-70*24*time.Hour).UnixMilli()), nil) - oracleKeeper.On("GetTimestampForBlockHeight", ctx, uint64(300)).Return(uint64(now.Add(-50*24*time.Hour).UnixMilli()), nil) - oracleKeeper.On("GetTimestampForBlockHeight", ctx, uint64(400)).Return(uint64(now.Add(-20*24*time.Hour).UnixMilli()), nil) + oracleKeeper.On("GetBlockHeightFromTimestamp", mock.Anything, mock.Anything).Return(uint64(250), nil) k.SetOracleKeeper(oracleKeeper) @@ -330,12 +324,10 @@ func TestPruneOldReportsMaxIterations(t *testing.T) { } oracleKeeper := mocks.NewOracleKeeper(t) - for i := uint64(1); i <= 5; i++ { - oracleKeeper.On("GetTimestampForBlockHeight", ctx, i).Return(uint64(now.Add(-90*24*time.Hour).UnixMilli()), nil).Maybe() - } + oracleKeeper.On("GetBlockHeightFromTimestamp", mock.Anything, mock.Anything).Return(uint64(400), nil) k.SetOracleKeeper(oracleKeeper) - // Prune with maxIterations = 3 (should only process 3 entries) + // Prune with maxBatchSize = 3 (should only delete 3 entries) err := k.PruneOldReports(ctx, 3) require.NoError(t, err) @@ -350,3 +342,105 @@ func TestPruneOldReportsMaxIterations(t *testing.T) { // Should have 2 remaining (5 - 3 = 2) require.Equal(t, 2, count) } + +func TestStakeRecalcFlag(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter := sample.AccAddressBytes() + + // Initially no flag + has, err := k.StakeRecalcFlag.Has(ctx, reporter.Bytes()) + require.NoError(t, err) + require.False(t, has) + + // Set flag + require.NoError(t, k.FlagStakeRecalc(ctx, reporter)) + + // Now flag should exist + has, err = k.StakeRecalcFlag.Has(ctx, reporter.Bytes()) + require.NoError(t, err) + require.True(t, has) + + // Remove flag + require.NoError(t, k.StakeRecalcFlag.Remove(ctx, reporter.Bytes())) + + // Flag should be gone + has, err = k.StakeRecalcFlag.Has(ctx, reporter.Bytes()) + require.NoError(t, err) + require.False(t, has) +} + +func TestLastValSetUpdateHeight(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + + // Initially not set + _, err := k.LastValSetUpdateHeight.Get(ctx) + require.Error(t, err) + + // Set it + require.NoError(t, k.LastValSetUpdateHeight.Set(ctx, uint64(100))) + + // Should be retrievable + height, err := k.LastValSetUpdateHeight.Get(ctx) + require.NoError(t, err) + require.Equal(t, uint64(100), height) + + // Update it + require.NoError(t, k.LastValSetUpdateHeight.Set(ctx, uint64(200))) + height, err = k.LastValSetUpdateHeight.Get(ctx) + require.NoError(t, err) + require.Equal(t, uint64(200), height) +} + +func TestRecalcAtTime(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter := sample.AccAddressBytes() + + // Initially not set + _, err := k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.Error(t, err) + + // Set it + lockExpiry := time.Now().Add(21 * 24 * time.Hour).Unix() + require.NoError(t, k.RecalcAtTime.Set(ctx, reporter.Bytes(), lockExpiry)) + + // Should be retrievable + stored, err := k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.NoError(t, err) + require.Equal(t, lockExpiry, stored) + + // Remove it + require.NoError(t, k.RecalcAtTime.Remove(ctx, reporter.Bytes())) + _, err = k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.Error(t, err) +} + +func TestRecalcAtTime_MinOfMultipleSwitches(t *testing.T) { + k, _, _, _, _, ctx, _ := setupKeeper(t) + reporter := sample.AccAddressBytes() + + // First switch: lock expires at T1 + t1 := time.Now().Add(21 * 24 * time.Hour).Unix() + require.NoError(t, k.RecalcAtTime.Set(ctx, reporter.Bytes(), t1)) + + // Second switch: lock expires at T2 > T1, should keep T1 (the min) + t2 := time.Now().Add(30 * 24 * time.Hour).Unix() + existing, err := k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.NoError(t, err) + if t2 < existing { + require.NoError(t, k.RecalcAtTime.Set(ctx, reporter.Bytes(), t2)) + } + stored, err := k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.NoError(t, err) + require.Equal(t, t1, stored) + + // Third switch: lock expires at T0 < T1, should update to T0 + t0 := time.Now().Add(10 * 24 * time.Hour).Unix() + existing, err = k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.NoError(t, err) + if t0 < existing { + require.NoError(t, k.RecalcAtTime.Set(ctx, reporter.Bytes(), t0)) + } + stored, err = k.RecalcAtTime.Get(ctx, reporter.Bytes()) + require.NoError(t, err) + require.Equal(t, t0, stored) +} diff --git a/x/reporter/keeper/msg_server.go b/x/reporter/keeper/msg_server.go index 66e64f865..dd1f34fb4 100644 --- a/x/reporter/keeper/msg_server.go +++ b/x/reporter/keeper/msg_server.go @@ -38,12 +38,11 @@ var _ types.MsgServer = msgServer{} // Msg: CreateReporter, adds a new reporter if it was never registered before and meets the min bonded tokens requirement // allows the reporter to set their commission rate and min tokens required for selectors to join func (k msgServer) CreateReporter(goCtx context.Context, msg *types.MsgCreateReporter) (*types.MsgCreateReporterResponse, error) { - err := validateCreateReporter(msg) + addr, err := validateCreateReporter(msg) if err != nil { return nil, err } // check if reporter has min bonded tokens - addr := sdk.MustAccAddressFromBech32(msg.ReporterAddress) params, err := k.Keeper.Params.Get(goCtx) if err != nil { return nil, err @@ -131,35 +130,34 @@ func (k msgServer) CreateReporter(goCtx context.Context, msg *types.MsgCreateRep return &types.MsgCreateReporterResponse{}, nil } -func validateCreateReporter(msg *types.MsgCreateReporter) error { - _, err := sdk.AccAddressFromBech32(msg.ReporterAddress) +func validateCreateReporter(msg *types.MsgCreateReporter) (reporter sdk.AccAddress, err error) { + reporter, err = sdk.AccAddressFromBech32(msg.ReporterAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) } // check that mintokensrequired is positive if msg.MinTokensRequired.LTE(math.ZeroInt()) { - return errors.New("MinTokensRequired must be positive (%s)") + return nil, errors.New("MinTokensRequired must be positive (%s)") } // check that moniker is not empty if msg.Moniker == "" { - return errors.New("moniker cannot be empty") + return nil, errors.New("moniker cannot be empty") } - return nil + return reporter, nil } // Msg: SelectReporter, allows a selector to join a reporter if they meet the min requirement set by the reporter // and the reporter has not reached the max selectors allowed // selector can only join one reporter at a time and to switch reporters see SwitchReporter func (k msgServer) SelectReporter(goCtx context.Context, msg *types.MsgSelectReporter) (*types.MsgSelectReporterResponse, error) { - err := validateSelectReporter(msg) + selectorAddr, reporterAddr, err := validateSelectReporter(msg) if err != nil { return nil, err } // check if selector exists - addr := sdk.MustAccAddressFromBech32(msg.SelectorAddress) - alreadyExists, err := k.Keeper.Selectors.Has(goCtx, addr) + alreadyExists, err := k.Keeper.Selectors.Has(goCtx, selectorAddr) if err != nil { return nil, err } @@ -167,7 +165,6 @@ func (k msgServer) SelectReporter(goCtx context.Context, msg *types.MsgSelectRep return nil, errors.New("selector already exists") } // check if reporter exists - reporterAddr := sdk.MustAccAddressFromBech32(msg.ReporterAddress) reporter, err := k.Keeper.Reporters.Get(goCtx, reporterAddr) if err != nil { return nil, err @@ -189,7 +186,7 @@ func (k msgServer) SelectReporter(goCtx context.Context, msg *types.MsgSelectRep return nil, errors.New("reporter has reached max selectors") } // count the selectors BONDED tokens in the staking module - bondedTokens, count, err := k.Keeper.CheckSelectorsDelegations(goCtx, addr) + bondedTokens, count, err := k.Keeper.CheckSelectorsDelegations(goCtx, selectorAddr) if err != nil { return nil, err } @@ -198,7 +195,7 @@ func (k msgServer) SelectReporter(goCtx context.Context, msg *types.MsgSelectRep return nil, fmt.Errorf("reporter's min requirement %s not met by selector. Must stake %s more to select to this reporter", reporter.MinTokensRequired.String(), reporter.MinTokensRequired.Sub(bondedTokens).String()) } // set the selector - if err := k.Keeper.Selectors.Set(goCtx, addr.Bytes(), types.NewSelection(reporterAddr.Bytes(), uint64(count))); err != nil { + if err := k.Keeper.Selectors.Set(goCtx, selectorAddr.Bytes(), types.NewSelection(reporterAddr.Bytes(), uint64(count))); err != nil { return nil, err } sdk.UnwrapSDKContext(goCtx).EventManager().EmitEvents(sdk.Events{ @@ -210,19 +207,22 @@ func (k msgServer) SelectReporter(goCtx context.Context, msg *types.MsgSelectRep ), }) telemetry.IncrCounterWithLabels([]string{"num_of_selectors", "join"}, 1, []metrics.Label{{Name: "chain_id", Value: sdk.UnwrapSDKContext(goCtx).ChainID()}}) + if err := k.Keeper.FlagStakeRecalc(goCtx, reporterAddr); err != nil { + return nil, err + } return &types.MsgSelectReporterResponse{}, nil } -func validateSelectReporter(msg *types.MsgSelectReporter) error { - _, err := sdk.AccAddressFromBech32(msg.SelectorAddress) +func validateSelectReporter(msg *types.MsgSelectReporter) (selector, reporter sdk.AccAddress, err error) { + selector, err = sdk.AccAddressFromBech32(msg.SelectorAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) + return nil, nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) } - _, err = sdk.AccAddressFromBech32(msg.ReporterAddress) + reporter, err = sdk.AccAddressFromBech32(msg.ReporterAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) + return nil, nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) } - return nil + return selector, reporter, nil } // Msg: SwitchReporter, allows a selector to switch reporters if they meet the new reporters min requirement @@ -230,27 +230,25 @@ func validateSelectReporter(msg *types.MsgSelectReporter) error { // switching reporters will not automatically include the selector's tokens to be part of reporting until the unbonding time has passed // in order to prevent the selector from being part of a report twice unless they were part of a reporter that hasn't reported yet func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchReporter) (*types.MsgSwitchReporterResponse, error) { - err := validateSwitchReporter(msg) + selectorAddr, reporterAddr, err := validateSwitchReporter(msg) if err != nil { return nil, err } - addr := sdk.MustAccAddressFromBech32(msg.SelectorAddress) // check if selector exists - selector, err := k.Keeper.Selectors.Get(goCtx, addr) + selector, err := k.Keeper.Selectors.Get(goCtx, selectorAddr) if err != nil { return nil, err } prevReporter := sdk.AccAddress(selector.Reporter) // check if reporter exists - reporterAddr := sdk.MustAccAddressFromBech32(msg.ReporterAddress) reporter, err := k.Keeper.Reporters.Get(goCtx, reporterAddr) if err != nil { return nil, err } // check if reporter is trying to become a selector, can only switch if havent reported in the last 21 days - if bytes.Equal(selector.Reporter, addr.Bytes()) { + if bytes.Equal(selector.Reporter, selectorAddr.Bytes()) { // get the timestamp of the most recent report for reporter switching to selector (msg signer/selector) - lastReportTimestamp, err := k.Keeper.oracleKeeper.GetLastReportedAtTimestamp(goCtx, addr.Bytes()) + lastReportTimestamp, err := k.Keeper.oracleKeeper.GetLastReportedAtTimestamp(goCtx, selectorAddr.Bytes()) if err != nil { return nil, err } @@ -261,7 +259,7 @@ func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchRep return nil, errors.New("reporter has reported in the last 21 days, please wait before switching reporters") } - if err := k.Keeper.Reporters.Remove(goCtx, addr.Bytes()); err != nil { + if err := k.Keeper.Reporters.Remove(goCtx, selectorAddr.Bytes()); err != nil { return nil, err } } @@ -282,7 +280,7 @@ func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchRep return nil, errors.New("reporter has reached max selectors") } // check if selector meets reporters min requirement - hasMin, err := k.Keeper.HasMin(goCtx, addr, reporter.MinTokensRequired) + hasMin, err := k.Keeper.HasMin(goCtx, selectorAddr, reporter.MinTokensRequired) if err != nil { return nil, err } @@ -291,7 +289,7 @@ func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchRep } // check if selector was part of a report before switching - prevReportedPower, err := k.Keeper.GetReporterTokensAtBlock(goCtx, sdk.MustAccAddressFromBech32(prevReporter.String()), uint64(sdk.UnwrapSDKContext(goCtx).BlockHeight())) + prevReportedPower, err := k.Keeper.GetReporterTokensAtBlock(goCtx, prevReporter, uint64(sdk.UnwrapSDKContext(goCtx).BlockHeight())) if err != nil { return nil, err } @@ -303,9 +301,18 @@ func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchRep } selector.LockedUntilTime = sdk.UnwrapSDKContext(goCtx).BlockTime().Add(unbondingTime) + + // Set RecalcAtTime for the new reporter so their cache is updated when this lock expires. + lockUnix := selector.LockedUntilTime.Unix() + existing, err := k.Keeper.RecalcAtTime.Get(goCtx, reporterAddr.Bytes()) + if err != nil || lockUnix < existing { + if err := k.Keeper.RecalcAtTime.Set(goCtx, reporterAddr.Bytes(), lockUnix); err != nil { + return nil, err + } + } } selector.Reporter = reporterAddr.Bytes() - if err := k.Keeper.Selectors.Set(goCtx, addr.Bytes(), selector); err != nil { + if err := k.Keeper.Selectors.Set(goCtx, selectorAddr.Bytes(), selector); err != nil { return nil, err } sdk.UnwrapSDKContext(goCtx).EventManager().EmitEvents(sdk.Events{ @@ -317,29 +324,34 @@ func (k msgServer) SwitchReporter(goCtx context.Context, msg *types.MsgSwitchRep sdk.NewAttribute("selector_locked_until", selector.LockedUntilTime.String()), ), }) + if err := k.Keeper.FlagStakeRecalc(goCtx, prevReporter); err != nil { + return nil, err + } + if err := k.Keeper.FlagStakeRecalc(goCtx, reporterAddr); err != nil { + return nil, err + } return &types.MsgSwitchReporterResponse{}, nil } -func validateSwitchReporter(msg *types.MsgSwitchReporter) error { - _, err := sdk.AccAddressFromBech32(msg.SelectorAddress) +func validateSwitchReporter(msg *types.MsgSwitchReporter) (selector, reporter sdk.AccAddress, err error) { + selector, err = sdk.AccAddressFromBech32(msg.SelectorAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) + return nil, nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) } - _, err = sdk.AccAddressFromBech32(msg.ReporterAddress) + reporter, err = sdk.AccAddressFromBech32(msg.ReporterAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) + return nil, nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) } - return nil + return selector, reporter, nil } // Msg: RemoveSelector, allows anyone to remove a selector if the selector falls below a given reporter's min requirement in order to free up space for new selectors // if they are capped at max selectors func (k msgServer) RemoveSelector(goCtx context.Context, msg *types.MsgRemoveSelector) (*types.MsgRemoveSelectorResponse, error) { - err := validateRemoveSelector(msg) + selectorAddr, err := validateRemoveSelector(msg) if err != nil { return nil, err } - selectorAddr := sdk.MustAccAddressFromBech32(msg.SelectorAddress) selector, err := k.Keeper.Selectors.Get(goCtx, selectorAddr) if err != nil { return nil, err @@ -384,6 +396,9 @@ func (k msgServer) RemoveSelector(goCtx context.Context, msg *types.MsgRemoveSel if err := k.Keeper.Selectors.Remove(goCtx, selectorAddr); err != nil { return nil, err } + if err := k.Keeper.FlagStakeRecalc(goCtx, sdk.AccAddress(selector.Reporter)); err != nil { + return nil, err + } sdk.UnwrapSDKContext(goCtx).EventManager().EmitEvents(sdk.Events{ sdk.NewEvent( "selector_removed", @@ -394,23 +409,26 @@ func (k msgServer) RemoveSelector(goCtx context.Context, msg *types.MsgRemoveSel return &types.MsgRemoveSelectorResponse{}, nil } -func validateRemoveSelector(msg *types.MsgRemoveSelector) error { - _, err := sdk.AccAddressFromBech32(msg.AnyAddress) +func validateRemoveSelector(msg *types.MsgRemoveSelector) (selector sdk.AccAddress, err error) { + _, err = sdk.AccAddressFromBech32(msg.AnyAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid signer address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid signer address (%s)", err) } - _, err = sdk.AccAddressFromBech32(msg.SelectorAddress) + selector, err = sdk.AccAddressFromBech32(msg.SelectorAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) } - return nil + return selector, nil } // Msg: UnjailReporter, allows a reporter that is jailed to be unjailed if the jail period has passed (jail period is set during a dispute) func (k msgServer) UnjailReporter(goCtx context.Context, msg *types.MsgUnjailReporter) (*types.MsgUnjailReporterResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) - reporterAddr := sdk.MustAccAddressFromBech32(msg.ReporterAddress) + reporterAddr, err := sdk.AccAddressFromBech32(msg.ReporterAddress) + if err != nil { + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) + } reporter, err := k.Reporters.Get(ctx, reporterAddr) if err != nil { @@ -431,15 +449,14 @@ func (k msgServer) UnjailReporter(goCtx context.Context, msg *types.MsgUnjailRep // Msg: WithdrawTip, allows selectors to directly withdraw reporting rewards and stake them with a BONDED validator func (k msgServer) WithdrawTip(goCtx context.Context, msg *types.MsgWithdrawTip) (*types.MsgWithdrawTipResponse, error) { - err := validateWithdrawTip(msg) + selectorAddr, err := validateWithdrawTip(msg) if err != nil { return nil, err } ctx := sdk.UnwrapSDKContext(goCtx) - delAddr := sdk.MustAccAddressFromBech32(msg.SelectorAddress) // Get the selector's reporter and settle any pending rewards - selection, err := k.Keeper.Selectors.Get(ctx, delAddr) + selection, err := k.Keeper.Selectors.Get(ctx, selectorAddr) if err == nil { // Selector exists - settle their reporter's current period if err := k.Keeper.SettleReporter(ctx, selection.Reporter); err != nil { @@ -447,7 +464,7 @@ func (k msgServer) WithdrawTip(goCtx context.Context, msg *types.MsgWithdrawTip) } } - shares, err := k.Keeper.SelectorTips.Get(ctx, delAddr) + shares, err := k.Keeper.SelectorTips.Get(ctx, selectorAddr) if err != nil { return nil, err } @@ -468,7 +485,7 @@ func (k msgServer) WithdrawTip(goCtx context.Context, msg *types.MsgWithdrawTip) if amtToDelegate.IsZero() { return nil, errors.New("no tips to withdraw") } - newShares, err := k.Keeper.stakingKeeper.Delegate(ctx, delAddr, amtToDelegate, val.Status, val, false) + newShares, err := k.Keeper.stakingKeeper.Delegate(ctx, selectorAddr, amtToDelegate, val.Status, val, false) if err != nil { return nil, err } @@ -476,12 +493,12 @@ func (k msgServer) WithdrawTip(goCtx context.Context, msg *types.MsgWithdrawTip) // isolate decimals from shares remainder := shares.Sub(shares.TruncateDec()) if remainder.IsZero() { - err = k.Keeper.SelectorTips.Remove(ctx, delAddr) + err = k.Keeper.SelectorTips.Remove(ctx, selectorAddr) if err != nil { return nil, err } } else { - err = k.Keeper.SelectorTips.Set(ctx, delAddr, remainder) + err = k.Keeper.SelectorTips.Set(ctx, selectorAddr, remainder) if err != nil { return nil, err } @@ -503,27 +520,26 @@ func (k msgServer) WithdrawTip(goCtx context.Context, msg *types.MsgWithdrawTip) ), }) // allow for people to track the amount they have withdrawn based on their address - telemetry.IncrCounterWithLabels([]string{"withdrawn_amount_tracker"}, float32(amtToDelegate.Int64()), []metrics.Label{{Name: "chain_id", Value: ctx.ChainID()}, {Name: "reporter", Value: hex.EncodeToString(delAddr.Bytes())}}) + telemetry.IncrCounterWithLabels([]string{"withdrawn_amount_tracker"}, float32(amtToDelegate.Int64()), []metrics.Label{{Name: "chain_id", Value: ctx.ChainID()}, {Name: "reporter", Value: hex.EncodeToString(selectorAddr.Bytes())}}) return &types.MsgWithdrawTipResponse{}, nil } -func validateWithdrawTip(msg *types.MsgWithdrawTip) error { - _, err := sdk.AccAddressFromBech32(msg.SelectorAddress) +func validateWithdrawTip(msg *types.MsgWithdrawTip) (selector sdk.AccAddress, err error) { + selector, err = sdk.AccAddressFromBech32(msg.SelectorAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid selector address (%s)", err) } - return nil + return selector, nil } func (k msgServer) EditReporter(goCtx context.Context, msg *types.MsgEditReporter) (*types.MsgEditReporterResponse, error) { - err := validateEditReporter(msg) + reporterAddr, err := validateEditReporter(msg) if err != nil { return nil, err } sdkCtx := sdk.UnwrapSDKContext(goCtx) - addr := sdk.MustAccAddressFromBech32(msg.ReporterAddress) params, err := k.Keeper.Params.Get(goCtx) if err != nil { return nil, err @@ -534,7 +550,7 @@ func (k msgServer) EditReporter(goCtx context.Context, msg *types.MsgEditReporte return nil, errors.New("commission rate must be between 0 and 1 (e.g, 0.50 = 50%)") } - reporter, err := k.Keeper.Reporter(goCtx, addr) + reporter, err := k.Keeper.Reporter(goCtx, reporterAddr) if err != nil { return nil, err } @@ -558,7 +574,7 @@ func (k msgServer) EditReporter(goCtx context.Context, msg *types.MsgEditReporte reporter.Moniker = msg.Moniker reporter.LastUpdated = sdkCtx.BlockTime() - err = k.Keeper.Reporters.Set(goCtx, addr.Bytes(), reporter) + err = k.Keeper.Reporters.Set(goCtx, reporterAddr.Bytes(), reporter) if err != nil { return nil, err } @@ -576,21 +592,21 @@ func (k msgServer) EditReporter(goCtx context.Context, msg *types.MsgEditReporte return &types.MsgEditReporterResponse{}, nil } -func validateEditReporter(msg *types.MsgEditReporter) error { - _, err := sdk.AccAddressFromBech32(msg.ReporterAddress) +func validateEditReporter(msg *types.MsgEditReporter) (reporter sdk.AccAddress, err error) { + reporter, err = sdk.AccAddressFromBech32(msg.ReporterAddress) if err != nil { - return errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid creator address (%s)", err) + return nil, errorsmod.Wrapf(sdkerrors.ErrInvalidAddress, "invalid reporter address (%s)", err) } // check that mintokensrequired is positive if msg.MinTokensRequired.LTE(math.ZeroInt()) { - return errors.New("MinTokensRequired must be positive (%s)") + return nil, errors.New("MinTokensRequired must be positive (%s)") } // check that moniker is not empty if msg.Moniker == "" { - return errors.New("moniker cannot be empty") + return nil, errors.New("moniker cannot be empty") } - return nil + return reporter, nil } diff --git a/x/reporter/keeper/reporter.go b/x/reporter/keeper/reporter.go index d40ced8ce..d1da14332 100644 --- a/x/reporter/keeper/reporter.go +++ b/x/reporter/keeper/reporter.go @@ -52,11 +52,33 @@ func (k Keeper) HasMin(ctx context.Context, addr sdk.AccAddress, minRequired mat // It also tracks period data for reward distribution - when delegation state changes, // the previous period is queued for distribution. func (k Keeper) ReporterStake(ctx context.Context, repAddr sdk.AccAddress, queryId []byte) (math.Int, error) { + needsRecalc, err := k.needsStakeRecalc(ctx, repAddr) + if err != nil { + return math.Int{}, err + } + + if !needsRecalc { + // Stake hasn't changed, fetch cached total from last Report entry + cached, err := k.GetDelegationsAmount(ctx, repAddr.Bytes(), uint64(sdk.UnwrapSDKContext(ctx).BlockHeight())) + if err != nil { + return math.Int{}, err + } + if !cached.Total.IsNil() && cached.Total.IsPositive() { + return cached.Total, nil + } + // if it ain't positive, just recalculate + } + totalTokens, delegates, selectorShares, hash, err := k.GetReporterStake(ctx, repAddr) if err != nil { return math.Int{}, err } + // Clear stake recalc flag after recalculation + if err := k.StakeRecalcFlag.Remove(ctx, repAddr.Bytes()); err != nil { + return math.Int{}, err + } + // Handle period tracking for reward distribution changed, err := k.handlePeriodTracking(ctx, repAddr, selectorShares, totalTokens, hash) if err != nil { @@ -72,6 +94,43 @@ func (k Keeper) ReporterStake(ctx context.Context, repAddr sdk.AccAddress, query return totalTokens, nil } +// needsStakeRecalc checks if a reporter's stake needs to be recalculated +func (k Keeper) needsStakeRecalc(ctx context.Context, repAddr sdk.AccAddress) (bool, error) { + // Check persisted recalc flag (set by hooks/msg handlers) + flagged, err := k.StakeRecalcFlag.Has(ctx, repAddr.Bytes()) + if err != nil { + return true, nil + } + if flagged { + return true, nil + } + + // Check if a selector's switch lock has expired since last calc + recalcAt, err := k.RecalcAtTime.Get(ctx, repAddr.Bytes()) + if err == nil { + blockTime := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() + if recalcAt <= blockTime { + return true, nil + } + } + + // Check if validator set updated since last calculation + lastCalcBlock, err := k.GetLastReportedAtBlock(ctx, repAddr.Bytes()) + if err != nil { + return true, nil // means first time for reporter + } + if lastCalcBlock == 0 { + return true, nil // never calculated + } + + valSetUpdateHeight, err := k.LastValSetUpdateHeight.Get(ctx) + if err != nil { + return true, nil // no update height stored yet, recalc to be safe + } + + return valSetUpdateHeight >= lastCalcBlock, nil +} + // function that iterates through a selector's delegations and checks if they meet the min requirement // plus counts how many delegations they have func (k Keeper) CheckSelectorsDelegations(ctx context.Context, addr sdk.AccAddress) (math.Int, int64, error) { @@ -162,6 +221,7 @@ func (k Keeper) GetReporterStake(ctx context.Context, repAddr sdk.AccAddress) (m selectorShares := make([]*types.SelectorShare, 0) // Compute hash inline as we build selector shares hasher := sha256.New() + var earliestFutureLock int64 // track earliest future lock expiry (unix seconds) for ; iter.Valid(); iter.Next() { selectorAddr, err := iter.PrimaryKey() if err != nil { @@ -179,6 +239,10 @@ func (k Keeper) GetReporterStake(ctx context.Context, repAddr sdk.AccAddress) (m } // skip selectors that are locked out for switching reporters if selector.LockedUntilTime.After(sdk.UnwrapSDKContext(ctx).BlockTime()) { + lockUnix := selector.LockedUntilTime.Unix() + if earliestFutureLock == 0 || lockUnix < earliestFutureLock { + earliestFutureLock = lockUnix + } continue } var iterError error @@ -248,6 +312,17 @@ func (k Keeper) GetReporterStake(ctx context.Context, repAddr sdk.AccAddress) (m hasher.Write(selectorTotal.BigInt().Bytes()) } } + // Update RecalcAtTime: if there are still-locked selectors, set the earliest expiry + // so needsStakeRecalc triggers when it expires. If none are locked, clean up. + if earliestFutureLock == 0 { + err = k.RecalcAtTime.Remove(ctx, repAddr.Bytes()) + } else { + err = k.RecalcAtTime.Set(ctx, repAddr.Bytes(), earliestFutureLock) + } + if err != nil { + return math.Int{}, nil, nil, nil, err + } + // Finalize hash with total hasher.Write(totalTokens.BigInt().Bytes()) return totalTokens, delegates, selectorShares, hasher.Sum(nil), nil diff --git a/x/reporter/mocks/OracleKeeper.go b/x/reporter/mocks/OracleKeeper.go index b3147c6e4..6d52a336e 100644 --- a/x/reporter/mocks/OracleKeeper.go +++ b/x/reporter/mocks/OracleKeeper.go @@ -4,6 +4,7 @@ package mocks import ( context "context" + time "time" mock "github.com/stretchr/testify/mock" ) @@ -13,23 +14,23 @@ type OracleKeeper struct { mock.Mock } -// GetLastReportedAtTimestamp provides a mock function with given fields: ctx, reporter -func (_m *OracleKeeper) GetLastReportedAtTimestamp(ctx context.Context, reporter []byte) (uint64, error) { - ret := _m.Called(ctx, reporter) +// GetBlockHeightFromTimestamp provides a mock function with given fields: ctx, timestamp +func (_m *OracleKeeper) GetBlockHeightFromTimestamp(ctx context.Context, timestamp time.Time) (uint64, error) { + ret := _m.Called(ctx, timestamp) var r0 uint64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []byte) (uint64, error)); ok { - return rf(ctx, reporter) + if rf, ok := ret.Get(0).(func(context.Context, time.Time) (uint64, error)); ok { + return rf(ctx, timestamp) } - if rf, ok := ret.Get(0).(func(context.Context, []byte) uint64); ok { - r0 = rf(ctx, reporter) + if rf, ok := ret.Get(0).(func(context.Context, time.Time) uint64); ok { + r0 = rf(ctx, timestamp) } else { r0 = ret.Get(0).(uint64) } - if rf, ok := ret.Get(1).(func(context.Context, []byte) error); ok { - r1 = rf(ctx, reporter) + if rf, ok := ret.Get(1).(func(context.Context, time.Time) error); ok { + r1 = rf(ctx, timestamp) } else { r1 = ret.Error(1) } @@ -37,23 +38,23 @@ func (_m *OracleKeeper) GetLastReportedAtTimestamp(ctx context.Context, reporter return r0, r1 } -// GetTimestampForBlockHeight provides a mock function with given fields: ctx, blockHeight -func (_m *OracleKeeper) GetTimestampForBlockHeight(ctx context.Context, blockHeight uint64) (uint64, error) { - ret := _m.Called(ctx, blockHeight) +// GetLastReportedAtTimestamp provides a mock function with given fields: ctx, reporter +func (_m *OracleKeeper) GetLastReportedAtTimestamp(ctx context.Context, reporter []byte) (uint64, error) { + ret := _m.Called(ctx, reporter) var r0 uint64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64) (uint64, error)); ok { - return rf(ctx, blockHeight) + if rf, ok := ret.Get(0).(func(context.Context, []byte) (uint64, error)); ok { + return rf(ctx, reporter) } - if rf, ok := ret.Get(0).(func(context.Context, uint64) uint64); ok { - r0 = rf(ctx, blockHeight) + if rf, ok := ret.Get(0).(func(context.Context, []byte) uint64); ok { + r0 = rf(ctx, reporter) } else { r0 = ret.Get(0).(uint64) } - if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { - r1 = rf(ctx, blockHeight) + if rf, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = rf(ctx, reporter) } else { r1 = ret.Error(1) } diff --git a/x/reporter/types/expected_keepers.go b/x/reporter/types/expected_keepers.go index 04e17f799..989cd438a 100644 --- a/x/reporter/types/expected_keepers.go +++ b/x/reporter/types/expected_keepers.go @@ -76,5 +76,5 @@ type RegistryKeeper interface { // OracleKeeper defines the expected interface for the Oracle module. type OracleKeeper interface { GetLastReportedAtTimestamp(ctx context.Context, reporter []byte) (uint64, error) - GetTimestampForBlockHeight(ctx context.Context, blockHeight uint64) (uint64, error) + GetBlockHeightFromTimestamp(ctx context.Context, timestamp time.Time) (uint64, error) } diff --git a/x/reporter/types/keys.go b/x/reporter/types/keys.go index 04ad7bcdf..a4df0e57a 100644 --- a/x/reporter/types/keys.go +++ b/x/reporter/types/keys.go @@ -33,4 +33,7 @@ var ( ReporterPeriodDataPrefix = collections.NewPrefix(22) DistributionQueuePrefix = collections.NewPrefix(23) DistributionQueueCounterPrefix = collections.NewPrefix(24) + LastValSetUpdateHeightPrefix = collections.NewPrefix(25) + StakeRecalcFlagPrefix = collections.NewPrefix(26) + RecalcAtTimePrefix = collections.NewPrefix(27) )