Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f98281f
add storage data migration modules
zhangchiqing May 9, 2025
a3c46a7
add storage data migration
zhangchiqing May 9, 2025
6cb9ebe
log progress
zhangchiqing May 10, 2025
e7e679f
fix error handling
zhangchiqing May 12, 2025
9922a15
fix lint
zhangchiqing May 16, 2025
ac246df
Merge branch 'master' into leo/storage-data-migration
zhangchiqing May 20, 2025
5cffb65
add validation
zhangchiqing May 16, 2025
0e8ae70
add runner
zhangchiqing May 16, 2025
3608508
add db migration cmd
zhangchiqing May 16, 2025
cd081ef
update comments and errors
zhangchiqing May 16, 2025
d865f41
add subcommand to util cmd root
zhangchiqing May 20, 2025
4d43afe
fix logging
zhangchiqing May 21, 2025
a46d236
trigger compaction at the end
zhangchiqing May 22, 2025
c054346
print all events
zhangchiqing May 22, 2025
4547b81
fix compaction
zhangchiqing May 22, 2025
67e27f4
fix migration to include single bytes keys
zhangchiqing May 22, 2025
857437f
Merge branch 'leo/storage-data-migration' into leo/storage-data-migra…
zhangchiqing May 22, 2025
3e122a2
update validation
zhangchiqing May 22, 2025
fc2a986
Merge branch 'leo/storage-data-migration-cmd' into leo/storage-data-m…
zhangchiqing May 22, 2025
7a81e03
skip compaction
zhangchiqing May 22, 2025
d98b746
refactor migration to commit in batch
zhangchiqing May 23, 2025
74029d8
better error handling
zhangchiqing May 23, 2025
50f5ac8
add benchmark tests
zhangchiqing May 23, 2025
dd206d9
refactor copy exact keys
zhangchiqing May 23, 2025
959a4a5
Merge branch 'leo/storage-data-migration' into leo/storage-data-migra…
zhangchiqing May 24, 2025
3634d19
Merge branch 'leo/storage-data-migration-cmd' into leo/storage-data-m…
zhangchiqing May 24, 2025
58dec5f
disable auto compaction, trigger compaction at the end
zhangchiqing May 24, 2025
a77bd72
disable WAL
zhangchiqing May 24, 2025
542f514
add comments
zhangchiqing May 24, 2025
911d448
add badger logger
zhangchiqing May 24, 2025
0abd486
disable badger compaction
zhangchiqing May 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions cmd/util/cmd/db-migration/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package db

import (
"github.com/docker/go-units"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/onflow/flow-go/storage/migration"
)

var (
flagBadgerDBdir string
flagPebbleDBdir string
flagBatchByteSize int
flagReaderCount int
flagWriterCount int
flagReaderShardPrefixBytes int
)

var Cmd = &cobra.Command{
Use: "db-migration",
Short: "copy badger db to pebble db",
Run: run,
}

func init() {
Cmd.Flags().StringVar(&flagBadgerDBdir, "datadir", "", "BadgerDB Dir to copy data from")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagPebbleDBdir, "pebbledir", "", "PebbleDB Dir to copy data to")
_ = Cmd.MarkFlagRequired("pebbledir")

Cmd.Flags().IntVar(&flagBatchByteSize, "batch_byte_size", migration.DefaultMigrationConfig.BatchByteSize,
"the batch size in bytes to use for migration (32MB by default)")

Cmd.Flags().IntVar(&flagReaderCount, "reader_count", migration.DefaultMigrationConfig.ReaderWorkerCount,
"the number of reader workers to use for migration")

Cmd.Flags().IntVar(&flagWriterCount, "writer_count", migration.DefaultMigrationConfig.WriterWorkerCount,
"the number of writer workers to use for migration")

Cmd.Flags().IntVar(&flagReaderShardPrefixBytes, "reader_shard_prefix_bytes", migration.DefaultMigrationConfig.ReaderShardPrefixBytes,
"the number of prefix bytes used to assign iterator workload")
}

