Skip to content

Commit

Permalink
[feat] curvefs: change s3 info
Browse files Browse the repository at this point in the history
Signed-off-by: Ken Han <[email protected]>
  • Loading branch information
ken90242 committed Jan 30, 2024
1 parent 88a6091 commit 03f368e
Show file tree
Hide file tree
Showing 27 changed files with 513 additions and 4 deletions.
13 changes: 12 additions & 1 deletion curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ enum FsStatus {

message FsDetail {
optional common.Volume volume = 1;
optional common.S3Info s3Info = 2;
optional common.S3Info s3info = 2;
}

message Mountpoint {
Expand Down Expand Up @@ -114,6 +114,16 @@ message GetFsInfoResponse {
optional FsInfo fsInfo = 2;
}

message UpdateS3InfoRequest {
required string fsName = 1;
required common.S3Info s3Info = 2;
}

message UpdateS3InfoResponse {
required FSStatusCode statusCode = 1;
optional FsInfo fsInfo = 2;
}

message CreateFsRequest {
required string fsName = 1;
required uint64 blockSize = 2;
Expand Down Expand Up @@ -255,6 +265,7 @@ message TsoResponse {
service MdsService {
// fs interface
rpc CreateFs(CreateFsRequest) returns (CreateFsResponse);
rpc UpdateS3Info(UpdateS3InfoRequest) returns (UpdateS3InfoResponse);
rpc MountFs(MountFsRequest) returns (MountFsResponse);
rpc UmountFs(UmountFsRequest) returns (UmountFsResponse);
// TODO(chengyi01): move to GetFssInfo
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cc_library(
"//external:brpc",
"//external:gflags",
"//external:glog",
"//external:json",
"//src/client:curve_client",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down
75 changes: 75 additions & 0 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ using ::curvefs::client::filesystem::EntryOut;
using ::curvefs::client::filesystem::FileOut;
using ::curvefs::client::filesystem::IsListWarmupXAttr;
using ::curvefs::client::filesystem::IsWarmupXAttr;
using ::curvefs::client::filesystem::IsS3ConfigXAttr;
using ::curvefs::client::filesystem::StrAttr;
using ::curvefs::client::filesystem::StrEntry;
using ::curvefs::client::filesystem::StrMode;
Expand Down Expand Up @@ -423,6 +424,78 @@ FuseClient* Client() {
return g_ClientInstance;
}


void UpdateS3Config(fuse_req_t req,
fuse_ino_t ino,
const char* name,
const char* value) {
auto fs = g_ClientInstance->GetFileSystem();

if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "updating s3 config only works for s3";
return fs->ReplyError(req, CURVEFS_ERROR::NOT_SUPPORT);
}

const std::string fsName = g_ClientInstance->GetFsInfo()->fsname();
const curvefs::common::S3Info oldS3Info =
g_ClientInstance->GetFsInfo()->detail().s3info();
curvefs::common::S3Info newS3Info(oldS3Info);

Json::CharReaderBuilder builder;
Json::CharReaderBuilder::strictMode(&builder.settings_);
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
Json::Value rootNode;
JSONCPP_STRING errormsg;
if (!reader->parse(value, value + strlen(value), &rootNode, &errormsg)) {
LOG(ERROR) << "Error parsing the input value ' "
<< value
<< " ': " << errormsg;
return fs->ReplyError(req, CURVEFS_ERROR::IO_ERROR);
}

FuseS3Client* g_S3ClientInstance =
dynamic_cast<FuseS3Client*>(g_ClientInstance);
if (!g_S3ClientInstance) {
LOG(ERROR) << "Dynamic cast from FuseClient to FuseS3Client failed";
return fs->ReplyError(req, CURVEFS_ERROR::INTERNAL);
}

if (rootNode.isMember("ak") &&
rootNode["ak"].asString() != oldS3Info.ak()) {
newS3Info.set_ak(rootNode["ak"].asString());
}

if (rootNode.isMember("sk") &&
rootNode["sk"].asString() != oldS3Info.sk()) {
newS3Info.set_sk(rootNode["sk"].asString());
}

if (rootNode.isMember("bucketname") &&
rootNode["bucketname"].asString() != oldS3Info.bucketname()) {
newS3Info.set_bucketname(rootNode["bucketname"].asString());
}

if (rootNode.isMember("endpoint") &&
rootNode["endpoint"].asString() != oldS3Info.endpoint()) {
newS3Info.set_endpoint(rootNode["endpoint"].asString());
}

if (oldS3Info.SerializeAsString() == newS3Info.SerializeAsString()) {
return fs->ReplyError(req, CURVEFS_ERROR::NODATA);
}

FsInfo fsInfo;
CURVEFS_ERROR updateStatusCode =
g_S3ClientInstance->UpdateS3Info(fsName, newS3Info, &fsInfo);
if (updateStatusCode != CURVEFS_ERROR::OK) {
return fs->ReplyError(req, updateStatusCode);
}

g_ClientInstance->SetFsInfo(std::make_shared<FsInfo>(fsInfo));

return fs->ReplyError(req, CURVEFS_ERROR::OK);
}

void TriggerWarmup(fuse_req_t req,
fuse_ino_t ino,
const char* name,
Expand Down Expand Up @@ -921,6 +994,8 @@ void FuseOpSetXattr(fuse_req_t req,

if (IsWarmupXAttr(name)) {
return TriggerWarmup(req, ino, name, value, size);
} else if (IsS3ConfigXAttr(name)) {
return UpdateS3Config(req, ino, name, value);
}
rc = client->FuseOpSetXattr(req, ino, name, value, size, flags);
return fs->ReplyError(req, rc);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/curve_fuse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <json/json.h>

#include "curvefs/src/client/fuse_common.h"

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/filesystem/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class CURVEFS_ERROR {
NOSYS = -20,
END_OF_FILE = -21,
NOT_A_DIRECTORY = -22,
UPDATE_S3_INFO_FAILED = -23,
};

std::string StrErr(CURVEFS_ERROR code);
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_S3_CONFIG[] = "curvefs.s3.update.config";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand All @@ -69,6 +70,10 @@ inline bool IsListWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_LIST;
}

inline bool IsS3ConfigXAttr(const std::string& key) {
return key == XATTR_S3_CONFIG;
}

} // namespace filesystem
} // namespace client
} // namespace curvefs
Expand Down
29 changes: 29 additions & 0 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,35 @@ CURVEFS_ERROR FuseS3Client::Truncate(InodeWrapper *inode, uint64_t length) {
return s3Adaptor_->Truncate(inode, length);
}

CURVEFS_ERROR FuseS3Client::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo) {
::curve::common::S3InfoOption s3InfoOption;
::curvefs::client::common::S3Info2FsS3Option(s3Info, &s3InfoOption);
FSStatusCode updateStatusCode =
mdsClient_->UpdateS3Info(fsName, s3Info, fsInfo);
if (updateStatusCode != FSStatusCode::OK) {
LOG(ERROR) << "Update s3 info error code: (FSStatusCode)"
<< updateStatusCode;
return CURVEFS_ERROR::UPDATE_S3_INFO_FAILED;
}

if (option_.s3Opt.s3AdaptrOpt.ak != s3Info.ak() ||
option_.s3Opt.s3AdaptrOpt.sk != s3Info.sk() ||
option_.s3Opt.s3AdaptrOpt.s3Address != s3Info.endpoint() ||
option_.s3Opt.s3AdaptrOpt.bucketName != s3Info.bucketname()) {

option_.s3Opt.s3AdaptrOpt.s3Address = s3Info.endpoint();
option_.s3Opt.s3AdaptrOpt.ak = s3Info.ak();
option_.s3Opt.s3AdaptrOpt.sk = s3Info.sk();
option_.s3Opt.s3AdaptrOpt.bucketName = s3Info.bucketname();

s3Adaptor_->GetS3Client()->Reinit(option_.s3Opt.s3AdaptrOpt);
}

return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
(void)req;
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class FuseS3Client : public FuseClient {

CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

CURVEFS_ERROR UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo);

private:
bool InitKVCache(const KVClientManagerOpt &opt);

Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace metric {
struct MDSClientMetric {
static const std::string prefix;

InterfaceMetric updateS3Info;
InterfaceMetric mountFs;
InterfaceMetric umountFs;
InterfaceMetric getFsInfo;
Expand All @@ -61,6 +62,7 @@ struct MDSClientMetric {
MDSClientMetric()
: mountFs(prefix, "mountFs"),
umountFs(prefix, "umountFs"),
updateS3Info(prefix, "updateS3Info"),
getFsInfo(prefix, "getFsInfo"),
getMetaServerInfo(prefix, "getMetaServerInfo"),
getMetaServerListInCopysets(prefix, "getMetaServerListInCopysets"),
Expand Down
12 changes: 12 additions & 0 deletions curvefs/src/client/rpcclient/base_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ namespace rpcclient {
using ::curvefs::mds::space::SpaceService_Stub;


void MDSBaseClient::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel) {
UpdateS3InfoRequest request;
request.set_fsname(fsName);
request.mutable_s3info()->CopyFrom(s3Info);
curvefs::mds::MdsService_Stub stub(channel);
stub.UpdateS3Info(cntl, &request, response, nullptr);
}

void MDSBaseClient::MountFs(const std::string& fsName,
const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
Expand Down
8 changes: 8 additions & 0 deletions curvefs/src/client/rpcclient/base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ using curvefs::mds::FsInfo;
using curvefs::mds::FsStatus;
using curvefs::mds::GetFsInfoRequest;
using curvefs::mds::GetFsInfoResponse;
using curvefs::mds::UpdateS3InfoRequest;
using curvefs::mds::UpdateS3InfoResponse;
using curvefs::mds::MountFsRequest;
using curvefs::mds::MountFsResponse;
using curvefs::mds::GetLatestTxIdRequest;
Expand Down Expand Up @@ -143,6 +145,12 @@ class MDSBaseClient {
public:
virtual ~MDSBaseClient() = default;

virtual void UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel);

virtual void MountFs(const std::string& fsName, const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
brpc::Channel* channel);
Expand Down
33 changes: 33 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,39 @@ MdsClientImpl::Init(const ::curve::client::MetaServerOption &mdsOpt,
[&](int addrindex, uint64_t rpctimeoutMS, brpc::Channel *channel, \
brpc::Controller *cntl) -> int

FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
mdsClientMetric_.updateS3Info.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.updateS3Info.latency);
UpdateS3InfoResponse response;
mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel);
if (cntl->Failed()) {
mdsClientMetric_.updateS3Info.eps.count << 1;
LOG(WARNING) << "UpdateS3Info Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

FSStatusCode ret = response.statuscode();
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info: fsname = " << fsName
<< ", s3Info = " << s3Info.ShortDebugString()
<< ", errcode = " << ret
<< ", errmsg = " << FSStatusCode_Name(ret);
} else if (response.has_fsinfo()) {
fsInfo->CopyFrom(response.fsinfo());
}
return ret;
};
return ReturnError(rpcexcutor_.DoRPCTask(task, mdsOpt_.mdsMaxRetryMS));
}

FSStatusCode MdsClientImpl::MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) {
auto task = RPCTask {
Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace curvefs {
namespace client {
namespace rpcclient {

using curvefs::common::S3Info;
using curvefs::mds::GetLatestTxIdRequest;
using curvefs::mds::GetLatestTxIdResponse;
using curvefs::mds::CommitTxRequest;
Expand All @@ -75,6 +76,9 @@ class MdsClient {
virtual FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) = 0;

virtual FSStatusCode UpdateS3Info(const std::string& fsName,
const S3Info& s3Info, FsInfo* fsInfo) = 0;

virtual FSStatusCode MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) = 0;

Expand Down Expand Up @@ -171,6 +175,9 @@ class MdsClientImpl : public MdsClient {
FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) override;

FSStatusCode UpdateS3Info(const std::string& fsName, const S3Info& s3Info,
FsInfo* fsInfo) override;

FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt,
FsInfo* fsInfo) override;

Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ void S3ClientImpl::Init(const curve::common::S3AdapterOption &option) {
s3Adapter_->Init(option);
}

void S3ClientImpl::Reinit(const curve::common::S3AdapterOption &option) {
s3Adapter_->Reinit(option);
}

void S3ClientImpl::Deinit() {
s3Adapter_->Deinit();
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/s3/client_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class S3Client {
S3Client() {}
virtual ~S3Client() {}
virtual void Init(const curve::common::S3AdapterOption& option) = 0;
virtual void Reinit(const curve::common::S3AdapterOption& option) = 0;
virtual void Deinit() = 0;
virtual int Upload(const std::string& name, const char* buf,
uint64_t length) = 0;
Expand All @@ -65,6 +66,7 @@ class S3ClientImpl : public S3Client {
}
virtual ~S3ClientImpl() {}
void Init(const curve::common::S3AdapterOption& option);
void Reinit(const curve::common::S3AdapterOption& option);
void Deinit();
int Upload(const std::string& name, const char* buf, uint64_t length);
void UploadAsync(std::shared_ptr<PutObjectAsyncContext> context);
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2524,6 +2524,10 @@ void DataCache::FlushTaskExecute(

LOG(WARNING) << "Put object failed, key: " << context->key;
// Retry using s3 no matter what the original was
if (--(context->retries) <= 0) {
s3TaskEvent.Signal();
return;
}
context->type = curve::common::ContextType::S3;
s3ClientAdaptor_->GetS3Client()->UploadAsync(context);
};
Expand Down
Loading

0 comments on commit 03f368e

Please sign in to comment.