Skip to content

Commit

Permalink
enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Apr 17, 2024
1 parent 1e47fc8 commit 1c7107b
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ assignees: ''

#### System information

Relayer version: (if getting from release page)
Blob syncer version: (if getting from release page)
OS & Version: Windows/Linux/OSX
Commit hash : (if `develop`)

Expand Down
2 changes: 1 addition & 1 deletion config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ const (
EnvVarDBUserPass = "DB_PASSWORD"
EnvVarPrivateKey = "PRIVATE_KEY"

DefaultCreateBundleSlotInterval = 10
DefaultCreateBundleSlotInterval = 20
)
1 change: 1 addition & 0 deletions db/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type Status int
const (
Processed Status = 0
Verified Status = 1 // each block's blobs will be verified by the post-verification process
Skipped Status = 2
)

type Block struct {
Expand Down
1 change: 1 addition & 0 deletions db/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
Finalizing InnerBundleStatus = 0
Finalized InnerBundleStatus = 1 // when a bundle is uploaded to bundle service, its status will be Finalized
Sealed InnerBundleStatus = 2 // todo The post verification process should check if a bundle is indeed sealed onchain
Deprecated InnerBundleStatus = 3
)

type Bundle struct {
Expand Down
18 changes: 13 additions & 5 deletions db/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type BlockDB interface {
GetBlockByRoot(root string) (*Block, error)
GetLatestProcessedBlock() (*Block, error)
GetEarliestUnverifiedBlock() (*Block, error)
UpdateBlockToVerifiedStatus(slot uint64) error
UpdateBlockStatus(slot uint64, status Status) error
UpdateBlocksStatus(startSlot, endSlot uint64, status Status) error
}

func (d *BlobSvcDB) GetBlock(slot uint64) (*Block, error) {
Expand Down Expand Up @@ -65,10 +66,17 @@ func (d *BlobSvcDB) GetEarliestUnverifiedBlock() (*Block, error) {
return &block, nil
}

func (d *BlobSvcDB) UpdateBlockToVerifiedStatus(slot uint64) error {
func (d *BlobSvcDB) UpdateBlockStatus(slot uint64, status Status) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
return dbTx.Model(Block{}).Where("slot = ?", slot).Updates(
Block{Status: Verified}).Error
Block{Status: status}).Error
})
}

func (d *BlobSvcDB) UpdateBlocksStatus(startSlot, endSlot uint64, status Status) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
return dbTx.Model(Block{}).Where("slot >= ? and slot <= ?", startSlot, endSlot).Updates(
Block{Status: status}).Error
})
}

