diff --git a/Cargo.lock b/Cargo.lock index 2b09c3c39a..7bb28f88a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3234,15 +3234,15 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" -version = "0.3.9" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" [[package]] name = "hermit-abi" -version = "0.4.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" [[package]] name = "hex" @@ -3528,6 +3528,7 @@ dependencies = [ "moka", "murmur3", "num-bigint", + "num_cpus", "once_cell", "opendal", "ordered-float 4.6.0", @@ -4207,7 +4208,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4712,11 +4713,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi 0.3.9", + "hermit-abi 0.5.2", "libc", ] diff --git a/Cargo.toml b/Cargo.toml index dbd117855b..424e59bfe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ mockito = "1" motore-macros = "0.4.3" murmur3 = "0.5.2" num-bigint = "0.4.6" +num_cpus = "1.17" once_cell = "1.20" opendal = "0.54.0" ordered-float = "4" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index fe4fdf73d6..678483dc46 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -70,6 +70,7 @@ itertools = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } +num_cpus = { workspace = true } once_cell = { workspace = true } opendal = { workspace = true } ordered-float = { workspace = true } diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 93fa975ce1..ed4db88f78 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -16,6 +16,8 @@ // under the License. use arrow_array::RecordBatch; +use num_cpus; +use tokio::task::JoinSet; use crate::spec::DataFileBuilder; use crate::writer::CurrentFileStatus; @@ -24,14 +26,18 @@ use crate::{Error, ErrorKind, Result}; /// Builder for creating a `RollingFileWriter` that rolls over to a new file /// when the data size exceeds a target threshold. +/// max_concurrent_close_tasks is the max number of file writers to close in parallel running in the background. #[derive(Clone)] pub struct RollingFileWriterBuilder { inner_builder: B, target_file_size: usize, + max_concurrent_close_tasks: usize, } impl RollingFileWriterBuilder { /// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. + /// By default, this builder is configured to close one file at a time. + /// To enable parallel file closing, use the `with_concurrency` constructor. /// /// # Arguments /// @@ -42,9 +48,29 @@ impl RollingFileWriterBuilder { /// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior. /// The actual file size on disk is expected to be slightly larger than `target_file_size`. pub fn new(inner_builder: B, target_file_size: usize) -> Self { + Self::with_concurrency(inner_builder, target_file_size, Some(1)) + } + + /// Creates a new `RollingFileWriterBuilder` with the specified concurrency for closing file writers in parallel. + /// + /// # Arguments + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `target_file_size` - The target size in bytes before rolling over to a new file + /// * `max_concurrent_close_tasks` - The max number of file writers to close in parallel running in the background. + /// If not provided, it will use the number of CPU cores as the default with a maximum of 4. + pub fn with_concurrency( + inner_builder: B, + target_file_size: usize, + max_concurrent_close_tasks: Option, + ) -> Self { + let default_concurrency: usize = num_cpus::get().clamp(1, 4); + let max_concurrent_close_tasks: usize = + max_concurrent_close_tasks.unwrap_or(default_concurrency); Self { inner_builder, target_file_size, + max_concurrent_close_tasks, } } } @@ -58,6 +84,8 @@ impl FileWriterBuilder for RollingFileWriterBuilder { inner_builder: self.inner_builder, target_file_size: self.target_file_size, data_file_builders: vec![], + max_concurrent_close_tasks: self.max_concurrent_close_tasks, + closing_tasks: JoinSet::new(), }) } } @@ -68,11 +96,16 @@ impl FileWriterBuilder for RollingFileWriterBuilder { /// This writer wraps another file writer that tracks the amount of data written. /// When the data size exceeds the target size, it closes the current file and /// starts writing to a new one. +/// +/// max_concurrent_close_tasks is the max number of file writers to close in parallel. +/// closing_tasks is a set of tasks that are closing the file writers. pub struct RollingFileWriter { inner: Option, inner_builder: B, target_file_size: usize, data_file_builders: Vec, + max_concurrent_close_tasks: usize, + closing_tasks: JoinSet>>, } impl RollingFileWriter { @@ -82,7 +115,11 @@ impl RollingFileWriter { /// /// `true` if a new file should be started, `false` otherwise fn should_roll(&self) -> bool { - self.current_written_size() > self.target_file_size + if let Some(writer) = &self.inner { + writer.current_written_size() > self.target_file_size + } else { + false + } } } @@ -95,8 +132,25 @@ impl FileWriter for RollingFileWriter { if self.should_roll() { if let Some(inner) = self.inner.take() { - // close the current writer, roll to a new file - self.data_file_builders.extend(inner.close().await?); + if self.closing_tasks.len() >= self.max_concurrent_close_tasks { + // Wait for one task to close (non-blocking rollover) + if let Some(result) = self.closing_tasks.join_next().await { + match result { + Ok(Ok(builders)) => self.data_file_builders.extend(builders), + Ok(Err(e)) => return Err(e), + Err(e) => { + return Err(Error::new( + ErrorKind::Unexpected, + "Closing task panicked inside the writer", + ) + .with_source(e)); + } + } + } + } + + // Spawn background close task + self.closing_tasks.spawn(async move { inner.close().await }); // start a new writer self.inner = Some(self.inner_builder.clone().build().await?); @@ -115,12 +169,29 @@ impl FileWriter for RollingFileWriter { } async fn close(mut self) -> Result> { - // close the current writer and merge the output + // Close current writer if let Some(current_writer) = self.inner { self.data_file_builders .extend(current_writer.close().await?); } + // Wait for all closing tasks to complete + let mut remaining_builders = Vec::new(); + while let Some(result) = self.closing_tasks.join_next().await { + match result { + Ok(Ok(builders)) => remaining_builders.extend(builders), + Ok(Err(e)) => return Err(e), + Err(e) => { + return Err(Error::new( + ErrorKind::Unexpected, + "Closing task panicked inside the writer", + ) + .with_source(e)); + } + } + } + + self.data_file_builders.extend(remaining_builders); Ok(self.data_file_builders) } } @@ -322,4 +393,86 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_rolling_writer_with_rolling_with_default_concurrent_should_roll_over_with_default_concurrent_limit() + -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = make_test_schema()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Set at None to use the default concurrency + let rolling_writer_builder = + RollingFileWriterBuilder::with_concurrency(parquet_writer_builder, 1024, None); + + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0); + + // Create writer + let mut writer = data_file_writer_builder.build().await?; + + // Create test data + let arrow_schema = make_test_arrow_schema(); + let arrow_schema_ref = Arc::new(arrow_schema.clone()); + + let names = vec![ + "Alice", "Bob", "Charlie", "Dave", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", + "Kelly", "Larry", "Mallory", "Shawn", + ]; + + let mut rng = rand::thread_rng(); + let batch_num = 10; + let batch_rows = 100; + let expected_rows = batch_num * batch_rows; + + for i in 0..batch_num { + let int_values: Vec = (0..batch_rows).map(|row| i * batch_rows + row).collect(); + let str_values: Vec<&str> = (0..batch_rows) + .map(|_| *names.iter().choose(&mut rng).unwrap()) + .collect(); + + let int_array = Arc::new(Int32Array::from(int_values)) as ArrayRef; + let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef; + + let batch = + RecordBatch::try_new(Arc::clone(&arrow_schema_ref), vec![int_array, str_array]) + .expect("Failed to create RecordBatch"); + + writer.write(batch).await?; + } + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify multiple files were created (at least 4) + assert!( + data_files.len() > 4, + "Expected at least 4 data files to be created, but got {}", + data_files.len() + ); + + // Verify total record count across all files + let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); + assert_eq!( + total_records, expected_rows as u64, + "Expected {} total records across all files", + expected_rows + ); + + Ok(()) + } }