Skip to content

Commit

Permalink
Add mmap example
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Feb 6, 2025
1 parent 96b454f commit 7527f8e
Showing 1 changed file with 90 additions and 3 deletions.
93 changes: 90 additions & 3 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
use std::io::Cursor;
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_ipc::reader::{FileReader, StreamReader};
use arrow_ipc::reader::{read_footer_length, FileDecoder, FileReader, StreamReader};
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::CompressionType;
use arrow_ipc::{root_as_footer, Block, CompressionType};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use std::sync::Arc;
use tempfile::{tempdir};
use arrow_buffer::Buffer;
use arrow_ipc::convert::fb_to_schema;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("arrow_ipc_stream_writer");
Expand Down Expand Up @@ -91,9 +94,93 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

// mmap file read
group.bench_function("FileReader/read_10/mmap", |b| {
let batch = create_batch(8192, true);
// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let file = std::fs::File::create(&path).unwrap();
let mut writer = FileWriter::try_new(file, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();

b.iter(move || {
let ipc_file = std::fs::File::open(&path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
decoder.get_batch(i);
}
})
});
}

// copied from the zero_copy_ipc example.
// should we move this to an actual API?
/// Wrapper around the example in the `FileDecoder` which handles the
/// low level interaction with the Arrow IPC format.
struct IPCBufferDecoder {
/// Memory (or memory mapped) Buffer with the data
buffer: Buffer,
/// Decoder that reads Arrays that refers to the underlying buffers
decoder: FileDecoder,
/// Location of the batches within the buffer
batches: Vec<Block>,
}

impl IPCBufferDecoder {
fn new(buffer: Buffer) -> Self {
let trailer_start = buffer.len() - 10;
let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();

let schema = fb_to_schema(footer.schema().unwrap());

let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());

// Read dictionaries
for block in footer.dictionaries().iter().flatten() {
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = buffer.slice_with_length(block.offset() as _, block_len);
decoder.read_dictionary(block, &data).unwrap();
}

// convert to Vec from the flatbuffers Vector to avoid having a direct dependency on flatbuffers
let batches = footer
.recordBatches()
.map(|b| b.iter().copied().collect())
.unwrap_or_default();

Self {
buffer,
decoder,
batches,
}
}

fn num_batches(&self) -> usize {
self.batches.len()
}

fn get_batch(&self, i: usize) -> RecordBatch {
let block = &self.batches[i];
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = self
.buffer
.slice_with_length(block.offset() as _, block_len);
self.decoder.read_record_batch(block, &data).unwrap().unwrap()
}
}


fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Expand Down

0 comments on commit 7527f8e

Please sign in to comment.