Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
# v0.12.0

### Added
* [#781](https://github.com/allora-network/allora-chain/pull/781/files) Enable pebbledb
* [#781](https://github.com/allora-network/allora-chain/pull/781) Enable pebbledb
* [#816](https://github.com/allora-network/allora-chain/pull/816) Generic labels in metrics, usage in events and q/tx

### Changed
* [#777](https://github.com/allora-network/allora-chain/pull/777) Allow forecasts with unregistered inferers to proceed
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ allorad --home=$APP_HOME q staking validators -o=json | \
`allorad --home=$APP_HOME status | jq -r '.validator_info.voting_power'`
- Output should be > 0

## Unstaking/unbounding a validator
## Unstaking/unbounding a validator

If you need to delete a validator from the chain, you just need to unbound the stake.

Expand Down Expand Up @@ -357,6 +357,7 @@ For reputers, the submission window starts at `(nonce.BlockHeight + topic.Ground

Both windows are inclusive of start and end boundaries.


## References

- https://github.com/go-delve/delve/blob/master/Documentation/faq.md#remote
Expand Down
35 changes: 14 additions & 21 deletions x/emissions/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
Emissions Module
=============================================

## Dependencies
The `emissions` module is a core component of the Allora Network that manages the topic definition, actor participation, economic incentives, rewards system and more. This is the main part of the implementation of the [Allora Whitepaper](https://research.assets.allora.network/allora.0x10001.pdf).

golang v1.21+
GNU make
docker
Key Features:
- Topic Management: Creation and management of prediction topics
- Stake Management: Handling of stakes for workers and reputers
- Reward Distribution: Calculation and distribution of rewards based on performance
- Delegation System: Support for stake delegation to reputers
- Performance Metrics: Tracking of worker, reputer, and forecaster scores
- Fee Collection: Management of network fees and revenue distribution

## Build
```bash
# get deps
go mod tidy

# rebuild the autogenerated protobuf files
make proto-all
## Monitoring

# build the module, making sure the source compiles
make
```
Allora node emits its own `emissions` module metrics in each event, query and tx.
event: `allora_loadtest_produce_count`
query/tx: `allora_request_counter` for occurrences, `allora_request_latency_ms` for latency measures.
Different labels are applied where appropriate (eg "topic_id", "address", "nonce", etc.)
See `x/emissions/metrics/` for details.

Then somewhere else you have a minimal-chain running:
```bash
cd ../minimal-chain
go mod tidy
make install
make init
allorad start
```
6 changes: 3 additions & 3 deletions x/emissions/keeper/ema_scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (k *Keeper) CalcAndSaveInfererScoreEmaWithLastSavedTopicQuantile(

emaScores := []types.Score{emaScore}
activeArr := map[string]bool{previousInfererScore.Address: false}
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_INFERER_UNSPECIFIED, emaScores, activeArr)
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_INFERER_UNSPECIFIED, emaScores, activeArr, topic.Id)
return nil
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func (k *Keeper) CalcAndSaveForecasterScoreEmaWithLastSavedTopicQuantile(

emaScores := []types.Score{emaScore}
activeArr := map[string]bool{previousForecasterScore.Address: false}
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_FORECASTER, emaScores, activeArr)
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_FORECASTER, emaScores, activeArr, topic.Id)
return nil
}

Expand Down Expand Up @@ -230,6 +230,6 @@ func (k *Keeper) CalcAndSaveReputerScoreEmaWithLastSavedTopicQuantile(

emaScores := []types.Score{emaScore}
activeArr := map[string]bool{previousReputerScore.Address: false}
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_REPUTER, emaScores, activeArr)
types.EmitNewActorEMAScoresSetEvent(ctx, types.ActorType_ACTOR_TYPE_REPUTER, emaScores, activeArr, topic.Id)
return nil
}
6 changes: 5 additions & 1 deletion x/emissions/keeper/msgserver/msg_server_demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package msgserver

import (
"context"
"strconv"
"time"

"cosmossdk.io/errors"
Expand All @@ -12,7 +13,10 @@ import (
)

func (ms msgServer) FundTopic(ctx context.Context, msg *types.FundTopicRequest) (_ *types.FundTopicResponse, err error) {
defer metrics.RecordMetrics("FundTopic", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
}
defer metrics.RecordMetrics("FundTopic", time.Now(), &err, labels)

if err := types.ValidateSdkIntRepresentingMonetaryValue(msg.Amount); err != nil {
return nil, errors.Wrap(err, "amount is not valid")
Expand Down
2 changes: 1 addition & 1 deletion x/emissions/keeper/msgserver/msg_server_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func (ms msgServer) UpdateParams(ctx context.Context, msg *types.UpdateParamsRequest) (_ *types.UpdateParamsResponse, err error) {
defer metrics.RecordMetrics("UpdateParams", time.Now(), &err)
defer metrics.RecordMetrics("UpdateParams", time.Now(), &err, map[string]string{})

err = ms.k.ValidateStringIsBech32(msg.Sender)
if err != nil {
Expand Down
30 changes: 27 additions & 3 deletions x/emissions/keeper/msgserver/msg_server_registrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package msgserver
import (
"context"
"fmt"
"strconv"
"time"

errorsmod "cosmossdk.io/errors"
Expand All @@ -14,7 +15,17 @@ import (

// Registers a new network participant to the network for the first time for worker or reputer
func (ms msgServer) Register(ctx context.Context, msg *types.RegisterRequest) (_ *types.RegisterResponse, err error) {
defer metrics.RecordMetrics("Register", time.Now(), &err)
actorLabel := "reputer"
if !msg.IsReputer {
actorLabel = "worker"
}

labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid labels with high cardinality, I think setting address as label would create too much metric entries which can impacts node performance

"actor": actorLabel,
}
defer metrics.RecordMetrics("Register", time.Now(), &err, labels)

err = msg.Validate()
if err != nil {
Expand Down Expand Up @@ -81,7 +92,17 @@ func (ms msgServer) Register(ctx context.Context, msg *types.RegisterRequest) (_

// Remove registration from a topic for worker or reputer
func (ms msgServer) RemoveRegistration(ctx context.Context, msg *types.RemoveRegistrationRequest) (_ *types.RemoveRegistrationResponse, err error) {
defer metrics.RecordMetrics("RemoveRegistration", time.Now(), &err)
actorLabel := "reputer"
if !msg.IsReputer {
actorLabel = "worker"
}

labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
"actor": actorLabel,
}
defer metrics.RecordMetrics("RemoveRegistration", time.Now(), &err, labels)

err = msg.Validate()
if err != nil {
Expand Down Expand Up @@ -137,7 +158,10 @@ func (ms msgServer) RemoveRegistration(ctx context.Context, msg *types.RemoveReg
}

func (ms msgServer) CheckBalanceForRegistration(ctx context.Context, address string) (success bool, fee sdk.Coin, err error) {
defer metrics.RecordMetrics("CheckBalanceForRegistration", time.Now(), &err)
labels := map[string]string{
"address": address,
}
defer metrics.RecordMetrics("CheckBalanceForRegistration", time.Now(), &err, labels)

moduleParams, err := ms.k.GetParams(ctx)
if err != nil {
Expand Down
44 changes: 36 additions & 8 deletions x/emissions/keeper/msgserver/msg_server_reputer_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package msgserver

import (
"context"
"strconv"
"time"

errorsmod "cosmossdk.io/errors"
Expand All @@ -14,27 +15,54 @@ import (

// A tx function that accepts a individual loss and possibly returns an error
func (ms msgServer) InsertReputerPayload(ctx context.Context, msg *types.InsertReputerPayloadRequest) (_ *types.InsertReputerPayloadResponse, err error) {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err)

sdkCtx := sdk.UnwrapSDKContext(ctx)
blockHeight := sdkCtx.BlockHeight()

moduleParams, err := ms.k.GetParams(ctx)
if err != nil {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": err.Error()})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having error messages as label can creates a lot of metric entries and are hard to use (e.g. error msgs can contains values creating multiple error labels for the same one), it's preferable to use error codes instead

return nil, errorsmod.Wrapf(err, "Error getting params for reputer: %v", &msg.ReputerValueBundle.ValueBundle.Reputer)
}
err = ms.k.ValidateStringIsBech32(msg.Sender)
if err != nil {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": err.Error()})
return nil, errorsmod.Wrapf(err, "Error validating sender address")
}
// fast permission check before entering validations which are more expensive
if msg.ReputerValueBundle != nil && msg.ReputerValueBundle.ValueBundle != nil {
err = ms.k.ValidateStringIsBech32(msg.ReputerValueBundle.ValueBundle.Reputer)
if err != nil {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": err.Error()})
return nil, errorsmod.Wrapf(err, "Error validating reputer address")
}
canSubmit, err := ms.k.CanSubmitReputerPayload(ctx, msg.ReputerValueBundle.ValueBundle.TopicId, msg.ReputerValueBundle.ValueBundle.Reputer)
if err != nil {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": err.Error()})
return nil, err
} else if !canSubmit {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": types.ErrNotPermittedToSubmitReputerPayload.Error()})
return nil, types.ErrNotPermittedToSubmitReputerPayload
}
} else {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": types.ErrInvalidReputerData.Error()})
return nil, types.ErrInvalidReputerData
}

// includes validation
rvb, err := types.NewInputReputerValueBundleFromInput(msg.ReputerValueBundle)
if err != nil {
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, map[string]string{"error": err.Error()})
return nil, errorsmod.Wrapf(err,
"Reputer bad data format for block: %d", blockHeight)
}

canSubmit, err := ms.k.CanSubmitReputerPayload(ctx, rvb.ValueBundle.TopicId, rvb.ValueBundle.Reputer)
if err != nil {
return nil, err
} else if !canSubmit {
return nil, types.ErrNotPermittedToSubmitReputerPayload
// if validated, record metrics with appropriate labels
labels := map[string]string{
"address": msg.ReputerValueBundle.ValueBundle.Reputer,
"topic_id": strconv.FormatUint(msg.ReputerValueBundle.ValueBundle.TopicId, 10),
"nonce": strconv.FormatUint(uint64(msg.ReputerValueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight), 10),
"blockHeight": strconv.FormatInt(blockHeight, 10),
}
defer metrics.RecordMetrics("InsertReputerPayload", time.Now(), &err, labels)
Comment on lines +59 to +65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have multiple remarks here:

  • The time.Now() being executed here would provide wrong latency measurement;
  • The address, nonce, blockHeight introduce a very high cardinality, that can be dangerous;

I think we should have only one defer statement at the beginning, and if we need to evaluate vars at execution we can use defer func() {} instead


err = checkInputLength(moduleParams.MaxSerializedMsgLength, msg)
if err != nil {
Expand Down
43 changes: 36 additions & 7 deletions x/emissions/keeper/msgserver/msg_server_stake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package msgserver

import (
"context"
"strconv"
"time"

"errors"
Expand All @@ -16,7 +17,11 @@ import (

// Function for reputers to call to add stake to an existing stake position.
func (ms msgServer) AddStake(ctx context.Context, msg *types.AddStakeRequest) (_ *types.AddStakeResponse, err error) {
defer metrics.RecordMetrics("AddStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("AddStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -68,7 +73,11 @@ func (ms msgServer) AddStake(ctx context.Context, msg *types.AddStakeRequest) (_
// once the withdrawal delay has passed then the ABCI endBlocker will automatically pay out the stake removal
// if this function is called twice, it will overwrite the previous stake removal and the delay will reset.
func (ms msgServer) RemoveStake(ctx context.Context, msg *types.RemoveStakeRequest) (_ *types.RemoveStakeResponse, err error) {
defer metrics.RecordMetrics("RemoveStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("RemoveStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -128,7 +137,11 @@ func (ms msgServer) RemoveStake(ctx context.Context, msg *types.RemoveStakeReque

// cancel a request to remove your stake, during the delay window
func (ms msgServer) CancelRemoveStake(ctx context.Context, msg *types.CancelRemoveStakeRequest) (_ *types.CancelRemoveStakeResponse, err error) {
defer metrics.RecordMetrics("CancelRemoveStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("CancelRemoveStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -161,7 +174,11 @@ func (ms msgServer) CancelRemoveStake(ctx context.Context, msg *types.CancelRemo

// Delegates a stake to a reputer. Sender does not have to be registered to delegate stake.
func (ms msgServer) DelegateStake(ctx context.Context, msg *types.DelegateStakeRequest) (_ *types.DelegateStakeResponse, err error) {
defer metrics.RecordMetrics("DelegateStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("DelegateStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -204,7 +221,11 @@ func (ms msgServer) DelegateStake(ctx context.Context, msg *types.DelegateStakeR
// once the withdrawal delay has passed then the ABCI endBlocker will automatically pay out the stake removal
// if this function is called twice, it will overwrite the previous stake removal and the delay will reset.
func (ms msgServer) RemoveDelegateStake(ctx context.Context, msg *types.RemoveDelegateStakeRequest) (_ *types.RemoveDelegateStakeResponse, err error) {
defer metrics.RecordMetrics("RemoveDelegateStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("RemoveDelegateStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -286,7 +307,11 @@ func (ms msgServer) RemoveDelegateStake(ctx context.Context, msg *types.RemoveDe

// cancel an ongoing stake removal request during the delay period
func (ms msgServer) CancelRemoveDelegateStake(ctx context.Context, msg *types.CancelRemoveDelegateStakeRequest) (_ *types.CancelRemoveDelegateStakeResponse, err error) {
defer metrics.RecordMetrics("CancelRemoveDelegateStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("CancelRemoveDelegateStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -333,7 +358,11 @@ func (ms msgServer) CancelRemoveDelegateStake(ctx context.Context, msg *types.Ca
}

func (ms msgServer) RewardDelegateStake(ctx context.Context, msg *types.RewardDelegateStakeRequest) (_ *types.RewardDelegateStakeResponse, err error) {
defer metrics.RecordMetrics("RewardDelegateStake", time.Now(), &err)
labels := map[string]string{
"topic_id": strconv.FormatUint(msg.TopicId, 10),
"address": msg.Sender,
}
defer metrics.RecordMetrics("RewardDelegateStake", time.Now(), &err, labels)

if err = msg.Validate(); err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion x/emissions/keeper/msgserver/msg_server_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
)

func (ms msgServer) CreateNewTopic(ctx context.Context, msg *types.CreateNewTopicRequest) (_ *types.CreateNewTopicResponse, err error) {
defer metrics.RecordMetrics("CreateNewTopic", time.Now(), &err)
labels := map[string]string{
"address": msg.Creator,
}
defer metrics.RecordMetrics("CreateNewTopic", time.Now(), &err, labels)

// Validate the address
if err := ms.k.ValidateStringIsBech32(msg.Creator); err != nil {
Expand Down
Loading
Loading