Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
ken90242 committed Jan 25, 2024
1 parent 6af52d7 commit 399b2bb
Show file tree
Hide file tree
Showing 24 changed files with 326 additions and 44 deletions.
5 changes: 3 additions & 2 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ enum FsStatus {

message FsDetail {
optional common.Volume volume = 1;
optional common.S3Info s3Info = 2;
optional common.S3Info s3info = 2;
}

message Mountpoint {
Expand Down Expand Up @@ -115,7 +115,8 @@ message GetFsInfoResponse {
}

message UpdateS3InfoRequest {
required common.S3Info s3Info = 1;
required string fsName = 1;
required common.S3Info s3Info = 2;
}

message UpdateS3InfoResponse {
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cc_library(
"//external:brpc",
"//external:gflags",
"//external:glog",
"//external:json",
"//src/client:curve_client",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down
61 changes: 43 additions & 18 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,41 +428,66 @@ FuseClient* Client() {
void UpdateS3Config(fuse_req_t req,
fuse_ino_t ino,
const char* name,
const char* value,
size_t size) {
const char* value) {

auto fs = g_ClientInstance->GetFileSystem();

if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "updating s3 config only works for s3";
return EOPNOTSUPP;
return fs->ReplyError(req, CURVEFS_ERROR::NOT_SUPPORT);
}

const std::string fsName = g_ClientInstance->GetFsInfo()->fsname();
const curvefs::common::S3Info oldS3Info = g_ClientInstance->GetFsInfo()->detail()->s3Info();
const curvefs::common::S3Info newS3Info(oldS3Info);
const curvefs::common::S3Info oldS3Info = g_ClientInstance->GetFsInfo()->detail().s3info();
curvefs::common::S3Info newS3Info(oldS3Info);

Json::CharReaderBuilder builder;
Json::CharReaderBuilder::strictMode(&builder.settings_);
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
Json::Value rootNode;
JSONCPP_STRING errormsg;
if (reader->parse(value, value + strlen(value), &rootNode, &errormsg)) {
newS3Info.ak = rootNode["ak"].asString();
newS3Info.sk = rootNode["sk"].asString();
newS3Info.bucket = rootNode["bucket"].asString();
newS3Info.endpoint = rootNode["endpoint"].asString();

FsInfo fsInfo;
FSStatusCode statusCode = g_ClientInstance->mdsClient_->UpdateS3Info(fsName, newS3Info, &fsInfo);
if (statusCode == FSStatusCode::OK) {
g_ClientInstance->SetFsInfo(fsInfo);
}
} else {
if (!reader->parse(value, value + strlen(value), &rootNode, &errormsg)) {
LOG(ERROR) << "Error parsing the input value ' "
<< value
<< " ': " << errormsg;
return fs->ReplyError(req, CURVEFS_ERROR::IO_ERROR);
}

fuse_reply_err(req, statusCode);
FuseS3Client* g_S3ClientInstance = dynamic_cast<FuseS3Client*>(g_ClientInstance);
if (!g_S3ClientInstance) {
LOG(ERROR) << "Dynamic cast from FuseClient to FuseS3Client failed";
return fs->ReplyError(req, CURVEFS_ERROR::INTERNAL);
}

if (rootNode.isMember("ak") && rootNode["ak"].asString() != oldS3Info.ak()) {
newS3Info.set_ak(rootNode["ak"].asString());
}

if (rootNode.isMember("sk") && rootNode["sk"].asString() != oldS3Info.sk()) {
newS3Info.set_sk(rootNode["sk"].asString());
}

if (rootNode.isMember("bucketname") && rootNode["bucketname"].asString() != oldS3Info.bucketname()) {
newS3Info.set_bucketname(rootNode["bucketname"].asString());
}

if (rootNode.isMember("endpoint") && rootNode["endpoint"].asString() != oldS3Info.endpoint()) {
newS3Info.set_endpoint(rootNode["endpoint"].asString());
}

if (oldS3Info.SerializeAsString() == newS3Info.SerializeAsString()) {
return fs->ReplyError(req, CURVEFS_ERROR::NODATA);
}

FsInfo fsInfo;
CURVEFS_ERROR updateStatusCode = g_S3ClientInstance->UpdateS3Info(fsName, newS3Info, fsInfo);
if (updateStatusCode != CURVEFS_ERROR::OK) {
return fs->ReplyError(req, updateStatusCode);
}

g_ClientInstance->SetFsInfo(std::make_shared<FsInfo>(fsInfo));

return fs->ReplyError(req, CURVEFS_ERROR::OK);
}

void TriggerWarmup(fuse_req_t req,
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/filesystem/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class CURVEFS_ERROR {
NOSYS = -20,
END_OF_FILE = -21,
NOT_A_DIRECTORY = -22,
UPDATE_S3_INFO_FAILED = -23,
};

std::string StrErr(CURVEFS_ERROR code);
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_S3_CONFIG[] = "curvefs.s3.config";
const char XATTR_S3_CONFIG[] = "curvefs.s3.update.config";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand Down
22 changes: 22 additions & 0 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,28 @@ CURVEFS_ERROR FuseS3Client::Truncate(InodeWrapper *inode, uint64_t length) {
return s3Adaptor_->Truncate(inode, length);
}

CURVEFS_ERROR FuseS3Client::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo& fsInfo) {
::curve::common::S3InfoOption s3InfoOption;
::curvefs::client::common::S3Info2FsS3Option(s3Info, &s3InfoOption);
FSStatusCode updateStatusCode = mdsClient_->UpdateS3Info(fsName, s3Info, &fsInfo);
if (updateStatusCode != FSStatusCode::OK) {
LOG(ERROR) << "Update s3 info error code: (FSStatusCode)"
<< updateStatusCode;
return CURVEFS_ERROR::UPDATE_S3_INFO_FAILED;
}

if (option_.s3Opt.s3AdaptrOpt.ak != s3Info.ak() ||
option_.s3Opt.s3AdaptrOpt.sk != s3Info.sk() ||
option_.s3Opt.s3AdaptrOpt.s3Address != s3Info.endpoint() ||
option_.s3Opt.s3AdaptrOpt.bucketName != s3Info.bucketname()) {
s3Adaptor_->GetS3Client()->Reinit(option_.s3Opt.s3AdaptrOpt);
}

return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
(void)req;
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class FuseS3Client : public FuseClient {

CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

CURVEFS_ERROR UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
FsInfo& fsInfo);

private:
bool InitKVCache(const KVClientManagerOpt &opt);

Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace metric {
struct MDSClientMetric {
static const std::string prefix;

InterfaceMetric updateS3Info;
InterfaceMetric mountFs;
InterfaceMetric umountFs;
InterfaceMetric getFsInfo;
Expand All @@ -61,6 +62,7 @@ struct MDSClientMetric {
MDSClientMetric()
: mountFs(prefix, "mountFs"),
umountFs(prefix, "umountFs"),
updateS3Info(prefix, "updateS3Info"),
getFsInfo(prefix, "getFsInfo"),
getMetaServerInfo(prefix, "getMetaServerInfo"),
getMetaServerListInCopysets(prefix, "getMetaServerListInCopysets"),
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/rpcclient/base_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void MDSBaseClient::UpdateS3Info(const std::string& fsName,
brpc::Channel* channel) {
UpdateS3InfoRequest request;
request.set_fsname(fsName);
request.set_s3Info(s3Info);
request.mutable_s3info()->CopyFrom(s3Info);
curvefs::mds::MdsService_Stub stub(channel);
stub.UpdateS3Info(cntl, &request, response, nullptr);
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/rpcclient/base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ using curvefs::mds::FsInfo;
using curvefs::mds::FsStatus;
using curvefs::mds::GetFsInfoRequest;
using curvefs::mds::GetFsInfoResponse;
using curvefs::mds::UpdateS3InfoRequest;
using curvefs::mds::UpdateS3InfoResponse;
using curvefs::mds::MountFsRequest;
using curvefs::mds::MountFsResponse;
using curvefs::mds::GetLatestTxIdRequest;
Expand Down
10 changes: 5 additions & 5 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName,
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
mdsClientMetric_.mountFs.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.mountFs.latency);
mdsClientMetric_.updateS3Info.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.updateS3Info.latency);
UpdateS3InfoResponse response;
mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel);
if (cntl->Failed()) {
mdsClientMetric_.mountFs.eps.count << 1;
LOG(WARNING) << "MountFs Failed, errorcode = " << cntl->ErrorCode()
mdsClientMetric_.updateS3Info.eps.count << 1;
LOG(WARNING) << "UpdateS3Info Failed, errorcode = " << cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
Expand All @@ -80,7 +80,7 @@ FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName,
FSStatusCode ret = response.statuscode();
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info: fsname = " << fsName
<< ", mountPt = " << mountPt.ShortDebugString()
<< ", s3Info = " << s3Info.ShortDebugString()
<< ", errcode = " << ret
<< ", errmsg = " << FSStatusCode_Name(ret);
} else if (response.has_fsinfo()) {
Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace curvefs {
namespace client {
namespace rpcclient {

using curvefs::common::S3Info;
using curvefs::mds::GetLatestTxIdRequest;
using curvefs::mds::GetLatestTxIdResponse;
using curvefs::mds::CommitTxRequest;
Expand All @@ -76,7 +77,7 @@ class MdsClient {
MDSBaseClient *baseclient) = 0;

virtual FSStatusCode UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) = 0;
const S3Info& s3Info, FsInfo* fsInfo) = 0;

virtual FSStatusCode MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) = 0;
Expand Down Expand Up @@ -174,7 +175,7 @@ class MdsClientImpl : public MdsClient {
FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) override;

FSStatusCode UpdateS3Info(const std::string& fsName, const curvefs::common::S3Info& s3Info,
FSStatusCode UpdateS3Info(const std::string& fsName, const S3Info& s3Info,
FsInfo* fsInfo) override;

FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt,
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ void S3ClientImpl::Init(const curve::common::S3AdapterOption &option) {
s3Adapter_->Init(option);
}

void S3ClientImpl::Reinit(const curve::common::S3AdapterOption &option) {
s3Adapter_->Reinit(option);
}

void S3ClientImpl::Deinit() {
s3Adapter_->Deinit();
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/s3/client_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class S3Client {
S3Client() {}
virtual ~S3Client() {}
virtual void Init(const curve::common::S3AdapterOption& option) = 0;
virtual void Reinit(const curve::common::S3AdapterOption& option) = 0;
virtual void Deinit() = 0;
virtual int Upload(const std::string& name, const char* buf,
uint64_t length) = 0;
Expand All @@ -65,6 +66,7 @@ class S3ClientImpl : public S3Client {
}
virtual ~S3ClientImpl() {}
void Init(const curve::common::S3AdapterOption& option);
void Reinit(const curve::common::S3AdapterOption& option);
void Deinit();
int Upload(const std::string& name, const char* buf, uint64_t length);
void UploadAsync(std::shared_ptr<PutObjectAsyncContext> context);
Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/mds/fs_info_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace curvefs {
namespace mds {

using ::curvefs::common::FSType;
using ::curvefs::common::S3Info;

// A wrapper for proto FsInfo
class FsInfoWrapper {
Expand Down Expand Up @@ -67,8 +68,8 @@ class FsInfoWrapper {
}

void SetS3Info(S3Info s3info) {
const FsDetail fsdetail_ = GetFsDetail();
fsdetail_._set_s3info(s3info)
FsDetail* fsdetail_ = fsInfo_.mutable_detail();
fsdetail_->mutable_s3info()->CopyFrom(s3info);
}

void SetFsName(const std::string& name) {
Expand Down
9 changes: 3 additions & 6 deletions curvefs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ FSStatusCode FsManager::DeleteFs(const std::string& fsName) {
}

FSStatusCode FsManager::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) {
const S3Info& s3Info, FsInfo* fsInfo) {
NameLockGuard lock(nameLock_, fsName);

// query fs
Expand All @@ -611,21 +611,18 @@ FSStatusCode FsManager::UpdateS3Info(const std::string& fsName,
return ret;
}

// insert mountpoint
// update s3Info
wrapper.SetS3Info(s3Info);
// for persistence consider
ret = fsStorage_->Update(wrapper);
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info fail, update fs fail, fsName = " << fsName
<< ", mountpoint = " << mountpoint.ShortDebugString()
<< ", s3Info = " << s3Info.ShortDebugString()
<< ", errCode = " << FSStatusCode_Name(ret);
return ret;
}
// update client alive time
UpdateClientAliveTime(mountpoint, fsName, false);

// convert fs info
FsMetric::GetInstance().OnMount(wrapper.GetFsName(), mountpoint);
*fsInfo = std::move(wrapper).ProtoFsInfo();

return FSStatusCode::OK;
Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/mds/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ using ::curve::common::Atomic;
using ::curve::common::InterruptibleSleeper;
using ::curve::common::S3Adapter;
using ::curve::common::Thread;

using ::curvefs::common::S3Info;
using ::curvefs::mds::topology::PartitionTxId;
using ::curvefs::mds::topology::Topology;
using ::curvefs::mds::topology::TopologyManager;

using ::curvefs::mds::dlock::DLock;
using ::curvefs::mds::Mountpoint;

Expand Down Expand Up @@ -130,7 +131,7 @@ class FsManager {
*
*/
FSStatusCode UpdateS3Info(const std::string& fsName,
const Mountpoint& mountpoint, FsInfo* fsInfo);
const S3Info& mountpoint, FsInfo* fsInfo);

/**
* @brief Mount fs, mount point can not repeat. It will increate
Expand Down
10 changes: 5 additions & 5 deletions curvefs/src/mds/mds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ void MdsServiceImpl::UpdateS3Info(::google::protobuf::RpcController* controller,
const std::string &fsName = request->fsname();
const curvefs::common::S3Info &s3Info = request->s3info();

LOG(INFO) << "MountFs request, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString();
LOG(INFO) << "UpdateS3Info request, fsName = " << fsName
<< ", s3Info = " << s3Info.ShortDebugString();

FSStatusCode status =
fsManager_->UpdateS3Info(fsName, s3Info, response->mutable_fsinfo());
Expand All @@ -159,15 +159,15 @@ void MdsServiceImpl::UpdateS3Info(::google::protobuf::RpcController* controller,
response->clear_fsinfo();
response->set_statuscode(status);
LOG(ERROR) << "UpdateS3Info fail, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString()
<< ", s3Info = " << s3Info.ShortDebugString()
<< ", errCode = " << FSStatusCode_Name(status);
return;
}

response->set_statuscode(FSStatusCode::OK);
LOG(INFO) << "UpdateS3Info success, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString()
<< ", mps: " << response->mutable_fsinfo()->mountpoints_size();
<< ", s3Info in response = " << response->fsinfo().detail().s3info().ShortDebugString()
<< ", mps = " << response->mutable_fsinfo()->mountpoints_size();
}

void MdsServiceImpl::MountFs(::google::protobuf::RpcController* controller,
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/mds/mds_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class MdsServiceImpl : public MdsService {
CreateFsResponse* response,
::google::protobuf::Closure* done);

void UpdateS3Info(::google::protobuf::RpcController* controller,
const UpdateS3InfoRequest* request,
UpdateS3InfoResponse* response,
::google::protobuf::Closure* done);

void MountFs(::google::protobuf::RpcController* controller,
const MountFsRequest* request,
MountFsResponse* response,
Expand Down
Loading

0 comments on commit 399b2bb

Please sign in to comment.