Skip to content

Commit 8ccd283

Browse files
feat: support temporal types in CPP (#266)
1 parent 499fa8c commit 8ccd283

5 files changed

Lines changed: 422 additions & 143 deletions

File tree

bindings/cpp/examples/example.cpp

Lines changed: 139 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ int main() {
4242
check("get_admin", conn.GetAdmin(admin));
4343

4444
fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
45-
45+
4646
// 2.1) Drop table if exists
4747
std::cout << "Dropping table if exists..." << std::endl;
4848
auto drop_result = admin.DropTable(table_path, true);
@@ -52,12 +52,16 @@ int main() {
5252
std::cout << "Table drop result: " << drop_result.error_message << std::endl;
5353
}
5454

55-
// 3) Schema & descriptor
55+
// 3) Schema with scalar and temporal columns
5656
auto schema = fluss::Schema::NewBuilder()
5757
.AddColumn("id", fluss::DataType::Int)
5858
.AddColumn("name", fluss::DataType::String)
5959
.AddColumn("score", fluss::DataType::Float)
6060
.AddColumn("age", fluss::DataType::Int)
61+
.AddColumn("event_date", fluss::DataType::Date)
62+
.AddColumn("event_time", fluss::DataType::Time)
63+
.AddColumn("created_at", fluss::DataType::Timestamp)
64+
.AddColumn("updated_at", fluss::DataType::TimestampLtz)
6165
.Build();
6266

6367
auto descriptor = fluss::TableDescriptor::NewBuilder()
@@ -66,15 +70,14 @@ int main() {
6670
.SetComment("cpp example table with 3 buckets")
6771
.Build();
6872

69-
// 3.1) Create table with 3 buckets
7073
std::cout << "Creating table with 3 buckets..." << std::endl;
7174
check("create_table", admin.CreateTable(table_path, descriptor, false));
7275

7376
// 4) Get table
7477
fluss::Table table;
7578
check("get_table", conn.GetTable(table_path, table));
7679

77-
// 5) Writer
80+
// 5) Write rows with scalar and temporal values
7881
fluss::AppendWriter writer;
7982
check("new_append_writer", table.NewAppendWriter(writer));
8083

@@ -83,12 +86,26 @@ int main() {
8386
const char* name;
8487
float score;
8588
int age;
89+
fluss::Date date;
90+
fluss::Time time;
91+
fluss::Timestamp ts_ntz;
92+
fluss::Timestamp ts_ltz;
8693
};
8794

95+
auto tp_now = std::chrono::system_clock::now();
8896
std::vector<RowData> rows = {
89-
{1, "Alice", 95.2f, 25},
90-
{2, "Bob", 87.2f, 30},
91-
{3, "Charlie", 92.1f, 35},
97+
{1, "Alice", 95.2f, 25,
98+
fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45),
99+
fluss::Timestamp::FromTimePoint(tp_now),
100+
fluss::Timestamp::FromMillis(1718467200000)},
101+
{2, "Bob", 87.2f, 30,
102+
fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0),
103+
fluss::Timestamp::FromMillis(1735689600000),
104+
fluss::Timestamp::FromMillisNanos(1735689600000, 500000)},
105+
{3, "Charlie", 92.1f, 35,
106+
fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59),
107+
fluss::Timestamp::FromMillis(946684799999),
108+
fluss::Timestamp::FromMillis(946684799999)},
92109
};
93110

94111
// Fire-and-forget: queue rows, flush at end
@@ -98,6 +115,10 @@ int main() {
98115
row.SetString(1, r.name);
99116
row.SetFloat32(2, r.score);
100117
row.SetInt32(3, r.age);
118+
row.SetDate(4, r.date);
119+
row.SetTime(5, r.time);
120+
row.SetTimestampNtz(6, r.ts_ntz);
121+
row.SetTimestampLtz(7, r.ts_ltz);
101122
check("append", writer.Append(row));
102123
}
103124
check("flush", writer.Flush());
@@ -116,7 +137,7 @@ int main() {
116137
std::cout << "Row acknowledged by server" << std::endl;
117138
}
118139

