Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make the file writing to be idempotent #15

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
BundleStatusCreatedOnChain = 2
BundleStatusSealedOnChain = 3

LoopSleepTime = 10 * time.Millisecond
BSCPauseTime = 3 * time.Second
LoopSleepTime = 10 * time.Millisecond
LoopErrorPauseTime = 2 * time.Second
BSCPauseTime = 3 * time.Second

ETHPauseTime = 90 * time.Second
RPCTimeout = 20 * time.Second
Expand Down Expand Up @@ -111,6 +112,7 @@ func (s *BlobSyncer) StartLoop() {
for range syncTicker.C {
if err = s.sync(); err != nil {
logging.Logger.Errorf("failed to sync, err=%s", err.Error())
time.Sleep(LoopErrorPauseTime)
}
}
}()
Expand All @@ -119,6 +121,7 @@ func (s *BlobSyncer) StartLoop() {
for range verifyTicket.C {
if err := s.verify(); err != nil {
logging.Logger.Errorf("failed to verify, err=%s", err.Error())
time.Sleep(LoopErrorPauseTime)
}
}
}()
Expand Down Expand Up @@ -197,27 +200,30 @@ func (s *BlobSyncer) sync() error {
if err != nil {
return err
}
var dbErr error

if isForkedBlock {
dbErr = s.blobDao.SaveBlockAndBlob(&db.Block{
err := s.blobDao.SaveBlockAndBlob(&db.Block{
Slot: blockID,
BundleName: bundleName,
}, nil)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
} else {
blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName)
if err == nil {
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err == nil {
metrics.SyncedBlockIDGauge.Set(float64(blockID))
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
if err != nil {
logging.Logger.Errorf("failed to convert to block and blobs, err=%s", err.Error())
return err
}
dbErr = err
}
if dbErr != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, dbErr.Error())
return dbErr
if err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave); err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
metrics.SyncedBlockIDGauge.Set(float64(blockID))
// update the block status to processed
if blockID == s.bundleDetail.finalizeBlockID {
// init next bundle
startBlockID := blockID + 1
Expand All @@ -240,12 +246,18 @@ func (s *BlobSyncer) process(bundleName string, blockID uint64, sidecars []*type
return err
}
}
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
// for idempotent
_, err = os.Stat(s.getBundleDir(bundleName))
if !os.IsNotExist(err) {
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
}
}
if blockID == s.bundleDetail.finalizeBlockID {
// this is idempotent
err = s.finalizeCurBundle(bundleName)
if err != nil {
logging.Logger.Errorf("failed to finalize bundle, bundle=%s, err=%s", bundleName, err.Error())
return err
}
logging.Logger.Infof("finalized bundle, bundle_name=%s, bucket_name=%s\n", bundleName, s.getBucketName())
Expand Down Expand Up @@ -298,14 +310,8 @@ func (s *BlobSyncer) finalizeBundle(bundleName, bundleDir, bundleFilePath string
return err
}
}
err = os.RemoveAll(bundleDir)
if err != nil {
return err
}
err = os.Remove(bundleFilePath)
if err != nil && !os.IsNotExist(err) {
return err
}
os.RemoveAll(bundleDir)
os.Remove(bundleFilePath)
return s.blobDao.UpdateBundleStatus(bundleName, db.Finalized)
}

Expand Down Expand Up @@ -395,7 +401,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side
blobsReturn := make([]*db.Blob, 0)

populateBlobTxDetails := func(blockNum uint64) error {
elBlock, err := s.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum)))
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
elBlock, err := s.client.BlockByNumber(ctx, big.NewInt(int64(blockNum)))
if err != nil {
return fmt.Errorf("failed to get block at height %d, err=%s", blockNum, err.Error())
}
Expand All @@ -415,7 +423,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side

switch {
case s.BSCChain():
header, err := s.client.GetBlockHeader(context.Background(), blockNumOrSlot)
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
header, err := s.client.GetBlockHeader(ctx, blockNumOrSlot)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -538,7 +548,9 @@ func (s *BlobSyncer) ETHChain() bool {

func (s *BlobSyncer) GetParams() (*cmn.VersionedParams, error) {
if s.params == nil {
params, err := s.chainClient.GetParams(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
params, err := s.chainClient.GetParams(ctx)
if err != nil {
logging.Logger.Errorf("failed to get params, err=%s", err.Error())
return nil, err
Expand Down
Loading