Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions src/paimon/common/data/blob_descriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,27 @@ namespace paimon {

Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(const std::string& uri,
int64_t offset, int64_t length) {
return Create(kCurrentVersion, uri, offset, length);
}

Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(int8_t version,
const std::string& uri,
int64_t offset, int64_t length) {
if (offset < 0) {
return Status::Invalid(fmt::format("offset {} is less than 0", offset));
}
// length == -1 means it's dynamic length
if (length < -1) {
return Status::Invalid(fmt::format("length {} is less than -1", length));
}
return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(uri, offset, length));
return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(version, uri, offset, length));
}

PAIMON_UNIQUE_PTR<Bytes> BlobDescriptor::Serialize(const std::shared_ptr<MemoryPool>& pool) const {
MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
out.WriteValue<int8_t>(version_);
out.WriteValue<int64_t>(kMagic);
out.WriteValue<int32_t>(static_cast<int32_t>(uri_.size()));

auto uri_bytes = std::make_shared<Bytes>(uri_, pool.get());
Expand All @@ -60,16 +67,40 @@ Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Deserialize(const char*
DataInputStream in(std::move(input_stream));
in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
if (version != CURRENT_VERSION) {
if (version > kCurrentVersion) {
return Status::Invalid(fmt::format(
"Expecting BlobDescriptor version to be {}, but found {}.", CURRENT_VERSION, version));
"Expecting BlobDescriptor version to be less than or equal to {}, but found {}.",
kCurrentVersion, version));
}
if (version > 1) {
PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
if (kMagic != magic) {
return Status::Invalid(fmt::format(
"Invalid BlobDescriptor: missing magic header. Expected magic: {}, but found {}",
kMagic, magic));
}
}
PAIMON_ASSIGN_OR_RAISE(int32_t uri_length, in.ReadValue<int32_t>());
std::string uri(uri_length, '\0');
PAIMON_RETURN_NOT_OK(in.Read(uri.data(), uri.size()));
PAIMON_ASSIGN_OR_RAISE(int64_t offset, in.ReadValue<int64_t>());
PAIMON_ASSIGN_OR_RAISE(int64_t length, in.ReadValue<int64_t>());
return BlobDescriptor::Create(uri, offset, length);
return BlobDescriptor::Create(version, uri, offset, length);
}

Result<bool> BlobDescriptor::IsBlobDescriptor(const char* buffer, uint64_t size) {
if (size < kMinDescriptorLength) {
return false;
Comment thread
lszskye marked this conversation as resolved.
}
auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
DataInputStream in(std::move(input_stream));
Comment thread
lszskye marked this conversation as resolved.
in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
if (version > kCurrentVersion) {
return false;
}
PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
return kMagic == magic;
}

std::string BlobDescriptor::ToString() const {
Expand Down
32 changes: 27 additions & 5 deletions src/paimon/common/data/blob_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,39 @@
#include "paimon/result.h"

namespace paimon {
/// Blob descriptor to describe a blob reference.
/// Memory Layout Description: All multi-byte numerical values (int/long) are stored using Little
/// Endian byte order.
///
/// | Offset | Field Name | Type | Size |
/// |--------|---------------|-----------|------|
/// | 0 | version | byte | 1 |
/// | 1 | magic_number | long | 8 |
/// | 9 | uri_length | int | 4 |
Comment thread
lszskye marked this conversation as resolved.
/// | 13 | uri_bytes | byte[N] | N |
/// | 13 + N | offset | long | 8 |
/// | 21 + N | length | long | 8 |

class BlobDescriptor {
public:
static Result<std::unique_ptr<BlobDescriptor>> Create(const std::string& uri, int64_t offset,
int64_t length);

~BlobDescriptor() = default;
static Result<std::unique_ptr<BlobDescriptor>> Create(int8_t version, const std::string& uri,
int64_t offset, int64_t length);

static Result<std::unique_ptr<BlobDescriptor>> Deserialize(const char* buffer, uint64_t size);

static Result<bool> IsBlobDescriptor(const char* buffer, uint64_t size);

PAIMON_UNIQUE_PTR<Bytes> Serialize(const std::shared_ptr<MemoryPool>& pool) const;

std::string ToString() const;

int8_t Version() const {
return version_;
}

const std::string& Uri() const {
return uri_;
}
Expand All @@ -52,13 +71,16 @@ class BlobDescriptor {
}

private:
BlobDescriptor(const std::string& uri, int64_t offset, int64_t length)
: uri_(uri), offset_(offset), length_(length) {}
BlobDescriptor(int8_t version, const std::string& uri, int64_t offset, int64_t length)
: version_(version), uri_(uri), offset_(offset), length_(length) {}

private:
static constexpr int8_t CURRENT_VERSION = 1;
static constexpr int64_t kMagic = 0x424C4F4244455343l;
/// one byte for version, eight bytes for magic number.
static constexpr uint64_t kMinDescriptorLength = 9;
static constexpr int8_t kCurrentVersion = 2;

const int8_t version_ = CURRENT_VERSION;
const int8_t version_ = kCurrentVersion;
std::string uri_;
int64_t offset_ = 0;
int64_t length_ = -1;
Expand Down
118 changes: 100 additions & 18 deletions src/paimon/common/data/blob_descriptor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,11 @@ class BlobDescriptorTest : public testing::Test {
pool_ = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(descriptor_,
BlobDescriptor::Create("test_uri", /*offset=*/1024, /*length=*/2048));

std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
java_serialized_ = std::string(bytes.data(), bytes.size());
}

private:
std::shared_ptr<MemoryPool> pool_;
std::unique_ptr<BlobDescriptor> descriptor_;
std::string java_serialized_;
};

TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
Expand All @@ -50,16 +45,35 @@ TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
ASSERT_EQ(descriptor_->Length(), 2048);
}

TEST_F(BlobDescriptorTest, TestSerializeDeserializeAndCompatibilityWithJava) {
auto serialized = descriptor_->Serialize(pool_);
std::string serialized_str(serialized->data(), serialized->size());
ASSERT_EQ(serialized_str, java_serialized_);
TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion1) {
std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
auto java_serialized = std::string(bytes.data(), bytes.size());

ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(),
java_serialized.size()));
ASSERT_EQ(descriptor->Version(), (int8_t)1);
ASSERT_EQ(descriptor->Uri(), "test_uri");
ASSERT_EQ(descriptor->Offset(), 1024);
ASSERT_EQ(descriptor->Length(), 2048);
}

ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
BlobDescriptor::Deserialize(serialized->data(), serialized->size()));
ASSERT_EQ(restored_descriptor->Uri(), "test_uri");
ASSERT_EQ(restored_descriptor->Offset(), 1024);
ASSERT_EQ(restored_descriptor->Length(), 2048);
TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion2) {
std::vector<char> bytes = {2, 67, 83, 69, 68, 66, 79, 76, 66, 8, 0, 0, 0,
116, 101, 115, 116, 95, 117, 114, 105, 0, 4, 0, 0, 0,
0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
auto java_serialized = std::string(bytes.data(), bytes.size());

ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(),
java_serialized.size()));
ASSERT_EQ(descriptor->Version(), (int8_t)2);
ASSERT_EQ(descriptor->Uri(), "test_uri");
ASSERT_EQ(descriptor->Offset(), 1024);
ASSERT_EQ(descriptor->Length(), 2048);

