Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ mockito = "1"
motore-macros = "0.4.3"
murmur3 = "0.5.2"
num-bigint = "0.4.6"
num_cpus = "1.17"
once_cell = "1.20"
opendal = "0.54.0"
ordered-float = "4"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ itertools = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
num-bigint = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
Expand Down
161 changes: 157 additions & 4 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

use arrow_array::RecordBatch;
use num_cpus;
use tokio::task::JoinSet;

use crate::spec::DataFileBuilder;
use crate::writer::CurrentFileStatus;
Expand All @@ -24,14 +26,18 @@ use crate::{Error, ErrorKind, Result};

/// Builder for creating a `RollingFileWriter` that rolls over to a new file
/// when the data size exceeds a target threshold.
/// max_concurrent_close_tasks is the max number of file writers to close in parallel running in the background.
#[derive(Clone)]
pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
inner_builder: B,
target_file_size: usize,
max_concurrent_close_tasks: usize,
}

impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
/// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size.
/// By default, this builder is configured to close one file at a time.
/// To enable parallel file closing, use the `with_concurrency` constructor.
///
/// # Arguments
///
Expand All @@ -42,9 +48,29 @@ impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
/// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior.
/// The actual file size on disk is expected to be slightly larger than `target_file_size`.
pub fn new(inner_builder: B, target_file_size: usize) -> Self {
Self::with_concurrency(inner_builder, target_file_size, Some(1))
}

/// Creates a new `RollingFileWriterBuilder` with the specified concurrency for closing file writers in parallel.
///
/// # Arguments
///
/// * `inner_builder` - The builder for the underlying file writer
/// * `target_file_size` - The target size in bytes before rolling over to a new file
/// * `max_concurrent_close_tasks` - The max number of file writers to close in parallel running in the background.
/// If not provided, it will use the number of CPU cores as the default with a maximum of 4.
pub fn with_concurrency(
inner_builder: B,
target_file_size: usize,
max_concurrent_close_tasks: Option<usize>,
) -> Self {
let default_concurrency: usize = num_cpus::get().clamp(1, 4);
let max_concurrent_close_tasks: usize =
max_concurrent_close_tasks.unwrap_or(default_concurrency);
Self {
inner_builder,
target_file_size,
max_concurrent_close_tasks,
}
}
}
Expand All @@ -58,6 +84,8 @@ impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
inner_builder: self.inner_builder,
target_file_size: self.target_file_size,
data_file_builders: vec![],
max_concurrent_close_tasks: self.max_concurrent_close_tasks,
closing_tasks: JoinSet::new(),
})
}
}
Expand All @@ -68,11 +96,16 @@ impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
/// This writer wraps another file writer that tracks the amount of data written.
/// When the data size exceeds the target size, it closes the current file and
/// starts writing to a new one.
///
/// max_concurrent_close_tasks is the max number of file writers to close in parallel.
/// closing_tasks is a set of tasks that are closing the file writers.
pub struct RollingFileWriter<B: FileWriterBuilder> {
inner: Option<B::R>,
inner_builder: B,
target_file_size: usize,
data_file_builders: Vec<DataFileBuilder>,
max_concurrent_close_tasks: usize,
closing_tasks: JoinSet<Result<Vec<DataFileBuilder>>>,
}

impl<B: FileWriterBuilder> RollingFileWriter<B> {
Expand All @@ -82,7 +115,11 @@ impl<B: FileWriterBuilder> RollingFileWriter<B> {
///
/// `true` if a new file should be started, `false` otherwise
fn should_roll(&self) -> bool {
self.current_written_size() > self.target_file_size
if let Some(writer) = &self.inner {
writer.current_written_size() > self.target_file_size
} else {
false
}
}
}

Expand All @@ -95,8 +132,25 @@ impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {

if self.should_roll() {
if let Some(inner) = self.inner.take() {
// close the current writer, roll to a new file
self.data_file_builders.extend(inner.close().await?);
if self.closing_tasks.len() >= self.max_concurrent_close_tasks {
// Wait for one task to close (non-blocking rollover)
if let Some(result) = self.closing_tasks.join_next().await {
match result {
Ok(Ok(builders)) => self.data_file_builders.extend(builders),
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(Error::new(
ErrorKind::Unexpected,
"Closing task panicked inside the writer",
)
.with_source(e));
}
}
}
}

// Spawn background close task
self.closing_tasks.spawn(async move { inner.close().await });

// start a new writer
self.inner = Some(self.inner_builder.clone().build().await?);
Expand All @@ -115,12 +169,29 @@ impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
}

async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
// close the current writer and merge the output
// Close current writer
if let Some(current_writer) = self.inner {
self.data_file_builders
.extend(current_writer.close().await?);
}

// Wait for all closing tasks to complete
let mut remaining_builders = Vec::new();
while let Some(result) = self.closing_tasks.join_next().await {
match result {
Ok(Ok(builders)) => remaining_builders.extend(builders),
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(Error::new(
ErrorKind::Unexpected,
"Closing task panicked inside the writer",
)
.with_source(e));
}
}
}

self.data_file_builders.extend(remaining_builders);
Ok(self.data_file_builders)
}
}
Expand Down Expand Up @@ -322,4 +393,86 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_rolling_writer_with_rolling_with_default_concurrent_should_roll_over_with_default_concurrent_limit()
-> Result<()> {
let temp_dir = TempDir::new()?;
let file_io = FileIOBuilder::new_fs_io().build()?;
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// Create schema
let schema = make_test_schema()?;

// Create writer builders
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema),
file_io.clone(),
location_gen,
file_name_gen,
);

// Set at None to use the default concurrency
let rolling_writer_builder =
RollingFileWriterBuilder::with_concurrency(parquet_writer_builder, 1024, None);

let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0);

// Create writer
let mut writer = data_file_writer_builder.build().await?;

// Create test data
let arrow_schema = make_test_arrow_schema();
let arrow_schema_ref = Arc::new(arrow_schema.clone());

let names = vec![
"Alice", "Bob", "Charlie", "Dave", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy",
"Kelly", "Larry", "Mallory", "Shawn",
];

let mut rng = rand::thread_rng();
let batch_num = 10;
let batch_rows = 100;
let expected_rows = batch_num * batch_rows;

for i in 0..batch_num {
let int_values: Vec<i32> = (0..batch_rows).map(|row| i * batch_rows + row).collect();
let str_values: Vec<&str> = (0..batch_rows)
.map(|_| *names.iter().choose(&mut rng).unwrap())
.collect();

let int_array = Arc::new(Int32Array::from(int_values)) as ArrayRef;
let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef;

let batch =
RecordBatch::try_new(Arc::clone(&arrow_schema_ref), vec![int_array, str_array])
.expect("Failed to create RecordBatch");

writer.write(batch).await?;
}

// Close writer and get data files
let data_files = writer.close().await?;

// Verify multiple files were created (at least 4)
assert!(
data_files.len() > 4,
"Expected at least 4 data files to be created, but got {}",
data_files.len()
);

// Verify total record count across all files
let total_records: u64 = data_files.iter().map(|file| file.record_count).sum();
assert_eq!(
total_records, expected_rows as u64,
"Expected {} total records across all files",
expected_rows
);

Ok(())
}
}
Loading