Skip to content

Commit 0833dd7

Browse files
committed
make inserts and reorgs atomic
1 parent 2fc72a3 commit 0833dd7

11 files changed

+527
-286
lines changed

internal/orchestrator/committer.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,17 @@ func (c *Committer) commit(blockData []common.BlockData) error {
168168
}
169169
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
170170

171+
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
172+
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
173+
return fmt.Errorf("error saving data to main storage: %v", err)
174+
}
175+
171176
go func() {
172177
if err := c.publisher.PublishBlockData(blockData); err != nil {
173178
log.Error().Err(err).Msg("Failed to publish block data to kafka")
174179
}
175180
}()
176181

177-
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
178-
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
179-
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
180-
return fmt.Errorf("error saving data to main storage: %v", err)
181-
}
182-
183182
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
184183
return fmt.Errorf("error deleting data from staging storage: %v", err)
185184
}

internal/orchestrator/reorg_handler.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,10 @@ func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
281281
})
282282
blocksToDelete = append(blocksToDelete, result.BlockNumber)
283283
}
284-
// TODO make delete and insert atomic
285-
deletedBlockData, err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete)
284+
285+
deletedBlockData, err := rh.storage.MainStorage.ReplaceBlockData(data)
286286
if err != nil {
287-
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
288-
}
289-
if err := rh.storage.MainStorage.InsertBlockData(data); err != nil {
290-
return fmt.Errorf("error saving data to main storage: %w", err)
287+
return fmt.Errorf("error replacing reorged data for blocks %v: %w", blocksToDelete, err)
291288
}
292289
if rh.publisher != nil {
293290
// Publish block data asynchronously

internal/orchestrator/reorg_handler_test.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,7 @@ func TestHandleReorg(t *testing.T) {
504504
})
505505
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
506506

507-
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.Anything).Return([]common.BlockData{}, nil)
508-
mockMainStorage.EXPECT().InsertBlockData(mock.Anything).Return(nil)
507+
mockMainStorage.EXPECT().ReplaceBlockData(mock.Anything).Return([]common.BlockData{}, nil)
509508

510509
handler := NewReorgHandler(mockRPC, mockStorage)
511510
err := handler.handleReorg([]*big.Int{big.NewInt(1), big.NewInt(2), big.NewInt(3)})
@@ -611,12 +610,9 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
611610
{BlockNumber: big.NewInt(105), Data: common.BlockData{}},
612611
})
613612

614-
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
613+
mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(blocks []common.BlockData) bool {
615614
return len(blocks) == 1
616615
})).Return([]common.BlockData{}, nil)
617-
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
618-
return len(data) == 1
619-
})).Return(nil)
620616

621617
handler := NewReorgHandler(mockRPC, mockStorage)
622618
mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99))
@@ -679,12 +675,9 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
679675
{BlockNumber: big.NewInt(108), Data: common.BlockData{}},
680676
})
681677

682-
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
683-
return len(blocks) == 8
684-
})).Return([]common.BlockData{}, nil)
685-
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
678+
mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
686679
return len(data) == 8
687-
})).Return(nil)
680+
})).Return([]common.BlockData{}, nil)
688681

689682
handler := NewReorgHandler(mockRPC, mockStorage)
690683
mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99))
@@ -743,12 +736,9 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
743736
{BlockNumber: big.NewInt(103), Data: common.BlockData{}},
744737
})
745738

746-
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
747-
return len(blocks) == 5
748-
})).Return([]common.BlockData{}, nil)
749-
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
739+
mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
750740
return len(data) == 5
751-
})).Return(nil)
741+
})).Return([]common.BlockData{}, nil)
752742

753743
handler := NewReorgHandler(mockRPC, mockStorage)
754744
mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99))

0 commit comments

Comments
 (0)