diff --git a/Cargo.lock b/Cargo.lock index 289cc1bc81..a308e37aaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1372,7 +1372,6 @@ dependencies = [ "pal_async", "scsi_buffers", "thiserror 2.0.16", - "tracelimit", "tracing", "vm_resource", ] diff --git a/vm/devices/storage/disk_striped/Cargo.toml b/vm/devices/storage/disk_striped/Cargo.toml index 741dd17903..3e27ee137e 100644 --- a/vm/devices/storage/disk_striped/Cargo.toml +++ b/vm/devices/storage/disk_striped/Cargo.toml @@ -19,7 +19,6 @@ inspect.workspace = true pal_async.workspace = true futures.workspace = true thiserror.workspace = true -tracelimit.workspace = true tracing.workspace = true vm_resource.workspace = true diff --git a/vm/devices/storage/disk_striped/src/lib.rs b/vm/devices/storage/disk_striped/src/lib.rs index 47d4e7750c..133edf27ec 100644 --- a/vm/devices/storage/disk_striped/src/lib.rs +++ b/vm/devices/storage/disk_striped/src/lib.rs @@ -15,7 +15,6 @@ use disk_backend::UnmapBehavior; use disk_backend::resolve::ResolveDiskParameters; use disk_backend::resolve::ResolvedDisk; use disk_backend_resources::StripedDiskHandle; -use futures::future::join_all; use futures::future::try_join_all; use inspect::Inspect; use scsi_buffers::RequestBuffers; @@ -96,35 +95,17 @@ pub enum NewDeviceError { } #[derive(Debug, Error)] -enum IoError { - #[error("cur_buf_offset-{cur_buf_offset} != buf_total_size -{buf_total_size}")] - InternalErrorBufferLengthMismatch { - cur_buf_offset: usize, - buf_total_size: usize, - }, - #[error("trimmed_sectors-{trimmed_sectors} != sector_count -{sector_count}")] - InternalErrorTrimLengthMismatch { - trimmed_sectors: u64, - sector_count: u64, - }, - #[error( - "Sector out of range: start_sector-{start_sector}, end_sector-{end_sector}, self.sector_count-{disk_sector_count}" - )] - IOInvalidSector { - start_sector: u64, - end_sector: u64, - disk_sector_count: u64, - }, - #[error("error in lower disk {index}")] - LowerError { - index: usize, - #[source] - err: DiskError, - }, +#[error("error in lower disk {index}")] +struct LowerError { + index: usize, + #[source] + err: DiskError, } -impl From for DiskError { - fn from(err: IoError) -> Self { +impl From for DiskError { + fn from(err: LowerError) -> Self { + // Treat all lower disk errors as IO errors--we don't currently handle + // specific errors from lower disks in a striped configuration. DiskError::Io(std::io::Error::other(err)) } } @@ -134,98 +115,63 @@ struct Chunk { disk_index: usize, // The chunk starting sector and offset on the disk. disk_sector_index: u64, - // The chunk length. It can be less the sector_count_per_chunk for the first and last chunk. + // The chunk length. It can be less than the sector_count_per_chunk for the + // first and last chunk. chunk_length_in_sectors: u32, } -#[derive(Debug, Clone)] -struct ChunkIter { - disk_count: usize, - sector_count_per_chunk: u32, - start_sector: u64, - end_sector: u64, - start_chunk_index: u64, - end_chunk_index: u64, - cur_chunk_index: u64, -} - -impl Iterator for ChunkIter { - type Item = Chunk; - - fn next(&mut self) -> Option { - // The valid range is [start_chunk_index, end_chunk_index). - assert!( - self.cur_chunk_index >= self.start_chunk_index, - "self.cur_chunk_index-[{}] < self.start_chunk_index-[{}] should never happen.", - self.cur_chunk_index, - self.start_chunk_index - ); - - if self.cur_chunk_index >= self.end_chunk_index { - return None; - } - - // The sector can be in middle of a chunk for the first chunk. - let sector_offset_in_chunk = if self.cur_chunk_index == self.start_chunk_index { - self.start_sector % self.sector_count_per_chunk as u64 - } else { - 0 - }; - - let disk_index = (self.cur_chunk_index % (self.disk_count as u64)) as usize; - let disk_sector_index = (self.cur_chunk_index / self.disk_count as u64) - * self.sector_count_per_chunk as u64 - + sector_offset_in_chunk; - - // The disk end offset can be in middle of the chunk for the last chunk. - let disk_end_offset_in_sectors = (self.cur_chunk_index / self.disk_count as u64) - * self.sector_count_per_chunk as u64 - + if self.cur_chunk_index == self.end_chunk_index - 1 { - self.end_sector - self.sector_count_per_chunk as u64 * self.cur_chunk_index - } else { - self.sector_count_per_chunk as u64 - }; - - // The chunk length can be less the sector_count_per_chunk for the first and last chunk. - let chunk_length_in_sectors = (disk_end_offset_in_sectors - disk_sector_index) as u32; - - self.cur_chunk_index += 1; - - Some(Chunk { - disk_index, - disk_sector_index, - chunk_length_in_sectors, - }) - } -} - impl StripedDisk { - fn get_chunk_iter(&self, start_sector: u64, end_sector: u64) -> Result { - // The valid range is [start_sector, end_sector). + fn get_chunk_iter( + &self, + start_sector: u64, + end_sector: u64, + ) -> Result, DiskError> { if end_sector > self.sector_count { - let err = IoError::IOInvalidSector { - start_sector, - end_sector, - disk_sector_count: self.sector_count, - }; - tracelimit::error_ratelimited!(err = ?err); - return Err(err.into()); + return Err(DiskError::IllegalBlock); } let start_chunk_index = start_sector / self.sector_count_per_chunk as u64; let end_chunk_index = end_sector.div_ceil(self.sector_count_per_chunk as u64); - let chunk_iter = ChunkIter { - disk_count: self.block_devices.len(), - sector_count_per_chunk: self.sector_count_per_chunk, - start_sector, - end_sector, - start_chunk_index, - end_chunk_index, - cur_chunk_index: start_chunk_index, - }; + // Use `Iterator::map` on `Range` so that the iterator implements + // `TrustedLen`, which ensures `Vec::from_iter` (i.e., `.collect()`) is + // optimized to use a single allocation and in-place construction. + let sector_count_per_chunk = self.sector_count_per_chunk; + let disk_count = self.block_devices.len(); + let iter = (start_chunk_index..end_chunk_index).map(move |i| { + // The sector can be in middle of a chunk for the first chunk. + let sector_offset_in_chunk = if i == start_chunk_index { + start_sector % sector_count_per_chunk as u64 + } else { + 0 + }; + + let disk_index = (i % (disk_count as u64)) as usize; + let disk_sector_index = + (i / disk_count as u64) * sector_count_per_chunk as u64 + sector_offset_in_chunk; + + // The disk end offset can be in middle of the chunk for the last + // chunk. + let disk_end_offset_in_sectors = (i / disk_count as u64) + * sector_count_per_chunk as u64 + + if i == end_chunk_index - 1 { + end_sector - sector_count_per_chunk as u64 * i + } else { + sector_count_per_chunk as u64 + }; + + // The chunk length can be less than the sector_count_per_chunk for + // the first and last chunk. + let chunk_length_in_sectors = (disk_end_offset_in_sectors - disk_sector_index) as u32; + + Chunk { + disk_index, + disk_sector_index, + chunk_length_in_sectors, + } + }); - Ok(chunk_iter) + Ok(iter) } } @@ -363,10 +309,7 @@ impl DiskIo for StripedDisk { } async fn eject(&self) -> Result<(), DiskError> { - let mut futures = Vec::new(); - for disk in &self.block_devices { - futures.push(disk.eject()); - } + let futures = self.block_devices.iter().map(|disk| disk.eject()).collect(); await_all_and_check(futures).await?; Ok(()) } @@ -378,39 +321,30 @@ impl DiskIo for StripedDisk { ) -> Result<(), DiskError> { let buf_total_size = buffers.len(); let end_sector = start_sector + ((buf_total_size as u64) >> self.sector_shift); - if end_sector > self.sector_count { - return Err(DiskError::IllegalBlock); - } let chunk_iter = self.get_chunk_iter(start_sector, end_sector)?; - let mut all_futures = Vec::new(); let mut cur_buf_offset: usize = 0; + let all_futures = chunk_iter + .map(|chunk| { + let disk = &self.block_devices[chunk.disk_index]; - for chunk in chunk_iter { - let disk = &self.block_devices[chunk.disk_index]; + let buf_len = (chunk.chunk_length_in_sectors as usize) << self.sector_shift; - let buf_len = (chunk.chunk_length_in_sectors as usize) << self.sector_shift; + let sub_buffers = buffers.subrange(cur_buf_offset, buf_len); + cur_buf_offset += buf_len; - let sub_buffers = buffers.subrange(cur_buf_offset, buf_len); - cur_buf_offset += buf_len; + async move { + disk.read_vectored(&sub_buffers, chunk.disk_sector_index) + .await + .map_err(|err| LowerError { + index: chunk.disk_index, + err, + }) + } + }) + .collect(); - all_futures.push(async move { - disk.read_vectored(&sub_buffers, chunk.disk_sector_index) - .await - .map_err(|err| IoError::LowerError { - index: chunk.disk_index, - err, - }) - }); - } - - if cur_buf_offset != buf_total_size { - return Err(IoError::InternalErrorBufferLengthMismatch { - cur_buf_offset, - buf_total_size, - } - .into()); - } + assert_eq!(cur_buf_offset, buf_total_size); await_all_and_check(all_futures).await?; Ok(()) @@ -424,54 +358,47 @@ impl DiskIo for StripedDisk { ) -> Result<(), DiskError> { let buf_total_size = buffers.len(); let end_sector = start_sector + ((buf_total_size as u64) >> self.sector_shift); - if end_sector > self.sector_count { - return Err(DiskError::IllegalBlock); - } let chunk_iter = self.get_chunk_iter(start_sector, end_sector)?; - let mut all_futures = Vec::new(); let mut cur_buf_offset: usize = 0; + let all_futures = chunk_iter + .map(|chunk| { + let disk = &self.block_devices[chunk.disk_index]; - for chunk in chunk_iter { - let disk = &self.block_devices[chunk.disk_index]; + let buf_len = (chunk.chunk_length_in_sectors as usize) << self.sector_shift; - let buf_len = (chunk.chunk_length_in_sectors as usize) << self.sector_shift; + let sub_buffers = buffers.subrange(cur_buf_offset, buf_len); + cur_buf_offset += buf_len; - let sub_buffers = buffers.subrange(cur_buf_offset, buf_len); - cur_buf_offset += buf_len; + async move { + disk.write_vectored(&sub_buffers, chunk.disk_sector_index, fua) + .await + .map_err(|err| LowerError { + index: chunk.disk_index, + err, + }) + } + }) + .collect(); - all_futures.push(async move { - disk.write_vectored(&sub_buffers, chunk.disk_sector_index, fua) - .await - .map_err(|err| IoError::LowerError { - index: chunk.disk_index, - err, - }) - }); - } - - if cur_buf_offset != buf_total_size { - return Err(IoError::InternalErrorBufferLengthMismatch { - cur_buf_offset, - buf_total_size, - } - .into()); - } + assert_eq!(cur_buf_offset, buf_total_size); await_all_and_check(all_futures).await?; Ok(()) } async fn sync_cache(&self) -> Result<(), DiskError> { - let mut all_futures = Vec::new(); - for (disk_index, disk) in self.block_devices.iter().enumerate() { - all_futures.push(async move { - disk.sync_cache().await.map_err(|err| IoError::LowerError { + let all_futures = self + .block_devices + .iter() + .enumerate() + .map(|(disk_index, disk)| async move { + disk.sync_cache().await.map_err(|err| LowerError { index: disk_index, err, }) - }); - } + }) + .collect(); await_all_and_check(all_futures).await?; Ok(()) } @@ -483,11 +410,6 @@ impl DiskIo for StripedDisk { block_level_only: bool, ) -> Result<(), DiskError> { let end_sector = start_sector + sector_count; - - if end_sector > self.sector_count { - return Err(DiskError::IllegalBlock); - } - let chunk_iter = match self.get_chunk_iter(start_sector, end_sector) { Ok(iter) => iter, Err(err) => { @@ -511,24 +433,24 @@ impl DiskIo for StripedDisk { trimmed_sectors += length; } - if trimmed_sectors != sector_count { - return Err(IoError::InternalErrorTrimLengthMismatch { - trimmed_sectors, - sector_count, - } - .into()); - } + assert_eq!(trimmed_sectors, sector_count); // Create a future for each disk's combined unmap operations - let mut all_futures = Vec::new(); + let all_futures = disk_sectors + .iter() + .enumerate() + .map(|(disk_index, &(start, length))| { + let disk = &self.block_devices[disk_index]; + async move { + if length > 0 { + disk.unmap(start, length, block_level_only).await + } else { + Ok(()) + } + } + }) + .collect(); - for (disk_index, &(start, length)) in disk_sectors.iter().enumerate() { - let disk = &self.block_devices[disk_index]; - // Check if the length is non-zero before pushing to all_futures - if length > 0 { - all_futures.push(async move { disk.unmap(start, length, block_level_only).await }); - } - } await_all_and_check(all_futures).await?; Ok(()) } @@ -546,15 +468,15 @@ impl DiskIo for StripedDisk { } } -async fn await_all_and_check(futures: T) -> Result<(), E> +/// Waits for all IOs to complete and checks for errors. +/// +/// Use `JoinAll` to wait for all IOs even if one fails. This is necessary to +/// avoid dropping IOs while they are in flight. +async fn await_all_and_check(futures: futures::future::JoinAll) -> Result<(), E> where - T: IntoIterator, - T::Item: Future>, + F: Future>, { - // Use join_all to wait for all IOs even if one fails. This is necessary to - // avoid dropping IOs while they are in flight. - let results = join_all(futures).await; - for result in results { + for result in futures.await { result?; } Ok(()) @@ -919,15 +841,8 @@ mod tests { devices.push(ramdisk); } - match StripedDisk::new(devices, None, None) { - Err(err) => { - println!( - "Expected failure since underlying files are not compatible: {}", - err - ); - } - Ok(strip_disk) => panic!("{:?}", strip_disk), - } + StripedDisk::new(devices, None, None) + .expect_err("Expected failure because of incompatible files"); // Creating striping disk using invalid chunk size shall fail. let mut block_devices = Vec::new(); @@ -936,12 +851,8 @@ mod tests { block_devices.push(ramdisk); } - match StripedDisk::new(block_devices, Some(4 * 1024 + 1), None) { - Err(err) => { - println!("Expected failure since chunk size is invalid: {}", err); - } - Ok(strip_disk) => panic!("{:?}", strip_disk), - } + StripedDisk::new(block_devices, Some(4 * 1024 + 1), None) + .expect_err("Expected failure since chunk size is invalid"); // Creating striping disk using invalid logic sector count shall fail. let mut block_devices = Vec::new(); @@ -950,19 +861,12 @@ mod tests { block_devices.push(ramdisk); } - match StripedDisk::new( + StripedDisk::new( block_devices, Some(4 * 1024), Some(1024 * 1024 * 2 / 512 + 1), - ) { - Err(err) => { - println!( - "Expected failure since logic sector count is invalid: {}", - err - ); - } - Ok(strip_disk) => panic!("{:?}", strip_disk), - } + ) + .expect_err("Expected failure since logic sector count is invalid"); // Create a simple striping disk. let mut block_devices = Vec::new(); @@ -971,10 +875,8 @@ mod tests { block_devices.push(ramdisk); } - let disk = match StripedDisk::new(block_devices, Some(8 * 1024), None) { - Err(err) => panic!("{}", err), - Ok(strip_disk) => strip_disk, - }; + let disk = StripedDisk::new(block_devices, Some(8 * 1024), None) + .expect("Failed to create striping disk"); assert_eq!(disk.sector_size, 512); assert_eq!(disk.sector_count_per_chunk, 8 * 1024 / 512); @@ -984,111 +886,28 @@ mod tests { let guest_mem = create_guest_mem(2 * HV_PAGE_SIZE as usize); let write_buffers = OwnedRequestBuffers::new(&[0]); let buf_sector_count = write_buffers.len().div_ceil(disk.sector_size as usize); - match disk - .write_vectored( - &write_buffers.buffer(&guest_mem), - disk.sector_count() - buf_sector_count as u64 + 1, - false, - ) - .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected write failure because of 1 sector off: {:?}", err); - } - } + disk.write_vectored( + &write_buffers.buffer(&guest_mem), + disk.sector_count() - buf_sector_count as u64 + 1, + false, + ) + .await + .expect_err("Expected write failure because of 1 sector off"); // read 1 sector off shall be caught. let guest_mem = create_guest_mem(2 * HV_PAGE_SIZE as usize); let read_buffers = OwnedRequestBuffers::new(&[1]); let buf_sector_count = read_buffers.len().div_ceil(disk.sector_size as usize); - match disk - .read_vectored( - &write_buffers.buffer(&guest_mem), - disk.sector_count() - buf_sector_count as u64 + 1, - ) - .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected read failure because of 1 sector off: {:?}", err); - } - } - - match disk - .unmap( - (disk.sector_count() - 2) * disk.sector_size as u64, - disk.sector_size as u64 * 3, - true, - ) - .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected failure because of 1 sector off: {:?}", err); - } - } - - // write 1 byte off shall be caught. - let write_buffers = - OwnedRequestBuffers::new_unaligned(&[0], 0, disk.sector_size as usize + 1); - let buf_sector_count = write_buffers.len().div_ceil(disk.sector_size as usize); - match disk - .write_vectored( - &write_buffers.buffer(&guest_mem), - disk.sector_count() - buf_sector_count as u64 + 1, - false, - ) - .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected failure because of write 1 byte off: {:?}", err); - } - } - - // read 1 byte off shall be caught. - let read_buffers = - OwnedRequestBuffers::new_unaligned(&[1], 0, disk.sector_size as usize + 1); - let buf_sector_count = read_buffers.len().div_ceil(disk.sector_size as usize); - match disk - .read_vectored( - &read_buffers.buffer(&guest_mem), - disk.sector_count() - buf_sector_count as u64 + 1, - ) - .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected failure because of read 1 byte off: {:?}", err); - } - } + disk.read_vectored( + &read_buffers.buffer(&guest_mem), + disk.sector_count() - buf_sector_count as u64 + 1, + ) + .await + .expect_err("Expected read failure because of 1 sector off"); - match disk - .unmap( - (disk.sector_count() - 2) * disk.sector_size as u64, - disk.sector_size as u64 * 2 + 1, - true, - ) + disk.unmap(disk.sector_count() - 2, 3, true) .await - { - Ok(_) => { - panic!("{:?}", disk); - } - Err(err) => { - println!("Expected failure because of 1 byte off: {:?}", err); - } - } + .expect_err("Expected unmap failure because of 1 sector off"); } #[async_test]