Skip to content

Commit

Permalink
fix: make the file writing to be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Aug 19, 2024
1 parent 7a35d28 commit 20dd9df
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *BlobSyncer) StartLoop() {
for range syncTicker.C {
if err = s.sync(); err != nil {
logging.Logger.Errorf("failed to sync, err=%s", err.Error())
time.Sleep(2 * time.Second)
}
}
}()
Expand All @@ -119,6 +120,7 @@ func (s *BlobSyncer) StartLoop() {
for range verifyTicket.C {
if err := s.verify(); err != nil {
logging.Logger.Errorf("failed to verify, err=%s", err.Error())
time.Sleep(2 * time.Second)
}
}
}()
Expand Down Expand Up @@ -197,27 +199,30 @@ func (s *BlobSyncer) sync() error {
if err != nil {
return err
}
var dbErr error

if isForkedBlock {
dbErr = s.blobDao.SaveBlockAndBlob(&db.Block{
err := s.blobDao.SaveBlockAndBlob(&db.Block{
Slot: blockID,
BundleName: bundleName,
}, nil)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
} else {
blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName)
if err == nil {
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err == nil {
metrics.SyncedBlockIDGauge.Set(float64(blockID))
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
if err != nil {
logging.Logger.Errorf("failed to convert to block and blobs, err=%s", err.Error())
return err
}
dbErr = err
}
if dbErr != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, dbErr.Error())
return dbErr
if err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave); err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
metrics.SyncedBlockIDGauge.Set(float64(blockID))
// update the block status to processed
if blockID == s.bundleDetail.finalizeBlockID {
// init next bundle
startBlockID := blockID + 1
Expand All @@ -240,12 +245,18 @@ func (s *BlobSyncer) process(bundleName string, blockID uint64, sidecars []*type
return err
}
}
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
// for idempotent
_, err = os.Stat(s.getBundleDir(bundleName))
if !os.IsNotExist(err) {
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
}
}
if blockID == s.bundleDetail.finalizeBlockID {
// this is idempotent
err = s.finalizeCurBundle(bundleName)
if err != nil {
logging.Logger.Errorf("failed to finalize bundle, bundle=%s, err=%s", bundleName, err.Error())
return err
}
logging.Logger.Infof("finalized bundle, bundle_name=%s, bucket_name=%s\n", bundleName, s.getBucketName())
Expand Down Expand Up @@ -298,14 +309,8 @@ func (s *BlobSyncer) finalizeBundle(bundleName, bundleDir, bundleFilePath string
return err
}
}
err = os.RemoveAll(bundleDir)
if err != nil {
return err
}
err = os.Remove(bundleFilePath)
if err != nil && !os.IsNotExist(err) {
return err
}
os.RemoveAll(bundleDir)
os.Remove(bundleFilePath)
return s.blobDao.UpdateBundleStatus(bundleName, db.Finalized)
}

Expand Down Expand Up @@ -395,7 +400,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side
blobsReturn := make([]*db.Blob, 0)

populateBlobTxDetails := func(blockNum uint64) error {
elBlock, err := s.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum)))
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
elBlock, err := s.client.BlockByNumber(ctx, big.NewInt(int64(blockNum)))
if err != nil {
return fmt.Errorf("failed to get block at height %d, err=%s", blockNum, err.Error())
}
Expand All @@ -415,7 +422,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side

switch {
case s.BSCChain():
header, err := s.client.GetBlockHeader(context.Background(), blockNumOrSlot)
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
header, err := s.client.GetBlockHeader(ctx, blockNumOrSlot)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -538,7 +547,9 @@ func (s *BlobSyncer) ETHChain() bool {

func (s *BlobSyncer) GetParams() (*cmn.VersionedParams, error) {
if s.params == nil {
params, err := s.chainClient.GetParams(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
params, err := s.chainClient.GetParams(ctx)
if err != nil {
logging.Logger.Errorf("failed to get params, err=%s", err.Error())
return nil, err
Expand Down

0 comments on commit 20dd9df

Please sign in to comment.