Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xload: Add consistency check #1644

Open
wants to merge 6 commits into
base: feature/xload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/md5"
"crypto/rand"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -513,3 +514,14 @@ func GetCRC64(data []byte, len int) []byte {

return checksumBytes
}

func GetMD5(fi *os.File) ([]byte, error) {
hasher := md5.New()
_, err := io.Copy(hasher, fi)

if err != nil {
return nil, fmt.Errorf("failed to generate md5 [%s]", err.Error())
}

return hasher.Sum(nil), nil
}
22 changes: 22 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,25 @@ func (suite *utilTestSuite) TestGetFuseMinorVersion() {
i := GetFuseMinorVersion()
suite.assert.GreaterOrEqual(i, 0)
}

func (s *utilTestSuite) TestGetMD5() {
assert := assert.New(s.T())

f, err := os.Create("abc.txt")
assert.Nil(err)

_, err = f.Write([]byte(randomString(50)))
assert.Nil(err)

f.Close()

f, err = os.Open("abc.txt")
assert.Nil(err)

md5Sum, err := GetMD5(f)
assert.Nil(err)
assert.NotZero(md5Sum)

f.Close()
os.Remove("abc.txt")
}
4 changes: 2 additions & 2 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (bb *BlockBlob) ReadToFile(name string, offset int64, count int64, fi *os.F

if bb.Config.validateMD5 {
// Compute md5 of local file
fileMD5, err := getMD5(fi)
fileMD5, err := common.GetMD5(fi)
if err != nil {
log.Warn("BlockBlob::ReadToFile : Failed to generate MD5 Sum for %s", name)
} else {
Expand Down Expand Up @@ -1030,7 +1030,7 @@ func (bb *BlockBlob) WriteFromFile(name string, metadata map[string]*string, fi
// hence we take cost of calculating md5 only for files which are bigger in size and which will be converted to blocks.
md5sum := []byte{}
if bb.Config.updateMD5 && stat.Size() >= blockblob.MaxUploadBlobBytes {
md5sum, err = getMD5(fi)
md5sum, err = common.GetMD5(fi)
if err != nil {
// Md5 sum generation failed so set nil while uploading
log.Warn("BlockBlob::WriteFromFile : Failed to generate md5 of %s", name)
Expand Down
6 changes: 3 additions & 3 deletions component/azstorage/block_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2864,7 +2864,7 @@ func (s *blockBlobTestSuite) TestMD5SetOnUpload() {
s.assert.NotEmpty(prop.MD5)

_, _ = f.Seek(0, 0)
localMD5, err := getMD5(f)
localMD5, err := common.GetMD5(f)
s.assert.Nil(err)
s.assert.EqualValues(localMD5, prop.MD5)

Expand Down Expand Up @@ -2965,7 +2965,7 @@ func (s *blockBlobTestSuite) TestMD5AutoSetOnUpload() {
s.assert.NotEmpty(prop.MD5)

_, _ = f.Seek(0, 0)
localMD5, err := getMD5(f)
localMD5, err := common.GetMD5(f)
s.assert.Nil(err)
s.assert.EqualValues(localMD5, prop.MD5)

Expand Down Expand Up @@ -3021,7 +3021,7 @@ func (s *blockBlobTestSuite) TestInvalidateMD5PostUpload() {
s.assert.NotEmpty(prop.MD5)

_, _ = f.Seek(0, 0)
localMD5, err := getMD5(f)
localMD5, err := common.GetMD5(f)
s.assert.Nil(err)
s.assert.NotEqualValues(localMD5, prop.MD5)

Expand Down
13 changes: 0 additions & 13 deletions component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
package azstorage

import (
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -558,17 +556,6 @@ func sanitizeSASKey(key string) string {
return key
}

func getMD5(fi *os.File) ([]byte, error) {
hasher := md5.New()
_, err := io.Copy(hasher, fi)

if err != nil {
return nil, errors.New("failed to generate md5")
}

return hasher.Sum(nil), nil
}

func autoDetectAuthMode(opt AzStorageOptions) string {
if opt.ApplicationID != "" || opt.ResourceID != "" || opt.ObjectID != "" {
return "msi"
Expand Down
22 changes: 0 additions & 22 deletions component/azstorage/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,28 +247,6 @@ func (s *utilsTestSuite) TestGetFileModeFromACL() {
}
}

func (s *utilsTestSuite) TestGetMD5() {
assert := assert.New(s.T())

f, err := os.Create("abc.txt")
assert.Nil(err)

_, err = f.Write([]byte(randomString(50)))
assert.Nil(err)

f.Close()

f, err = os.Open("abc.txt")
assert.Nil(err)

md5Sum, err := getMD5(f)
assert.Nil(err)
assert.NotZero(md5Sum)

f.Close()
os.Remove("abc.txt")
}

func (s *utilsTestSuite) TestSanitizeSASKey() {
assert := assert.New(s.T())

Expand Down
27 changes: 26 additions & 1 deletion component/loopback/loopback_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"strings"
"syscall"

"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
Expand All @@ -59,7 +60,8 @@ const compName = "loopbackfs"
type LoopbackFS struct {
internal.BaseComponent

path string
path string
consistency bool
}

var _ internal.Component = &LoopbackFS{}
Expand Down Expand Up @@ -88,6 +90,10 @@ func (lfs *LoopbackFS) Configure(_ bool) error {
return nil
}

func (lfs *LoopbackFS) SetConsistency(consistency bool) {
lfs.consistency = consistency
}

func (lfs *LoopbackFS) Name() string {
return compName
}
Expand Down Expand Up @@ -178,12 +184,20 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal

for _, file := range files {
info, _ := file.Info()
var md5 []byte
if lfs.consistency && !file.IsDir() {
md5, err = computeMD5(filepath.Join(path, file.Name()))
if err != nil {
log.Err("LoopbackFS::StreamDir : failed to compute md5sum [%s]", err)
}
}
attr := &internal.ObjAttr{
Path: filepath.Join(options.Name, file.Name()),
Name: file.Name(),
Size: info.Size(),
Mode: info.Mode(),
Mtime: info.ModTime(),
MD5: md5,
}
attr.Flags.Set(internal.PropFlagModeDefault)

Expand All @@ -196,6 +210,17 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal
return attrList, "", nil
}

func computeMD5(path string) ([]byte, error) {
fh, err := os.Open(path)
if err != nil {
log.Err("LoopbackFS::computeMD5 : failed to compute md5sum [%s]", err)
return nil, err
}
defer fh.Close()

return common.GetMD5(fh)
}

func (lfs *LoopbackFS) RenameDir(options internal.RenameDirOptions) error {
log.Trace("LoopbackFS::RenameDir : %s -> %s", options.Src, options.Dst)
oldPath := filepath.Join(lfs.path, options.Src)
Expand Down
15 changes: 10 additions & 5 deletions component/xload/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,24 @@ type remoteDataManager struct {
dataManager
}

func newRemoteDataManager(remote internal.Component, statsMgr *StatsManager) (*remoteDataManager, error) {
log.Debug("data_manager::NewRemoteDataManager : create new remote data manager")
type remoteDataManagerOptions struct {
remote internal.Component
statsMgr *StatsManager
}

if remote == nil || statsMgr == nil {
func newRemoteDataManager(opts *remoteDataManagerOptions) (*remoteDataManager, error) {
if opts == nil || opts.remote == nil || opts.statsMgr == nil {
log.Err("data_manager::NewRemoteDataManager : invalid parameters sent to create remote data manager")
return nil, fmt.Errorf("invalid parameters sent to create remote data manager")
}

log.Debug("data_manager::NewRemoteDataManager : create new remote data manager")

rdm := &remoteDataManager{}

rdm.SetName(DATA_MANAGER)
rdm.SetRemote(remote)
rdm.SetStatsManager(statsMgr)
rdm.SetRemote(opts.remote)
rdm.SetStatsManager(opts.statsMgr)
rdm.Init()
return rdm, nil
}
Expand Down
20 changes: 14 additions & 6 deletions component/xload/data_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ func (suite *dataManagerTestSuite) SetupSuite() {
}

func (suite *dataManagerTestSuite) TestNewRemoteDataManager() {
rdm, err := newRemoteDataManager(nil, nil)
rdm, err := newRemoteDataManager(nil)
suite.assert.NotNil(err)
suite.assert.Nil(rdm)
suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager")

rdm, err = newRemoteDataManager(&remoteDataManagerOptions{})
suite.assert.NotNil(err)
suite.assert.Nil(rdm)
suite.assert.Contains(err.Error(), "invalid parameters sent to create remote data manager")
Expand All @@ -62,7 +67,10 @@ func (suite *dataManagerTestSuite) TestNewRemoteDataManager() {
suite.assert.Nil(err)
suite.assert.NotNil(statsMgr)

rdm, err = newRemoteDataManager(remote, statsMgr)
rdm, err = newRemoteDataManager(&remoteDataManagerOptions{
remote: remote,
statsMgr: statsMgr,
})
suite.assert.Nil(err)
suite.assert.NotNil(rdm)
}
Expand All @@ -79,16 +87,16 @@ func (suite *dataManagerTestSuite) TestProcessErrors() {
Ctx: ctx,
}

n, err := rdm.Process(item)
dataLength, err := rdm.Process(item)
suite.assert.NotNil(err)
suite.assert.Equal(n, 0)
suite.assert.Equal(dataLength, 0)

// cancel the context
cancel()

n, err = rdm.Process(item)
dataLength, err = rdm.Process(item)
suite.assert.NotNil(err)
suite.assert.Equal(n, 0)
suite.assert.Equal(dataLength, 0)
}

func TestDatamanagerSuite(t *testing.T) {
Expand Down
24 changes: 16 additions & 8 deletions component/xload/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,32 @@ type remoteLister struct {
listBlocked bool
}

func newRemoteLister(path string, defaultPermission os.FileMode, remote internal.Component, statsMgr *StatsManager) (*remoteLister, error) {
log.Debug("lister::NewRemoteLister : create new remote lister for %s, default permission %v", path, defaultPermission)
type remoteListerOptions struct {
path string
defaultPermission os.FileMode
remote internal.Component
statsMgr *StatsManager
}

if path == "" || remote == nil || statsMgr == nil {
func newRemoteLister(opts *remoteListerOptions) (*remoteLister, error) {
if opts == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil {
log.Err("lister::NewRemoteLister : invalid parameters sent to create remote lister")
return nil, fmt.Errorf("invalid parameters sent to create remote lister")
}

log.Debug("lister::NewRemoteLister : create new remote lister for %s, default permission %v", opts.path, opts.defaultPermission)

rl := &remoteLister{
lister: lister{
path: path,
defaultPermission: defaultPermission,
path: opts.path,
defaultPermission: opts.defaultPermission,
},
listBlocked: false,
}

rl.SetName(LISTER)
rl.SetRemote(remote)
rl.SetStatsManager(statsMgr)
rl.SetRemote(opts.remote)
rl.SetStatsManager(opts.statsMgr)
rl.Init()
return rl, nil
}
Expand Down Expand Up @@ -193,14 +200,15 @@ func (rl *remoteLister) Process(item *WorkItem) (int, error) {
fileMode = entry.Mode
}

// send file to the output channel for chunking
// send file to the splitter's channel for chunking
rl.GetNext().Schedule(&WorkItem{
CompName: rl.GetNext().GetName(),
Path: entry.Path,
DataLen: uint64(entry.Size),
Mode: fileMode,
Atime: entry.Atime,
Mtime: entry.Mtime,
MD5: entry.MD5,
})
}
}
Expand Down
Loading