Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions triedb/pathdb/history_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,15 @@ type indexWriter struct {
db ethdb.KeyValueReader
}

// newIndexWriter constructs the index writer for the specified state.
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) {
// newIndexWriter constructs the index writer for the specified state. Additionally,
// it takes an element ID and prunes all existing elements above that ID. It's
// essential as the recovery mechanism after unclean shutdown during the history
// indexing.
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent, lastID uint64) (*indexWriter, error) {
blob := readStateIndex(state, db)
if len(blob) == 0 {
desc := newIndexBlockDesc(0)
bw, _ := newBlockWriter(nil, desc)
bw, _ := newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
return &indexWriter{
descList: []*indexBlockDesc{desc},
bw: bw,
Expand All @@ -180,15 +183,26 @@ func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, er
if err != nil {
return nil, err
}
// Trim trailing blocks whose elements all exceed the limit.
for i := len(descList) - 1; i > 0; i-- {
if descList[i].max <= lastID {
break
}
if descList[i-1].max >= lastID {
descList = descList[:i]
}
}

// Take the last block for appending new elements
lastDesc := descList[len(descList)-1]
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
bw, err := newBlockWriter(indexBlock, lastDesc, lastID)
if err != nil {
return nil, err
}
return &indexWriter{
descList: descList,
lastID: lastDesc.max,
lastID: bw.last(),
bw: bw,
state: state,
db: db,
Expand Down Expand Up @@ -221,7 +235,7 @@ func (w *indexWriter) rotate() error {
desc = newIndexBlockDesc(w.bw.desc.id + 1)
)
w.frozen = append(w.frozen, w.bw)
w.bw, err = newBlockWriter(nil, desc)
w.bw, err = newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
if err != nil {
return err
}
Expand Down Expand Up @@ -271,13 +285,13 @@ type indexDeleter struct {
}

// newIndexDeleter constructs the index deleter for the specified state.
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) {
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent, lastID uint64) (*indexDeleter, error) {
blob := readStateIndex(state, db)
if len(blob) == 0 {
// TODO(rjl493456442) we can probably return an error here,
// deleter with no data is meaningless.
desc := newIndexBlockDesc(0)
bw, _ := newBlockWriter(nil, desc)
bw, _ := newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
return &indexDeleter{
descList: []*indexBlockDesc{desc},
bw: bw,
Expand All @@ -289,15 +303,26 @@ func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter,
if err != nil {
return nil, err
}
// Trim trailing blocks whose elements all exceed the limit.
for i := len(descList) - 1; i > 0; i-- {
if descList[i].max <= lastID {
break
}
if descList[i-1].max >= lastID {
descList = descList[:i]
}
}

// Take the block for deleting element from
lastDesc := descList[len(descList)-1]
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
bw, err := newBlockWriter(indexBlock, lastDesc, lastID)
if err != nil {
return nil, err
}
return &indexDeleter{
descList: descList,
lastID: lastDesc.max,
lastID: bw.last(),
bw: bw,
state: state,
db: db,
Expand Down Expand Up @@ -337,7 +362,7 @@ func (d *indexDeleter) pop(id uint64) error {
// Open the previous block writer for deleting
lastDesc := d.descList[len(d.descList)-1]
indexBlock := readStateIndexBlock(d.state, d.db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
bw, err := newBlockWriter(indexBlock, lastDesc, lastDesc.max)
if err != nil {
return err
}
Expand Down
42 changes: 34 additions & 8 deletions triedb/pathdb/history_index_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"errors"
"fmt"
"math"

"github.com/ethereum/go-ethereum/log"
)

const (
indexBlockDescSize = 14 // The size of index block descriptor
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
indexBlockRestartLen = 256 // The restart interval length of index block
historyIndexBatch = 512 * 1024 // The number of state history indexes for constructing or deleting as batch
indexBlockDescSize = 14 // The size of index block descriptor
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
indexBlockRestartLen = 256 // The restart interval length of index block
historyIndexBatch = 8 * 1024 * 1024 // The number of state history indexes for constructing or deleting as batch
)

// indexBlockDesc represents a descriptor for an index block, which contains a
Expand Down Expand Up @@ -180,7 +182,11 @@ type blockWriter struct {
data []byte // Aggregated encoded data slice
}

func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
// newBlockWriter constructs a block writer. In addition to the existing data
// and block description, it takes an element ID and prunes all existing elements
// above that ID. It's essential as the recovery mechanism after unclean shutdown
// during the history indexing.
func newBlockWriter(blob []byte, desc *indexBlockDesc, lastID uint64) (*blockWriter, error) {
if len(blob) == 0 {
return &blockWriter{
desc: desc,
Expand All @@ -191,11 +197,22 @@ func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
if err != nil {
return nil, err
}
return &blockWriter{
writer := &blockWriter{
desc: desc,
restarts: restarts,
data: data, // safe to own the slice
}, nil
}
var trimmed int
for !writer.empty() && writer.last() > lastID {
if err := writer.pop(writer.last()); err != nil {
return nil, err
}
trimmed += 1
}
if trimmed > 0 {
log.Debug("Truncated extraneous elements", "count", trimmed, "lastID", lastID)
}
return writer, nil
}

// append adds a new element to the block. The new element must be greater than
Expand Down Expand Up @@ -271,6 +288,7 @@ func (b *blockWriter) sectionLast(section int) uint64 {

// sectionSearch looks up the specified value in the given section,
// the position and the preceding value will be returned if found.
// It assumes that the preceding element exists in the section.
func (b *blockWriter) sectionSearch(section int, n uint64) (found bool, prev uint64, pos int) {
b.scanSection(section, func(v uint64, p int) bool {
if n == v {
Expand All @@ -295,7 +313,6 @@ func (b *blockWriter) pop(id uint64) error {
}
// If there is only one entry left, the entire block should be reset
if b.desc.entries == 1 {
//b.desc.min = 0
b.desc.max = 0
b.desc.entries = 0
b.restarts = nil
Expand Down Expand Up @@ -331,6 +348,15 @@ func (b *blockWriter) full() bool {
return b.desc.full()
}

// last returns the last element in the block. It should only be called when
// writer is not empty, otherwise the returned data is meaningless.
func (b *blockWriter) last() uint64 {
if b.empty() {
return 0
}
return b.desc.max
}

// finish finalizes the index block encoding by appending the encoded restart points
// and the restart counter to the end of the block.
//
Expand Down
98 changes: 84 additions & 14 deletions triedb/pathdb/history_index_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestBlockReaderBasic(t *testing.T) {
elements := []uint64{
1, 5, 10, 11, 20,
}
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBlockReaderLarge(t *testing.T) {
}
slices.Sort(elements)

bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
Expand Down Expand Up @@ -95,19 +95,21 @@ func TestBlockReaderLarge(t *testing.T) {
}

func TestBlockWriterBasic(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
if !bw.empty() {
t.Fatal("expected empty block")
}
bw.append(2)
if err := bw.append(1); err == nil {
t.Fatal("out-of-order insertion is not expected")
}
var maxElem uint64
for i := 0; i < 10; i++ {
bw.append(uint64(i + 3))
maxElem = uint64(i + 3)
}

bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0))
bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0), maxElem)
if err != nil {
t.Fatalf("Failed to construct the block writer, %v", err)
}
Expand All @@ -119,8 +121,71 @@ func TestBlockWriterBasic(t *testing.T) {
bw.finish()
}

func TestBlockWriterWithLimit(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)

var maxElem uint64
for i := 0; i < indexBlockRestartLen*2; i++ {
bw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}

suites := []struct {
limit uint64
expMax uint64
}{
// nothing to truncate
{
maxElem, maxElem,
},
// truncate the last element
{
maxElem - 1, maxElem - 1,
},
// truncation around the restart boundary
{
uint64(indexBlockRestartLen + 1),
uint64(indexBlockRestartLen + 1),
},
// truncation around the restart boundary
{
uint64(indexBlockRestartLen),
uint64(indexBlockRestartLen),
},
{
uint64(1), uint64(1),
},
// truncate the entire block, it's in theory invalid
{
uint64(0), uint64(0),
},
}
for i, suite := range suites {
desc := *bw.desc
block, err := newBlockWriter(bw.finish(), &desc, suite.limit)
if err != nil {
t.Fatalf("Failed to construct the block writer, %v", err)
}
if block.desc.max != suite.expMax {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, block.desc.max, suite.expMax)
}

// Re-fill the elements
var maxElem uint64
for elem := suite.limit + 1; elem < indexBlockRestartLen*4; elem++ {
if err := block.append(elem); err != nil {
t.Fatalf("Failed to append value %d: %v", elem, err)
}
maxElem = elem
}
if block.desc.max != maxElem {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, block.desc.max, maxElem)
}
}
}

func TestBlockWriterDelete(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < 10; i++ {
bw.append(uint64(i + 1))
}
Expand All @@ -147,7 +212,7 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
elements := []uint64{
1, 5, 10, 11, 20,
}
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
Expand All @@ -158,7 +223,7 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
max: 20,
entries: 5,
}
bw, err := newBlockWriter(bw.finish(), desc)
bw, err := newBlockWriter(bw.finish(), desc, elements[len(elements)-1])
if err != nil {
t.Fatalf("Failed to construct block writer %v", err)
}
Expand Down Expand Up @@ -201,15 +266,18 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
}

func TestCorruptedIndexBlock(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)

var maxElem uint64
for i := 0; i < 10; i++ {
bw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
buf := bw.finish()

// Mutate the buffer manually
buf[len(buf)-1]++
_, err := newBlockWriter(buf, newIndexBlockDesc(0))
_, err := newBlockWriter(buf, newIndexBlockDesc(0), maxElem)
if err == nil {
t.Fatal("Corrupted index block data is not detected")
}
Expand All @@ -218,7 +286,7 @@ func TestCorruptedIndexBlock(t *testing.T) {
// BenchmarkParseIndexBlock benchmarks the performance of parseIndexBlock.
func BenchmarkParseIndexBlock(b *testing.B) {
// Generate a realistic index block blob
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < 4096; i++ {
bw.append(uint64(i * 2))
}
Expand All @@ -238,13 +306,15 @@ func BenchmarkBlockWriterAppend(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

desc := newIndexBlockDesc(0)
writer, _ := newBlockWriter(nil, desc)
var blockID uint32
desc := newIndexBlockDesc(blockID)
writer, _ := newBlockWriter(nil, desc, 0)

for i := 0; i < b.N; i++ {
if writer.full() {
desc = newIndexBlockDesc(0)
writer, _ = newBlockWriter(nil, desc)
blockID += 1
desc = newIndexBlockDesc(blockID)
writer, _ = newBlockWriter(nil, desc, 0)
}
if err := writer.append(writer.desc.max + 1); err != nil {
b.Error(err)
Expand Down
Loading