diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index efdf2e80..e6f9619b 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -26,8 +26,8 @@ static void check(const char* step, const fluss::Result& r) { if (!r.Ok()) { - std::cerr << step << " failed: code=" << r.error_code - << " msg=" << r.error_message << std::endl; + std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message + << std::endl; std::exit(1); } } @@ -54,15 +54,15 @@ int main() { // 3) Schema with scalar and temporal columns auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("score", fluss::DataType::Float()) - .AddColumn("age", fluss::DataType::Int()) - .AddColumn("event_date", fluss::DataType::Date()) - .AddColumn("event_time", fluss::DataType::Time()) - .AddColumn("created_at", fluss::DataType::Timestamp()) - .AddColumn("updated_at", fluss::DataType::TimestampLtz()) - .Build(); + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::Float()) + .AddColumn("age", fluss::DataType::Int()) + .AddColumn("event_date", fluss::DataType::Date()) + .AddColumn("event_time", fluss::DataType::Time()) + .AddColumn("created_at", fluss::DataType::Timestamp()) + .AddColumn("updated_at", fluss::DataType::TimestampLtz()) + .Build(); auto descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) @@ -94,17 +94,13 @@ int main() { auto tp_now = std::chrono::system_clock::now(); std::vector rows = { - {1, "Alice", 95.2f, 25, - fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45), - fluss::Timestamp::FromTimePoint(tp_now), - fluss::Timestamp::FromMillis(1718467200000)}, - {2, "Bob", 87.2f, 30, - fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0), + {1, "Alice", 95.2f, 25, fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45), + fluss::Timestamp::FromTimePoint(tp_now), fluss::Timestamp::FromMillis(1718467200000)}, + {2, "Bob", 87.2f, 30, fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0), fluss::Timestamp::FromMillis(1735689600000), fluss::Timestamp::FromMillisNanos(1735689600000, 500000)}, - {3, "Charlie", 92.1f, 35, - fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59), - fluss::Timestamp::FromMillis(946684799999), + {3, "Charlie", 92.1f, 35, fluss::Date::FromYMD(1999, 12, 31), + fluss::Time::FromHMS(23, 59, 59), fluss::Timestamp::FromMillis(946684799999), fluss::Timestamp::FromMillis(946684799999)}, }; @@ -160,23 +156,23 @@ int main() { const auto& f = rec.row.fields; if (f[4].type != fluss::DatumType::Date) { - std::cerr << "ERROR: field 4 expected Date, got " - << static_cast(f[4].type) << std::endl; + std::cerr << "ERROR: field 4 expected Date, got " << static_cast(f[4].type) + << std::endl; scan_ok = false; } if (f[5].type != fluss::DatumType::Time) { - std::cerr << "ERROR: field 5 expected Time, got " - << static_cast(f[5].type) << std::endl; + std::cerr << "ERROR: field 5 expected Time, got " << static_cast(f[5].type) + << std::endl; scan_ok = false; } if (f[6].type != fluss::DatumType::TimestampNtz) { - std::cerr << "ERROR: field 6 expected TimestampNtz, got " - << static_cast(f[6].type) << std::endl; + std::cerr << "ERROR: field 6 expected TimestampNtz, got " << static_cast(f[6].type) + << std::endl; scan_ok = false; } if (f[7].type != fluss::DatumType::TimestampLtz) { - std::cerr << "ERROR: field 7 expected TimestampLtz, got " - << static_cast(f[7].type) << std::endl; + std::cerr << "ERROR: field 7 expected TimestampLtz, got " << static_cast(f[7].type) + << std::endl; scan_ok = false; } @@ -185,15 +181,11 @@ int main() { auto ts_ntz = f[6].GetTimestamp(); auto ts_ltz = f[7].GetTimestamp(); - std::cout << " id=" << f[0].i32_val - << " name=" << f[1].string_val - << " score=" << f[2].f32_val - << " age=" << f[3].i32_val - << " date=" << date.Year() << "-" << date.Month() << "-" << date.Day() - << " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second() - << " ts_ntz=" << ts_ntz.epoch_millis - << " ts_ltz=" << ts_ltz.epoch_millis - << "+" << ts_ltz.nano_of_millisecond << "ns" + std::cout << " id=" << f[0].i32_val << " name=" << f[1].string_val + << " score=" << f[2].f32_val << " age=" << f[3].i32_val << " date=" << date.Year() + << "-" << date.Month() << "-" << date.Day() << " time=" << time.Hour() << ":" + << time.Minute() << ":" << time.Second() << " ts_ntz=" << ts_ntz.epoch_millis + << " ts_ltz=" << ts_ltz.epoch_millis << "+" << ts_ltz.nano_of_millisecond << "ns" << std::endl; } @@ -237,9 +229,8 @@ int main() { } auto ts = f[1].GetTimestamp(); - std::cout << " id=" << f[0].i32_val - << " updated_at=" << ts.epoch_millis - << "+" << ts.nano_of_millisecond << "ns" << std::endl; + std::cout << " id=" << f[0].i32_val << " updated_at=" << ts.epoch_millis << "+" + << ts.nano_of_millisecond << "ns" << std::endl; } if (scan_ok) { @@ -260,19 +251,16 @@ int main() { std::unordered_map earliest_offsets; check("list_earliest_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::Earliest(), - earliest_offsets)); + admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetQuery::Earliest(), + earliest_offsets)); std::cout << "Earliest offsets:" << std::endl; for (const auto& [bucket_id, offset] : earliest_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; } std::unordered_map latest_offsets; - check("list_latest_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::Latest(), - latest_offsets)); + check("list_latest_offsets", admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::Latest(), latest_offsets)); std::cout << "Latest offsets:" << std::endl; for (const auto& [bucket_id, offset] : latest_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; @@ -280,14 +268,14 @@ int main() { auto now = std::chrono::system_clock::now(); auto one_hour_ago = now - std::chrono::hours(1); - auto timestamp_ms = std::chrono::duration_cast( - one_hour_ago.time_since_epoch()).count(); + auto timestamp_ms = + std::chrono::duration_cast(one_hour_ago.time_since_epoch()) + .count(); std::unordered_map timestamp_offsets; check("list_timestamp_offsets", admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::FromTimestamp(timestamp_ms), - timestamp_offsets)); + fluss::OffsetQuery::FromTimestamp(timestamp_ms), timestamp_offsets)); std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl; for (const auto& [bucket_id, offset] : timestamp_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; @@ -301,8 +289,8 @@ int main() { std::vector subscriptions; for (const auto& [bucket_id, offset] : earliest_offsets) { subscriptions.push_back({bucket_id, offset}); - std::cout << "Preparing subscription: bucket=" << bucket_id - << ", offset=" << offset << std::endl; + std::cout << "Preparing subscription: bucket=" << bucket_id << ", offset=" << offset + << std::endl; } check("subscribe_buckets", batch_scanner.Subscribe(subscriptions)); @@ -311,12 +299,12 @@ int main() { fluss::ScanRecords batch_records; check("poll_batch", batch_scanner.Poll(5000, batch_records)); - std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl; + std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" + << std::endl; for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) { const auto& rec = batch_records[i]; std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id - << ", offset=" << rec.offset - << ", timestamp=" << rec.timestamp << std::endl; + << ", offset=" << rec.offset << ", timestamp=" << rec.timestamp << std::endl; } if (batch_records.Size() > 5) { std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl; @@ -339,7 +327,8 @@ int main() { for (size_t i = 0; i < arrow_batches.Size(); ++i) { const auto& batch = arrow_batches[i]; if (batch->Available()) { - std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl; + std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() + << " rows" << std::endl; } else { std::cout << " Batch " << i << ": not available" << std::endl; } @@ -350,20 +339,25 @@ int main() { fluss::LogScanner projected_arrow_scanner; check("new_record_batch_log_scanner_with_projection", - table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner)); + table.NewScan() + .Project(projected_columns) + .CreateRecordBatchScanner(projected_arrow_scanner)); for (int b = 0; b < buckets; ++b) { check("subscribe_projected_arrow", projected_arrow_scanner.Subscribe(b, 0)); } fluss::ArrowRecordBatches projected_arrow_batches; - check("poll_projected_record_batch", projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches)); + check("poll_projected_record_batch", + projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches)); - std::cout << "Polled " << projected_arrow_batches.Size() << " projected Arrow record batches" << std::endl; + std::cout << "Polled " << projected_arrow_batches.Size() << " projected Arrow record batches" + << std::endl; for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) { const auto& batch = projected_arrow_batches[i]; if (batch->Available()) { - std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl; + std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() + << " rows" << std::endl; } else { std::cout << " Batch " << i << ": not available" << std::endl; } @@ -380,8 +374,8 @@ int main() { // Create schema with decimal columns auto decimal_schema = fluss::Schema::NewBuilder() .AddColumn("id", fluss::DataType::Int()) - .AddColumn("price", fluss::DataType::Decimal(10, 2)) // compact - .AddColumn("amount", fluss::DataType::Decimal(28, 8)) // i128 + .AddColumn("price", fluss::DataType::Decimal(10, 2)) // compact + .AddColumn("amount", fluss::DataType::Decimal(28, 8)) // i128 .Build(); auto decimal_descriptor = fluss::TableDescriptor::NewBuilder() @@ -403,8 +397,8 @@ int main() { { fluss::GenericRow row; row.SetInt32(0, 1); - row.SetDecimal(1, "123.45"); // Rust knows DECIMAL(10,2) - row.SetDecimal(2, "1.00000000"); // Rust knows DECIMAL(28,8) + row.SetDecimal(1, "123.45"); // Rust knows DECIMAL(10,2) + row.SetDecimal(2, "1.00000000"); // Rust knows DECIMAL(28,8) check("append_decimal", decimal_writer.Append(row)); } { @@ -436,13 +430,134 @@ int main() { for (const auto& rec : decimal_records) { auto& price = rec.row.fields[1]; auto& amount = rec.row.fields[2]; - std::cout << " id=" << rec.row.fields[0].i32_val - << " price=" << price.DecimalToString() + std::cout << " id=" << rec.row.fields[0].i32_val << " price=" << price.DecimalToString() << " (raw=" << price.i64_val << ")" - << " amount=" << amount.DecimalToString() - << " is_decimal=" << price.IsDecimal() + << " amount=" << amount.DecimalToString() << " is_decimal=" << price.IsDecimal() << std::endl; } + // 13) Partitioned table example + std::cout << "\n=== Partitioned Table Example ===" << std::endl; + + fluss::TablePath partitioned_table_path("fluss", "partitioned_table_cpp_v1"); + + // Drop if exists + check("drop_partitioned_table_if_exists", admin.DropTable(partitioned_table_path, true)); + + // Create a partitioned table with a "region" partition key + auto partitioned_schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("region", fluss::DataType::String()) + .AddColumn("value", fluss::DataType::BigInt()) + .Build(); + + auto partitioned_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(partitioned_schema) + .SetPartitionKeys({"region"}) + .SetBucketCount(1) + .SetComment("cpp partitioned table example") + .Build(); + + check("create_partitioned_table", + admin.CreateTable(partitioned_table_path, partitioned_descriptor, false)); + std::cout << "Created partitioned table" << std::endl; + + // Create partitions + check("create_partition_US", + admin.CreatePartition(partitioned_table_path, {{"region", "US"}}, true)); + check("create_partition_EU", + admin.CreatePartition(partitioned_table_path, {{"region", "EU"}}, true)); + std::cout << "Created partitions: US, EU" << std::endl; + + // List partitions + std::vector partition_infos; + check("list_partition_infos", + admin.ListPartitionInfos(partitioned_table_path, partition_infos)); + for (const auto& pi : partition_infos) { + std::cout << " Partition: " << pi.partition_name << " (id=" << pi.partition_id << ")" + << std::endl; + } + + // Write data to partitioned table + fluss::Table partitioned_table; + check("get_partitioned_table", conn.GetTable(partitioned_table_path, partitioned_table)); + + fluss::AppendWriter partitioned_writer; + check("new_partitioned_writer", partitioned_table.NewAppendWriter(partitioned_writer)); + + struct PartitionedRow { + int id; + const char* region; + int64_t value; + }; + + std::vector partitioned_rows = { + {1, "US", 100}, + {2, "US", 200}, + {3, "EU", 300}, + {4, "EU", 400}, + }; + + for (const auto& r : partitioned_rows) { + fluss::GenericRow row; + row.SetInt32(0, r.id); + row.SetString(1, r.region); + row.SetInt64(2, r.value); + check("append_partitioned", partitioned_writer.Append(row)); + } + check("flush_partitioned", partitioned_writer.Flush()); + std::cout << "Wrote " << partitioned_rows.size() << " rows to partitioned table" << std::endl; + + // 13.1) subscribe_partition_buckets: subscribe to each partition individually + std::cout << "\n--- Testing SubscribePartitionBuckets ---" << std::endl; + fluss::LogScanner partition_scanner; + check("new_partition_scanner", partitioned_table.NewScan().CreateLogScanner(partition_scanner)); + + for (const auto& pi : partition_infos) { + check("subscribe_partition_buckets", + partition_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0)); + std::cout << "Subscribed to partition " << pi.partition_name << std::endl; + } + + fluss::ScanRecords partition_records; + check("poll_partitioned", partition_scanner.Poll(5000, partition_records)); + std::cout << "Scanned " << partition_records.Size() << " records from partitioned table" + << std::endl; + for (size_t i = 0; i < partition_records.Size(); ++i) { + const auto& rec = partition_records[i]; + std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val + << ", region=" << rec.row.fields[1].string_val + << ", value=" << rec.row.fields[2].i64_val << std::endl; + } + + // 13.2) subscribe_partition_buckets: batch subscribe to all partitions at once + std::cout << "\n--- Testing SubscribePartitionBuckets (batch) ---" << std::endl; + fluss::LogScanner partition_batch_scanner; + check("new_partition_batch_scanner", + partitioned_table.NewScan().CreateLogScanner(partition_batch_scanner)); + + std::vector partition_subs; + for (const auto& pi : partition_infos) { + partition_subs.push_back({pi.partition_id, 0, 0}); + } + check("subscribe_partition_buckets", + partition_batch_scanner.SubscribePartitionBuckets(partition_subs)); + std::cout << "Batch subscribed to " << partition_subs.size() << " partition+bucket combinations" + << std::endl; + + fluss::ScanRecords partition_batch_records; + check("poll_partition_batch", partition_batch_scanner.Poll(5000, partition_batch_records)); + std::cout << "Scanned " << partition_batch_records.Size() + << " records from batch partition subscription" << std::endl; + for (size_t i = 0; i < partition_batch_records.Size(); ++i) { + const auto& rec = partition_batch_records[i]; + std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val + << ", region=" << rec.row.fields[1].string_val + << ", value=" << rec.row.fields[2].i64_val << std::endl; + } + + // Cleanup + check("drop_partitioned_table", admin.DropTable(partitioned_table_path, true)); + std::cout << "Dropped partitioned table" << std::endl; return 0; } diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 8125c49c..3a104455 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -28,18 +28,18 @@ // Forward declare Arrow classes to avoid including heavy Arrow headers in header namespace arrow { - class RecordBatch; +class RecordBatch; } namespace fluss { namespace ffi { - struct Connection; - struct Admin; - struct Table; - struct AppendWriter; - struct WriteResult; - struct LogScanner; +struct Connection; +struct Admin; +struct Table; +struct AppendWriter; +struct WriteResult; +struct LogScanner; } // namespace ffi struct Date { @@ -62,8 +62,8 @@ struct Time { static Time FromMillis(int32_t ms) { return {ms}; } static Time FromHMS(int hour, int minute, int second, int millis = 0) { - return {hour * kMillisPerHour + minute * kMillisPerMinute + - second * kMillisPerSecond + millis}; + return {hour * kMillisPerHour + minute * kMillisPerMinute + second * kMillisPerSecond + + millis}; } int Hour() const { return millis_since_midnight / kMillisPerHour; } @@ -87,9 +87,7 @@ struct Timestamp { } static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) { auto duration = tp.time_since_epoch(); - auto ns = - std::chrono::duration_cast(duration) - .count(); + auto ns = std::chrono::duration_cast(duration).count(); auto ms = ns / kNanosPerMilli; auto nano_of_ms = static_cast(ns % kNanosPerMilli); if (nano_of_ms < 0) { @@ -118,7 +116,7 @@ enum class TypeId { }; class DataType { -public: + public: explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0) : id_(id), precision_(p), scale_(s) {} @@ -147,7 +145,7 @@ class DataType { int32_t precision() const { return precision_; } int32_t scale() const { return scale_; } -private: + private: TypeId id_; int32_t precision_{0}; int32_t scale_{0}; @@ -218,9 +216,8 @@ struct Schema { std::vector primary_keys; class Builder { - public: - Builder& AddColumn(std::string name, DataType type, - std::string comment = "") { + public: + Builder& AddColumn(std::string name, DataType type, std::string comment = "") { columns_.push_back({std::move(name), std::move(type), std::move(comment)}); return *this; } @@ -230,11 +227,9 @@ struct Schema { return *this; } - Schema Build() { - return Schema{std::move(columns_), std::move(primary_keys_)}; - } + Schema Build() { return Schema{std::move(columns_), std::move(primary_keys_)}; } - private: + private: std::vector columns_; std::vector primary_keys_; }; @@ -251,7 +246,7 @@ struct TableDescriptor { std::string comment; class Builder { - public: + public: Builder& SetSchema(Schema s) { schema_ = std::move(s); return *this; @@ -283,15 +278,12 @@ struct TableDescriptor { } TableDescriptor Build() { - return TableDescriptor{std::move(schema_), - std::move(partition_keys_), - bucket_count_, - std::move(bucket_keys_), - std::move(properties_), - std::move(comment_)}; + return TableDescriptor{std::move(schema_), std::move(partition_keys_), + bucket_count_, std::move(bucket_keys_), + std::move(properties_), std::move(comment_)}; } - private: + private: Schema schema_; std::vector partition_keys_; int32_t bucket_count_{0}; @@ -417,16 +409,17 @@ struct Datum { fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; } bool IsDecimal() const { - return type == DatumType::DecimalI64 || type == DatumType::DecimalI128 - || type == DatumType::DecimalString; + return type == DatumType::DecimalI64 || type == DatumType::DecimalI128 || + type == DatumType::DecimalString; } std::string DecimalToString() const { if (type == DatumType::DecimalI64) { return FormatUnscaled64(i64_val, decimal_scale); } else if (type == DatumType::DecimalI128) { - unsigned __int128 uval = (static_cast(static_cast(i128_hi)) << 64) | - static_cast(static_cast(i128_lo)); + unsigned __int128 uval = + (static_cast(static_cast(i128_hi)) << 64) | + static_cast(static_cast(i128_lo)); __int128 val = static_cast<__int128>(uval); return FormatUnscaled128(val, decimal_scale); } else if (type == DatumType::DecimalString) { @@ -435,10 +428,11 @@ struct Datum { return ""; } -private: + private: static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) { bool negative = unscaled < 0; - uint64_t abs_val = negative ? -static_cast(unscaled) : static_cast(unscaled); + uint64_t abs_val = + negative ? -static_cast(unscaled) : static_cast(unscaled); std::string digits = std::to_string(abs_val); if (scale <= 0) { return (negative ? "-" : "") + digits; @@ -452,8 +446,8 @@ struct Datum { static std::string FormatUnscaled128(__int128 val, int32_t scale) { bool negative = val < 0; - unsigned __int128 abs_val = negative ? -static_cast(val) - : static_cast(val); + unsigned __int128 abs_val = + negative ? -static_cast(val) : static_cast(val); std::string digits; if (abs_val == 0) { digits = "0"; @@ -542,7 +536,7 @@ struct GenericRow { fields[idx] = Datum::DecimalString(value); } -private: + private: void EnsureSize(size_t idx) { if (fields.size() <= idx) { fields.resize(idx + 1); @@ -569,15 +563,14 @@ struct ScanRecords { }; class ArrowRecordBatch { -public: - + public: std::shared_ptr GetArrowRecordBatch() const { return batch_; } bool Available() const; // Get number of rows in the batch int64_t NumRows() const; - + // Get ScanBatch metadata int64_t GetTableId() const; int64_t GetPartitionId() const; @@ -585,14 +578,11 @@ class ArrowRecordBatch { int64_t GetBaseOffset() const; int64_t GetLastOffset() const; -private: + private: friend class LogScanner; - explicit ArrowRecordBatch( - std::shared_ptr batch, - int64_t table_id, - int64_t partition_id, - int32_t bucket_id, - int64_t base_offset) noexcept; + explicit ArrowRecordBatch(std::shared_ptr batch, int64_t table_id, + int64_t partition_id, int32_t bucket_id, + int64_t base_offset) noexcept; std::shared_ptr batch_{nullptr}; @@ -602,7 +592,6 @@ class ArrowRecordBatch { int64_t base_offset_; }; - struct ArrowRecordBatches { std::vector> batches; @@ -626,6 +615,12 @@ struct BucketSubscription { int64_t offset; }; +struct PartitionBucketSubscription { + int64_t partition_id; + int32_t bucket_id; + int64_t offset; +}; + struct LakeSnapshot { int64_t snapshot_id; std::vector bucket_offsets; @@ -644,7 +639,7 @@ class Table; class TableScan; class Connection { -public: + public: Connection() noexcept; ~Connection() noexcept; @@ -660,13 +655,13 @@ class Connection { Result GetAdmin(Admin& out); Result GetTable(const TablePath& table_path, Table& out); -private: + private: void Destroy() noexcept; ffi::Connection* conn_{nullptr}; }; class Admin { -public: + public: Admin() noexcept; ~Admin() noexcept; @@ -677,8 +672,7 @@ class Admin { bool Available() const; - Result CreateTable(const TablePath& table_path, - const TableDescriptor& descriptor, + Result CreateTable(const TablePath& table_path, const TableDescriptor& descriptor, bool ignore_if_exists = false); Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = false); @@ -687,26 +681,24 @@ class Admin { Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out); - Result ListOffsets(const TablePath& table_path, - const std::vector& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map& out); + Result ListOffsets(const TablePath& table_path, const std::vector& bucket_ids, + const OffsetQuery& offset_query, std::unordered_map& out); + + Result ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name, + const std::vector& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map& out); - Result ListPartitionOffsets(const TablePath& table_path, - const std::string& partition_name, - const std::vector& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map& out); + Result ListPartitionInfos(const TablePath& table_path, std::vector& out); - Result ListPartitionInfos(const TablePath& table_path, - std::vector& out); + Result CreatePartition(const TablePath& table_path, + const std::unordered_map& partition_spec, + bool ignore_if_exists = false); -private: - Result DoListOffsets(const TablePath& table_path, - const std::vector& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map& out, - const std::string* partition_name = nullptr); + private: + Result DoListOffsets(const TablePath& table_path, const std::vector& bucket_ids, + const OffsetQuery& offset_query, std::unordered_map& out, + const std::string* partition_name = nullptr); friend class Connection; Admin(ffi::Admin* admin) noexcept; @@ -716,7 +708,7 @@ class Admin { }; class Table { -public: + public: Table() noexcept; ~Table() noexcept; @@ -734,7 +726,7 @@ class Table { TablePath GetTablePath() const; bool HasPrimaryKey() const; -private: + private: friend class Connection; friend class TableScan; Table(ffi::Table* table) noexcept; @@ -744,7 +736,7 @@ class Table { }; class TableScan { -public: + public: TableScan(const TableScan&) = delete; TableScan& operator=(const TableScan&) = delete; TableScan(TableScan&&) noexcept = default; @@ -755,7 +747,7 @@ class TableScan { Result CreateLogScanner(LogScanner& out); Result CreateRecordBatchScanner(LogScanner& out); -private: + private: friend class Table; explicit TableScan(ffi::Table* table) noexcept; @@ -764,7 +756,7 @@ class TableScan { }; class WriteResult { -public: + public: WriteResult() noexcept; ~WriteResult() noexcept; @@ -779,7 +771,7 @@ class WriteResult { /// For fire-and-forget, simply let the WriteResult go out of scope. Result Wait(); -private: + private: friend class AppendWriter; WriteResult(ffi::WriteResult* inner) noexcept; @@ -788,7 +780,7 @@ class WriteResult { }; class AppendWriter { -public: + public: AppendWriter() noexcept; ~AppendWriter() noexcept; @@ -803,7 +795,7 @@ class AppendWriter { Result Append(const GenericRow& row, WriteResult& out); Result Flush(); -private: + private: friend class Table; AppendWriter(ffi::AppendWriter* writer) noexcept; @@ -812,7 +804,7 @@ class AppendWriter { }; class LogScanner { -public: + public: LogScanner() noexcept; ~LogScanner() noexcept; @@ -825,12 +817,13 @@ class LogScanner { Result Subscribe(int32_t bucket_id, int64_t start_offset); Result Subscribe(const std::vector& bucket_offsets); - Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset); + Result SubscribePartitionBuckets(int64_t partition_id, int32_t bucket_id, int64_t start_offset); + Result SubscribePartitionBuckets(const std::vector& subscriptions); Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id); Result Poll(int64_t timeout_ms, ScanRecords& out); Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out); -private: + private: friend class Table; friend class TableScan; LogScanner(ffi::LogScanner* scanner) noexcept; diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index d19e444c..4aed78db 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -183,4 +183,25 @@ Result Admin::ListPartitionInfos(const TablePath& table_path, return result; } +Result Admin::CreatePartition(const TablePath& table_path, + const std::unordered_map& partition_spec, + bool ignore_if_exists) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + + rust::Vec rust_spec; + for (const auto& [key, value] : partition_spec) { + ffi::FfiPartitionKeyValue kv; + kv.key = rust::String(key); + kv.value = rust::String(value); + rust_spec.push_back(std::move(kv)); + } + + auto ffi_result = admin_->create_partition(ffi_path, std::move(rust_spec), ignore_if_exists); + return utils::from_ffi_result(ffi_result); +} + } // namespace fluss diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 4aeb13db..235d282f 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -166,6 +166,12 @@ mod ffi { offset: i64, } + struct FfiPartitionBucketSubscription { + partition_id: i64, + bucket_id: i32, + offset: i64, + } + struct FfiBucketOffsetPair { bucket_id: i32, offset: i64, @@ -181,6 +187,11 @@ mod ffi { lake_snapshot: FfiLakeSnapshot, } + struct FfiPartitionKeyValue { + key: String, + value: String, + } + struct FfiPartitionInfo { partition_id: i64, partition_name: String, @@ -240,6 +251,12 @@ mod ffi { self: &Admin, table_path: &FfiTablePath, ) -> FfiListPartitionInfosResult; + fn create_partition( + self: &Admin, + table_path: &FfiTablePath, + partition_spec: Vec, + ignore_if_exists: bool, + ) -> FfiResult; // Table unsafe fn delete_table(table: *mut Table); @@ -279,6 +296,10 @@ mod ffi { bucket_id: i32, start_offset: i64, ) -> FfiResult; + fn subscribe_partition_buckets( + self: &LogScanner, + subscriptions: Vec, + ) -> FfiResult; fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) -> FfiResult; fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; @@ -613,6 +634,33 @@ impl Admin { }, } } + fn create_partition( + &self, + table_path: &ffi::FfiTablePath, + partition_spec: Vec, + ignore_if_exists: bool, + ) -> ffi::FfiResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + let spec_map: std::collections::HashMap = partition_spec + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect(); + let partition_spec = fcore::metadata::PartitionSpec::new(spec_map); + + let result = RUNTIME.block_on(async { + self.inner + .create_partition(&path, &partition_spec, ignore_if_exists) + .await + }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } } // Table implementation @@ -939,6 +987,41 @@ impl LogScanner { self.do_subscribe(Some(partition_id), bucket_id, start_offset) } + fn subscribe_partition_buckets( + &self, + subscriptions: Vec, + ) -> ffi::FfiResult { + use std::collections::HashMap; + let mut partition_bucket_offsets: HashMap<(PartitionId, i32), i64> = HashMap::new(); + for sub in subscriptions { + partition_bucket_offsets.insert((sub.partition_id, sub.bucket_id), sub.offset); + } + + if let Some(ref inner) = self.inner { + let result = RUNTIME.block_on(async { + inner + .subscribe_partition_buckets(&partition_bucket_offsets) + .await + }); + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else if let Some(ref inner_batch) = self.inner_batch { + let result = RUNTIME.block_on(async { + inner_batch + .subscribe_partition_buckets(&partition_bucket_offsets) + .await + }); + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else { + err_result(1, "LogScanner not initialized".to_string()) + } + } + fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult { if let Some(ref inner) = self.inner { match RUNTIME diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index 04d1846c..4425b5fb 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -17,14 +17,17 @@ * under the License. */ +#include + +#include + +#include "ffi_converter.hpp" #include "fluss.hpp" #include "lib.rs.h" -#include "ffi_converter.hpp" #include "rust/cxx.h" -#include -#include // todo: bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link dependencies. -// In environments where Bazel does not already have Arrow available, this will fail at compile/link time. +// In environments where Bazel does not already have Arrow available, this will fail at compile/link +// time. #include namespace fluss { @@ -89,9 +92,7 @@ void Table::Destroy() noexcept { } } -Table::Table(Table&& other) noexcept : table_(other.table_) { - other.table_ = nullptr; -} +Table::Table(Table&& other) noexcept : table_(other.table_) { other.table_ = nullptr; } Table& Table::operator=(Table&& other) noexcept { if (this != &other) { @@ -119,9 +120,7 @@ Result Table::NewAppendWriter(AppendWriter& out) { } } -TableScan Table::NewScan() { - return TableScan(table_); -} +TableScan Table::NewScan() { return TableScan(table_); } // TableScan implementation TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {} @@ -167,7 +166,8 @@ Result TableScan::CreateRecordBatchScanner(LogScanner& out) { for (size_t idx : projection_) { rust_indices.push_back(idx); } - out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices)); + out.scanner_ = + table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices)); } return utils::make_ok(); } catch (const rust::Error& e) { @@ -354,7 +354,8 @@ Result LogScanner::Subscribe(const std::vector& bucket_offse return utils::from_ffi_result(ffi_result); } -Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset) { +Result LogScanner::SubscribePartitionBuckets(int64_t partition_id, int32_t bucket_id, + int64_t start_offset) { if (!Available()) { return utils::make_error(1, "LogScanner not available"); } @@ -363,6 +364,25 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i return utils::from_ffi_result(ffi_result); } +Result LogScanner::SubscribePartitionBuckets( + const std::vector& subscriptions) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + rust::Vec rust_subs; + for (const auto& sub : subscriptions) { + ffi::FfiPartitionBucketSubscription ffi_sub; + ffi_sub.partition_id = sub.partition_id; + ffi_sub.bucket_id = sub.bucket_id; + ffi_sub.offset = sub.offset; + rust_subs.push_back(ffi_sub); + } + + auto ffi_result = scanner_->subscribe_partition_buckets(std::move(rust_subs)); + return utils::from_ffi_result(ffi_result); +} + Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) { if (!Available()) { return utils::make_error(1, "LogScanner not available"); @@ -387,12 +407,9 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { return utils::make_ok(); } -ArrowRecordBatch::ArrowRecordBatch( - std::shared_ptr batch, - int64_t table_id, - int64_t partition_id, - int32_t bucket_id, - int64_t base_offset) noexcept +ArrowRecordBatch::ArrowRecordBatch(std::shared_ptr batch, int64_t table_id, + int64_t partition_id, int32_t bucket_id, + int64_t base_offset) noexcept : batch_(std::move(batch)), table_id_(table_id), partition_id_(partition_id), @@ -406,7 +423,6 @@ int64_t ArrowRecordBatch::NumRows() const { return batch_->num_rows(); } - int64_t ArrowRecordBatch::GetTableId() const { if (!Available()) return 0; return this->table_id_; @@ -453,26 +469,23 @@ Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out) if (import_result.ok()) { auto batch_ptr = import_result.ValueOrDie(); auto batch_wrapper = std::unique_ptr(new ArrowRecordBatch( - std::move(batch_ptr), - ffi_batch.table_id, - ffi_batch.partition_id, - ffi_batch.bucket_id, - ffi_batch.base_offset - )); + std::move(batch_ptr), ffi_batch.table_id, ffi_batch.partition_id, + ffi_batch.bucket_id, ffi_batch.base_offset)); out.batches.push_back(std::move(batch_wrapper)); - + // Free the container structures that were allocated in Rust after successful import ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, ffi_batch.schema_ptr); } else { // Import failed, free the container structures to avoid leaks and return error ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, ffi_batch.schema_ptr); - + // Return an error indicating that the import failed - std::string error_msg = "Failed to import Arrow record batch: " + import_result.status().ToString(); + std::string error_msg = + "Failed to import Arrow record batch: " + import_result.status().ToString(); return utils::make_error(1, error_msg); } } - + return utils::make_ok(); } diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index dd7f1b14..732b7dff 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -717,6 +717,18 @@ async def main(): print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") print(partitioned_arrow.to_pandas()) + # Demo: subscribe_partition_buckets for batch subscribing to multiple partitions at once + print("\n--- Testing subscribe_partition_buckets + to_arrow() ---") + partitioned_scanner_batch = await partitioned_table.new_scan().create_batch_scanner() + partition_bucket_offsets = { + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos + } + partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets) + print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations") + partitioned_batch_arrow = partitioned_scanner_batch.to_arrow() + print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:") + print(partitioned_batch_arrow.to_pandas()) + # Demo: unsubscribe_partition - unsubscribe from one partition, read remaining print("\n--- Testing unsubscribe_partition ---") partitioned_scanner3 = await partitioned_table.new_scan().create_batch_scanner() diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 526dad78..ceef1553 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -420,6 +420,16 @@ class LogScanner: start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) """ ... + def subscribe_partition_buckets( + self, partition_bucket_offsets: Dict[Tuple[int, int], int] + ) -> None: + """Subscribe to multiple partition+bucket combinations at once (partitioned tables only). + + Args: + partition_bucket_offsets: Dict mapping (partition_id, bucket_id) tuples to start_offsets. + Example: {(partition_id_1, 0): EARLIEST_OFFSET, (partition_id_2, 1): 100} + """ + ... def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None: """Unsubscribe from a specific partition bucket (partitioned tables only). diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 1a7dbdce..d9265963 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1657,6 +1657,26 @@ impl LogScanner { }) } + /// Subscribe to multiple partition+bucket combinations at once (partitioned tables only). + /// + /// Args: + /// partition_bucket_offsets: A dict mapping (partition_id, bucket_id) tuples to start_offsets + fn subscribe_partition_buckets( + &self, + py: Python, + partition_bucket_offsets: HashMap<(i64, i32), i64>, + ) -> PyResult<()> { + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + with_scanner!( + &self.scanner, + subscribe_partition_buckets(&partition_bucket_offsets) + ) + .map_err(|e| FlussError::new_err(e.to_string())) + }) + }) + } + /// Unsubscribe from a specific partition bucket (partitioned tables only). /// /// Args: @@ -1813,7 +1833,7 @@ impl LogScanner { /// Reads from currently subscribed buckets until reaching their latest offsets. /// Works for both partitioned and non-partitioned tables. /// - /// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. + /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. /// /// Returns: /// PyArrow Table containing all data from subscribed buckets @@ -1822,7 +1842,7 @@ impl LogScanner { let subscribed = scanner.get_subscribed_buckets(); if subscribed.is_empty() { return Err(FlussError::new_err( - "No buckets subscribed. Call subscribe(), subscribe_buckets(), or subscribe_partition() first.", + "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", )); } @@ -1838,7 +1858,7 @@ impl LogScanner { /// Reads from currently subscribed buckets until reaching their latest offsets. /// Works for both partitioned and non-partitioned tables. /// - /// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. + /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. /// /// Returns: /// Pandas DataFrame containing all data from subscribed buckets diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 26f54da8..57f90a1e 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -361,29 +361,17 @@ impl LogScannerInner { if self.is_partitioned_table { return Err(Error::UnsupportedOperation { message: - "The table is a partitioned table, subscribe_buckets is not supported currently." + "The table is a partitioned table, please use \"subscribe_partition_buckets\" instead." .to_string(), }); } - self.metadata - .check_and_update_table_metadata(from_ref(&self.table_path)) - .await?; - if bucket_offsets.is_empty() { - return Err(Error::UnexpectedError { - message: "Bucket offsets are empty.".to_string(), - source: None, - }); - } let mut scan_bucket_offsets = HashMap::new(); for (bucket_id, offset) in bucket_offsets { let table_bucket = TableBucket::new(self.table_id, *bucket_id); scan_bucket_offsets.insert(table_bucket, *offset); } - - self.log_scanner_status - .assign_scan_buckets(scan_bucket_offsets); - Ok(()) + self.do_subscribe_buckets(scan_bucket_offsets).await } async fn subscribe_partition( @@ -409,6 +397,43 @@ impl LogScannerInner { Ok(()) } + async fn subscribe_partition_buckets( + &self, + partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>, + ) -> Result<()> { + if !self.is_partitioned_table { + return Err(UnsupportedOperation { + message: "The table is not a partitioned table, please use \"subscribe_buckets\" \ + to subscribe to non-partitioned buckets instead." + .to_string(), + }); + } + + let mut scan_bucket_offsets = HashMap::new(); + for (&(partition_id, bucket_id), &offset) in partition_bucket_offsets { + let table_bucket = + TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket_id); + scan_bucket_offsets.insert(table_bucket, offset); + } + self.do_subscribe_buckets(scan_bucket_offsets).await + } + + async fn do_subscribe_buckets(&self, bucket_offsets: HashMap) -> Result<()> { + if bucket_offsets.is_empty() { + return Err(Error::UnexpectedError { + message: "Bucket offsets are empty.".to_string(), + source: None, + }); + } + + self.metadata + .check_and_update_table_metadata(from_ref(&self.table_path)) + .await?; + + self.log_scanner_status.assign_scan_buckets(bucket_offsets); + Ok(()) + } + async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> { if !self.is_partitioned_table { return Err(Error::UnsupportedOperation { @@ -501,6 +526,15 @@ impl LogScanner { .await } + pub async fn subscribe_partition_buckets( + &self, + partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>, + ) -> Result<()> { + self.inner + .subscribe_partition_buckets(partition_bucket_offsets) + .await + } + pub async fn unsubscribe_partition( &self, partition_id: PartitionId, @@ -546,6 +580,15 @@ impl RecordBatchLogScanner { self.inner.log_scanner_status.get_all_subscriptions() } + pub async fn subscribe_partition_buckets( + &self, + partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>, + ) -> Result<()> { + self.inner + .subscribe_partition_buckets(partition_bucket_offsets) + .await + } + pub async fn unsubscribe_partition( &self, partition_id: PartitionId, diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 82f81352..d80ce791 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1200,6 +1200,55 @@ mod table_test { records_after_unsubscribe.len() ); + // Test subscribe_partition_buckets: batch subscribe to all partitions at once + let log_scanner_batch = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner for batch partition subscribe test"); + let partition_infos = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partition infos"); + let partition_bucket_offsets: HashMap<(i64, i32), i64> = partition_infos + .iter() + .map(|p| ((p.get_partition_id(), 0), 0i64)) + .collect(); + log_scanner_batch + .subscribe_partition_buckets(&partition_bucket_offsets) + .await + .expect("Failed to batch subscribe to partitions"); + + let mut batch_collected: Vec<(i32, String, i64)> = Vec::new(); + let batch_start = std::time::Instant::now(); + while batch_collected.len() < expected_records.len() + && batch_start.elapsed() < Duration::from_secs(10) + { + let records = log_scanner_batch + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll after batch partition subscribe"); + for rec in records { + let row = rec.row(); + batch_collected.push(( + row.get_int(0), + row.get_string(1).to_string(), + row.get_long(2), + )); + } + } + assert_eq!( + batch_collected.len(), + expected_records.len(), + "Did not receive all records in time, expect receive {} records, but got {} records", + expected_records.len(), + batch_collected.len() + ); + batch_collected.sort_by_key(|r| r.0); + assert_eq!( + batch_collected, expected_records, + "subscribe_partition_buckets should receive the same records as subscribe_partition loop" + ); + admin .drop_table(&table_path, false) .await