Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 184 additions & 69 deletions bindings/cpp/examples/example.cpp

Large diffs are not rendered by default.

157 changes: 75 additions & 82 deletions bindings/cpp/include/fluss.hpp

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,25 @@ Result Admin::ListPartitionInfos(const TablePath& table_path,
return result;
}

Result Admin::CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);

rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
for (const auto& [key, value] : partition_spec) {
ffi::FfiPartitionKeyValue kv;
kv.key = rust::String(key);
kv.value = rust::String(value);
rust_spec.push_back(std::move(kv));
}

auto ffi_result = admin_->create_partition(ffi_path, std::move(rust_spec), ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}

} // namespace fluss
83 changes: 83 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ mod ffi {
offset: i64,
}

struct FfiPartitionBucketSubscription {
partition_id: i64,
bucket_id: i32,
offset: i64,
}

struct FfiBucketOffsetPair {
bucket_id: i32,
offset: i64,
Expand All @@ -181,6 +187,11 @@ mod ffi {
lake_snapshot: FfiLakeSnapshot,
}

struct FfiPartitionKeyValue {
key: String,
value: String,
}

struct FfiPartitionInfo {
partition_id: i64,
partition_name: String,
Expand Down Expand Up @@ -240,6 +251,12 @@ mod ffi {
self: &Admin,
table_path: &FfiTablePath,
) -> FfiListPartitionInfosResult;
fn create_partition(
self: &Admin,
table_path: &FfiTablePath,
partition_spec: Vec<FfiPartitionKeyValue>,
ignore_if_exists: bool,
) -> FfiResult;

// Table
unsafe fn delete_table(table: *mut Table);
Expand Down Expand Up @@ -279,6 +296,10 @@ mod ffi {
bucket_id: i32,
start_offset: i64,
) -> FfiResult;
fn subscribe_partition_buckets(
self: &LogScanner,
subscriptions: Vec<FfiPartitionBucketSubscription>,
) -> FfiResult;
fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32)
-> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
Expand Down Expand Up @@ -613,6 +634,33 @@ impl Admin {
},
}
}
fn create_partition(
&self,
table_path: &ffi::FfiTablePath,
partition_spec: Vec<ffi::FfiPartitionKeyValue>,
ignore_if_exists: bool,
) -> ffi::FfiResult {
let path = fcore::metadata::TablePath::new(
table_path.database_name.clone(),
table_path.table_name.clone(),
);
let spec_map: std::collections::HashMap<String, String> = partition_spec
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect();
let partition_spec = fcore::metadata::PartitionSpec::new(spec_map);

let result = RUNTIME.block_on(async {
self.inner
.create_partition(&path, &partition_spec, ignore_if_exists)
.await
});

match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
}
}

// Table implementation
Expand Down Expand Up @@ -939,6 +987,41 @@ impl LogScanner {
self.do_subscribe(Some(partition_id), bucket_id, start_offset)
}

fn subscribe_partition_buckets(
&self,
subscriptions: Vec<ffi::FfiPartitionBucketSubscription>,
) -> ffi::FfiResult {
use std::collections::HashMap;
let mut partition_bucket_offsets: HashMap<(PartitionId, i32), i64> = HashMap::new();
for sub in subscriptions {
partition_bucket_offsets.insert((sub.partition_id, sub.bucket_id), sub.offset);
}

if let Some(ref inner) = self.inner {
let result = RUNTIME.block_on(async {
inner
.subscribe_partition_buckets(&partition_bucket_offsets)
.await
});
match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else if let Some(ref inner_batch) = self.inner_batch {
let result = RUNTIME.block_on(async {
inner_batch
.subscribe_partition_buckets(&partition_bucket_offsets)
.await
});
match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else {
err_result(1, "LogScanner not initialized".to_string())
}
}

fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult {
if let Some(ref inner) = self.inner {
match RUNTIME
Expand Down
71 changes: 42 additions & 29 deletions bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
* under the License.
*/

#include <arrow/c/bridge.h>

#include <ctime>

#include "ffi_converter.hpp"
#include "fluss.hpp"
#include "lib.rs.h"
#include "ffi_converter.hpp"
#include "rust/cxx.h"
#include <arrow/c/bridge.h>
#include <ctime>
// todo: bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link dependencies.
// In environments where Bazel does not already have Arrow available, this will fail at compile/link time.
// In environments where Bazel does not already have Arrow available, this will fail at compile/link
// time.
#include <arrow/record_batch.h>

namespace fluss {
Expand Down Expand Up @@ -89,9 +92,7 @@ void Table::Destroy() noexcept {
}
}

Table::Table(Table&& other) noexcept : table_(other.table_) {
other.table_ = nullptr;
}
Table::Table(Table&& other) noexcept : table_(other.table_) { other.table_ = nullptr; }

Table& Table::operator=(Table&& other) noexcept {
if (this != &other) {
Expand Down Expand Up @@ -119,9 +120,7 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
}

TableScan Table::NewScan() {
return TableScan(table_);
}
TableScan Table::NewScan() { return TableScan(table_); }

// TableScan implementation
TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
Expand Down Expand Up @@ -167,7 +166,8 @@ Result TableScan::CreateRecordBatchScanner(LogScanner& out) {
for (size_t idx : projection_) {
rust_indices.push_back(idx);
}
out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
out.scanner_ =
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
}
return utils::make_ok();
} catch (const rust::Error& e) {
Expand Down Expand Up @@ -354,7 +354,8 @@ Result LogScanner::Subscribe(const std::vector<BucketSubscription>& bucket_offse
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset) {
Result LogScanner::SubscribePartitionBuckets(int64_t partition_id, int32_t bucket_id,
int64_t start_offset) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
}
Expand All @@ -363,6 +364,25 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::SubscribePartitionBuckets(
const std::vector<PartitionBucketSubscription>& subscriptions) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
}

rust::Vec<ffi::FfiPartitionBucketSubscription> rust_subs;
for (const auto& sub : subscriptions) {
ffi::FfiPartitionBucketSubscription ffi_sub;
ffi_sub.partition_id = sub.partition_id;
ffi_sub.bucket_id = sub.bucket_id;
ffi_sub.offset = sub.offset;
rust_subs.push_back(ffi_sub);
}

auto ffi_result = scanner_->subscribe_partition_buckets(std::move(rust_subs));
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
Expand All @@ -387,12 +407,9 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
return utils::make_ok();
}

ArrowRecordBatch::ArrowRecordBatch(
std::shared_ptr<arrow::RecordBatch> batch,
int64_t table_id,
int64_t partition_id,
int32_t bucket_id,
int64_t base_offset) noexcept
ArrowRecordBatch::ArrowRecordBatch(std::shared_ptr<arrow::RecordBatch> batch, int64_t table_id,
int64_t partition_id, int32_t bucket_id,
int64_t base_offset) noexcept
: batch_(std::move(batch)),
table_id_(table_id),
partition_id_(partition_id),
Expand All @@ -406,7 +423,6 @@ int64_t ArrowRecordBatch::NumRows() const {
return batch_->num_rows();
}


int64_t ArrowRecordBatch::GetTableId() const {
if (!Available()) return 0;
return this->table_id_;
Expand Down Expand Up @@ -453,26 +469,23 @@ Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out)
if (import_result.ok()) {
auto batch_ptr = import_result.ValueOrDie();
auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new ArrowRecordBatch(
std::move(batch_ptr),
ffi_batch.table_id,
ffi_batch.partition_id,
ffi_batch.bucket_id,
ffi_batch.base_offset
));
std::move(batch_ptr), ffi_batch.table_id, ffi_batch.partition_id,
ffi_batch.bucket_id, ffi_batch.base_offset));
out.batches.push_back(std::move(batch_wrapper));

// Free the container structures that were allocated in Rust after successful import
ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, ffi_batch.schema_ptr);
} else {
// Import failed, free the container structures to avoid leaks and return error
ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, ffi_batch.schema_ptr);

