Skip to content

Commit e283d60

Browse files
committed
fix ci
1 parent 2d8e7cc commit e283d60

34 files changed

Lines changed: 468 additions & 26 deletions

src/paimon/common/data/blob_descriptor.cpp

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,27 @@ namespace paimon {
3131

3232
Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(const std::string& uri,
3333
int64_t offset, int64_t length) {
34+
return Create(CURRENT_VERSION, uri, offset, length);
35+
}
36+
37+
Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(int64_t version,
38+
const std::string& uri,
39+
int64_t offset, int64_t length) {
3440
if (offset < 0) {
3541
return Status::Invalid(fmt::format("offset {} is less than 0", offset));
3642
}
3743
// length == -1 means it's dynamic length
3844
if (length < -1) {
3945
return Status::Invalid(fmt::format("length {} is less than -1", length));
4046
}
41-
return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(uri, offset, length));
47+
return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(version, uri, offset, length));
4248
}
4349

4450
PAIMON_UNIQUE_PTR<Bytes> BlobDescriptor::Serialize(const std::shared_ptr<MemoryPool>& pool) const {
4551
MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
4652
out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
4753
out.WriteValue<int8_t>(version_);
54+
out.WriteValue<int64_t>(MAGIC);
4855
out.WriteValue<int32_t>(static_cast<int32_t>(uri_.size()));
4956

5057
auto uri_bytes = std::make_shared<Bytes>(uri_, pool.get());
@@ -60,16 +67,40 @@ Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Deserialize(const char*
6067
DataInputStream in(std::move(input_stream));
6168
in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
6269
PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
63-
if (version != CURRENT_VERSION) {
70+
if (version > CURRENT_VERSION) {
6471
return Status::Invalid(fmt::format(
65-
"Expecting BlobDescriptor version to be {}, but found {}.", CURRENT_VERSION, version));
72+
"Expecting BlobDescriptor version to be less than or equal to {}, but found {}.",
73+
CURRENT_VERSION, version));
74+
}
75+
if (version > 1) {
76+
PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
77+
if (MAGIC != magic) {
78+
return Status::Invalid(
79+
"Invalid BlobDescriptor: missing magic header. Expected magic: {}, but found {}",
80+
MAGIC, magic);
81+
}
6682
}
6783
PAIMON_ASSIGN_OR_RAISE(int32_t uri_length, in.ReadValue<int32_t>());
6884
std::string uri(uri_length, '\0');
6985
PAIMON_RETURN_NOT_OK(in.Read(uri.data(), uri.size()));
7086
PAIMON_ASSIGN_OR_RAISE(int64_t offset, in.ReadValue<int64_t>());
7187
PAIMON_ASSIGN_OR_RAISE(int64_t length, in.ReadValue<int64_t>());
72-
return BlobDescriptor::Create(uri, offset, length);
88+
return BlobDescriptor::Create(version, uri, offset, length);
89+
}
90+
91+
Result<bool> BlobDescriptor::IsBlobDescriptor(const char* buffer, uint64_t size) {
92+
if (size < 9) {
93+
return false;
94+
}
95+
auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
96+
DataInputStream in(std::move(input_stream));
97+
in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
98+
PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
99+
if (version > CURRENT_VERSION) {
100+
return false;
101+
}
102+
PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
103+
return MAGIC == magic;
73104
}
74105

