-
Notifications
You must be signed in to change notification settings - Fork 190
[Storage] Add storage data migration functions #7396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
1ae00cb
to
a3c46a7
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7396 +/- ##
==========================================
+ Coverage 41.11% 41.15% +0.04%
==========================================
Files 2207 2209 +2
Lines 193755 194074 +319
==========================================
+ Hits 79660 79876 +216
- Misses 107491 107575 +84
- Partials 6604 6623 +19
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d898f40
to
e7e679f
Compare
// Simple deterministic dataset | ||
func TestMigrationWithSimpleData(t *testing.T) { | ||
data := map[string]string{ | ||
"apple": "fruit", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found an issue that needs to fix, the migration didn't migrate keys that is only a single byte. A test case can be add here to verify "a": "a single key".
This is the case for the FinalizedHeight
key, which has only a single prefix byte.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I left some comments and suggestions.
There are the more important items:
-
To reduce compaction in Pebble, we may want to write records with the same prefix sequentially when possible.
For example, instead of sending and receiving (writing) individual KVPair, maybe we can send and receive (write) batched KVPairs with the same prefix.
-
For error handling, is the program expected to proceed with other prefixes after a read error is encountered with one prefix? If so, we shouldn't exit read worker goroutine on error because this would reduce workers for the remaining work.
storage/migration/migration.go
Outdated
if n == 0 { | ||
return [][]byte{{}} | ||
} | ||
var results [][]byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pre-allocate results here.
var results [][]byte | |
results := make([][]byte, 0, base) |
storage/migration/migration.go
Outdated
for _, key := range keys { | ||
err := badgerDB.View(func(txn *badger.Txn) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be faster if we iterate keys inside badgerDB.View
.
for _, key := range keys { | |
err := badgerDB.View(func(txn *badger.Txn) error { | |
err := badgerDB.View(func(txn *badger.Txn) error { | |
for _, key := range keys { |
} | ||
|
||
func copyExactKeysFromBadgerToPebble(badgerDB *badger.DB, pebbleDB *pebble.DB, keys [][]byte) error { | ||
batch := pebbleDB.NewBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Pebble's max batch size is ~4GB. Given the prefix length is configurable, should we use multiple batches here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one batch is enough. We most likely will only use 2 prefix bytes, and there is not many keys have exact 2 bytes, we have a few 1 byte key. Most keys contains a flow.Identifier as part of the key, which is 32 bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one batch is enough. We most likely will only use 2 prefix bytes, and there is not many keys have exact 2 bytes, we have a few 1 byte key. Most keys contains a flow.Identifier as part of the key, which is 32 bytes.
Sounds good.
Just to clarity, batch size includes both key and value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could run a test and print how big the actual size is including both key and value, but I think it should be quite small.
storage/migration/migration.go
Outdated
// read key value | ||
val, err := item.ValueCopy(nil) | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying value from BadgerDB may not be needed because Pebble's Batch.Set()
copies key and value under the hood.
storage/migration/migration.go
Outdated
} | ||
} | ||
|
||
err := batch.Commit(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to sync to disk in batch.Commit
?
err := batch.Commit(nil) | |
err := batch.Commit(pebble.Sync) |
storage/migration/migration.go
Outdated
defer wg.Done() | ||
|
||
for prefix := range jobs { | ||
defer lgProgress(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer
is called when this goroutine is completed.
For progress logging, we probably want to call lgProgress(1)
after db.View
is called (not in defer
).
storage/migration/migration.go
Outdated
defer lgProgress(1) | ||
|
||
err := db.View(func(txn *badger.Txn) error { | ||
it := txn.NewIterator(badger.DefaultIteratorOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can specify IteratorOptions.Prefix
to narrow down SSTables when creating iterator.
it := txn.NewIterator(badger.DefaultIteratorOptions) | |
options := badger.DefaultIteratorOptions | |
options.Prefix = prefix | |
it := txn.NewIterator(options) |
storage/migration/migration.go
Outdated
if err != nil { | ||
return err | ||
} | ||
kvChan <- KVPair{Key: key, Value: val} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be faster if we send batched KVPair
to channel, instead of sending individual one.
Also, it might be better for compaction if we can write items with the same prefix sequentially to reduce sorting for compaction.
storage/migration/migration.go
Outdated
if err := flush(); err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call batch.Commit()
directly here to avoid creating a new batch unnecessarily.
storage/migration/migration.go
Outdated
if err != nil { | ||
return fmt.Errorf("Reader error for prefix %x: %v\n", prefix, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the program continues migration with other prefixes instead of exiting on error, we shouldn't exit goroutine here because it reduces number of workers for the remaining work.
Also same issue for write workers.
6e954ee
to
dd206d9
Compare
} | ||
|
||
func copyExactKeysFromBadgerToPebble(badgerDB *badger.DB, pebbleDB *pebble.DB, keys [][]byte) error { | ||
batch := pebbleDB.NewBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could run a test and print how big the actual size is including both key and value, but I think it should be quite small.
// | ||
// The function blocks until all keys are migrated and written successfully. | ||
// It returns an error if any part of the process fails. | ||
func CopyFromBadgerToPebble(badgerDB *badger.DB, pebbleDB *pebble.DB, cfg MigrationConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach works, but I noticed it's quite slow, since inserting keys to pebble will often trigger compaction.
Pebble actually provides another way to bulk insert key-value pairs by directly writing to sstables. So as long as the key-value pairs are sorted when writing to the sstables, it can avoid compaction, and therefore much faster.
I'm going to try in a separate PR, but reuse the same test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If data migration is too slow, maybe try these Pebble settings and manual compaction:
Options.DisableWAL
to disable writing WAL filesOptions.DisableAutomaticCompactions
to disable compaction during migration- After migration completes, use
Compact()
with theparallelize
parameter set totrue
for manual compaction.
dd206d9
to
bfd2615
Compare
Working towards #7395
This PR implemented the functions
CopyFromBadgerToPebble
to copy all key-value pairs from badger to pebble.