Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] curvefs: change s3 info #2990

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ enum FsStatus {

message FsDetail {
optional common.Volume volume = 1;
optional common.S3Info s3Info = 2;
optional common.S3Info s3info = 2;
}

message Mountpoint {
Expand Down Expand Up @@ -114,6 +114,16 @@ message GetFsInfoResponse {
optional FsInfo fsInfo = 2;
}

message UpdateS3InfoRequest {
required string fsName = 1;
required common.S3Info s3Info = 2;
}

message UpdateS3InfoResponse {
required FSStatusCode statusCode = 1;
optional FsInfo fsInfo = 2;
}

message CreateFsRequest {
required string fsName = 1;
required uint64 blockSize = 2;
Expand Down Expand Up @@ -255,6 +265,7 @@ message TsoResponse {
service MdsService {
// fs interface
rpc CreateFs(CreateFsRequest) returns (CreateFsResponse);
rpc UpdateS3Info(UpdateS3InfoRequest) returns (UpdateS3InfoResponse);
rpc MountFs(MountFsRequest) returns (MountFsResponse);
rpc UmountFs(UmountFsRequest) returns (UmountFsResponse);
// TODO(chengyi01): move to GetFssInfo
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cc_library(
"//external:brpc",
"//external:gflags",
"//external:glog",
"//external:json",
"//src/client:curve_client",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down
75 changes: 75 additions & 0 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ using ::curvefs::client::filesystem::EntryOut;
using ::curvefs::client::filesystem::FileOut;
using ::curvefs::client::filesystem::IsListWarmupXAttr;
using ::curvefs::client::filesystem::IsWarmupXAttr;
using ::curvefs::client::filesystem::IsS3ConfigXAttr;
using ::curvefs::client::filesystem::StrAttr;
using ::curvefs::client::filesystem::StrEntry;
using ::curvefs::client::filesystem::StrMode;
Expand Down Expand Up @@ -423,6 +424,78 @@ FuseClient* Client() {
return g_ClientInstance;
}


void UpdateS3Config(fuse_req_t req,
fuse_ino_t ino,
const char* name,
const char* value) {
auto fs = g_ClientInstance->GetFileSystem();

if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "updating s3 config only works for s3";
return fs->ReplyError(req, CURVEFS_ERROR::NOT_SUPPORT);
}

const std::string fsName = g_ClientInstance->GetFsInfo()->fsname();
const curvefs::common::S3Info oldS3Info =
g_ClientInstance->GetFsInfo()->detail().s3info();
curvefs::common::S3Info newS3Info(oldS3Info);

Json::CharReaderBuilder builder;
Json::CharReaderBuilder::strictMode(&builder.settings_);
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
Json::Value rootNode;
JSONCPP_STRING errormsg;
if (!reader->parse(value, value + strlen(value), &rootNode, &errormsg)) {
LOG(ERROR) << "Error parsing the input value ' "
<< value
<< " ': " << errormsg;
return fs->ReplyError(req, CURVEFS_ERROR::IO_ERROR);
}

FuseS3Client* g_S3ClientInstance =
dynamic_cast<FuseS3Client*>(g_ClientInstance);
if (!g_S3ClientInstance) {
LOG(ERROR) << "Dynamic cast from FuseClient to FuseS3Client failed";
return fs->ReplyError(req, CURVEFS_ERROR::INTERNAL);
}

if (rootNode.isMember("ak") &&
rootNode["ak"].asString() != oldS3Info.ak()) {
newS3Info.set_ak(rootNode["ak"].asString());
}

if (rootNode.isMember("sk") &&
rootNode["sk"].asString() != oldS3Info.sk()) {
newS3Info.set_sk(rootNode["sk"].asString());
}

if (rootNode.isMember("bucketname") &&
rootNode["bucketname"].asString() != oldS3Info.bucketname()) {
newS3Info.set_bucketname(rootNode["bucketname"].asString());
}

if (rootNode.isMember("endpoint") &&
rootNode["endpoint"].asString() != oldS3Info.endpoint()) {
newS3Info.set_endpoint(rootNode["endpoint"].asString());
}

if (oldS3Info.SerializeAsString() == newS3Info.SerializeAsString()) {
return fs->ReplyError(req, CURVEFS_ERROR::NODATA);
}

FsInfo fsInfo;
CURVEFS_ERROR updateStatusCode =
g_S3ClientInstance->UpdateS3Info(fsName, newS3Info, &fsInfo);
if (updateStatusCode != CURVEFS_ERROR::OK) {
return fs->ReplyError(req, updateStatusCode);
}

g_ClientInstance->SetFsInfo(std::make_shared<FsInfo>(fsInfo));

return fs->ReplyError(req, CURVEFS_ERROR::OK);
}

void TriggerWarmup(fuse_req_t req,
fuse_ino_t ino,
const char* name,
Expand Down Expand Up @@ -921,6 +994,8 @@ void FuseOpSetXattr(fuse_req_t req,

if (IsWarmupXAttr(name)) {
return TriggerWarmup(req, ino, name, value, size);
} else if (IsS3ConfigXAttr(name)) {
return UpdateS3Config(req, ino, name, value);
}
rc = client->FuseOpSetXattr(req, ino, name, value, size, flags);
return fs->ReplyError(req, rc);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/curve_fuse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <json/json.h>

#include "curvefs/src/client/fuse_common.h"

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/filesystem/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class CURVEFS_ERROR {
NOSYS = -20,
END_OF_FILE = -21,
NOT_A_DIRECTORY = -22,
UPDATE_S3_INFO_FAILED = -23,
};

std::string StrErr(CURVEFS_ERROR code);
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_S3_CONFIG[] = "curvefs.s3.update.config";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand All @@ -69,6 +70,10 @@ inline bool IsListWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_LIST;
}

inline bool IsS3ConfigXAttr(const std::string& key) {
return key == XATTR_S3_CONFIG;
}

} // namespace filesystem
} // namespace client
} // namespace curvefs
Expand Down
29 changes: 29 additions & 0 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,35 @@ CURVEFS_ERROR FuseS3Client::Truncate(InodeWrapper *inode, uint64_t length) {
return s3Adaptor_->Truncate(inode, length);
}

CURVEFS_ERROR FuseS3Client::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo) {
::curve::common::S3InfoOption s3InfoOption;
::curvefs::client::common::S3Info2FsS3Option(s3Info, &s3InfoOption);
FSStatusCode updateStatusCode =
mdsClient_->UpdateS3Info(fsName, s3Info, fsInfo);
if (updateStatusCode != FSStatusCode::OK) {
LOG(ERROR) << "Update s3 info error code: (FSStatusCode)"
<< updateStatusCode;
return CURVEFS_ERROR::UPDATE_S3_INFO_FAILED;
}

if (option_.s3Opt.s3AdaptrOpt.ak != s3Info.ak() ||
option_.s3Opt.s3AdaptrOpt.sk != s3Info.sk() ||
option_.s3Opt.s3AdaptrOpt.s3Address != s3Info.endpoint() ||
option_.s3Opt.s3AdaptrOpt.bucketName != s3Info.bucketname()) {

option_.s3Opt.s3AdaptrOpt.s3Address = s3Info.endpoint();
option_.s3Opt.s3AdaptrOpt.ak = s3Info.ak();
option_.s3Opt.s3AdaptrOpt.sk = s3Info.sk();
option_.s3Opt.s3AdaptrOpt.bucketName = s3Info.bucketname();

s3Adaptor_->GetS3Client()->Reinit(option_.s3Opt.s3AdaptrOpt);
}

return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
(void)req;
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class FuseS3Client : public FuseClient {

CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

CURVEFS_ERROR UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo);

private:
bool InitKVCache(const KVClientManagerOpt &opt);

Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace metric {
struct MDSClientMetric {
static const std::string prefix;

InterfaceMetric updateS3Info;
InterfaceMetric mountFs;
InterfaceMetric umountFs;
InterfaceMetric getFsInfo;
Expand All @@ -61,6 +62,7 @@ struct MDSClientMetric {
MDSClientMetric()
: mountFs(prefix, "mountFs"),
umountFs(prefix, "umountFs"),
updateS3Info(prefix, "updateS3Info"),
getFsInfo(prefix, "getFsInfo"),
getMetaServerInfo(prefix, "getMetaServerInfo"),
getMetaServerListInCopysets(prefix, "getMetaServerListInCopysets"),
Expand Down
12 changes: 12 additions & 0 deletions curvefs/src/client/rpcclient/base_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ namespace rpcclient {
using ::curvefs::mds::space::SpaceService_Stub;


void MDSBaseClient::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel) {
UpdateS3InfoRequest request;
request.set_fsname(fsName);
request.mutable_s3info()->CopyFrom(s3Info);
curvefs::mds::MdsService_Stub stub(channel);
stub.UpdateS3Info(cntl, &request, response, nullptr);
}

void MDSBaseClient::MountFs(const std::string& fsName,
const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
Expand Down
8 changes: 8 additions & 0 deletions curvefs/src/client/rpcclient/base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ using curvefs::mds::FsInfo;
using curvefs::mds::FsStatus;
using curvefs::mds::GetFsInfoRequest;
using curvefs::mds::GetFsInfoResponse;
using curvefs::mds::UpdateS3InfoRequest;
using curvefs::mds::UpdateS3InfoResponse;
using curvefs::mds::MountFsRequest;
using curvefs::mds::MountFsResponse;
using curvefs::mds::GetLatestTxIdRequest;
Expand Down Expand Up @@ -143,6 +145,12 @@ class MDSBaseClient {
public:
virtual ~MDSBaseClient() = default;

virtual void UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel);

virtual void MountFs(const std::string& fsName, const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
brpc::Channel* channel);
Expand Down
33 changes: 33 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,39 @@ MdsClientImpl::Init(const ::curve::client::MetaServerOption &mdsOpt,
[&](int addrindex, uint64_t rpctimeoutMS, brpc::Channel *channel, \
brpc::Controller *cntl) -> int

FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
mdsClientMetric_.updateS3Info.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.updateS3Info.latency);
UpdateS3InfoResponse response;
mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel);
if (cntl->Failed()) {
mdsClientMetric_.updateS3Info.eps.count << 1;
LOG(WARNING) << "UpdateS3Info Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

FSStatusCode ret = response.statuscode();
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info: fsname = " << fsName
<< ", s3Info = " << s3Info.ShortDebugString()
<< ", errcode = " << ret
<< ", errmsg = " << FSStatusCode_Name(ret);
} else if (response.has_fsinfo()) {
fsInfo->CopyFrom(response.fsinfo());
}
return ret;
};
return ReturnError(rpcexcutor_.DoRPCTask(task, mdsOpt_.mdsMaxRetryMS));
}

FSStatusCode MdsClientImpl::MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) {
auto task = RPCTask {
Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace curvefs {
namespace client {
namespace rpcclient {

using curvefs::common::S3Info;
using curvefs::mds::GetLatestTxIdRequest;
using curvefs::mds::GetLatestTxIdResponse;
using curvefs::mds::CommitTxRequest;
Expand All @@ -75,6 +76,9 @@ class MdsClient {
virtual FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) = 0;

virtual FSStatusCode UpdateS3Info(const std::string& fsName,
const S3Info& s3Info, FsInfo* fsInfo) = 0;

virtual FSStatusCode MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) = 0;

Expand Down Expand Up @@ -171,6 +175,9 @@ class MdsClientImpl : public MdsClient {
FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) override;

FSStatusCode UpdateS3Info(const std::string& fsName, const S3Info& s3Info,
FsInfo* fsInfo) override;

FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt,
FsInfo* fsInfo) override;

Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ void S3ClientImpl::Init(const curve::common::S3AdapterOption &option) {
s3Adapter_->Init(option);
}

void S3ClientImpl::Reinit(const curve::common::S3AdapterOption &option) {
s3Adapter_->Reinit(option);
}

void S3ClientImpl::Deinit() {
s3Adapter_->Deinit();
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/s3/client_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class S3Client {
S3Client() {}
virtual ~S3Client() {}
virtual void Init(const curve::common::S3AdapterOption& option) = 0;
virtual void Reinit(const curve::common::S3AdapterOption& option) = 0;
virtual void Deinit() = 0;
virtual int Upload(const std::string& name, const char* buf,
uint64_t length) = 0;
Expand All @@ -65,6 +66,7 @@ class S3ClientImpl : public S3Client {
}
virtual ~S3ClientImpl() {}
void Init(const curve::common::S3AdapterOption& option);
void Reinit(const curve::common::S3AdapterOption& option);
void Deinit();
int Upload(const std::string& name, const char* buf, uint64_t length);
void UploadAsync(std::shared_ptr<PutObjectAsyncContext> context);
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2524,6 +2524,10 @@ void DataCache::FlushTaskExecute(

LOG(WARNING) << "Put object failed, key: " << context->key;
// Retry using s3 no matter what the original was
if (--(context->retries) <= 0) {
s3TaskEvent.Signal();
return;
}
context->type = curve::common::ContextType::S3;
s3ClientAdaptor_->GetS3Client()->UploadAsync(context);
};
Expand Down
Loading