75106
std::string BlobDescriptor::ToString() const {

src/paimon/common/data/blob_descriptor.h

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,39 @@
2525
#include "paimon/result.h"
2626

2727
namespace paimon {
28+
/// Blob descriptor to describe a blob reference.
29+
/// Memory Layout Description: All multi-byte numerical values (int/long) are stored using Little
30+
/// Endian byte order.
31+
///
32+
/// | Offset | Field Name | Type | Size |
33+
/// |--------|---------------|-----------|------|
34+
/// | 0 | version | byte | 1 |
35+
/// | 1 | magic_number | long | 8 |
36+
/// | 9 | uri_length | int | 4 |
37+
/// | 13 | uri_bytes | byte[N] | N |
38+
/// | 13 + N | offset | long | 8 |
39+
/// | 21 + N | length | long | 8 |
2840

2941
class BlobDescriptor {
3042
public:
3143
static Result<std::unique_ptr<BlobDescriptor>> Create(const std::string& uri, int64_t offset,
3244
int64_t length);
3345

34-
~BlobDescriptor() = default;
46+
static Result<std::unique_ptr<BlobDescriptor>> Create(int64_t version, const std::string& uri,
47+
int64_t offset, int64_t length);
3548

3649
static Result<std::unique_ptr<BlobDescriptor>> Deserialize(const char* buffer, uint64_t size);
3750

51+
static Result<bool> IsBlobDescriptor(const char* buffer, uint64_t size);
52+
3853
PAIMON_UNIQUE_PTR<Bytes> Serialize(const std::shared_ptr<MemoryPool>& pool) const;
3954

4055
std::string ToString() const;
4156

57+
int8_t Version() const {
58+
return version_;
59+
}
60+
4261
const std::string& Uri() const {
4362
return uri_;
4463
}
@@ -52,11 +71,12 @@ class BlobDescriptor {
5271
}
5372

5473
private:
55-
BlobDescriptor(const std::string& uri, int64_t offset, int64_t length)
56-
: uri_(uri), offset_(offset), length_(length) {}
74+
BlobDescriptor(int64_t version, const std::string& uri, int64_t offset, int64_t length)
75+
: version_(version), uri_(uri), offset_(offset), length_(length) {}
5776

5877
private:
59-
static constexpr int8_t CURRENT_VERSION = 1;
78+
static constexpr int64_t MAGIC = 0x424C4F4244455343l;
79+
static constexpr int8_t CURRENT_VERSION = 2;
6080

6181
const int8_t version_ = CURRENT_VERSION;
6282
std::string uri_;

src/paimon/common/data/blob_descriptor_test.cpp

Lines changed: 100 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,11 @@ class BlobDescriptorTest : public testing::Test {
3232
pool_ = GetDefaultPool();
3333
ASSERT_OK_AND_ASSIGN(descriptor_,
3434
BlobDescriptor::Create("test_uri", /*offset=*/1024, /*length=*/2048));
35-
36-
std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
37-
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
38-
java_serialized_ = std::string(bytes.data(), bytes.size());
3935
}
4036

4137
private:
4238
std::shared_ptr<MemoryPool> pool_;
4339
std::unique_ptr<BlobDescriptor> descriptor_;
44-
std::string java_serialized_;
4540
};
4641

4742
TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
@@ -50,16 +45,35 @@ TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
5045
ASSERT_EQ(descriptor_->Length(), 2048);
5146
}
5247

53-
TEST_F(BlobDescriptorTest, TestSerializeDeserializeAndCompatibilityWithJava) {
54-
auto serialized = descriptor_->Serialize(pool_);
55-
std::string serialized_str(serialized->data(), serialized->size());
56-
ASSERT_EQ(serialized_str, java_serialized_);
48+
TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion1) {
49+
std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
50+
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
51+
auto java_serialized = std::string(bytes.data(), bytes.size());
52+
53+
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(),
54+
java_serialized.size()));
55+
ASSERT_EQ(descriptor->Version(), (int8_t)1);
56+
ASSERT_EQ(descriptor->Uri(), "test_uri");
57+
ASSERT_EQ(descriptor->Offset(), 1024);
58+
ASSERT_EQ(descriptor->Length(), 2048);
59+
}
5760

58-
ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
59-
BlobDescriptor::Deserialize(serialized->data(), serialized->size()));
60-
ASSERT_EQ(restored_descriptor->Uri(), "test_uri");
61-
ASSERT_EQ(restored_descriptor->Offset(), 1024);
62-
ASSERT_EQ(restored_descriptor->Length(), 2048);
61+
TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion2) {
62+
std::vector<char> bytes = {2, 67, 83, 69, 68, 66, 79, 76, 66, 8, 0, 0, 0,
63+
116, 101, 115, 116, 95, 117, 114, 105, 0, 4, 0, 0, 0,
64+
0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
65+
auto java_serialized = std::string(bytes.data(), bytes.size());
66+
67+
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(),
68+
java_serialized.size()));
69+
ASSERT_EQ(descriptor->Version(), (int8_t)2);
70+
ASSERT_EQ(descriptor->Uri(), "test_uri");
71+
ASSERT_EQ(descriptor->Offset(), 1024);
72+
ASSERT_EQ(descriptor->Length(), 2048);
73+
74+
PAIMON_UNIQUE_PTR<Bytes> cpp_serialized = descriptor->Serialize(pool_);
75+
ASSERT_EQ(cpp_serialized->size(), bytes.size());
76+
ASSERT_EQ(*(cpp_serialized->data()), *(bytes.data()));
6377
}
6478