119-
// 6) Scan
140+
// 6) Full scan — verify all column types including temporal
120141
fluss::LogScanner scanner;
121142
check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
122143

@@ -129,188 +150,216 @@ int main() {
129150
fluss::ScanRecords records;
130151
check("poll", scanner.Poll(5000, records));
131152

132-
std::cout << "Scanned records: " << records.records.size() << std::endl;
153+
std::cout << "Scanned records: " << records.Size() << std::endl;
154+
bool scan_ok = true;
133155
for (const auto& rec : records.records) {
134-
std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val
135-
<< " name=" << rec.row.fields[1].string_val
136-
<< " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val
137-
<< " ts=" << rec.timestamp << std::endl;
156+
const auto& f = rec.row.fields;
157+
158+
if (f[4].type != fluss::DatumType::Date) {
159+
std::cerr << "ERROR: field 4 expected Date, got "
160+
<< static_cast<int>(f[4].type) << std::endl;
161+
scan_ok = false;
162+
}
163+
if (f[5].type != fluss::DatumType::Time) {
164+
std::cerr << "ERROR: field 5 expected Time, got "
165+
<< static_cast<int>(f[5].type) << std::endl;
166+
scan_ok = false;
167+
}
168+
if (f[6].type != fluss::DatumType::TimestampNtz) {
169+
std::cerr << "ERROR: field 6 expected TimestampNtz, got "
170+
<< static_cast<int>(f[6].type) << std::endl;
171+
scan_ok = false;
172+
}
173+
if (f[7].type != fluss::DatumType::TimestampLtz) {
174+
std::cerr << "ERROR: field 7 expected TimestampLtz, got "
175+
<< static_cast<int>(f[7].type) << std::endl;
176+
scan_ok = false;
177+
}
178+
179+
auto date = f[4].GetDate();
180+
auto time = f[5].GetTime();
181+
auto ts_ntz = f[6].GetTimestamp();
182+
auto ts_ltz = f[7].GetTimestamp();
183+
184+
std::cout << " id=" << f[0].i32_val
185+
<< " name=" << f[1].string_val
186+
<< " score=" << f[2].f32_val
187+
<< " age=" << f[3].i32_val
188+
<< " date=" << date.Year() << "-" << date.Month() << "-" << date.Day()
189+
<< " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second()
190+
<< " ts_ntz=" << ts_ntz.epoch_millis
191+
<< " ts_ltz=" << ts_ltz.epoch_millis
192+
<< "+" << ts_ltz.nano_of_millisecond << "ns"
193+
<< std::endl;
194+
}
195+
196+
if (!scan_ok) {
197+
std::cerr << "Full scan type verification FAILED!" << std::endl;
198+
std::exit(1);
138199
}
139-
140-
// 7) Project only id (0) and name (1) columns
141-
std::vector<size_t> projected_columns = {0, 1};
200+
201+
// 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify
202+
// NTZ/LTZ disambiguation works with column index remapping
203+
std::vector<size_t> projected_columns = {0, 7};
142204
fluss::LogScanner projected_scanner;
143205
check("new_log_scanner_with_projection",
144206
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
145-
207+
146208
for (int b = 0; b < buckets; ++b) {
147209
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
148210
}
149-
211+
150212
fluss::ScanRecords projected_records;
151213
check("poll_projected", projected_scanner.Poll(5000, projected_records));
152-
153-
std::cout << "Projected records: " << projected_records.records.size() << std::endl;
154-
155-
bool projection_verified = true;
156-
for (size_t i = 0; i < projected_records.records.size(); ++i) {
157-
const auto& rec = projected_records.records[i];
158-
const auto& row = rec.row;
159-
160-
if (row.fields.size() != projected_columns.size()) {
161-
std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
162-
<< " fields, expected " << projected_columns.size() << std::endl;
163-
projection_verified = false;
214+
215+
std::cout << "Projected records: " << projected_records.Size() << std::endl;
216+
for (const auto& rec : projected_records.records) {
217+
const auto& f = rec.row.fields;
218+
219+
if (f.size() != 2) {
220+
std::cerr << "ERROR: expected 2 fields, got " << f.size() << std::endl;
221+
scan_ok = false;
164222
continue;
165223
}
166-
167-
// Verify field types match expected columns
168-
// Column 0 (id) should be Int32, Column 1 (name) should be String
169-
if (row.fields[0].type != fluss::DatumType::Int32) {
170-
std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl;
171-
projection_verified = false;
224+
if (f[0].type != fluss::DatumType::Int32) {
225+
std::cerr << "ERROR: projected field 0 expected Int32, got "
226+
<< static_cast<int>(f[0].type) << std::endl;
227+
scan_ok = false;
172228
}
173-
if (row.fields[1].type != fluss::DatumType::String) {
174-
std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl;
175-
projection_verified = false;
176-
}
177-
178-
// Print projected data
179-
if (row.fields[0].type == fluss::DatumType::Int32 &&
180-
row.fields[1].type == fluss::DatumType::String) {
181-
std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
182-
<< ", name=" << row.fields[1].string_val << std::endl;
229+
if (f[1].type != fluss::DatumType::TimestampLtz) {
230+
std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
231+
<< static_cast<int>(f[1].type) << std::endl;
232+
scan_ok = false;
183233
}
234+
235+
auto ts = f[1].GetTimestamp();
236+
std::cout << " id=" << f[0].i32_val
237+
<< " updated_at=" << ts.epoch_millis
238+
<< "+" << ts.nano_of_millisecond << "ns" << std::endl;
184239
}
185-
186-
if (projection_verified) {
187-
std::cout << "Column pruning verification passed!" << std::endl;
240+
241+
if (scan_ok) {
242+
std::cout << "Scan verification passed!" << std::endl;
188243
} else {
189-
std::cerr << "Column pruning verification failed!" << std::endl;
244+
std::cerr << "Scan verification FAILED!" << std::endl;
190245
std::exit(1);
191246
}
192247

193248
// 8) List offsets examples
194249
std::cout << "\n=== List Offsets Examples ===" << std::endl;
195-
196-
// 8.1) Query earliest offsets for all buckets
250+
197251
std::vector<int32_t> all_bucket_ids;
198252
all_bucket_ids.reserve(buckets);
199253
for (int b = 0; b < buckets; ++b) {
200254
all_bucket_ids.push_back(b);
201255
}
202-
256+
203257
std::unordered_map<int32_t, int64_t> earliest_offsets;
204-
check("list_earliest_offsets",
205-
admin.ListOffsets(table_path, all_bucket_ids,
206-
fluss::OffsetQuery::Earliest(),
258+
check("list_earliest_offsets",
259+
admin.ListOffsets(table_path, all_bucket_ids,
260+
fluss::OffsetQuery::Earliest(),
207261
earliest_offsets));
208262
std::cout << "Earliest offsets:" << std::endl;
209263
for (const auto& [bucket_id, offset] : earliest_offsets) {
210264
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
211265
}
212-
213-
// 8.2) Query latest offsets for all buckets
266+
214267
std::unordered_map<int32_t, int64_t> latest_offsets;
215-
check("list_latest_offsets",
216-
admin.ListOffsets(table_path, all_bucket_ids,
217-
fluss::OffsetQuery::Latest(),
268+
check("list_latest_offsets",
269+
admin.ListOffsets(table_path, all_bucket_ids,
270+
fluss::OffsetQuery::Latest(),
218271
latest_offsets));
219272
std::cout << "Latest offsets:" << std::endl;
220273
for (const auto& [bucket_id, offset] : latest_offsets) {
221274
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
222275
}
223-
224-
// 8.3) Query offsets for a specific timestamp (current time - 1 hour)
276+
225277
auto now = std::chrono::system_clock::now();
226278
auto one_hour_ago = now - std::chrono::hours(1);
227279
auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
228280
one_hour_ago.time_since_epoch()).count();
229-
281+
230282
std::unordered_map<int32_t, int64_t> timestamp_offsets;
231-
check("list_timestamp_offsets",
232-
admin.ListOffsets(table_path, all_bucket_ids,
233-
fluss::OffsetQuery::FromTimestamp(timestamp_ms),
283+
check("list_timestamp_offsets",
284+
admin.ListOffsets(table_path, all_bucket_ids,
285+
fluss::OffsetQuery::FromTimestamp(timestamp_ms),
234286
timestamp_offsets));
235287
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl;
236288
for (const auto& [bucket_id, offset] : timestamp_offsets) {
237289
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
238290
}
239-
240-
// 8.4) Use batch subscribe with offsets from list_offsets
291+
292+
// 9) Batch subscribe
241293
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
242294
fluss::LogScanner batch_scanner;
243295
check("new_log_scanner_for_batch", table.NewScan().CreateLogScanner(batch_scanner));
244-
296+
245297
std::vector<fluss::BucketSubscription> subscriptions;
246298
for (const auto& [bucket_id, offset] : earliest_offsets) {
247299
subscriptions.push_back({bucket_id, offset});
248-
std::cout << "Preparing subscription: bucket=" << bucket_id
300+
std::cout << "Preparing subscription: bucket=" << bucket_id
249301
<< ", offset=" << offset << std::endl;
250302
}
251-
303+
252304
check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
253305
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl;
254-
255-
// 8.5) Poll and verify bucket_id in records
306+
256307
fluss::ScanRecords batch_records;
257308
check("poll_batch", batch_scanner.Poll(5000, batch_records));
258-
309+
259310
std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl;
260311
for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
261312
const auto& rec = batch_records[i];
262-
std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
263-
<< ", offset=" << rec.offset
313+
std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
314+
<< ", offset=" << rec.offset
264315
<< ", timestamp=" << rec.timestamp << std::endl;
265316
}
266317
if (batch_records.Size() > 5) {
267318
std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl;
268319
}
269320

270-
// 9) Test the new Arrow record batch polling functionality
321+
// 10) Arrow record batch polling
271322
std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
272323

273324
fluss::LogScanner arrow_scanner;
274325
check("new_record_batch_log_scanner", table.NewScan().CreateRecordBatchScanner(arrow_scanner));
275-
276-
// Subscribe to all buckets starting from offset 0
326+
277327
for (int b = 0; b < buckets; ++b) {
278328
check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
279329
}
280-
330+
281331
fluss::ArrowRecordBatches arrow_batches;
282332
check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, arrow_batches));
283-
333+
284334
std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" << std::endl;
285335
for (size_t i = 0; i < arrow_batches.Size(); ++i) {
286336
const auto& batch = arrow_batches[i];
287337
if (batch->Available()) {
288-
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
338+
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
289339
} else {
290340
std::cout << " Batch " << i << ": not available" << std::endl;
291341
}
292342
}
293-
294-
// 10) Test the new Arrow record batch polling with projection
343+
344+
// 11) Arrow record batch polling with projection
295345
std::cout << "\n=== Testing Arrow Record Batch Polling with Projection ===" << std::endl;
296346

297347
fluss::LogScanner projected_arrow_scanner;
298348
check("new_record_batch_log_scanner_with_projection",
299349
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));
300-
301-
// Subscribe to all buckets starting from offset 0
350+
302351
for (int b = 0; b < buckets; ++b) {
303352
check("subscribe_projected_arrow", projected_arrow_scanner.Subscribe(b, 0));
304353
}
305-
354+
306355
fluss::ArrowRecordBatches projected_arrow_batches;
307356
check("poll_projected_record_batch", projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
308-
357+
309358
std::cout << "Polled " << projected_arrow_batches.Size() << " projected Arrow record batches" << std::endl;
310359
for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
311360
const auto& batch = projected_arrow_batches[i];
312361
if (batch->Available()) {
313-
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
362+
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
314363
} else {
315364
std::cout << " Batch " << i << ": not available" << std::endl;
316365
}

0 commit comments

Comments
 (0)