-
Notifications
You must be signed in to change notification settings - Fork 17
chain validation and fix command #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package cmd | ||
|
||
import ( | ||
"crypto/tls" | ||
"fmt" | ||
"math/big" | ||
"strconv" | ||
|
||
"github.com/ClickHouse/clickhouse-go/v2" | ||
"github.com/rs/zerolog/log" | ||
"github.com/spf13/cobra" | ||
config "github.com/thirdweb-dev/indexer/configs" | ||
"github.com/thirdweb-dev/indexer/internal/rpc" | ||
"github.com/thirdweb-dev/indexer/internal/storage" | ||
"github.com/thirdweb-dev/indexer/internal/validation" | ||
) | ||
|
||
var ( | ||
validateCmd = &cobra.Command{ | ||
Use: "validate", | ||
Short: "TBD", | ||
Long: "TBD", | ||
Run: func(cmd *cobra.Command, args []string) { | ||
RunValidate(cmd, args) | ||
}, | ||
} | ||
) | ||
|
||
func RunValidate(cmd *cobra.Command, args []string) { | ||
batchSize := big.NewInt(1000) | ||
fixBatchSize := 0 // default is no batch size | ||
if len(args) > 0 { | ||
batchSizeFromArgs, err := strconv.Atoi(args[0]) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to parse batch size") | ||
} | ||
if batchSizeFromArgs < 1 { | ||
batchSizeFromArgs = 1 | ||
} | ||
batchSize = big.NewInt(int64(batchSizeFromArgs)) | ||
log.Info().Msgf("Using batch size %d from args", batchSize) | ||
} | ||
if len(args) > 1 { | ||
fixBatchSizeFromArgs, err := strconv.Atoi(args[1]) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to parse fix batch size") | ||
} | ||
fixBatchSize = fixBatchSizeFromArgs | ||
} | ||
log.Debug().Msgf("Batch size: %d, fix batch size: %d", batchSize, fixBatchSize) | ||
batchSize = new(big.Int).Sub(batchSize, big.NewInt(1)) // -1 because range ends are inclusive | ||
|
||
rpcClient, err := rpc.Initialize() | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to initialize RPC") | ||
} | ||
log.Info().Msgf("Running validation for chain %d", rpcClient.GetChainID()) | ||
|
||
s, err := storage.NewStorageConnector(&config.Cfg.Storage) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to initialize storage") | ||
} | ||
cursor, err := validation.InitCursor(rpcClient.GetChainID(), s) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to initialize cursor") | ||
} | ||
log.Debug().Msgf("Cursor initialized for chain %d, starting from block %d", rpcClient.GetChainID(), cursor.LastScannedBlockNumber) | ||
|
||
conn, err := clickhouse.Open(&clickhouse.Options{ | ||
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.Storage.Main.Clickhouse.Host, config.Cfg.Storage.Main.Clickhouse.Port)}, | ||
Protocol: clickhouse.Native, | ||
TLS: &tls.Config{}, | ||
Auth: clickhouse.Auth{ | ||
Username: config.Cfg.Storage.Main.Clickhouse.Username, | ||
Password: config.Cfg.Storage.Main.Clickhouse.Password, | ||
}, | ||
Settings: func() clickhouse.Settings { | ||
settings := clickhouse.Settings{ | ||
"do_not_merge_across_partitions_select_final": "1", | ||
"use_skip_indexes_if_final": "1", | ||
"optimize_move_to_prewhere_if_final": "1", | ||
"async_insert": "1", | ||
"wait_for_async_insert": "1", | ||
} | ||
return settings | ||
}(), | ||
}) | ||
Comment on lines
+69
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Set TLS MinVersion for better security The TLS configuration is missing a TLS: &tls.Config{
+ MinVersion: tls.VersionTLS12,
}, Ideally, use TLS 1.3 if your server supports it: TLS: &tls.Config{
+ MinVersion: tls.VersionTLS13,
}, 🧰 Tools🪛 ast-grep (0.38.1)[warning] 71-71: MinVersion (missing-ssl-minversion-go) 🤖 Prompt for AI Agents
|
||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to connect to ClickHouse") | ||
} | ||
defer conn.Close() | ||
|
||
startBlock := new(big.Int).Add(cursor.LastScannedBlockNumber, big.NewInt(1)) | ||
|
||
for startBlock.Cmp(cursor.MaxBlockNumber) <= 0 { | ||
batchEndBlock := new(big.Int).Add(startBlock, batchSize) | ||
if batchEndBlock.Cmp(cursor.MaxBlockNumber) > 0 { | ||
batchEndBlock = new(big.Int).Set(cursor.MaxBlockNumber) | ||
} | ||
|
||
log.Info().Msgf("Validating batch of blocks from %s to %s", startBlock.String(), batchEndBlock.String()) | ||
err := validateAndFixRange(rpcClient, s, conn, startBlock, batchEndBlock, fixBatchSize) | ||
if err != nil { | ||
log.Fatal().Err(err).Msgf("failed to validate range %v-%v", startBlock, batchEndBlock) | ||
} | ||
|
||
startBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1)) | ||
cursor.Update(batchEndBlock) | ||
} | ||
} | ||
|
||
/** | ||
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds | ||
*/ | ||
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error { | ||
chainId := rpcClient.GetChainID() | ||
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to find and fix duplicates") | ||
} | ||
|
||
err = validation.FindAndFixGaps(rpcClient, s, conn, chainId, startBlock, endBlock) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to find and fix gaps") | ||
} | ||
|
||
err = validation.ValidateAndFixBlocks(rpcClient, s, conn, startBlock, endBlock, fixBatchSize) | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("Failed to validate and fix blocks") | ||
} | ||
|
||
log.Debug().Msgf("Validation complete for range %v-%v", startBlock, endBlock) | ||
return nil | ||
Comment on lines
+115
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve error handling in validateAndFixRange The function calls func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
chainId := rpcClient.GetChainID()
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)
if err != nil {
- log.Fatal().Err(err).Msg("Failed to find and fix duplicates")
+ return fmt.Errorf("failed to find and fix duplicates: %w", err)
}
err = validation.FindAndFixGaps(rpcClient, s, conn, chainId, startBlock, endBlock)
if err != nil {
- log.Fatal().Err(err).Msg("Failed to find and fix gaps")
+ return fmt.Errorf("failed to find and fix gaps: %w", err)
}
err = validation.ValidateAndFixBlocks(rpcClient, s, conn, startBlock, endBlock, fixBatchSize)
if err != nil {
- log.Fatal().Err(err).Msg("Failed to validate and fix blocks")
+ return fmt.Errorf("failed to validate and fix blocks: %w", err)
}
log.Debug().Msgf("Validation complete for range %v-%v", startBlock, endBlock)
return nil
}
🤖 Prompt for AI Agents
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.