PAIMON_UNIQUE_PTR<Bytes> cpp_serialized = descriptor->Serialize(pool_);
auto cpp_serialized_string = std::string(cpp_serialized->data(), cpp_serialized->size());
ASSERT_EQ(cpp_serialized_string, java_serialized);
}
Comment thread
lszskye marked this conversation as resolved.

TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithEmptyUri) {
Expand Down Expand Up @@ -92,9 +106,10 @@ TEST_F(BlobDescriptorTest, TestInvalidParameters) {
ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
BlobDescriptor::Create(/*uri=*/"test", /*offset=*/1, /*length=*/2));
auto serialized = descriptor->Serialize(pool_);
(*serialized)[0] = '\x02';
ASSERT_NOK_WITH_MSG(BlobDescriptor::Deserialize(serialized->data(), serialized->size()),
"Expecting BlobDescriptor version to be 1, but found 2");
(*serialized)[0] = '\x03';
ASSERT_NOK_WITH_MSG(
BlobDescriptor::Deserialize(serialized->data(), serialized->size()),
"Expecting BlobDescriptor version to be less than or equal to 2, but found 3");
}
// Test deserialize invalid buffer size
{
Expand All @@ -118,7 +133,7 @@ TEST_F(BlobDescriptorTest, TestInvalidParameters) {
TEST_F(BlobDescriptorTest, TestToString) {
std::string debug_str = descriptor_->ToString();
ASSERT_FALSE(debug_str.empty());
ASSERT_TRUE(debug_str.find("version=1") != std::string::npos);
ASSERT_TRUE(debug_str.find("version=2") != std::string::npos);
ASSERT_TRUE(debug_str.find("uri='test_uri'") != std::string::npos);
ASSERT_TRUE(debug_str.find("offset=1024") != std::string::npos);
ASSERT_TRUE(debug_str.find("length=2048") != std::string::npos);
Expand All @@ -140,4 +155,71 @@ TEST_F(BlobDescriptorTest, TestRoundTripConsistency) {
ASSERT_EQ(second_restored->Length(), 2048);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithValidDescriptor) {
// A valid v2 descriptor should be recognized
auto serialized = descriptor_->Serialize(pool_);
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
ASSERT_TRUE(result);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithTooShortBuffer) {
// Buffer shorter than 9 bytes should return false
std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C};
ASSERT_OK_AND_ASSIGN(
bool result, BlobDescriptor::IsBlobDescriptor(short_buffer.data(), short_buffer.size()));
ASSERT_FALSE(result);

// Empty buffer
ASSERT_OK_AND_ASSIGN(bool empty_result, BlobDescriptor::IsBlobDescriptor(nullptr, 0));
ASSERT_FALSE(empty_result);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithFutureVersion) {
// Version > CURRENT_VERSION should return false (not an error)
auto serialized = descriptor_->Serialize(pool_);
(*serialized)[0] = '\x03'; // set version to 3 (> CURRENT_VERSION)
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithWrongMagic) {
// Wrong magic number should return false
auto serialized = descriptor_->Serialize(pool_);
// Corrupt the magic bytes (bytes 1-8)
(*serialized)[1] = '\x00';
(*serialized)[2] = '\x00';
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithRandomData) {
// Random data that doesn't match blob descriptor format
std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(random_data.data(), random_data.size()));
ASSERT_FALSE(result);
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithVersion1Data) {
// v1 data: version=1, followed by uri_length (not magic), should return false
// because reading bytes 1-8 as magic won't match MAGIC constant
std::vector<char> v1_bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(v1_bytes.data(), v1_bytes.size()));
ASSERT_FALSE(result);
Comment thread
lszskye marked this conversation as resolved.
}

TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithExactly9Bytes) {
// Exactly 9 bytes with valid version and magic should return true
// version=2, magic=0x424C4F4244455343 in little-endian
std::vector<char> minimal = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C, 0x42};
ASSERT_OK_AND_ASSIGN(bool result,
BlobDescriptor::IsBlobDescriptor(minimal.data(), minimal.size()));
ASSERT_TRUE(result);
}

} // namespace paimon::test
Loading
Loading