From c37f1c556264dcb01ed395d1606100318d8c36f4 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 26 Feb 2025 13:02:01 +0530 Subject: [PATCH 01/20] refactor --- component/xload/data_manager.go | 15 ++++++--- component/xload/data_manager_test.go | 20 ++++++++---- component/xload/lister.go | 21 ++++++++---- component/xload/lister_test.go | 47 +++++++++++++++++++++++---- component/xload/splitter.go | 48 +++++++++++++++++----------- component/xload/splitter_test.go | 31 ++++++++++++++---- component/xload/xload.go | 24 ++++++++++++-- component/xload/xload_test.go | 10 +++--- 8 files changed, 161 insertions(+), 55 deletions(-) diff --git a/component/xload/data_manager.go b/component/xload/data_manager.go index 0bd820f81..08a987fc1 100644 --- a/component/xload/data_manager.go +++ b/component/xload/data_manager.go @@ -54,19 +54,24 @@ type remoteDataManager struct { dataManager } -func newRemoteDataManager(remote internal.Component, statsMgr *StatsManager) (*remoteDataManager, error) { - log.Debug("data_manager::NewRemoteDataManager : create new remote data manager") +type newRemoteDataManagerOptions struct { + remote internal.Component + statsMgr *StatsManager +} - if remote == nil || statsMgr == nil { +func newRemoteDataManager(opts *newRemoteDataManagerOptions) (*remoteDataManager, error) { + if opts == nil || opts.remote == nil || opts.statsMgr == nil { log.Err("data_manager::NewRemoteDataManager : invalid parameters sent to create remote data manager") return nil, fmt.Errorf("invalid parameters sent to create remote data manager") } + log.Debug("data_manager::NewRemoteDataManager : create new remote data manager") + rdm := &remoteDataManager{} rdm.SetName(DATA_MANAGER) - rdm.SetRemote(remote) - rdm.SetStatsManager(statsMgr) + rdm.SetRemote(opts.remote) + rdm.SetStatsManager(opts.statsMgr) rdm.Init() return rdm, nil } diff --git a/component/xload/data_manager_test.go b/component/xload/data_manager_test.go index 98fadb573..0baff981e 100644 --- a/component/xload/data_manager_test.go +++ b/component/xload/data_manager_test.go @@ -52,7 +52,12 @@ func (suite *dataManagerTestSuite) SetupSuite() { } func (suite *dataManagerTestSuite) TestNewRemoteDataManager() { - rdm, err := newRemoteDataManager(nil, nil) + rdm, err := newRemoteDataManager(nil) + suite.assert.NotNil(err) + suite.assert.Nil(rdm) + suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager") + + rdm, err = newRemoteDataManager(&newRemoteDataManagerOptions{}) suite.assert.NotNil(err) suite.assert.Nil(rdm) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager") @@ -62,7 +67,10 @@ func (suite *dataManagerTestSuite) TestNewRemoteDataManager() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - rdm, err = newRemoteDataManager(remote, statsMgr) + rdm, err = newRemoteDataManager(&newRemoteDataManagerOptions{ + remote: remote, + statsMgr: statsMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rdm) } @@ -79,16 +87,16 @@ func (suite *dataManagerTestSuite) TestProcessErrors() { Ctx: ctx, } - n, err := rdm.Process(item) + dataLength, err := rdm.Process(item) suite.assert.NotNil(err) - suite.assert.Equal(n, 0) + suite.assert.Equal(dataLength, 0) // cancel the context cancel() - n, err = rdm.Process(item) + dataLength, err = rdm.Process(item) suite.assert.NotNil(err) - suite.assert.Equal(n, 0) + suite.assert.Equal(dataLength, 0) } func TestDatamanagerSuite(t *testing.T) { diff --git a/component/xload/lister.go b/component/xload/lister.go index 6dac82b76..243cee60f 100644 --- a/component/xload/lister.go +++ b/component/xload/lister.go @@ -68,25 +68,32 @@ type remoteLister struct { listBlocked bool } -func newRemoteLister(path string, defaultPermission os.FileMode, remote internal.Component, statsMgr *StatsManager) (*remoteLister, error) { - log.Debug("lister::NewRemoteLister : create new remote lister for %s, default permission %v", path, defaultPermission) +type newRemoteListerOptions struct { + path string + defaultPermission os.FileMode + remote internal.Component + statsMgr *StatsManager +} - if path == "" || remote == nil || statsMgr == nil { +func newRemoteLister(opts *newRemoteListerOptions) (*remoteLister, error) { + if opts == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil { log.Err("lister::NewRemoteLister : invalid parameters sent to create remote lister") return nil, fmt.Errorf("invalid parameters sent to create remote lister") } + log.Debug("lister::NewRemoteLister : create new remote lister for %s, default permission %v", opts.path, opts.defaultPermission) + rl := &remoteLister{ lister: lister{ - path: path, - defaultPermission: defaultPermission, + path: opts.path, + defaultPermission: opts.defaultPermission, }, listBlocked: false, } rl.SetName(LISTER) - rl.SetRemote(remote) - rl.SetStatsManager(statsMgr) + rl.SetRemote(opts.remote) + rl.SetStatsManager(opts.statsMgr) rl.Init() return rl, nil } diff --git a/component/xload/lister_test.go b/component/xload/lister_test.go index 2f804867f..d297c9e6a 100644 --- a/component/xload/lister_test.go +++ b/component/xload/lister_test.go @@ -174,17 +174,37 @@ func (tl *testLister) cleanup() error { } func (suite *listTestSuite) TestNewRemoteLister() { - rl, err := newRemoteLister("", common.DefaultFilePermissionBits, nil, nil) + rl, err := newRemoteLister(nil) suite.assert.NotNil(err) suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") - rl, err = newRemoteLister("home/user/random_path", common.DefaultFilePermissionBits, nil, nil) + rl, err = newRemoteLister(&newRemoteListerOptions{ + path: "", + defaultPermission: common.DefaultFilePermissionBits, + remote: nil, + statsMgr: nil, + }) suite.assert.NotNil(err) suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") - rl, err = newRemoteLister("home/user/random_path", common.DefaultFilePermissionBits, lb, nil) + rl, err = newRemoteLister(&newRemoteListerOptions{ + path: "home/user/random_path", + defaultPermission: common.DefaultFilePermissionBits, + remote: nil, + statsMgr: nil, + }) + suite.assert.NotNil(err) + suite.assert.Nil(rl) + suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") + + rl, err = newRemoteLister(&newRemoteListerOptions{ + path: "home/user/random_path", + defaultPermission: common.DefaultFilePermissionBits, + remote: lb, + statsMgr: nil, + }) suite.assert.NotNil(err) suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") @@ -193,7 +213,12 @@ func (suite *listTestSuite) TestNewRemoteLister() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - rl, err = newRemoteLister("home/user/random_path", common.DefaultFilePermissionBits, lb, statsMgr) + rl, err = newRemoteLister(&newRemoteListerOptions{ + path: "home/user/random_path", + defaultPermission: common.DefaultFilePermissionBits, + remote: lb, + statsMgr: statsMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rl) } @@ -208,7 +233,12 @@ func (suite *listTestSuite) TestListerStartStop() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(tl.path, common.DefaultFilePermissionBits, lb, tl.stMgr) + rl, err := newRemoteLister(&newRemoteListerOptions{ + path: tl.path, + defaultPermission: common.DefaultFilePermissionBits, + remote: lb, + statsMgr: tl.stMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rl) @@ -236,7 +266,12 @@ func (suite *listTestSuite) TestListerMkdir() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(tl.path, common.DefaultFilePermissionBits, lb, tl.stMgr) + rl, err := newRemoteLister(&newRemoteListerOptions{ + path: tl.path, + defaultPermission: common.DefaultFilePermissionBits, + remote: lb, + statsMgr: tl.stMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rl) diff --git a/component/xload/splitter.go b/component/xload/splitter.go index c0fcc1ef7..bec0417c9 100644 --- a/component/xload/splitter.go +++ b/component/xload/splitter.go @@ -52,9 +52,10 @@ var _ XComponent = &downloadSplitter{} type splitter struct { XBase - blockPool *BlockPool - path string - fileLocks *common.LockMap + blockPool *BlockPool + path string + fileLocks *common.LockMap + consistency bool } // -------------------------------------------------------------------------------------------------------- @@ -63,25 +64,35 @@ type downloadSplitter struct { splitter } -func newDownloadSplitter(blockPool *BlockPool, path string, remote internal.Component, statsMgr *StatsManager, fileLocks *common.LockMap) (*downloadSplitter, error) { - if blockPool == nil || path == "" || remote == nil || statsMgr == nil || fileLocks == nil { +type newDownloadSplitterOptions struct { + blockPool *BlockPool + path string + remote internal.Component + statsMgr *StatsManager + fileLocks *common.LockMap + consistency bool +} + +func newDownloadSplitter(opts *newDownloadSplitterOptions) (*downloadSplitter, error) { + if opts == nil || opts.blockPool == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil || opts.fileLocks == nil { log.Err("lister::NewRemoteLister : invalid parameters sent to create download splitter") return nil, fmt.Errorf("invalid parameters sent to create download splitter") } - log.Debug("splitter::NewDownloadSplitter : create new download splitter for %s, block size %v", path, blockPool.GetBlockSize()) + log.Debug("splitter::NewDownloadSplitter : create new download splitter for %s, block size %v", opts.path, opts.blockPool.GetBlockSize()) d := &downloadSplitter{ splitter: splitter{ - blockPool: blockPool, - path: path, - fileLocks: fileLocks, + blockPool: opts.blockPool, + path: opts.path, + fileLocks: opts.fileLocks, + consistency: opts.consistency, }, } d.SetName(SPLITTER) - d.SetRemote(remote) - d.SetStatsManager(statsMgr) + d.SetRemote(opts.remote) + d.SetStatsManager(opts.statsMgr) d.Init() return d, nil } @@ -125,13 +136,14 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { } filePresent, isDir, size := isFilePresent(localPath) - if isDir { - log.Err("downloadSplitter::Process : %s is a directory", item.Path) - return -1, fmt.Errorf("%s is a directory", item.Path) - } - if filePresent && item.DataLen == uint64(size) { - log.Debug("downloadSplitter::Process : %s will be served from local path, priority %v", item.Path, item.Priority) - return int(size), nil + if filePresent { + if isDir { + log.Err("downloadSplitter::Process : %s is a directory", item.Path) + return -1, fmt.Errorf("%s is a directory", item.Path) + } else if item.DataLen == uint64(size) { + log.Debug("downloadSplitter::Process : %s will be served from local path, priority %v", item.Path, item.Priority) + return int(size), nil + } } // TODO:: xload : should we delete the file if it already exists diff --git a/component/xload/splitter_test.go b/component/xload/splitter_test.go index 0d3017d4c..ad5f9fafe 100644 --- a/component/xload/splitter_test.go +++ b/component/xload/splitter_test.go @@ -155,7 +155,12 @@ func (ts *testSplitter) cleanup() error { } func (suite *splitterTestSuite) TestNewDownloadSplitter() { - ds, err := newDownloadSplitter(nil, "", nil, nil, nil) + ds, err := newDownloadSplitter(nil) + suite.assert.NotNil(err) + suite.assert.Nil(ds) + suite.assert.Contains(err.Error(), "invalid parameters sent to create download splitter") + + ds, err = newDownloadSplitter(&newDownloadSplitterOptions{}) suite.assert.NotNil(err) suite.assert.Nil(ds) suite.assert.Contains(err.Error(), "invalid parameters sent to create download splitter") @@ -164,7 +169,13 @@ func (suite *splitterTestSuite) TestNewDownloadSplitter() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - ds, err = newDownloadSplitter(NewBlockPool(1, 1), "/home/user/random_path", remote, statsMgr, common.NewLockMap()) + ds, err = newDownloadSplitter(&newDownloadSplitterOptions{ + blockPool: NewBlockPool(1, 1), + path: "/home/user/random_path", + remote: remote, + statsMgr: statsMgr, + fileLocks: common.NewLockMap(), + }) suite.assert.Nil(err) suite.assert.NotNil(ds) } @@ -179,7 +190,7 @@ func (suite *splitterTestSuite) TestProcessFilePresent() { suite.assert.Nil(err) }() - ds, err := newDownloadSplitter(ts.blockPool, ts.path, remote, ts.stMgr, ts.locks) + ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) suite.assert.Nil(err) suite.assert.NotNil(ds) @@ -208,15 +219,23 @@ func (suite *splitterTestSuite) TestSplitterStartStop() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(ts.path, common.DefaultFilePermissionBits, remote, ts.stMgr) + rl, err := newRemoteLister(&newRemoteListerOptions{ + path: ts.path, + defaultPermission: common.DefaultFilePermissionBits, + remote: remote, + statsMgr: ts.stMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rl) - ds, err := newDownloadSplitter(ts.blockPool, ts.path, remote, ts.stMgr, ts.locks) + ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) suite.assert.Nil(err) suite.assert.NotNil(ds) - rdm, err := newRemoteDataManager(remote, ts.stMgr) + rdm, err := newRemoteDataManager(&newRemoteDataManagerOptions{ + remote: remote, + statsMgr: ts.stMgr, + }) suite.assert.Nil(err) suite.assert.NotNil(rdm) diff --git a/component/xload/xload.go b/component/xload/xload.go index a0839bd44..3ff860a65 100644 --- a/component/xload/xload.go +++ b/component/xload/xload.go @@ -53,6 +53,7 @@ type Xload struct { blockSize uint64 // Size of each block to be cached mode Mode // Mode of the Xload component exportProgress bool // Export the progess of xload operation to json file + consistency bool // validate md5sum on download, if md5sum is set on blob workerCount uint32 // Number of workers running blockPool *BlockPool // Pool of blocks path string // Path on local disk where Xload will operate @@ -68,6 +69,7 @@ type XloadOptions struct { Mode string `config:"mode" yaml:"mode,omitempty"` Path string `config:"path" yaml:"path,omitempty"` ExportProgress bool `config:"export-progress" yaml:"path,omitempty"` + Consistency bool `config:"consistency" yaml:"consistency,omitempty"` // TODO:: xload : add parallelism parameter } @@ -192,6 +194,7 @@ func (xl *Xload) Configure(_ bool) error { xl.mode = mode xl.exportProgress = conf.ExportProgress + xl.consistency = conf.Consistency allowOther := false err = config.UnmarshalKey("allow-other", &allowOther) @@ -276,19 +279,34 @@ func (xl *Xload) createDownloader() error { log.Trace("Xload::createDownloader : Starting downloader") // Create remote lister pool to list remote files - rl, err := newRemoteLister(xl.path, xl.defaultPermission, xl.NextComponent(), xl.statsMgr) + rl, err := newRemoteLister(&newRemoteListerOptions{ + path: xl.path, + defaultPermission: xl.defaultPermission, + remote: xl.NextComponent(), + statsMgr: xl.statsMgr, + }) if err != nil { log.Err("Xload::createDownloader : Unable to create remote lister [%s]", err.Error()) return err } - ds, err := newDownloadSplitter(xl.blockPool, xl.path, xl.NextComponent(), xl.statsMgr, xl.fileLocks) + ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ + blockPool: xl.blockPool, + path: xl.path, + remote: xl.NextComponent(), + statsMgr: xl.statsMgr, + fileLocks: xl.fileLocks, + consistency: xl.consistency, + }) if err != nil { log.Err("Xload::createDownloader : Unable to create download splitter [%s]", err.Error()) return err } - rdm, err := newRemoteDataManager(xl.NextComponent(), xl.statsMgr) + rdm, err := newRemoteDataManager(&newRemoteDataManagerOptions{ + remote: xl.NextComponent(), + statsMgr: xl.statsMgr, + }) if err != nil { log.Err("Xload::startUploader : failed to create remote data manager [%s]", err.Error()) return err diff --git a/component/xload/xload_test.go b/component/xload/xload_test.go index cf4cb7ee0..e2e2599c4 100644 --- a/component/xload/xload_test.go +++ b/component/xload/xload_test.go @@ -360,6 +360,7 @@ func (suite *xloadTestSuite) TestCreateDownloader() { xl := &Xload{} err := xl.createDownloader() suite.assert.NotNil(err) + suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") suite.assert.Len(xl.comps, 0) xl.path = suite.local_path @@ -367,6 +368,7 @@ func (suite *xloadTestSuite) TestCreateDownloader() { xl.statsMgr = &StatsManager{} err = xl.createDownloader() suite.assert.NotNil(err) + suite.assert.Contains(err.Error(), "invalid parameters sent to create download splitter") suite.assert.Len(xl.comps, 0) xl.blockPool = &BlockPool{} @@ -526,7 +528,7 @@ func (suite *xloadTestSuite) TestOpenFileWithDownload() { err = suite.xload.CloseFile(internal.CloseFileOptions{Handle: fh1}) suite.assert.Nil(err) - fh2, err := suite.xload.OpenFile(internal.OpenFileOptions{Name: "dir_0/file_3", Flags: os.O_RDWR, Mode: common.DefaultFilePermissionBits}) + fh2, err := suite.xload.OpenFile(internal.OpenFileOptions{Name: "dir_0/file_3", Flags: os.O_RDONLY, Mode: common.DefaultFilePermissionBits}) suite.assert.Nil(err) suite.assert.NotNil(fh2) suite.assert.Equal(fh2.Size, (int64)(27)) @@ -553,13 +555,13 @@ func (suite *xloadTestSuite) validateMD5WithOpenFile(localPath string, remotePat suite.assert.Nil(err) suite.assert.NotNil(fh) - l, err := computeMD5(localFilePath) + localMD5, err := computeMD5(localFilePath) suite.assert.Nil(err) - r, err := computeMD5(remoteFilePath) + remoteMD5, err := computeMD5(remoteFilePath) suite.assert.Nil(err) - suite.assert.Equal(l, r) + suite.assert.Equal(localMD5, remoteMD5) err = suite.xload.CloseFile(internal.CloseFileOptions{Handle: fh}) suite.assert.Nil(err) From aab746f83fdf1dad89e2e41754392010d381d02d Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 26 Feb 2025 18:03:44 +0530 Subject: [PATCH 02/20] add consistency check --- common/util.go | 12 ++++++++++++ component/azstorage/block_blob.go | 4 ++-- component/azstorage/block_blob_test.go | 6 +++--- component/azstorage/utils.go | 13 ------------- component/azstorage/utils_test.go | 2 +- component/xload/lister.go | 3 ++- component/xload/splitter.go | 26 ++++++++++++++++++++++++++ component/xload/utils.go | 1 + component/xload/xload.go | 4 +++- 9 files changed, 50 insertions(+), 21 deletions(-) diff --git a/common/util.go b/common/util.go index 337059c23..ef8af513d 100644 --- a/common/util.go +++ b/common/util.go @@ -38,6 +38,7 @@ import ( "bytes" "crypto/aes" "crypto/cipher" + "crypto/md5" "crypto/rand" "encoding/binary" "fmt" @@ -513,3 +514,14 @@ func GetCRC64(data []byte, len int) []byte { return checksumBytes } + +func GetMD5(fi *os.File) ([]byte, error) { + hasher := md5.New() + _, err := io.Copy(hasher, fi) + + if err != nil { + return nil, fmt.Errorf("failed to generate md5 [%s]", err.Error()) + } + + return hasher.Sum(nil), nil +} diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index c8ab0e73f..895d5306d 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -823,7 +823,7 @@ func (bb *BlockBlob) ReadToFile(name string, offset int64, count int64, fi *os.F if bb.Config.validateMD5 { // Compute md5 of local file - fileMD5, err := getMD5(fi) + fileMD5, err := common.GetMD5(fi) if err != nil { log.Warn("BlockBlob::ReadToFile : Failed to generate MD5 Sum for %s", name) } else { @@ -1030,7 +1030,7 @@ func (bb *BlockBlob) WriteFromFile(name string, metadata map[string]*string, fi // hence we take cost of calculating md5 only for files which are bigger in size and which will be converted to blocks. md5sum := []byte{} if bb.Config.updateMD5 && stat.Size() >= blockblob.MaxUploadBlobBytes { - md5sum, err = getMD5(fi) + md5sum, err = common.GetMD5(fi) if err != nil { // Md5 sum generation failed so set nil while uploading log.Warn("BlockBlob::WriteFromFile : Failed to generate md5 of %s", name) diff --git a/component/azstorage/block_blob_test.go b/component/azstorage/block_blob_test.go index 2e8cae1b2..6a4cde751 100644 --- a/component/azstorage/block_blob_test.go +++ b/component/azstorage/block_blob_test.go @@ -2864,7 +2864,7 @@ func (s *blockBlobTestSuite) TestMD5SetOnUpload() { s.assert.NotEmpty(prop.MD5) _, _ = f.Seek(0, 0) - localMD5, err := getMD5(f) + localMD5, err := common.GetMD5(f) s.assert.Nil(err) s.assert.EqualValues(localMD5, prop.MD5) @@ -2965,7 +2965,7 @@ func (s *blockBlobTestSuite) TestMD5AutoSetOnUpload() { s.assert.NotEmpty(prop.MD5) _, _ = f.Seek(0, 0) - localMD5, err := getMD5(f) + localMD5, err := common.GetMD5(f) s.assert.Nil(err) s.assert.EqualValues(localMD5, prop.MD5) @@ -3021,7 +3021,7 @@ func (s *blockBlobTestSuite) TestInvalidateMD5PostUpload() { s.assert.NotEmpty(prop.MD5) _, _ = f.Seek(0, 0) - localMD5, err := getMD5(f) + localMD5, err := common.GetMD5(f) s.assert.Nil(err) s.assert.NotEqualValues(localMD5, prop.MD5) diff --git a/component/azstorage/utils.go b/component/azstorage/utils.go index 98383b74b..e1e66a7a5 100644 --- a/component/azstorage/utils.go +++ b/component/azstorage/utils.go @@ -34,11 +34,9 @@ package azstorage import ( - "crypto/md5" "encoding/json" "errors" "fmt" - "io" "net" "net/http" "net/url" @@ -558,17 +556,6 @@ func sanitizeSASKey(key string) string { return key } -func getMD5(fi *os.File) ([]byte, error) { - hasher := md5.New() - _, err := io.Copy(hasher, fi) - - if err != nil { - return nil, errors.New("failed to generate md5") - } - - return hasher.Sum(nil), nil -} - func autoDetectAuthMode(opt AzStorageOptions) string { if opt.ApplicationID != "" || opt.ResourceID != "" || opt.ObjectID != "" { return "msi" diff --git a/component/azstorage/utils_test.go b/component/azstorage/utils_test.go index f7f865b57..423844202 100644 --- a/component/azstorage/utils_test.go +++ b/component/azstorage/utils_test.go @@ -261,7 +261,7 @@ func (s *utilsTestSuite) TestGetMD5() { f, err = os.Open("abc.txt") assert.Nil(err) - md5Sum, err := getMD5(f) + md5Sum, err := common.GetMD5(f) assert.Nil(err) assert.NotZero(md5Sum) diff --git a/component/xload/lister.go b/component/xload/lister.go index 243cee60f..0ff88d940 100644 --- a/component/xload/lister.go +++ b/component/xload/lister.go @@ -200,7 +200,7 @@ func (rl *remoteLister) Process(item *WorkItem) (int, error) { fileMode = entry.Mode } - // send file to the output channel for chunking + // send file to the splitter's channel for chunking rl.GetNext().Schedule(&WorkItem{ CompName: rl.GetNext().GetName(), Path: entry.Path, @@ -208,6 +208,7 @@ func (rl *remoteLister) Process(item *WorkItem) (int, error) { Mode: fileMode, Atime: entry.Atime, Mtime: entry.Mtime, + MD5: entry.MD5, }) } } diff --git a/component/xload/splitter.go b/component/xload/splitter.go index bec0417c9..f6a8a5589 100644 --- a/component/xload/splitter.go +++ b/component/xload/splitter.go @@ -38,6 +38,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "sync" "time" @@ -276,6 +277,31 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { return -1, fmt.Errorf("failed to download data for file %s", item.Path) } + if d.consistency { + // Compute md5 of local file + fileMD5, err := common.GetMD5(item.FileHandle) + if err != nil { + log.Err("downloadSplitter::Process : Failed to generate MD5Sum for %s [%s]", item.Path, err.Error()) + } else { + if item.MD5 == nil { + log.Warn("downloadSplitter::Process : Failed to get MD5Sum for blob %s", item.Path) + } else { + // compare md5 and fail is not match + if !reflect.DeepEqual(fileMD5, item.MD5) { + log.Err("downloadSplitter::Process : MD5Sum mismatch on download for file %s, so deleting it from local path", item.Path) + + // delete the file from the local path if md5sum is not matching + err = os.Remove(localPath) + if err != nil { + log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err.Error()) + } + + return -1, fmt.Errorf("md5sum mismatch on download for file %s", item.Path) + } + } + } + } + log.Debug("downloadSplitter::Process : Download completed for file %s, priority %v", item.Path, item.Priority) return 0, nil } diff --git a/component/xload/utils.go b/component/xload/utils.go index 80536a1f9..283a1ef49 100644 --- a/component/xload/utils.go +++ b/component/xload/utils.go @@ -71,6 +71,7 @@ type WorkItem struct { Download bool // boolean variable to decide upload or download Priority bool // boolean flag to decide if this item needs to be processed on priority Ctx context.Context // context with cancellation method so that if download fails for one block, all other download operations will be cancelled + MD5 []byte // content md5 of the blob which can be used to check the consistency of the download } // xload mode enum diff --git a/component/xload/xload.go b/component/xload/xload.go index 3ff860a65..50e28b5d4 100644 --- a/component/xload/xload.go +++ b/component/xload/xload.go @@ -208,7 +208,8 @@ func (xl *Xload) Configure(_ bool) error { xl.defaultPermission = common.DefaultFilePermissionBits } - log.Debug("Xload::Configure : block size %v, mode %v, path %v, default permission %v", xl.blockSize, xl.mode.String(), xl.path, xl.defaultPermission) + log.Debug("Xload::Configure : block size %v, mode %v, path %v, default permission %v, export progress %v, consistency %v", xl.blockSize, + xl.mode.String(), xl.path, xl.defaultPermission, xl.exportProgress, xl.consistency) return nil } @@ -392,6 +393,7 @@ func (xl *Xload) downloadFile(fileName string) error { Mode: fileMode, Atime: attr.Atime, Mtime: attr.Mtime, + MD5: attr.MD5, }) if err != nil { From 9ab3f29a69d5790fb15d3cb1900671153c02a2b8 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 12:54:15 +0530 Subject: [PATCH 03/20] review comments --- common/util_test.go | 22 ++++++ component/azstorage/utils_test.go | 22 ------ component/xload/data_manager.go | 4 +- component/xload/data_manager_test.go | 4 +- component/xload/lister.go | 4 +- component/xload/lister_test.go | 12 +-- component/xload/splitter.go | 110 +++++++++++++++------------ component/xload/splitter_test.go | 12 +-- component/xload/xload.go | 6 +- 9 files changed, 103 insertions(+), 93 deletions(-) diff --git a/common/util_test.go b/common/util_test.go index 4fdb132a3..2a2e27dd5 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -377,3 +377,25 @@ func (suite *utilTestSuite) TestGetFuseMinorVersion() { i := GetFuseMinorVersion() suite.assert.GreaterOrEqual(i, 0) } + +func (s *utilTestSuite) TestGetMD5() { + assert := assert.New(s.T()) + + f, err := os.Create("abc.txt") + assert.Nil(err) + + _, err = f.Write([]byte(randomString(50))) + assert.Nil(err) + + f.Close() + + f, err = os.Open("abc.txt") + assert.Nil(err) + + md5Sum, err := GetMD5(f) + assert.Nil(err) + assert.NotZero(md5Sum) + + f.Close() + os.Remove("abc.txt") +} diff --git a/component/azstorage/utils_test.go b/component/azstorage/utils_test.go index 423844202..cfda70980 100644 --- a/component/azstorage/utils_test.go +++ b/component/azstorage/utils_test.go @@ -247,28 +247,6 @@ func (s *utilsTestSuite) TestGetFileModeFromACL() { } } -func (s *utilsTestSuite) TestGetMD5() { - assert := assert.New(s.T()) - - f, err := os.Create("abc.txt") - assert.Nil(err) - - _, err = f.Write([]byte(randomString(50))) - assert.Nil(err) - - f.Close() - - f, err = os.Open("abc.txt") - assert.Nil(err) - - md5Sum, err := common.GetMD5(f) - assert.Nil(err) - assert.NotZero(md5Sum) - - f.Close() - os.Remove("abc.txt") -} - func (s *utilsTestSuite) TestSanitizeSASKey() { assert := assert.New(s.T()) diff --git a/component/xload/data_manager.go b/component/xload/data_manager.go index 08a987fc1..fa69f0778 100644 --- a/component/xload/data_manager.go +++ b/component/xload/data_manager.go @@ -54,12 +54,12 @@ type remoteDataManager struct { dataManager } -type newRemoteDataManagerOptions struct { +type remoteDataManagerOptions struct { remote internal.Component statsMgr *StatsManager } -func newRemoteDataManager(opts *newRemoteDataManagerOptions) (*remoteDataManager, error) { +func newRemoteDataManager(opts *remoteDataManagerOptions) (*remoteDataManager, error) { if opts == nil || opts.remote == nil || opts.statsMgr == nil { log.Err("data_manager::NewRemoteDataManager : invalid parameters sent to create remote data manager") return nil, fmt.Errorf("invalid parameters sent to create remote data manager") diff --git a/component/xload/data_manager_test.go b/component/xload/data_manager_test.go index 0baff981e..812e14daf 100644 --- a/component/xload/data_manager_test.go +++ b/component/xload/data_manager_test.go @@ -57,7 +57,7 @@ func (suite *dataManagerTestSuite) TestNewRemoteDataManager() { suite.assert.Nil(rdm) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager") - rdm, err = newRemoteDataManager(&newRemoteDataManagerOptions{}) + rdm, err = newRemoteDataManager(&remoteDataManagerOptions{}) suite.assert.NotNil(err) suite.assert.Nil(rdm) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager") @@ -67,7 +67,7 @@ func (suite *dataManagerTestSuite) TestNewRemoteDataManager() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - rdm, err = newRemoteDataManager(&newRemoteDataManagerOptions{ + rdm, err = newRemoteDataManager(&remoteDataManagerOptions{ remote: remote, statsMgr: statsMgr, }) diff --git a/component/xload/lister.go b/component/xload/lister.go index 0ff88d940..b2e7aa42d 100644 --- a/component/xload/lister.go +++ b/component/xload/lister.go @@ -68,14 +68,14 @@ type remoteLister struct { listBlocked bool } -type newRemoteListerOptions struct { +type remoteListerOptions struct { path string defaultPermission os.FileMode remote internal.Component statsMgr *StatsManager } -func newRemoteLister(opts *newRemoteListerOptions) (*remoteLister, error) { +func newRemoteLister(opts *remoteListerOptions) (*remoteLister, error) { if opts == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil { log.Err("lister::NewRemoteLister : invalid parameters sent to create remote lister") return nil, fmt.Errorf("invalid parameters sent to create remote lister") diff --git a/component/xload/lister_test.go b/component/xload/lister_test.go index d297c9e6a..0ae63b4cf 100644 --- a/component/xload/lister_test.go +++ b/component/xload/lister_test.go @@ -179,7 +179,7 @@ func (suite *listTestSuite) TestNewRemoteLister() { suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") - rl, err = newRemoteLister(&newRemoteListerOptions{ + rl, err = newRemoteLister(&remoteListerOptions{ path: "", defaultPermission: common.DefaultFilePermissionBits, remote: nil, @@ -189,7 +189,7 @@ func (suite *listTestSuite) TestNewRemoteLister() { suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") - rl, err = newRemoteLister(&newRemoteListerOptions{ + rl, err = newRemoteLister(&remoteListerOptions{ path: "home/user/random_path", defaultPermission: common.DefaultFilePermissionBits, remote: nil, @@ -199,7 +199,7 @@ func (suite *listTestSuite) TestNewRemoteLister() { suite.assert.Nil(rl) suite.assert.Contains(err.Error(), "invalid parameters sent to create remote lister") - rl, err = newRemoteLister(&newRemoteListerOptions{ + rl, err = newRemoteLister(&remoteListerOptions{ path: "home/user/random_path", defaultPermission: common.DefaultFilePermissionBits, remote: lb, @@ -213,7 +213,7 @@ func (suite *listTestSuite) TestNewRemoteLister() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - rl, err = newRemoteLister(&newRemoteListerOptions{ + rl, err = newRemoteLister(&remoteListerOptions{ path: "home/user/random_path", defaultPermission: common.DefaultFilePermissionBits, remote: lb, @@ -233,7 +233,7 @@ func (suite *listTestSuite) TestListerStartStop() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(&newRemoteListerOptions{ + rl, err := newRemoteLister(&remoteListerOptions{ path: tl.path, defaultPermission: common.DefaultFilePermissionBits, remote: lb, @@ -266,7 +266,7 @@ func (suite *listTestSuite) TestListerMkdir() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(&newRemoteListerOptions{ + rl, err := newRemoteLister(&remoteListerOptions{ path: tl.path, defaultPermission: common.DefaultFilePermissionBits, remote: lb, diff --git a/component/xload/splitter.go b/component/xload/splitter.go index f6a8a5589..fe26c13f7 100644 --- a/component/xload/splitter.go +++ b/component/xload/splitter.go @@ -65,7 +65,7 @@ type downloadSplitter struct { splitter } -type newDownloadSplitterOptions struct { +type downloadSplitterOptions struct { blockPool *BlockPool path string remote internal.Component @@ -74,7 +74,7 @@ type newDownloadSplitterOptions struct { consistency bool } -func newDownloadSplitter(opts *newDownloadSplitterOptions) (*downloadSplitter, error) { +func newDownloadSplitter(opts *downloadSplitterOptions) (*downloadSplitter, error) { if opts == nil || opts.blockPool == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil || opts.fileLocks == nil { log.Err("lister::NewRemoteLister : invalid parameters sent to create download splitter") return nil, fmt.Errorf("invalid parameters sent to create download splitter") @@ -82,7 +82,7 @@ func newDownloadSplitter(opts *newDownloadSplitterOptions) (*downloadSplitter, e log.Debug("splitter::NewDownloadSplitter : create new download splitter for %s, block size %v", opts.path, opts.blockPool.GetBlockSize()) - d := &downloadSplitter{ + ds := &downloadSplitter{ splitter: splitter{ blockPool: opts.blockPool, path: opts.path, @@ -91,47 +91,47 @@ func newDownloadSplitter(opts *newDownloadSplitterOptions) (*downloadSplitter, e }, } - d.SetName(SPLITTER) - d.SetRemote(opts.remote) - d.SetStatsManager(opts.statsMgr) - d.Init() - return d, nil + ds.SetName(SPLITTER) + ds.SetRemote(opts.remote) + ds.SetStatsManager(opts.statsMgr) + ds.Init() + return ds, nil } -func (d *downloadSplitter) Init() { - d.SetThreadPool(NewThreadPool(MAX_DATA_SPLITTER, d.Process)) - if d.GetThreadPool() == nil { +func (ds *downloadSplitter) Init() { + ds.SetThreadPool(NewThreadPool(MAX_DATA_SPLITTER, ds.Process)) + if ds.GetThreadPool() == nil { log.Err("downloadSplitter::Init : fail to init thread pool") } } -func (d *downloadSplitter) Start() { - log.Debug("downloadSplitter::Start : start download splitter for %s", d.path) - d.GetThreadPool().Start() +func (ds *downloadSplitter) Start() { + log.Debug("downloadSplitter::Start : start download splitter for %s", ds.path) + ds.GetThreadPool().Start() } -func (d *downloadSplitter) Stop() { - log.Debug("downloadSplitter::Stop : stop download splitter for %s", d.path) - if d.GetThreadPool() != nil { - d.GetThreadPool().Stop() +func (ds *downloadSplitter) Stop() { + log.Debug("downloadSplitter::Stop : stop download splitter for %s", ds.path) + if ds.GetThreadPool() != nil { + ds.GetThreadPool().Stop() } - d.GetNext().Stop() + ds.GetNext().Stop() } // download data in chunks and then write to the local file -func (d *downloadSplitter) Process(item *WorkItem) (int, error) { +func (ds *downloadSplitter) Process(item *WorkItem) (int, error) { log.Debug("downloadSplitter::Process : Splitting data for %s, size %v, mode %v, priority %v, access time %v, modified time %v", item.Path, item.DataLen, item.Mode, item.Priority, item.Atime.Format(time.DateTime), item.Mtime.Format(time.DateTime)) var err error - localPath := filepath.Join(d.path, item.Path) + localPath := filepath.Join(ds.path, item.Path) // if priority is false, it means that it has been scheduled by the lister and not by the OpenFile call. // So, get a lock. If the locking goes into wait state, it means the file is already under download by the OpenFile thread. // Otherwise, if there are no other locks, acquire a lock to prevent any OpenFile call from adding a request again. // OpenFile thread already takes a lock on the file in its code, so don't take it again here. if !item.Priority { - flock := d.fileLocks.Get(item.Path) + flock := ds.fileLocks.Get(item.Path) flock.Lock() defer flock.Unlock() } @@ -162,7 +162,7 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { if item.DataLen == 0 { log.Debug("downloadSplitter::Process : 0 byte file %s", item.Path) // send the status to stats manager - d.GetStatsManager().AddStats(&StatsItem{ + ds.GetStatsManager().AddStats(&StatsItem{ Component: SPLITTER, Name: item.Path, Success: true, @@ -185,7 +185,7 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { return -1, fmt.Errorf("failed to truncate file %s [%s]", item.Path, err.Error()) } - numBlocks := ((item.DataLen - 1) / d.blockPool.GetBlockSize()) + 1 + numBlocks := ((item.DataLen - 1) / ds.blockPool.GetBlockSize()) + 1 offset := int64(0) wg := sync.WaitGroup{} @@ -216,22 +216,22 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { if respSplitItem.Block != nil { // log.Debug("downloadSplitter::process : Download successful %s index %d offset %v", item.path, respSplitItem.block.index, respSplitItem.block.offset) - d.blockPool.Release(respSplitItem.Block) + ds.blockPool.Release(respSplitItem.Block) } } }() for i := 0; i < int(numBlocks); i++ { - block := d.blockPool.GetBlock(item.Priority) + block := ds.blockPool.GetBlock(item.Priority) if block == nil { responseChannel <- &WorkItem{Err: fmt.Errorf("failed to get block from pool for file %s, offset %v", item.Path, offset)} } else { block.Index = i block.Offset = offset - block.Length = int64(d.blockPool.GetBlockSize()) + block.Length = int64(ds.blockPool.GetBlockSize()) splitItem := &WorkItem{ - CompName: d.GetNext().GetName(), + CompName: ds.GetNext().GetName(), Path: item.Path, DataLen: item.DataLen, FileHandle: item.FileHandle, @@ -242,10 +242,10 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { Ctx: ctx, } // log.Debug("downloadSplitter::Process : Scheduling download for %s offset %v", item.Path, offset) - d.GetNext().Schedule(splitItem) + ds.GetNext().Schedule(splitItem) } - offset += int64(d.blockPool.GetBlockSize()) + offset += int64(ds.blockPool.GetBlockSize()) } wg.Wait() @@ -258,7 +258,7 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { } // send the download status to stats manager - d.GetStatsManager().AddStats(&StatsItem{ + ds.GetStatsManager().AddStats(&StatsItem{ Component: SPLITTER, Name: item.Path, Success: operationSuccess, @@ -277,31 +277,41 @@ func (d *downloadSplitter) Process(item *WorkItem) (int, error) { return -1, fmt.Errorf("failed to download data for file %s", item.Path) } - if d.consistency { - // Compute md5 of local file - fileMD5, err := common.GetMD5(item.FileHandle) + if ds.consistency { + err = ds.checkConsistency(item) if err != nil { - log.Err("downloadSplitter::Process : Failed to generate MD5Sum for %s [%s]", item.Path, err.Error()) - } else { - if item.MD5 == nil { - log.Warn("downloadSplitter::Process : Failed to get MD5Sum for blob %s", item.Path) - } else { - // compare md5 and fail is not match - if !reflect.DeepEqual(fileMD5, item.MD5) { - log.Err("downloadSplitter::Process : MD5Sum mismatch on download for file %s, so deleting it from local path", item.Path) + log.Err("downloadSplitter::Process : unable to check consistency for %s, so deleting it from local path", item.Path) - // delete the file from the local path if md5sum is not matching - err = os.Remove(localPath) - if err != nil { - log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err.Error()) - } - - return -1, fmt.Errorf("md5sum mismatch on download for file %s", item.Path) - } + // delete the file from the local path if md5sum is not matching + err = os.Remove(localPath) + if err != nil { + log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err.Error()) } + + return -1, fmt.Errorf("md5sum mismatch on download for file %s", item.Path) } } log.Debug("downloadSplitter::Process : Download completed for file %s, priority %v", item.Path, item.Priority) return 0, nil } + +func (ds *downloadSplitter) checkConsistency(item *WorkItem) error { + if item.MD5 == nil { + log.Warn("downloadSplitter::Process : Unable to get MD5Sum for blob %s", item.Path) + } else { + // Compute md5 of local file + fileMD5, err := common.GetMD5(item.FileHandle) + if err != nil { + log.Err("downloadSplitter::Process : Failed to generate MD5Sum for %s [%s]", item.Path, err.Error()) + return err + } + // compare md5 and fail is not match + if !reflect.DeepEqual(fileMD5, item.MD5) { + log.Err("downloadSplitter::Process : MD5Sum mismatch on download for file %s", item.Path) + return fmt.Errorf("md5sum mismatch on download for file %s", item.Path) + } + } + + return nil +} diff --git a/component/xload/splitter_test.go b/component/xload/splitter_test.go index ad5f9fafe..999f7c1ba 100644 --- a/component/xload/splitter_test.go +++ b/component/xload/splitter_test.go @@ -160,7 +160,7 @@ func (suite *splitterTestSuite) TestNewDownloadSplitter() { suite.assert.Nil(ds) suite.assert.Contains(err.Error(), "invalid parameters sent to create download splitter") - ds, err = newDownloadSplitter(&newDownloadSplitterOptions{}) + ds, err = newDownloadSplitter(&downloadSplitterOptions{}) suite.assert.NotNil(err) suite.assert.Nil(ds) suite.assert.Contains(err.Error(), "invalid parameters sent to create download splitter") @@ -169,7 +169,7 @@ func (suite *splitterTestSuite) TestNewDownloadSplitter() { suite.assert.Nil(err) suite.assert.NotNil(statsMgr) - ds, err = newDownloadSplitter(&newDownloadSplitterOptions{ + ds, err = newDownloadSplitter(&downloadSplitterOptions{ blockPool: NewBlockPool(1, 1), path: "/home/user/random_path", remote: remote, @@ -190,7 +190,7 @@ func (suite *splitterTestSuite) TestProcessFilePresent() { suite.assert.Nil(err) }() - ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) + ds, err := newDownloadSplitter(&downloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) suite.assert.Nil(err) suite.assert.NotNil(ds) @@ -219,7 +219,7 @@ func (suite *splitterTestSuite) TestSplitterStartStop() { suite.assert.Nil(err) }() - rl, err := newRemoteLister(&newRemoteListerOptions{ + rl, err := newRemoteLister(&remoteListerOptions{ path: ts.path, defaultPermission: common.DefaultFilePermissionBits, remote: remote, @@ -228,11 +228,11 @@ func (suite *splitterTestSuite) TestSplitterStartStop() { suite.assert.Nil(err) suite.assert.NotNil(rl) - ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) + ds, err := newDownloadSplitter(&downloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) suite.assert.Nil(err) suite.assert.NotNil(ds) - rdm, err := newRemoteDataManager(&newRemoteDataManagerOptions{ + rdm, err := newRemoteDataManager(&remoteDataManagerOptions{ remote: remote, statsMgr: ts.stMgr, }) diff --git a/component/xload/xload.go b/component/xload/xload.go index 50e28b5d4..235fff3e2 100644 --- a/component/xload/xload.go +++ b/component/xload/xload.go @@ -280,7 +280,7 @@ func (xl *Xload) createDownloader() error { log.Trace("Xload::createDownloader : Starting downloader") // Create remote lister pool to list remote files - rl, err := newRemoteLister(&newRemoteListerOptions{ + rl, err := newRemoteLister(&remoteListerOptions{ path: xl.path, defaultPermission: xl.defaultPermission, remote: xl.NextComponent(), @@ -291,7 +291,7 @@ func (xl *Xload) createDownloader() error { return err } - ds, err := newDownloadSplitter(&newDownloadSplitterOptions{ + ds, err := newDownloadSplitter(&downloadSplitterOptions{ blockPool: xl.blockPool, path: xl.path, remote: xl.NextComponent(), @@ -304,7 +304,7 @@ func (xl *Xload) createDownloader() error { return err } - rdm, err := newRemoteDataManager(&newRemoteDataManagerOptions{ + rdm, err := newRemoteDataManager(&remoteDataManagerOptions{ remote: xl.NextComponent(), statsMgr: xl.statsMgr, }) From 7e42360e41f39f7f92e8fa8a400829d199ea0ad4 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 16:43:50 +0530 Subject: [PATCH 04/20] fix --- component/xload/splitter.go | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/component/xload/splitter.go b/component/xload/splitter.go index fe26c13f7..b9e08db3a 100644 --- a/component/xload/splitter.go +++ b/component/xload/splitter.go @@ -151,7 +151,7 @@ func (ds *downloadSplitter) Process(item *WorkItem) (int, error) { // TODO:: xload : what should be the flags // TODO:: xload : verify if the mode is set correctly // TODO:: xload : handle case if blob is a symlink - item.FileHandle, err = os.OpenFile(localPath, os.O_WRONLY|os.O_CREATE, item.Mode) + item.FileHandle, err = os.OpenFile(localPath, os.O_RDWR|os.O_CREATE, item.Mode) if err != nil { log.Err("downloadSplitter::Process : Failed to create file %s [%s]", item.Path, err.Error()) return -1, fmt.Errorf("failed to open file %s [%s]", item.Path, err.Error()) @@ -257,6 +257,14 @@ func (ds *downloadSplitter) Process(item *WorkItem) (int, error) { log.Err("downloadSplitter::Process : Failed to change times of file %s [%s]", item.Path, err.Error()) } + if ds.consistency && operationSuccess { + err = ds.checkConsistency(item) + if err != nil { + log.Err("downloadSplitter::Process : unable to check consistency for %s [%s]", item.Path, err.Error()) + operationSuccess = false + } + } + // send the download status to stats manager ds.GetStatsManager().AddStats(&StatsItem{ Component: SPLITTER, @@ -277,38 +285,23 @@ func (ds *downloadSplitter) Process(item *WorkItem) (int, error) { return -1, fmt.Errorf("failed to download data for file %s", item.Path) } - if ds.consistency { - err = ds.checkConsistency(item) - if err != nil { - log.Err("downloadSplitter::Process : unable to check consistency for %s, so deleting it from local path", item.Path) - - // delete the file from the local path if md5sum is not matching - err = os.Remove(localPath) - if err != nil { - log.Err("downloadSplitter::Process : Failed to delete file %s [%s]", item.Path, err.Error()) - } - - return -1, fmt.Errorf("md5sum mismatch on download for file %s", item.Path) - } - } - log.Debug("downloadSplitter::Process : Download completed for file %s, priority %v", item.Path, item.Priority) return 0, nil } func (ds *downloadSplitter) checkConsistency(item *WorkItem) error { if item.MD5 == nil { - log.Warn("downloadSplitter::Process : Unable to get MD5Sum for blob %s", item.Path) + log.Warn("downloadSplitter::checkConsistency : Unable to get MD5Sum for blob %s", item.Path) } else { // Compute md5 of local file fileMD5, err := common.GetMD5(item.FileHandle) if err != nil { - log.Err("downloadSplitter::Process : Failed to generate MD5Sum for %s [%s]", item.Path, err.Error()) + log.Err("downloadSplitter::checkConsistency : Failed to generate MD5Sum for %s [%s]", item.Path, err.Error()) return err } // compare md5 and fail is not match if !reflect.DeepEqual(fileMD5, item.MD5) { - log.Err("downloadSplitter::Process : MD5Sum mismatch on download for file %s", item.Path) + log.Err("downloadSplitter::checkConsistency : MD5Sum mismatch on download for file %s", item.Path) return fmt.Errorf("md5sum mismatch on download for file %s", item.Path) } } From 9efe353f86a54fd1f862f1977bea5b3e31a58b39 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 17:17:47 +0530 Subject: [PATCH 05/20] fix --- component/xload/splitter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/xload/splitter_test.go b/component/xload/splitter_test.go index 999f7c1ba..acf589c23 100644 --- a/component/xload/splitter_test.go +++ b/component/xload/splitter_test.go @@ -228,7 +228,7 @@ func (suite *splitterTestSuite) TestSplitterStartStop() { suite.assert.Nil(err) suite.assert.NotNil(rl) - ds, err := newDownloadSplitter(&downloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, false}) + ds, err := newDownloadSplitter(&downloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, true}) suite.assert.Nil(err) suite.assert.NotNil(ds) From 3c6bd97f634141812b8296c76331ac9ae9462cff Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 18:00:04 +0530 Subject: [PATCH 06/20] test --- component/loopback/loopback_fs.go | 27 ++++++++++++++++- component/xload/splitter_test.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/component/loopback/loopback_fs.go b/component/loopback/loopback_fs.go index 05b254667..542c0ccc7 100644 --- a/component/loopback/loopback_fs.go +++ b/component/loopback/loopback_fs.go @@ -42,6 +42,7 @@ import ( "strings" "syscall" + "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" @@ -59,7 +60,8 @@ const compName = "loopbackfs" type LoopbackFS struct { internal.BaseComponent - path string + path string + consistency bool } var _ internal.Component = &LoopbackFS{} @@ -88,6 +90,10 @@ func (lfs *LoopbackFS) Configure(_ bool) error { return nil } +func (lfs *LoopbackFS) SetConsistency(consistency bool) { + lfs.consistency = consistency +} + func (lfs *LoopbackFS) Name() string { return compName } @@ -178,12 +184,20 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal for _, file := range files { info, _ := file.Info() + var md5 []byte + if lfs.consistency && !file.IsDir() { + md5, err = computeMD5(filepath.Join(path, file.Name())) + if err != nil { + log.Err("LoopbackFS::StreamDir : failed to compute md5sum [%s]", err) + } + } attr := &internal.ObjAttr{ Path: filepath.Join(options.Name, file.Name()), Name: file.Name(), Size: info.Size(), Mode: info.Mode(), Mtime: info.ModTime(), + MD5: md5, } attr.Flags.Set(internal.PropFlagModeDefault) @@ -196,6 +210,17 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal return attrList, "", nil } +func computeMD5(path string) ([]byte, error) { + fh, err := os.Open(path) + if err != nil { + log.Err("LoopbackFS::computeMD5 : failed to compute md5sum [%s]", err) + return nil, err + } + defer fh.Close() + + return common.GetMD5(fh) +} + func (lfs *LoopbackFS) RenameDir(options internal.RenameDirOptions) error { log.Trace("LoopbackFS::RenameDir : %s -> %s", options.Src, options.Dst) oldPath := filepath.Join(lfs.path, options.Src) diff --git a/component/xload/splitter_test.go b/component/xload/splitter_test.go index acf589c23..2df9382c0 100644 --- a/component/xload/splitter_test.go +++ b/component/xload/splitter_test.go @@ -256,6 +256,56 @@ func (suite *splitterTestSuite) TestSplitterStartStop() { validateMD5(ts.path, remote_path, suite.assert) } +func (suite *splitterTestSuite) TestSplitterConsistency() { + ts, err := setupTestSplitter() + suite.assert.Nil(err) + suite.assert.NotNil(ts) + + remote.(*loopback.LoopbackFS).SetConsistency(true) + + defer func() { + remote.(*loopback.LoopbackFS).SetConsistency(false) + err = ts.cleanup() + suite.assert.Nil(err) + }() + + rl, err := newRemoteLister(&remoteListerOptions{ + path: ts.path, + defaultPermission: common.DefaultFilePermissionBits, + remote: remote, + statsMgr: ts.stMgr, + }) + suite.assert.Nil(err) + suite.assert.NotNil(rl) + + ds, err := newDownloadSplitter(&downloadSplitterOptions{ts.blockPool, ts.path, remote, ts.stMgr, ts.locks, true}) + suite.assert.Nil(err) + suite.assert.NotNil(ds) + + rdm, err := newRemoteDataManager(&remoteDataManagerOptions{ + remote: remote, + statsMgr: ts.stMgr, + }) + suite.assert.Nil(err) + suite.assert.NotNil(rdm) + + // create chain + rl.SetNext(ds) + ds.SetNext(rdm) + + // start components + rdm.Start() + ds.Start() + rl.Start() + + time.Sleep(5 * time.Second) + + // stop comoponents + rl.Stop() + + validateMD5(ts.path, remote_path, suite.assert) +} + func validateMD5(localPath string, remotePath string, assert *assert.Assertions) { entries, err := os.ReadDir(remotePath) assert.Nil(err) From 21068f87484a85c3d36841a04f46227516563763 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 18:48:34 +0530 Subject: [PATCH 07/20] e2e tests --- azure-pipeline-templates/e2e-tests-xload.yml | 109 +++++++++++++++++++ blobfuse2-nightly.yaml | 78 +++++++++++++ testdata/config/azure_key_xload.yaml | 34 ++++++ 3 files changed, 221 insertions(+) create mode 100644 azure-pipeline-templates/e2e-tests-xload.yml create mode 100644 testdata/config/azure_key_xload.yaml diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml new file mode 100644 index 000000000..39838be39 --- /dev/null +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -0,0 +1,109 @@ +parameters: + - name: conf_template + type: string + - name: config_file + type: string + - name: container + type: string + - name: temp_dir + type: string + - name: mount_dir + type: string + - name: idstring + type: string + - name: adls + type: boolean + - name: account_name + type: string + - name: account_key + type: string + - name: account_type + type: string + - name: account_endpoint + - name: distro_name + type: string + - name: quick_test + type: boolean + default: true + - name: mnt_flags + type: string + default: "" + - name: verbose_log + type: boolean + default: false + - name: clone + type: boolean + default: false + +steps: + - script: | + $(WORK_DIR)/blobfuse2 gen-test-config --config-file=$(WORK_DIR)/testdata/config/azure_key.yaml --container-name=${{ parameters.container }} --temp-path=${{ parameters.temp_dir }} --output-file=${{ parameters.config_file }} + displayName: 'Create Config File for RW mount' + env: + NIGHTLY_STO_ACC_NAME: ${{ parameters.account_name }} + NIGHTLY_STO_ACC_KEY: ${{ parameters.account_key }} + ACCOUNT_TYPE: ${{ parameters.account_type }} + ACCOUNT_ENDPOINT: ${{ parameters.account_endpoint }} + VERBOSE_LOG: ${{ parameters.verbose_log }} + continueOnError: false + + - script: + cat ${{ parameters.config_file }} + displayName: 'Print config file' + + # run below step only if direct_io is false + - template: 'mount.yml' + parameters: + working_dir: $(WORK_DIR) + mount_dir: ${{ parameters.mount_dir }} + temp_dir: ${{ parameters.temp_dir }} + prefix: ${{ parameters.idstring }} + mountStep: + script: | + $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) --file-cache-timeout=3200 ${{ parameters.mnt_flags }} + + - script: | + for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' + ls -lh ${{ parameters.mount_dir }}/myfile_* + displayName: 'Generate data' + + - script: | + md5sum ${{ parameters.mount_dir }}/myfile_* > $(WORK_DIR)/md5sum_file_cache.txt + displayName: 'Generate md5Sum with File-Cache' + + - script: | + $(WORK_DIR)/blobfuse2 unmount all + displayName: 'Unmount RW mount' + + - script: | + $(WORK_DIR)/blobfuse2 gen-test-config --config-file=$(WORK_DIR)/testdata/config/azure_key_xload.yaml --container-name=${{ parameters.container }} --temp-path=${{ parameters.temp_dir }} --output-file=${{ parameters.config_file }} + displayName: 'Create Config File for preload' + env: + NIGHTLY_STO_ACC_NAME: ${{ parameters.account_name }} + NIGHTLY_STO_ACC_KEY: ${{ parameters.account_key }} + ACCOUNT_TYPE: ${{ parameters.account_type }} + ACCOUNT_ENDPOINT: ${{ parameters.account_endpoint }} + VERBOSE_LOG: ${{ parameters.verbose_log }} + continueOnError: false + + + - script: + cat ${{ parameters.config_file }} + displayName: 'Print xload config file' + + - task: PublishBuildArtifacts@1 + inputs: + pathToPublish: blobfuse2-logs.txt + artifactName: 'blobfuse_block_cache.txt' + condition: failed() + + - script: | + tail -n 200 blobfuse2-logs.txt + displayName: 'View Logs' + condition: failed() + + - template: 'cleanup.yml' + parameters: + working_dir: $(WORK_DIR) + mount_dir: ${{ parameters.mount_dir }} + temp_dir: ${{ parameters.temp_dir }} \ No newline at end of file diff --git a/blobfuse2-nightly.yaml b/blobfuse2-nightly.yaml index 72a9f2bb8..7f82238d6 100755 --- a/blobfuse2-nightly.yaml +++ b/blobfuse2-nightly.yaml @@ -1683,6 +1683,84 @@ stages: temp_dir: $(TEMP_DIR) mount_dir: $(MOUNT_DIR) + - stage: XloadValidation + jobs: + # Ubuntu Tests + - job: Set_1 + timeoutInMinutes: 300 + strategy: + matrix: + Ubuntu-20: + AgentName: 'blobfuse-ubuntu20' + containerName: 'test-cnt-ubn-20' + adlsSas: $(AZTEST_ADLS_CONT_SAS_UBN_20) + fuselib: 'libfuse-dev' + tags: 'fuse2' + Ubuntu-22: + AgentName: 'blobfuse-ubuntu22' + containerName: 'test-cnt-ubn-22' + adlsSas: $(AZTEST_ADLS_CONT_SAS_UBN_22) + fuselib: 'libfuse3-dev' + tags: 'fuse3' + + pool: + name: "blobfuse-ubuntu-pool" + demands: + - ImageOverride -equals $(AgentName) + + variables: + - group: NightlyBlobFuse + - name: MOUNT_DIR + value: '$(Pipeline.Workspace)/blob_mnt' + - name: TEMP_DIR + value: '$(Pipeline.Workspace)/blobfuse2_tmp' + - name: BLOBFUSE2_CFG + value: '$(Pipeline.Workspace)/blobfuse2.yaml' + - name: BLOBFUSE2_ADLS_CFG + value: '$(Pipeline.Workspace)/blobfuse2.adls.yaml' + - name: skipComponentGovernanceDetection + value: true + - name: GOPATH + value: '$(Pipeline.Workspace)/go' + - name: ROOT_DIR + value: '$(System.DefaultWorkingDirectory)' + - name: WORK_DIR + value: '$(System.DefaultWorkingDirectory)/azure-storage-fuse' + + steps: + # ------------------------------------------------------- + # Pull and build the code + - template: 'azure-pipeline-templates/build.yml' + parameters: + working_directory: $(WORK_DIR) + root_dir: $(ROOT_DIR) + mount_dir: $(MOUNT_DIR) + temp_dir: $(TEMP_DIR) + gopath: $(GOPATH) + container: $(containerName) + tags: $(tags) + fuselib: $(fuselib) + skip_ut: true + + - template: 'azure-pipeline-templates/e2e-tests-xload.yml' + parameters: + conf_template: azure_key.yaml + config_file: $(BLOBFUSE2_CFG) + container: $(containerName) + idstring: Block_Blob + adls: false + account_name: $(NIGHTLY_STO_BLOB_ACC_NAME) + account_key: $(NIGHTLY_STO_BLOB_ACC_KEY) + account_type: block + account_endpoint: https://$(NIGHTLY_STO_BLOB_ACC_NAME).blob.core.windows.net + distro_name: $(AgentName) + quick_test: false + verbose_log: ${{ parameters.verbose_log }} + clone: false + # TODO: These can be removed one day and replace all instances of ${{ parameters.temp_dir }} with $(TEMP_DIR) since it is a global variable + temp_dir: $(TEMP_DIR) + mount_dir: $(MOUNT_DIR) + - stage: Healthmon jobs: - job: Set_1 diff --git a/testdata/config/azure_key_xload.yaml b/testdata/config/azure_key_xload.yaml new file mode 100644 index 000000000..5ce4d38be --- /dev/null +++ b/testdata/config/azure_key_xload.yaml @@ -0,0 +1,34 @@ +logging: + level: log_debug + file-path: "blobfuse2-logs.txt" + type: base + +components: + - libfuse + - xload + - attr_cache + - azstorage + +libfuse: + attribute-expiration-sec: 0 + entry-expiration-sec: 0 + negative-entry-expiration-sec: 0 + ignore-open-flags: true + +xload: + block-size-mb: 16 + path: { 1 } + export-progress: true + consistency: true + +attr_cache: + timeout-sec: 3600 + +azstorage: + type: { ACCOUNT_TYPE } + endpoint: { ACCOUNT_ENDPOINT } + account-name: { NIGHTLY_STO_ACC_NAME } + account-key: { NIGHTLY_STO_ACC_KEY } + mode: key + container: { 0 } + tier: hot From e7b4900568a208a551bdba56c788a63214eab791 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 19:34:06 +0530 Subject: [PATCH 08/20] e2e test --- azure-pipeline-templates/e2e-tests-xload.yml | 53 +++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 39838be39..fbd35c985 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -63,7 +63,8 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) --file-cache-timeout=3200 ${{ parameters.mnt_flags }} - script: | - for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' + # for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' + for i in {1,2,3,4,5}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' ls -lh ${{ parameters.mount_dir }}/myfile_* displayName: 'Generate data' @@ -89,7 +90,55 @@ steps: - script: cat ${{ parameters.config_file }} - displayName: 'Print xload config file' + displayName: 'Print preload config file' + + - template: 'mount.yml' + parameters: + working_dir: $(WORK_DIR) + mount_dir: ${{ parameters.mount_dir }} + temp_dir: ${{ parameters.temp_dir }} + prefix: ${{ parameters.idstring }} + ro_mount: true + mountStep: + script: | + $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} + + - script: + STATS_MANAGER=$(WORK_DIR)/xload_stats_*.json + ls -l $STATS_MANAGER + + while true; do + percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) + echo "PercentCompleted = $percent" + + if [[ "$percent" == "100" ]]; then + echo "Processing complete!" + break + fi + + sleep 5 + done + displayName: 'Print stats manager file' + + - script: | + md5sum ${{ parameters.mount_dir }}/myfile_* > $(WORK_DIR)/md5sum_xload.txt + displayName: 'Generate md5Sum with preload' + + - script: | + $(WORK_DIR)/blobfuse2 unmount all + displayName: 'Unmount preload mount' + + - script: | + echo "----------------------------------------------" + cat $(WORK_DIR)/md5sum_block_cache.txt + echo "----------------------------------------------" + cat $(WORK_DIR)/md5sum_file_cache.txt + echo "----------------------------------------------" + diff $(WORK_DIR)/md5sum_block_cache.txt $(WORK_DIR)/md5sum_file_cache.txt + if [ $? -ne 0 ]; then + exit 1 + fi + displayName: 'Compare md5Sum' - task: PublishBuildArtifacts@1 inputs: From c34f30c1c32d5e734677a9b4f8b6083a9c63ffe9 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 19:43:07 +0530 Subject: [PATCH 09/20] e2e test --- azure-pipeline-templates/e2e-tests-xload.yml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index fbd35c985..3f6d004e2 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -35,7 +35,11 @@ parameters: type: boolean default: false -steps: +steps: + - script: + sudo apt-get install jq -y + displayName: 'Install dependencies' + - script: | $(WORK_DIR)/blobfuse2 gen-test-config --config-file=$(WORK_DIR)/testdata/config/azure_key.yaml --container-name=${{ parameters.container }} --temp-path=${{ parameters.temp_dir }} --output-file=${{ parameters.config_file }} displayName: 'Create Config File for RW mount' @@ -105,6 +109,7 @@ steps: - script: STATS_MANAGER=$(WORK_DIR)/xload_stats_*.json + echo $STATS_MANAGER ls -l $STATS_MANAGER while true; do @@ -130,15 +135,15 @@ steps: - script: | echo "----------------------------------------------" - cat $(WORK_DIR)/md5sum_block_cache.txt + cat $(WORK_DIR)/md5sum_xload.txt echo "----------------------------------------------" cat $(WORK_DIR)/md5sum_file_cache.txt echo "----------------------------------------------" - diff $(WORK_DIR)/md5sum_block_cache.txt $(WORK_DIR)/md5sum_file_cache.txt + diff $(WORK_DIR)/md5sum_xload.txt $(WORK_DIR)/md5sum_file_cache.txt if [ $? -ne 0 ]; then exit 1 fi - displayName: 'Compare md5Sum' + displayName: 'Compare md5sum' - task: PublishBuildArtifacts@1 inputs: From 5309026a63f4b7c43212d48e7b384cff489b6fab Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 19:54:42 +0530 Subject: [PATCH 10/20] debug --- azure-pipeline-templates/e2e-tests-xload.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 3f6d004e2..8253839db 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -108,6 +108,8 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} - script: + ls -l $(WORK_DIR) + echo "----------------------------------------------------------" STATS_MANAGER=$(WORK_DIR)/xload_stats_*.json echo $STATS_MANAGER ls -l $STATS_MANAGER From 7bc8766e91bf63026f693e01eb80e7e547896c48 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 20:00:56 +0530 Subject: [PATCH 11/20] test --- azure-pipeline-templates/e2e-tests-xload.yml | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 8253839db..8e2731e2c 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -108,23 +108,24 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} - script: - ls -l $(WORK_DIR) + echo $(WORK_DIR) + ls -lh $(WORK_DIR) echo "----------------------------------------------------------" STATS_MANAGER=$(WORK_DIR)/xload_stats_*.json echo $STATS_MANAGER ls -l $STATS_MANAGER - while true; do - percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) - echo "PercentCompleted = $percent" + # while true; do + # percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) + # echo "PercentCompleted = $percent" - if [[ "$percent" == "100" ]]; then - echo "Processing complete!" - break - fi + # if [[ "$percent" == "100" ]]; then + # echo "Processing complete!" + # break + # fi - sleep 5 - done + # sleep 5 + # done displayName: 'Print stats manager file' - script: | From f074ca9a3dd4ef0217e261ba40c0b1b97893abd2 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 21:43:22 +0530 Subject: [PATCH 12/20] fix --- azure-pipeline-templates/e2e-tests-xload.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 8e2731e2c..2db83997b 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -107,13 +107,15 @@ steps: script: | $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} - - script: + - script: | + echo "----------------------------------------------" echo $(WORK_DIR) ls -lh $(WORK_DIR) - echo "----------------------------------------------------------" - STATS_MANAGER=$(WORK_DIR)/xload_stats_*.json + echo "----------------------------------------------" + STATS_MANAGER=`ls $(WORK_DIR)/xload_stats_*.json` echo $STATS_MANAGER ls -l $STATS_MANAGER + cat $STATS_MANAGER # while true; do # percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) From 7c072b6959268e2ca5a74d4c990bec947ed8f41f Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 21:55:01 +0530 Subject: [PATCH 13/20] fix --- azure-pipeline-templates/e2e-tests-xload.yml | 23 ++++++++------------ 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 2db83997b..551e9df03 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -108,26 +108,21 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} - script: | - echo "----------------------------------------------" - echo $(WORK_DIR) - ls -lh $(WORK_DIR) echo "----------------------------------------------" STATS_MANAGER=`ls $(WORK_DIR)/xload_stats_*.json` echo $STATS_MANAGER - ls -l $STATS_MANAGER - cat $STATS_MANAGER - # while true; do - # percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) - # echo "PercentCompleted = $percent" + while true; do + percent=$(jq -r '.. | .PercentCompleted? | select(.)' "$STATS_MANAGER" | tail -n 1) + echo "PercentCompleted = $percent" - # if [[ "$percent" == "100" ]]; then - # echo "Processing complete!" - # break - # fi + if [[ "$percent" == "100" ]]; then + echo "Processing complete!" + break + fi - # sleep 5 - # done + sleep 5 + done displayName: 'Print stats manager file' - script: | From a02b837c7c41166bc531c06482d7469f10c14dd6 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 22:17:40 +0530 Subject: [PATCH 14/20] test --- azure-pipeline-templates/e2e-tests-xload.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 551e9df03..70b5de305 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -67,8 +67,7 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) --file-cache-timeout=3200 ${{ parameters.mnt_flags }} - script: | - # for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' - for i in {1,2,3,4,5}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' + for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' ls -lh ${{ parameters.mount_dir }}/myfile_* displayName: 'Generate data' @@ -123,7 +122,7 @@ steps: sleep 5 done - displayName: 'Print stats manager file' + displayName: 'Stats manager info' - script: | md5sum ${{ parameters.mount_dir }}/myfile_* > $(WORK_DIR)/md5sum_xload.txt From f764fd8d193390f4ff292ef9ef4dd6e24750bfed Mon Sep 17 00:00:00 2001 From: souravgupta Date: Wed, 5 Mar 2025 22:43:24 +0530 Subject: [PATCH 15/20] test --- azure-pipeline-templates/e2e-tests-xload.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 70b5de305..36722cc90 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -107,7 +107,6 @@ steps: $(WORK_DIR)/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config_file }} --default-working-dir=$(WORK_DIR) -o ro ${{ parameters.mnt_flags }} - script: | - echo "----------------------------------------------" STATS_MANAGER=`ls $(WORK_DIR)/xload_stats_*.json` echo $STATS_MANAGER @@ -122,6 +121,9 @@ steps: sleep 5 done + + echo "----------------------------------------------" + tail -n 100 $STATS_MANAGER displayName: 'Stats manager info' - script: | From af23a2df55146dc92c51d442e765850fa3371cfb Mon Sep 17 00:00:00 2001 From: souravgupta Date: Thu, 6 Mar 2025 14:54:37 +0530 Subject: [PATCH 16/20] adding more files --- azure-pipeline-templates/e2e-tests-xload.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 36722cc90..0cb5a93f0 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -68,11 +68,14 @@ steps: - script: | for i in {1,2,3,4,5,6,7,8,9,10,20,30,50,100,200,1024,2048,4096}; do echo $i; done | parallel --will-cite -j 5 'head -c {}M < /dev/urandom > ${{ parameters.mount_dir }}/myfile_{}' - ls -lh ${{ parameters.mount_dir }}/myfile_* + for i in $(seq 1 10); do echo $(shuf -i 0-4294967296 -n 1); done | parallel --will-cite -j 5 'head -c {} < /dev/urandom > ${{ parameters.mount_dir }}/datafiles_{}' + cd ${{ parameters.mount_dir }} + python3 $(WORK_DIR)/testdata/scripts/generate-parquet-files.py + ls -l ${{ parameters.mount_dir }}/* displayName: 'Generate data' - script: | - md5sum ${{ parameters.mount_dir }}/myfile_* > $(WORK_DIR)/md5sum_file_cache.txt + md5sum ${{ parameters.mount_dir }}/* > $(WORK_DIR)/md5sum_file_cache.txt displayName: 'Generate md5Sum with File-Cache' - script: | @@ -127,7 +130,7 @@ steps: displayName: 'Stats manager info' - script: | - md5sum ${{ parameters.mount_dir }}/myfile_* > $(WORK_DIR)/md5sum_xload.txt + md5sum ${{ parameters.mount_dir }}/* > $(WORK_DIR)/md5sum_xload.txt displayName: 'Generate md5Sum with preload' - script: | From b1c6f8264625e25b4a7e05c885912027dc6e9129 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Thu, 6 Mar 2025 15:11:36 +0530 Subject: [PATCH 17/20] install dependencies --- azure-pipeline-templates/e2e-tests-xload.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 0cb5a93f0..72f9fb0a8 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -38,6 +38,9 @@ parameters: steps: - script: sudo apt-get install jq -y + sudo apt-get install python3-setuptools -y + sudo apt install python3-pip -y + sudo pip3 install pandas numpy pyarrow fastparquet displayName: 'Install dependencies' - script: | From 7780be3564eef9c354159100151ff4c498b1d3e1 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Thu, 6 Mar 2025 15:29:46 +0530 Subject: [PATCH 18/20] fix --- azure-pipeline-templates/e2e-tests-xload.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 72f9fb0a8..492c9016c 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -37,9 +37,8 @@ parameters: steps: - script: - sudo apt-get install jq -y - sudo apt-get install python3-setuptools -y - sudo apt install python3-pip -y + sudo apt-get update + sudo apt-get install jq python3-setuptools python3-pip -y sudo pip3 install pandas numpy pyarrow fastparquet displayName: 'Install dependencies' From 813cd0893ad3a4867bb1d5fd3029a60bc1709a3c Mon Sep 17 00:00:00 2001 From: souravgupta Date: Thu, 6 Mar 2025 15:47:13 +0530 Subject: [PATCH 19/20] fix --- azure-pipeline-templates/e2e-tests-xload.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/azure-pipeline-templates/e2e-tests-xload.yml b/azure-pipeline-templates/e2e-tests-xload.yml index 492c9016c..a27db8d88 100644 --- a/azure-pipeline-templates/e2e-tests-xload.yml +++ b/azure-pipeline-templates/e2e-tests-xload.yml @@ -36,7 +36,7 @@ parameters: default: false steps: - - script: + - script: | sudo apt-get update sudo apt-get install jq python3-setuptools python3-pip -y sudo pip3 install pandas numpy pyarrow fastparquet @@ -127,6 +127,9 @@ steps: sleep 5 done + echo "----------------------------------------------" + head -n 20 $STATS_MANAGER + echo "----------------------------------------------" tail -n 100 $STATS_MANAGER displayName: 'Stats manager info' From 0b9e6e4c3df8696f30bff63197ab3316c3669e57 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Thu, 6 Mar 2025 16:34:18 +0530 Subject: [PATCH 20/20] todo --- component/xload/splitter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/component/xload/splitter.go b/component/xload/splitter.go index b9e08db3a..d78c9e5c3 100644 --- a/component/xload/splitter.go +++ b/component/xload/splitter.go @@ -260,6 +260,7 @@ func (ds *downloadSplitter) Process(item *WorkItem) (int, error) { if ds.consistency && operationSuccess { err = ds.checkConsistency(item) if err != nil { + // TODO:: xload : retry if consistency check fails log.Err("downloadSplitter::Process : unable to check consistency for %s [%s]", item.Path, err.Error()) operationSuccess = false }