Skip to content

Commit 6cd2982

Browse files
authored
refactor: use iceberg-rust rolling writer directly (#134)
1 parent 008f7e7 commit 6cd2982

8 files changed

Lines changed: 70 additions & 1103 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@ datafusion = "52.2"
1818

1919
# Local workspace members
2020
futures = "0.3.17"
21-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "72b0729b94435a958554e009a940502a3ebeb88a", features = [
21+
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "baaa9c7b2deb3e744db21712e4b6ced5891a6012", features = [
2222
"storage-s3",
2323
"storage-gcs",
2424
"storage-azblob",
2525
"storage-azdls",
2626
] }
27-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "72b0729b94435a958554e009a940502a3ebeb88a" }
28-
iceberg-catalog-memory = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "72b0729b94435a958554e009a940502a3ebeb88a" }
29-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "72b0729b94435a958554e009a940502a3ebeb88a" }
27+
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "baaa9c7b2deb3e744db21712e4b6ced5891a6012" }
28+
iceberg-catalog-memory = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "baaa9c7b2deb3e744db21712e4b6ced5891a6012" }
29+
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "baaa9c7b2deb3e744db21712e4b6ced5891a6012" }
3030
iceberg-compaction-core = { path = "./core" }
31-
iceberg-datafusion = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "72b0729b94435a958554e009a940502a3ebeb88a" }
31+
iceberg-datafusion = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "baaa9c7b2deb3e744db21712e4b6ced5891a6012" }
3232

3333
parquet = { version = "57.1", features = ["async"] }
3434
port_scanner = "0.1.5"

core/src/config/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,32 @@ pub struct CompactionExecutionConfig {
360360
#[builder(default = "DEFAULT_NORMALIZED_COLUMN_IDENTIFIERS")]
361361
pub enable_normalized_column_identifiers: bool,
362362

363+
/// Deprecated: this setting is no longer used after switching to the upstream
364+
/// `RollingFileWriter`.
365+
///
366+
/// It remains temporarily for backward compatibility and will be removed in a
367+
/// future change.
368+
#[deprecated(
369+
note = "unused after switching to the upstream RollingFileWriter; this field is now a no-op and will be removed in a future change"
370+
)]
363371
#[builder(default = "DEFAULT_ENABLE_DYNAMIC_SIZE_ESTIMATION")]
372+
#[builder_setter_attr(deprecated(
373+
note = "unused after switching to the upstream RollingFileWriter; this setter is now a no-op and will be removed in a future change"
374+
))]
364375
pub enable_dynamic_size_estimation: bool,
365376

377+
/// Deprecated: this setting is no longer used after switching to the upstream
378+
/// `RollingFileWriter`.
379+
///
380+
/// It remains temporarily for backward compatibility and will be removed in a
381+
/// future change.
382+
#[deprecated(
383+
note = "unused after switching to the upstream RollingFileWriter; this field is now a no-op and will be removed in a future change"
384+
)]
366385
#[builder(default = "DEFAULT_SIZE_ESTIMATION_SMOOTHING_FACTOR")]
386+
#[builder_setter_attr(deprecated(
387+
note = "unused after switching to the upstream RollingFileWriter; this setter is now a no-op and will be removed in a future change"
388+
))]
367389
pub size_estimation_smoothing_factor: f64,
368390

369391
/// Maximum concurrent compaction plans in `compact()` method.

core/src/executor/datafusion/mod.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use super::{CompactionExecutor, RewriteFilesStat};
3838
use crate::CompactionError;
3939
use crate::config::CompactionExecutionConfig;
4040
use crate::error::Result;
41-
use crate::executor::iceberg_writer::rolling_iceberg_writer;
4241
pub mod datafusion_processor;
4342
use super::{RewriteFilesRequest, RewriteFilesResponse};
4443
pub mod file_scan_task_table_provider;
@@ -167,6 +166,14 @@ pub fn build_iceberg_data_file_writer(
167166
partition_spec: Arc<PartitionSpec>,
168167
execution_config: Arc<CompactionExecutionConfig>,
169168
) -> Result<Box<dyn IcebergWriter>> {
169+
let target_file_size =
170+
usize::try_from(execution_config.target_file_size_bytes).map_err(|_| {
171+
CompactionError::Config(format!(
172+
"target_file_size_bytes {} exceeds platform usize",
173+
execution_config.target_file_size_bytes
174+
))
175+
})?;
176+
170177
let data_file_builder = {
171178
let parquet_writer_builder = ParquetWriterBuilder::new(
172179
execution_config.write_parquet_properties.clone(),
@@ -180,27 +187,18 @@ pub fn build_iceberg_data_file_writer(
180187
iceberg::spec::DataFileFormat::Parquet,
181188
);
182189

183-
// Noop wrapper for `DataFileWriterBuilder`
184190
let rolling_writer_builder = RollingFileWriterBuilder::new(
185191
parquet_writer_builder,
186-
usize::MAX, // No rolling based on row count
192+
target_file_size,
187193
file_io,
188194
location_generator,
189195
file_name_generator,
190-
);
196+
)
197+
.with_max_concurrent_closes(execution_config.max_concurrent_closes);
191198

192199
DataFileWriterBuilder::new(rolling_writer_builder)
193200
};
194201

195-
let rolling_iceberg_writer_builder =
196-
rolling_iceberg_writer::RollingIcebergWriterBuilder::new(data_file_builder)
197-
.with_target_file_size(execution_config.target_file_size_bytes)
198-
.with_max_concurrent_closes(execution_config.max_concurrent_closes)
199-
.with_dynamic_size_estimation(execution_config.enable_dynamic_size_estimation)
200-
.with_size_estimation_smoothing_factor(
201-
execution_config.size_estimation_smoothing_factor,
202-
);
203-
204202
let partition_splitter = if partition_spec.is_unpartitioned() {
205203
None
206204
} else {
@@ -211,7 +209,7 @@ pub fn build_iceberg_data_file_writer(
211209
};
212210

213211
let iceberg_task_writer = TaskWriter::new_with_partition_splitter(
214-
rolling_iceberg_writer_builder,
212+
data_file_builder,
215213
true,
216214
schema,
217215
partition_spec,

core/src/executor/iceberg_writer/mod.rs

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)