Expand Down Expand Up @@ -138,12 +146,12 @@ func (d *BlobSvcDB) UpdateBundleStatus(bundleName string, status InnerBundleStat
func (d *BlobSvcDB) SaveBlockAndBlob(block *Block, blobs []*Blob) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
err := dbTx.Save(block).Error
if err != nil && MysqlErrCode(err) == ErrDuplicateEntryCode {
if err != nil && MysqlErrCode(err) != ErrDuplicateEntryCode {
return err
}
if len(blobs) != 0 {
err = dbTx.Save(blobs).Error
if err != nil && MysqlErrCode(err) == ErrDuplicateEntryCode {
if err != nil && MysqlErrCode(err) != ErrDuplicateEntryCode {
return err
}
}
Expand Down
16 changes: 16 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,22 @@ func (s *BlobSyncer) LoadProgressAndResume(nextSlot uint64) error {
if err != nil {
return err
}

// might no longer need to process the bundle even-thought it is not finalized if the user set the config to skip it.
if nextSlot > endSlot {
err = s.blobDao.UpdateBlocksStatus(startSlot, endSlot, db.Skipped)
if err != nil {
logging.Logger.Errorf("failed to update blocks status, startSlot=%d, endSlot=%d", startSlot, endSlot)
return err
}
logging.Logger.Infof("the config slot number %d is larger than the recorded bundle end slot %d, will resume from the config slot", nextSlot, endSlot)
if err = s.blobDao.UpdateBundleStatus(finalizingBundle.Name, db.Deprecated); err != nil {
return err
}
startSlot = nextSlot
endSlot = nextSlot + s.getCreateBundleSlotInterval() - 1
}

}
s.bundleDetail = &curBundleDetail{
name: types.GetBundleName(startSlot, endSlot),
Expand Down
37 changes: 29 additions & 8 deletions syncer/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *BlobSyncer) verify() error {
}
verifyBlockSlot := verifyBlock.Slot
if verifyBlock.BlobCount == 0 {
if err = s.blobDao.UpdateBlockToVerifiedStatus(verifyBlockSlot); err != nil {
if err = s.blobDao.UpdateBlockStatus(verifyBlockSlot, db.Verified); err != nil {
logging.Logger.Errorf("failed to update block status, slot=%d err=%s", verifyBlockSlot, err.Error())
return err
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func (s *BlobSyncer) verify() error {
}
return err
}
if err = s.blobDao.UpdateBlockToVerifiedStatus(verifyBlockSlot); err != nil {
logging.Logger.Errorf("failed to update block status, slot=%d err=%s", verifyBlockSlot, err.Error())
if err = s.blobDao.UpdateBlockStatus(verifyBlockSlot, db.Verified); err != nil {
logging.Logger.Errorf("failed to update block status to verified, slot=%d err=%s", verifyBlockSlot, err.Error())
return err
}
metrics.VerifiedSlotGauge.Set(float64(verifyBlockSlot))
Expand Down Expand Up @@ -177,6 +177,9 @@ func (s *BlobSyncer) verifyBlobAtSlot(slot uint64, sidecars []*structs.Sidecar,
}

func (s *BlobSyncer) reUploadBundle(bundleName string) error {
if err := s.blobDao.UpdateBundleStatus(bundleName, db.Deprecated); err != nil {
return err
}
newBundleName := bundleName + "_calibrated_" + util.Int64ToString(time.Now().Unix())
startSlot, endSlot, err := types.ParseBundleName(bundleName)
if err != nil {
Expand All @@ -190,12 +193,13 @@ func (s *BlobSyncer) reUploadBundle(bundleName string) error {
}
}
if err = s.blobDao.CreateBundle(&db.Bundle{
Name: newBundleName,
Status: db.Finalizing,
Name: newBundleName,
Status: db.Finalizing,
Calibrated: true,
}); err != nil {
return err
}
for slot := startSlot; slot < endSlot; slot++ {
for slot := startSlot; slot <= endSlot; slot++ {
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
sideCars, err := s.ethClients.BeaconClient.GetBlob(ctx, slot)
Expand All @@ -206,22 +210,39 @@ func (s *BlobSyncer) reUploadBundle(bundleName string) error {
return err
}
block, err := s.ethClients.BeaconClient.GetBlock(ctx, slot)
if err != nil {
if err == external.ErrBlockNotFound {
continue
}
return err
}
blockMeta, err := s.blobDao.GetBlock(slot)
if err != nil {
return err
}
blobMetas, err := s.blobDao.GetBlobBySlot(slot)
if err != nil {
return err
}
blockToSave, blobToSave, err := s.ToBlockAndBlobs(block, sideCars, slot, newBundleName)
if err != nil {
return err
}
blockToSave.Id = blockMeta.Id
for i, preBlob := range blobMetas {
if i < len(blobToSave) {
blobToSave[i].Id = preBlob.Id
}
}
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) and Blob(count=%d), err=%s", blockToSave.Slot, len(blobToSave), err.Error())
return err
}
logging.Logger.Infof("save calibrated block(slot=%d) and blobs(num=%d) to DB \n", slot, len(blobToSave))
}

if err := s.finalizeBundle(newBundleName, s.getBundleDir(newBundleName), s.getBundleFilePath(newBundleName)); err != nil {
if err = s.finalizeBundle(newBundleName, s.getBundleDir(newBundleName), s.getBundleFilePath(newBundleName)); err != nil {
logging.Logger.Errorf("failed to finalized bundle, name=%s, err=%s", newBundleName, err.Error())
return err
}
return nil
Expand Down

0 comments on commit 1c7107b

Please sign in to comment.