func run(*cobra.Command, []string) {
lg := log.With().
Str("badger_db_dir", flagBadgerDBdir).
Str("pebble_db_dir", flagPebbleDBdir).
Str("batch_byte_size", units.HumanSize(float64(flagBatchByteSize))).
Int("reader_count", flagReaderCount).
Int("writer_count", flagWriterCount).
Int("reader_shard_prefix_bytes", flagReaderShardPrefixBytes).
Logger()

lg.Info().Msgf("starting migration from badger db to pebble db")

err := migration.RunMigration(flagBadgerDBdir, flagPebbleDBdir, migration.MigrationConfig{
BatchByteSize: flagBatchByteSize,
ReaderWorkerCount: flagReaderCount,
WriterWorkerCount: flagWriterCount,
ReaderShardPrefixBytes: flagReaderShardPrefixBytes,
})

if err != nil {
lg.Error().Err(err).Msg("migration failed")
return
}

lg.Info().Msgf("migration completed")
}
2 changes: 2 additions & 0 deletions cmd/util/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
checkpoint_collect_stats "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-collect-stats"
checkpoint_list_tries "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-list-tries"
checkpoint_trie_stats "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-trie-stats"
db_migration "github.com/onflow/flow-go/cmd/util/cmd/db-migration"
debug_script "github.com/onflow/flow-go/cmd/util/cmd/debug-script"
debug_tx "github.com/onflow/flow-go/cmd/util/cmd/debug-tx"
diff_states "github.com/onflow/flow-go/cmd/util/cmd/diff-states"
Expand Down Expand Up @@ -132,6 +133,7 @@ func addCommands() {
rootCmd.AddCommand(evm_state_exporter.Cmd)
rootCmd.AddCommand(verify_execution_result.Cmd)
rootCmd.AddCommand(verify_evm_offchain_replay.Cmd)
rootCmd.AddCommand(db_migration.Cmd)
}

func initConfig() {
Expand Down
295 changes: 295 additions & 0 deletions storage/migration/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
package migration

import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"

"github.com/rs/zerolog/log"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/module/util"
)

type MigrationConfig struct {
BatchByteSize int // the size of each batch to write to pebble
ReaderWorkerCount int // the number of workers to read from badger
WriterWorkerCount int // the number of workers to write to the pebble

// number of prefix bytes used to assign iterator workload
// e.g, if the number is 1, it means the first byte of the key is used to divide into 256 key space,
// and each worker will be assigned to iterate all keys with the same first byte.
// Since keys are not evenly distributed, especially some table under a certain prefix byte may have
// a lot more data than others, we might choose to use 2 or 3 bytes to divide the key space, so that
// the redaer worker can concurrently iterate keys with the same prefix bytes (same table).
ReaderShardPrefixBytes int
}

type KVPair struct {
Key []byte
Value []byte
}

func GeneratePrefixes(n int) [][]byte {
if n == 0 {
return [][]byte{{}}
}

base := 1 << (8 * n)
results := make([][]byte, 0, base)

for i := 0; i < base; i++ {
buf := make([]byte, n)
switch n {
case 1:
buf[0] = byte(i)
case 2:
binary.BigEndian.PutUint16(buf, uint16(i))
case 3:
buf[0] = byte(i >> 16)
buf[1] = byte(i >> 8)
buf[2] = byte(i)
default:
panic("unsupported prefix byte length")
}
results = append(results, buf)
}
return results
}

func GenerateKeysShorterThanPrefix(n int) [][]byte {
allKeys := make([][]byte, 0)
for i := 1; i < n; i++ {
keys := GeneratePrefixes(i)
allKeys = append(allKeys, keys...)
}
return allKeys
}

// readerWorker reads key-value pairs from BadgerDB using a prefix iterator.
func readerWorker(
ctx context.Context,
lgProgress func(int),
db *badger.DB,
jobs <-chan []byte, // each job is a prefix to iterate over
kvChan chan<- []KVPair, // channel to send key-value pairs to writer workers
batchSize int,
) error {
for prefix := range jobs {
err := db.View(func(txn *badger.Txn) error {
defer lgProgress(1)

if ctx.Err() != nil {
return ctx.Err()
}

options := badger.DefaultIteratorOptions
options.Prefix = prefix
it := txn.NewIterator(options)
defer it.Close()

var (
kvBatch []KVPair
currSize int
)

for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
if ctx.Err() != nil {
return ctx.Err()
}

item := it.Item()
key := item.KeyCopy(nil)
val, err := item.ValueCopy(nil)
if err != nil {
return err
}

kvBatch = append(kvBatch, KVPair{Key: key, Value: val})
currSize += len(key) + len(val)

if currSize >= batchSize {
select {
case kvChan <- kvBatch:
case <-ctx.Done():
return ctx.Err()
}
kvBatch = nil
currSize = 0
}
}

if len(kvBatch) > 0 {
select {
case kvChan <- kvBatch:
case <-ctx.Done():
return ctx.Err()
}
}

return nil
})

lgProgress(1)

if err != nil {
return err
}
}
return nil
}

