Skip to content

Commit 79917cc

Browse files
feat: add audit_log and binlog system tables (#268)
1 parent 5af2437 commit 79917cc

35 files changed

Lines changed: 1355 additions & 169 deletions

cmake_modules/SetupCxxFlags.cmake

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,10 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
8181
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unknown-warning-option")
8282
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-constant-logical-operand")
8383
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
84-
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins")
8584
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
8685
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
8786
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion")
8887
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
89-
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins")
9088
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion")
9189
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-variable")
9290
else()
@@ -192,7 +190,6 @@ elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_CXX_COMPILER_ID STRE
192190
# Don't complain about optimization passes that were not possible
193191
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-pass-failed")
194192
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
195-
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins")
196193

197194
if(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
198195
# Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be

include/paimon/defs.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,14 @@ struct PAIMON_EXPORT Options {
374374
/// "aggregation.remove-record-on-delete" - Whether to remove the whole row in aggregation
375375
/// engine when delete records are received. Default value is "false".
376376
static const char AGGREGATION_REMOVE_RECORD_ON_DELETE[];
377+
/// "table-read.sequence-number.enabled" - Whether to include the _SEQUENCE_NUMBER field when
378+
/// reading the audit_log or binlog system tables. This is only valid for primary key tables.
379+
/// Default value is "false".
380+
static const char TABLE_READ_SEQUENCE_NUMBER_ENABLED[];
381+
/// "key-value.sequence-number.enabled" - Whether to include the _SEQUENCE_NUMBER field when
382+
/// reading key-value data. This is an internal option used by AuditLogTable and BinlogTable
383+
/// when table-read.sequence-number.enabled is set to true. Default value is "false".
384+
static const char KEY_VALUE_SEQUENCE_NUMBER_ENABLED[];
377385

378386
/// "scan.timestamp-millis" - Optional timestamp used in case of "from-timestamp" scan mode.
379387
/// For batch sources, produces the latest snapshot earlier than or equal to the timestamp.

include/paimon/utils/special_field_ids.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@ namespace paimon {
2525
class SpecialFieldIds {
2626
protected:
2727
/// System defined constant for field id boundary. Value: INT32_MAX - 10000
28-
static const int32_t CPP_FIELD_ID_END;
28+
static const int32_t CPP_FIELD_ID_END = std::numeric_limits<int32_t>::max() - 10000;
2929

3030
public:
3131
/// Special field ID reserved for sequence number. Value: INT32_MAX - 1
32-
static const int32_t SEQUENCE_NUMBER;
32+
static const int32_t SEQUENCE_NUMBER = std::numeric_limits<int32_t>::max() - 1;
3333
/// Special field ID reserved for value kind. Value: INT32_MAX - 2
34-
static const int32_t VALUE_KIND;
34+
static const int32_t VALUE_KIND = std::numeric_limits<int32_t>::max() - 2;
35+
/// Special field ID reserved for row kind. Value: INT32_MAX - 3
36+
static const int32_t ROW_KIND = std::numeric_limits<int32_t>::max() - 3;
3537
/// Special field ID reserved for row ID. Value: INT32_MAX - 5
36-
static const int32_t ROW_ID;
38+
static const int32_t ROW_ID = std::numeric_limits<int32_t>::max() - 5;
3739

3840
/// Special field ID reserved for index score. Value: CPP_FIELD_ID_END - 1
39-
static const int32_t INDEX_SCORE;
41+
static const int32_t INDEX_SCORE = CPP_FIELD_ID_END - 1;
4042
};
4143

4244
} // namespace paimon

src/paimon/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,10 @@ set(PAIMON_CORE_SRCS
313313
core/table/source/table_read.cpp
314314
core/table/source/table_scan.cpp
315315
core/table/source/data_evolution_batch_scan.cpp
316+
core/table/system/audit_log_system_table.cpp
317+
core/table/system/binlog_system_table.cpp
316318
core/table/system/options_system_table.cpp
317319
core/table/system/system_table.cpp
318-
core/table/system/system_table_read.cpp
319320
core/table/system/system_table_scan.cpp
320321
core/table/system/system_table_schema.cpp
321322
core/tag/tag.cpp
@@ -695,6 +696,7 @@ if(PAIMON_BUILD_TESTS)
695696
core/table/source/split_generator_test.cpp
696697
core/table/source/startup_mode_test.cpp
697698
core/table/source/table_scan_test.cpp
699+
core/table/system/system_table_test.cpp
698700
core/tag/tag_test.cpp
699701
core/utils/branch_manager_test.cpp
700702
core/utils/file_store_path_factory_cache_test.cpp

src/paimon/common/defs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
9494
const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
9595
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
9696
const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete";
97+
const char Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED[] = "table-read.sequence-number.enabled";
98+
const char Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED[] = "key-value.sequence-number.enabled";
9799
const char Options::SCAN_TIMESTAMP_MILLIS[] = "scan.timestamp-millis";
98100
const char Options::SCAN_TIMESTAMP[] = "scan.timestamp";
99101
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";

src/paimon/common/table/special_fields.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ struct SpecialFields {
4545
return data_field;
4646
}
4747

48+
static const DataField& RowKind() {
49+
static const DataField data_field =
50+
DataField(SpecialFieldIds::ROW_KIND, arrow::field("rowkind", arrow::utf8()));
51+
return data_field;
52+
}
53+
4854
static const DataField& RowId() {
4955
static const DataField data_field =
5056
DataField(SpecialFieldIds::ROW_ID, arrow::field("_ROW_ID", arrow::int64()));

src/paimon/common/table/special_fields_test.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ TEST(SpecialFieldsTest, TestValueKindField) {
3535
ASSERT_EQ(SpecialFields::ValueKind().Type()->id(), arrow::Type::INT8);
3636
}
3737

38+
TEST(SpecialFieldsTest, TestRowKindField) {
39+
ASSERT_EQ(SpecialFields::RowKind().Id(), std::numeric_limits<int32_t>::max() - 3);
40+
ASSERT_EQ(SpecialFields::RowKind().Name(), "rowkind");
41+
ASSERT_EQ(SpecialFields::RowKind().Type()->id(), arrow::Type::STRING);
42+
}
43+
3844
TEST(SpecialFieldsTest, TestRowIdField) {
3945
ASSERT_EQ(SpecialFields::RowId().Id(), std::numeric_limits<int32_t>::max() - 5);
4046
ASSERT_EQ(SpecialFields::RowId().Name(), "_ROW_ID");

src/paimon/core/catalog/file_system_catalog.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,18 @@ Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
272272
if (!latest_schema) {
273273
return Status::NotExist(fmt::format("{} not exist", data_identifier.ToString()));
274274
}
275-
PAIMON_ASSIGN_OR_RAISE(
276-
std::shared_ptr<SystemTable> system_table,
277-
SystemTableLoader::Load(system_table_name.value(), fs_, GetTableLocation(identifier),
278-
latest_schema.value()));
279-
return std::make_shared<SystemTableSchema>(system_table->ArrowSchema());
275+
std::map<std::string, std::string> dynamic_options;
276+
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> branch, identifier.GetBranchName());
277+
if (branch) {
278+
dynamic_options[Options::BRANCH] = branch.value();
279+
}
280+
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<SystemTable> system_table,
281+
SystemTableLoader::Load(system_table_name.value(), fs_,
282+
GetTableLocation(data_identifier),
283+
latest_schema.value(), dynamic_options));
284+
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> arrow_schema,
285+
system_table->ArrowSchema());
286+
return std::make_shared<SystemTableSchema>(std::move(arrow_schema));
280287
}
281288
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
282289
TableSchemaExists(identifier));

src/paimon/core/catalog/file_system_catalog_test.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,77 @@ TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) {
210210
"Cannot rename system table");
211211
}
212212

