Skip to content

Add iceberg stuff (#16652) (#17522) (#17903) #18429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions ydb/core/external_sources/external_source_builder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include "external_source_builder.h"
#include "validation_functions.h"

#include <util/string/join.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

namespace NKikimr::NExternalSource {
namespace {

class TValidatedExternalDataSource final : public IExternalSource {
public:
TValidatedExternalDataSource(
const TString& name,
const std::vector<TExternalSourceBuilder::TAuthHolder>& authMethods,
const std::unordered_map<TString, TExternalSourceBuilder::TConditionalValidator>& availableProperties,
const std::vector<TRegExMatch>& hostnamePatterns)
: Name_(name)
, AuthMethodsForCheck_(authMethods)
, AvailableProperties_(availableProperties)
, HostnamePatterns_(hostnamePatterns)
{

}

virtual TString Pack(const NKikimrExternalSources::TSchema&,
const NKikimrExternalSources::TGeneral&) const override {
ythrow TExternalSourceException() << "Internal error. Only external table supports pack operation";
}

virtual TString GetName() const override {
return Name_;
}

virtual bool HasExternalTable() const override {
return false;
}

virtual TVector<TString> GetAuthMethods() const override {
TVector<TString> result;

for (auto a : AuthMethodsForCheck_) {
result.push_back(a.Auth);
}

return result;
}

TVector<TString> GetAuthMethods(const TString& externalDataSourceDescription) const {
NKikimrSchemeOp::TExternalDataSourceDescription proto;

if (!proto.ParseFromString(externalDataSourceDescription)) {
ythrow TExternalSourceException()
<< "Internal error. "
<< "Couldn't parse protobuf with external data source description";
}

TVector<TString> result;

for (auto a : AuthMethodsForCheck_) {
if (a.UseCondition(proto.GetProperties().GetProperties())) {
result.push_back(a.Auth);
}
}

return result;
}

virtual TMap<TString, TVector<TString>> GetParameters(const TString&) const override {
ythrow TExternalSourceException() << "Internal error. Only external table supports parameters";
}

virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
NKikimrSchemeOp::TExternalDataSourceDescription proto;

if (!proto.ParseFromString(externalDataSourceDescription)) {
ythrow TExternalSourceException()
<< "Internal error. "
<< "Couldn't parse protobuf with external data source description";
}

auto properties = proto.GetProperties().GetProperties();
std::unordered_set<TString> validatedProperties;

for (const auto& [key, value] : properties) {
auto p = AvailableProperties_.find(key);

if (AvailableProperties_.end() == p) {
throw TExternalSourceException() << "Unsupported property: " << key;
}

// validate property value
if (p->second.ApplyCondition(properties)) {
p->second.Validator(key, value);
}

validatedProperties.emplace(key);
}

// validate properties that has been left
for (const auto& [property, validator] : AvailableProperties_) {
if (validatedProperties.contains(property)) {
continue;
}

if (validator.ApplyCondition(properties)) {
validator.Validator(property, "");
}
}

ValidateHostname(HostnamePatterns_, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name_;
const std::vector<TExternalSourceBuilder::TAuthHolder> AuthMethodsForCheck_;
const std::unordered_map<TString, TExternalSourceBuilder::TConditionalValidator> AvailableProperties_;
const std::vector<TRegExMatch> HostnamePatterns_;
};

} // unnamed

TExternalSourceBuilder::TExternalSourceBuilder(const TString& name)
: Name_(name)
{
}

TExternalSourceBuilder& TExternalSourceBuilder::Auth(const TVector<TString>& authMethods, TCondition condition) {
for (auto a : authMethods) {
AuthMethodsForCheck_.push_back(TExternalSourceBuilder::TAuthHolder{a, condition});
}

return *this;
}

TExternalSourceBuilder& TExternalSourceBuilder::Property(TString name, TValidator validator, TCondition condition) {
AvailableProperties_.emplace(name, TExternalSourceBuilder::TConditionalValidator{validator, condition});
return *this;
}

TExternalSourceBuilder& TExternalSourceBuilder::Properties(const TSet<TString>& availableProperties, TValidator validator, TCondition condition) {
for (auto p : availableProperties) {
Property(p, validator, condition);
}

return *this;
}

TExternalSourceBuilder& TExternalSourceBuilder::HostnamePatterns(const std::vector<TRegExMatch>& patterns) {
HostnamePatterns_.insert(
HostnamePatterns_.end(), patterns.begin(), patterns.end());
return *this;
}

IExternalSource::TPtr TExternalSourceBuilder::Build() {
return MakeIntrusive<TValidatedExternalDataSource>(
std::move(Name_), std::move(AuthMethodsForCheck_), std::move(AvailableProperties_), std::move(HostnamePatterns_));
}

TCondition GetHasSettingCondition(const TString& property, const TString& value) {
return [property, value](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& properties) -> bool {
auto it = properties.find(property);
return properties.end() != it && value == it->second;
};
}

TValidator GetRequiredValidator() {
return [](const TString& property, const TString& value){
if (!value.empty()) {
return;
}

throw TExternalSourceException() << "required property: " << property << " is not set";
};
}

TValidator GetIsInListValidator(const std::unordered_set<TString>& values, bool required) {
auto joinedValues = JoinSeq(", ", values);

return [values, required, joinedValues](const TString& property, const TString& value){
if (value.empty() && required) {
throw TExternalSourceException() << " required property: " << property << " is not set";
}

if (value.empty()) {
return;
}

if (!values.contains(value)) {
throw TExternalSourceException()
<< " property: " << property
<< " has wrong value: " << value
<< " allowed values: " << joinedValues;
}
};
}

} // NKikimr::NExternalSource
118 changes: 118 additions & 0 deletions ydb/core/external_sources/external_source_builder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#pragma once

#include "external_source.h"

#include <library/cpp/regex/pcre/regexp.h>
#include <util/generic/set.h>

namespace NKikimr::NExternalSource {

typedef std::function<void(const TString&, const TString&)> TValidator;
typedef std::function<bool(const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&)> TCondition;

///
/// Builder to create an external data source with validations
///
class TExternalSourceBuilder {
public:
struct TAuthHolder {
TString Auth;

// When auth has to be used
TCondition UseCondition;
};

struct TConditionalValidator {
TValidator Validator;

// When validator has to be applied
TCondition ApplyCondition;
};

public:
explicit TExternalSourceBuilder(const TString& name);

~TExternalSourceBuilder() = default;

///
/// Add auth methods which are returned from the "source" only if a condition is true.
/// A condition is applied to source's ddl in @sa IExternalSource::GetAuthMethods
/// call.
///
TExternalSourceBuilder& Auth(const TVector<TString>& authMethods, TCondition condition);

TExternalSourceBuilder& Auth(const TVector<TString>& authMethods) {
return Auth(authMethods, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
return true;
});
}

///
/// Add property which can be in a "source".
///
/// @param name name of a property
/// @param validator validator which is applied to a property from a source's ddl
/// in @sa IExternalSource::ValidateExternalDataSource call
/// @param condition condition that defines to use validator or not, if condition returns true
/// for source's ddl then validator is applied; otherwise, validator is skiped;
/// condition is executed in @sa IExternalSource::ValidateExternalDataSource call
/// before validator
///
TExternalSourceBuilder& Property(const TString name, TValidator validator, TCondition condition);

TExternalSourceBuilder& Properties(const TSet<TString>& properties, TValidator validator, TCondition condition);

TExternalSourceBuilder& HostnamePatterns(const std::vector<TRegExMatch>& patterns);

///
/// Create external data source
///
IExternalSource::TPtr Build();

TExternalSourceBuilder& Property(const TString name, TValidator validator) {
return Property(name, validator, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
return true;
});
}

TExternalSourceBuilder& Property(const TString name) {
return Property(name, [](const TString&, const TString&){});
}

TExternalSourceBuilder& Properties(const TSet<TString>& properties, TValidator validator) {
return Properties(properties, validator, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
return true;
});
}

TExternalSourceBuilder& Properties(const TSet<TString>& properties) {
return Properties(properties, [](const TString&, const TString&){});
}

private:
TString Name_;
std::vector<TAuthHolder> AuthMethodsForCheck_;
std::unordered_map<TString, TConditionalValidator> AvailableProperties_;
std::vector<TRegExMatch> HostnamePatterns_;
};

///
/// Create a condition that returns "true" if a source's ddl has
/// property "p" with value equals to "v"
///
TCondition GetHasSettingCondition(const TString& p, const TString& v);

///
/// Create a validator which check that source's ddl has a property with non empty value
///
TValidator GetRequiredValidator();

///
/// Create a validator which check that source's ddl has a property with a value from list
///
/// @param values list of allowed values
/// @param required allow property without value
///
TValidator GetIsInListValidator(const std::unordered_set<TString>& values, bool required);

} // NKikimr::NExternalSource
Loading
Loading