// writerWorker writes key-value pairs to PebbleDB in batches.
func writerWorker(ctx context.Context, db *pebble.DB, kvChan <-chan []KVPair) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case kvGroup, ok := <-kvChan:
if !ok {
return nil
}
batch := db.NewBatch()
for _, kv := range kvGroup {
if err := batch.Set(kv.Key, kv.Value, nil); err != nil {
return fmt.Errorf("fail to set key %x: %w", kv.Key, err)
}
}

if err := batch.Commit(nil); err != nil {
return fmt.Errorf("fail to commit batch: %w", err)
}
}
}
}

// CopyFromBadgerToPebble migrates all key-value pairs from a BadgerDB instance to a PebbleDB instance.
//
// The migration is performed in parallel using a configurable number of reader and writer workers.
// Reader workers iterate over the BadgerDB by sharded key prefixes (based on ReaderShardPrefixBytes)
// and send key-value pairs to a shared channel. Writer workers consume from this channel and write
// batched entries into PebbleDB.
//
// Configuration is provided via MigrationConfig:
// - BatchByteSize: maximum size in bytes for a single Pebble write batch.
// - ReaderWorkerCount: number of concurrent workers reading from Badger.
// - WriterWorkerCount: number of concurrent workers writing to Pebble.
// - ReaderShardPrefixBytes: number of bytes used to shard the keyspace for parallel iteration.
//
// The function blocks until all keys are migrated and written successfully.
// It returns an error if any part of the process fails.
func CopyFromBadgerToPebble(badgerDB *badger.DB, pebbleDB *pebble.DB, cfg MigrationConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
errOnce sync.Once
firstErr error
)

// once running into an exception, cancel the context and report the first error
reportFirstError := func(err error) {
if err != nil {
errOnce.Do(func() {
firstErr = err
cancel()
})
}
}

// Step 1: Copy all keys shorter than prefix
keysShorterThanPrefix := GenerateKeysShorterThanPrefix(cfg.ReaderShardPrefixBytes)
if err := copyExactKeysFromBadgerToPebble(badgerDB, pebbleDB, keysShorterThanPrefix); err != nil {
return fmt.Errorf("failed to copy keys shorter than prefix: %w", err)
}
log.Info().Msgf("Copied %d keys shorter than %v bytes prefix", len(keysShorterThanPrefix), cfg.ReaderShardPrefixBytes)

// Step 2: Copy all keys with prefix by first generating prefix shards and then
// using reader and writer workers to copy the keys with the same prefix
prefixes := GeneratePrefixes(cfg.ReaderShardPrefixBytes)
prefixJobs := make(chan []byte, len(prefixes))
for _, prefix := range prefixes {
prefixJobs <- prefix
}
close(prefixJobs)

kvChan := make(chan []KVPair, cfg.ReaderWorkerCount*2)

lg := util.LogProgress(
log.Logger,
util.DefaultLogProgressConfig("migration keys from badger to pebble", len(prefixes)),
)

var readerWg sync.WaitGroup
for i := 0; i < cfg.ReaderWorkerCount; i++ {
readerWg.Add(1)
go func() {
defer readerWg.Done()
if err := readerWorker(ctx, lg, badgerDB, prefixJobs, kvChan, cfg.BatchByteSize); err != nil {
reportFirstError(err)
}
}()
}

var writerWg sync.WaitGroup
for i := 0; i < cfg.WriterWorkerCount; i++ {
writerWg.Add(1)
go func() {
defer writerWg.Done()
if err := writerWorker(ctx, pebbleDB, kvChan); err != nil {
reportFirstError(err)
}
}()
}

// Close kvChan after readers complete
go func() {
readerWg.Wait()
close(kvChan)
}()

writerWg.Wait()
return firstErr
}

func copyExactKeysFromBadgerToPebble(badgerDB *badger.DB, pebbleDB *pebble.DB, keys [][]byte) error {
batch := pebbleDB.NewBatch()
err := badgerDB.View(func(txn *badger.Txn) error {
for _, key := range keys {
item, err := txn.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
// skip if the key is not found
continue
}

return err
}

err = item.Value(func(val []byte) error {
return batch.Set(key, val, nil)
})

if err != nil {
return fmt.Errorf("failed to get value for key %x: %w", key, err)
}
}

return nil
})

if err != nil {
return fmt.Errorf("failed to get key from BadgerDB: %w", err)
}

err = batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch to PebbleDB: %w", err)
}

return nil
}
Loading