6579
TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithEmptyUri) {
@@ -92,9 +106,10 @@ TEST_F(BlobDescriptorTest, TestInvalidParameters) {
92106
ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
93107
BlobDescriptor::Create(/*uri=*/"test", /*offset=*/1, /*length=*/2));
94108
auto serialized = descriptor->Serialize(pool_);
95-
(*serialized)[0] = '\x02';
96-
ASSERT_NOK_WITH_MSG(BlobDescriptor::Deserialize(serialized->data(), serialized->size()),
97-
"Expecting BlobDescriptor version to be 1, but found 2");
109+
(*serialized)[0] = '\x03';
110+
ASSERT_NOK_WITH_MSG(
111+
BlobDescriptor::Deserialize(serialized->data(), serialized->size()),
112+
"Expecting BlobDescriptor version to be less than or equal to 2, but found 3");
98113
}
99114
// Test deserialize invalid buffer size
100115
{
@@ -118,7 +133,7 @@ TEST_F(BlobDescriptorTest, TestInvalidParameters) {
118133
TEST_F(BlobDescriptorTest, TestToString) {
119134
std::string debug_str = descriptor_->ToString();
120135
ASSERT_FALSE(debug_str.empty());
121-
ASSERT_TRUE(debug_str.find("version=1") != std::string::npos);
136+
ASSERT_TRUE(debug_str.find("version=2") != std::string::npos);
122137
ASSERT_TRUE(debug_str.find("uri='test_uri'") != std::string::npos);
123138
ASSERT_TRUE(debug_str.find("offset=1024") != std::string::npos);
124139
ASSERT_TRUE(debug_str.find("length=2048") != std::string::npos);
@@ -140,4 +155,71 @@ TEST_F(BlobDescriptorTest, TestRoundTripConsistency) {
140155
ASSERT_EQ(second_restored->Length(), 2048);
141156
}
142157

158+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithValidDescriptor) {
159+
// A valid v2 descriptor should be recognized
160+
auto serialized = descriptor_->Serialize(pool_);
161+
ASSERT_OK_AND_ASSIGN(bool result,
162+
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
163+
ASSERT_TRUE(result);
164+
}
165+
166+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithTooShortBuffer) {
167+
// Buffer shorter than 9 bytes should return false
168+
std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C};
169+
ASSERT_OK_AND_ASSIGN(
170+
bool result, BlobDescriptor::IsBlobDescriptor(short_buffer.data(), short_buffer.size()));
171+
ASSERT_FALSE(result);
172+
173+
// Empty buffer
174+
ASSERT_OK_AND_ASSIGN(bool empty_result, BlobDescriptor::IsBlobDescriptor(nullptr, 0));
175+
ASSERT_FALSE(empty_result);
176+
}
177+
178+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithFutureVersion) {
179+
// Version > CURRENT_VERSION should return false (not an error)
180+
auto serialized = descriptor_->Serialize(pool_);
181+
(*serialized)[0] = '\x03'; // set version to 3 (> CURRENT_VERSION)
182+
ASSERT_OK_AND_ASSIGN(bool result,
183+
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
184+
ASSERT_FALSE(result);
185+
}
186+
187+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithWrongMagic) {
188+
// Wrong magic number should return false
189+
auto serialized = descriptor_->Serialize(pool_);
190+
// Corrupt the magic bytes (bytes 1-8)
191+
(*serialized)[1] = '\x00';
192+
(*serialized)[2] = '\x00';
193+
ASSERT_OK_AND_ASSIGN(bool result,
194+
BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size()));
195+
ASSERT_FALSE(result);
196+
}
197+
198+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithRandomData) {
199+
// Random data that doesn't match blob descriptor format
200+
std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
201+
ASSERT_OK_AND_ASSIGN(bool result,
202+
BlobDescriptor::IsBlobDescriptor(random_data.data(), random_data.size()));
203+
ASSERT_FALSE(result);
204+
}
205+
206+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithVersion1Data) {
207+
// v1 data: version=1, followed by uri_length (not magic), should return false
208+
// because reading bytes 1-8 as magic won't match MAGIC constant
209+
std::vector<char> v1_bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
210+
0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0};
211+
ASSERT_OK_AND_ASSIGN(bool result,
212+
BlobDescriptor::IsBlobDescriptor(v1_bytes.data(), v1_bytes.size()));
213+
ASSERT_FALSE(result);
214+
}
215+
216+
TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithExactly9Bytes) {
217+
// Exactly 9 bytes with valid version and magic should return true
218+
// version=2, magic=0x424C4F4244455343 in little-endian
219+
std::vector<char> minimal = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C, 0x42};
220+
ASSERT_OK_AND_ASSIGN(bool result,
221+
BlobDescriptor::IsBlobDescriptor(minimal.data(), minimal.size()));
222+
ASSERT_TRUE(result);
223+
}
224+
143225
} // namespace paimon::test

0 commit comments

Comments
 (0)