213+
TEST(FileSystemCatalogTest, TestAuditLogAndBinlogSystemTableCatalog) {
214+
std::map<std::string, std::string> options;
215+
options[Options::FILE_SYSTEM] = "local";
216+
options[Options::FILE_FORMAT] = "orc";
217+
ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options));
218+
auto dir = UniqueTestDirectory::Create();
219+
ASSERT_TRUE(dir);
220+
FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str());
221+
ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true));
222+
223+
auto typed_schema =
224+
arrow::schema({arrow::field("pk", arrow::utf8()), arrow::field("v", arrow::int32(), true)});
225+
::ArrowSchema schema;
226+
ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok());
227+
ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema,
228+
/*partition_keys=*/{}, /*primary_keys=*/{"pk"}, options,
229+
/*ignore_if_exists=*/false));
230+
ArrowSchemaRelease(&schema);
231+
232+
Identifier audit_log_identifier("db1", "tbl1$audit_log");
233+
Identifier binlog_identifier("db1", "tbl1$binlog");
234+
ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(audit_log_identifier));
235+
ASSERT_TRUE(exists);
236+
ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(binlog_identifier));
237+
ASSERT_TRUE(exists);
238+
ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown")));
239+
ASSERT_FALSE(exists);
240+
241+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> audit_log_system_schema,
242+
catalog.LoadTableSchema(audit_log_identifier));
243+
ASSERT_TRUE(std::dynamic_pointer_cast<SystemTableSchema>(audit_log_system_schema) != nullptr);
244+
ASSERT_OK_AND_ASSIGN(auto audit_log_c_schema, audit_log_system_schema->GetArrowSchema());
245+
auto audit_log_schema_result = arrow::ImportSchema(audit_log_c_schema.get());
246+
ASSERT_TRUE(audit_log_schema_result.ok()) << audit_log_schema_result.status().ToString();
247+
auto audit_log_schema = audit_log_schema_result.ValueUnsafe();
248+
ASSERT_EQ(audit_log_schema->field_names(), (std::vector<std::string>{"rowkind", "pk", "v"}));
249+
ASSERT_EQ(audit_log_schema->field(0)->type()->id(), arrow::Type::STRING);
250+
ASSERT_EQ(audit_log_schema->field(1)->type()->id(), arrow::Type::STRING);
251+
ASSERT_EQ(audit_log_schema->field(2)->type()->id(), arrow::Type::INT32);
252+
253+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> binlog_system_schema,
254+
catalog.LoadTableSchema(binlog_identifier));
255+
ASSERT_TRUE(std::dynamic_pointer_cast<SystemTableSchema>(binlog_system_schema) != nullptr);
256+
ASSERT_OK_AND_ASSIGN(auto binlog_c_schema, binlog_system_schema->GetArrowSchema());
257+
auto binlog_schema_result = arrow::ImportSchema(binlog_c_schema.get());
258+
ASSERT_TRUE(binlog_schema_result.ok()) << binlog_schema_result.status().ToString();
259+
auto binlog_schema = binlog_schema_result.ValueUnsafe();
260+
ASSERT_EQ(binlog_schema->field_names(), (std::vector<std::string>{"rowkind", "pk", "v"}));
261+
ASSERT_EQ(binlog_schema->field(0)->type()->id(), arrow::Type::STRING);
262+
ASSERT_EQ(binlog_schema->field(1)->type()->id(), arrow::Type::LIST);
263+
ASSERT_EQ(binlog_schema->field(2)->type()->id(), arrow::Type::LIST);
264+
auto binlog_pk_type =
265+
std::dynamic_pointer_cast<arrow::ListType>(binlog_schema->field(1)->type());
266+
auto binlog_v_type =
267+
std::dynamic_pointer_cast<arrow::ListType>(binlog_schema->field(2)->type());
268+
ASSERT_TRUE(binlog_pk_type);
269+
ASSERT_TRUE(binlog_v_type);
270+
ASSERT_EQ(binlog_pk_type->value_type()->id(), arrow::Type::STRING);
271+
ASSERT_EQ(binlog_v_type->value_type()->id(), arrow::Type::INT32);
272+
273+
::ArrowSchema system_create_schema;
274+
ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok());
275+
ASSERT_NOK_WITH_MSG(
276+
catalog.CreateTable(audit_log_identifier, &system_create_schema, {}, {}, options, false),
277+
"Cannot create table for system table");
278+
ArrowSchemaRelease(&system_create_schema);
279+
ASSERT_NOK_WITH_MSG(catalog.DropTable(binlog_identifier, false), "Cannot drop system table");
280+
ASSERT_NOK_WITH_MSG(catalog.RenameTable(audit_log_identifier, Identifier("db1", "tbl2"), false),
281+
"Cannot rename system table");
282+
}
283+
213284
TEST(FileSystemCatalogTest, TestCreateTableWithBlob) {
214285
std::map<std::string, std::string> options;
215286
options[Options::FILE_SYSTEM] = "local";

src/paimon/core/core_options.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,8 @@ struct CoreOptions::Impl {
424424
bool lookup_wait = true;
425425
bool partial_update_remove_record_on_delete = false;
426426
bool aggregation_remove_record_on_delete = false;
427+
bool table_read_sequence_number_enabled = false;
428+
bool key_value_sequence_number_enabled = false;
427429
bool file_index_read_enabled = true;
428430
bool enable_adaptive_prefetch_strategy = true;
429431
bool index_file_in_data_file_dir = false;
@@ -629,6 +631,12 @@ struct CoreOptions::Impl {
629631
// Parse aggregation_remove_record_on_delete
630632
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::AGGREGATION_REMOVE_RECORD_ON_DELETE,
631633
&aggregation_remove_record_on_delete));
634+
// Parse table-read.sequence-number.enabled - expose sequence number in system tables
635+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED,
636+
&table_read_sequence_number_enabled));
637+
// Parse key-value.sequence-number.enabled - internal sequence number read switch
638+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED,
639+
&key_value_sequence_number_enabled));
632640
// Parse partial-update.remove-record-on-sequence-group
633641
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
634642
Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, Options::FIELDS_SEPARATOR,
@@ -1201,6 +1209,14 @@ bool CoreOptions::AggregationRemoveRecordOnDelete() const {
12011209
return impl_->aggregation_remove_record_on_delete;
12021210
}
12031211

1212+
bool CoreOptions::TableReadSequenceNumberEnabled() const {
1213+
return impl_->table_read_sequence_number_enabled;
1214+
}
1215+
1216+
bool CoreOptions::KeyValueSequenceNumberEnabled() const {
1217+
return impl_->key_value_sequence_number_enabled;
1218+
}
1219+
12041220
std::vector<std::string> CoreOptions::GetPartialUpdateRemoveRecordOnSequenceGroup() const {
12051221
return impl_->remove_record_on_sequence_group;
12061222
}

0 commit comments

Comments
 (0)