// Return an error indicating that the import failed
std::string error_msg = "Failed to import Arrow record batch: " + import_result.status().ToString();
std::string error_msg =
"Failed to import Arrow record batch: " + import_result.status().ToString();
return utils::make_error(1, error_msg);
}
}

return utils::make_ok();
}

Expand Down
12 changes: 12 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,18 @@ async def main():
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:")
print(partitioned_arrow.to_pandas())

# Demo: subscribe_partition_buckets for batch subscribing to multiple partitions at once
print("\n--- Testing subscribe_partition_buckets + to_arrow() ---")
partitioned_scanner_batch = await partitioned_table.new_scan().create_batch_scanner()
partition_bucket_offsets = {
(p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos
}
partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets)
print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations")
partitioned_batch_arrow = partitioned_scanner_batch.to_arrow()
print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:")
print(partitioned_batch_arrow.to_pandas())

# Demo: unsubscribe_partition - unsubscribe from one partition, read remaining
print("\n--- Testing unsubscribe_partition ---")
partitioned_scanner3 = await partitioned_table.new_scan().create_batch_scanner()
Expand Down
10 changes: 10 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,16 @@ class LogScanner:
start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning)
"""
...
def subscribe_partition_buckets(
self, partition_bucket_offsets: Dict[Tuple[int, int], int]
) -> None:
"""Subscribe to multiple partition+bucket combinations at once (partitioned tables only).

Args:
partition_bucket_offsets: Dict mapping (partition_id, bucket_id) tuples to start_offsets.
Example: {(partition_id_1, 0): EARLIEST_OFFSET, (partition_id_2, 1): 100}
"""
...
def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None:
"""Unsubscribe from a specific partition bucket (partitioned tables only).

Expand Down
26 changes: 23 additions & 3 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,26 @@ impl LogScanner {
})
}

/// Subscribe to multiple partition+bucket combinations at once (partitioned tables only).
///
/// Args:
/// partition_bucket_offsets: A dict mapping (partition_id, bucket_id) tuples to start_offsets
fn subscribe_partition_buckets(
&self,
py: Python,
partition_bucket_offsets: HashMap<(i64, i32), i64>,
) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
with_scanner!(
&self.scanner,
subscribe_partition_buckets(&partition_bucket_offsets)
)
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}

/// Unsubscribe from a specific partition bucket (partitioned tables only).
///
/// Args:
Expand Down Expand Up @@ -1813,7 +1833,7 @@ impl LogScanner {
/// Reads from currently subscribed buckets until reaching their latest offsets.
/// Works for both partitioned and non-partitioned tables.
///
/// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first.
/// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.
///
/// Returns:
/// PyArrow Table containing all data from subscribed buckets
Expand All @@ -1822,7 +1842,7 @@ impl LogScanner {
let subscribed = scanner.get_subscribed_buckets();
if subscribed.is_empty() {
return Err(FlussError::new_err(
"No buckets subscribed. Call subscribe(), subscribe_buckets(), or subscribe_partition() first.",
"No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.",
));
}

Expand All @@ -1838,7 +1858,7 @@ impl LogScanner {
/// Reads from currently subscribed buckets until reaching their latest offsets.
/// Works for both partitioned and non-partitioned tables.
///
/// You must call subscribe(), subscribe_buckets(), or subscribe_partition() first.
/// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.
///
/// Returns:
/// Pandas DataFrame containing all data from subscribed buckets
Expand Down
Loading
Loading