Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions rust/lance-encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::ops::Range;
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, TryFutureExt};

use lance_core::Result;
use lance_core::{Error, Result};
use snafu::location;

pub mod buffer;
pub mod compression;
Expand Down Expand Up @@ -88,8 +89,21 @@ impl BufferScheduler {
Self { data }
}

fn satisfy_request(&self, req: Range<u64>) -> Bytes {
self.data.slice(req.start as usize..req.end as usize)
fn satisfy_request(&self, req: Range<u64>) -> Result<Bytes> {
let start = req.start as usize;
let end = req.end as usize;
if end > self.data.len() {
return Err(Error::io(
format!(
"byte range {}..{} out of bounds for buffer of size {}",
start,
end,
self.data.len()
),
location!(),
));
}
Ok(self.data.slice(start..end))
}
}

Expand All @@ -99,10 +113,12 @@ impl EncodingsIo for BufferScheduler {
ranges: Vec<Range<u64>>,
_priority: u64,
) -> BoxFuture<'static, Result<Vec<Bytes>>> {
std::future::ready(Ok(ranges
.into_iter()
.map(|range| self.satisfy_request(range))
.collect::<Vec<_>>()))
std::future::ready(
ranges
.into_iter()
.map(|range| self.satisfy_request(range))
.collect::<Result<Vec<_>>>(),
)
.boxed()
}
}
27 changes: 22 additions & 5 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,21 @@ fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
range1.start < range2.end && range2.start < range1.end
}

fn checked_slice(data: &Bytes, range: Range<usize>) -> Result<Bytes> {
if range.end > data.len() {
return Err(Error::io(
format!(
"byte range {}..{} out of bounds for buffer of size {}",
range.start,
range.end,
data.len()
),
location!(),
));
}
Ok(data.slice(range))
}

impl FileScheduler {
/// Submit a batch of I/O requests to the reader
///
Expand Down Expand Up @@ -893,26 +908,28 @@ impl FileScheduler {
if is_overlapping(updated_range, orig_range) {
// We need to undo the coalescing and splitting done earlier
let start = orig_range.start as usize - byte_offset;
let data = &bytes_vec[updated_index];
if orig_range.end <= updated_range.end {
// The original range is fully contained in the updated range, can do
// zero-copy slice
let end = orig_range.end as usize - byte_offset;
final_bytes.push(bytes_vec[updated_index].slice(start..end));
final_bytes.push(checked_slice(data, start..end)?);
} else {
// The original read was split into multiple requests, need to copy
// back into a single buffer
let orig_size = orig_range.end - orig_range.start;
let mut merged_bytes = Vec::with_capacity(orig_size as usize);
merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
merged_bytes.extend_from_slice(&checked_slice(data, start..data.len())?);
let mut copy_offset = merged_bytes.len() as u64;
while copy_offset < orig_size {
updated_index += 1;
let next_range = &updated_requests[updated_index];
let bytes_to_take =
(orig_size - copy_offset).min(next_range.end - next_range.start);
merged_bytes.extend_from_slice(
&bytes_vec[updated_index].slice(0..bytes_to_take as usize),
);
merged_bytes.extend_from_slice(&checked_slice(
&bytes_vec[updated_index],
0..bytes_to_take as usize,
)?);
copy_offset += bytes_to_take;
}
final_bytes.push(Bytes::from(merged_bytes));
Expand Down
Loading