Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be more resilient when opening too many files #48

Open
Kerollmops opened this issue Nov 7, 2023 · 0 comments
Open

Be more resilient when opening too many files #48

Kerollmops opened this issue Nov 7, 2023 · 0 comments
Labels
enhancement New feature or request

Comments

@Kerollmops
Copy link
Member

Kerollmops commented Nov 7, 2023

We use grenad in Meilisearch, and we often have the too many open files (os error 24) error, which stops the whole indexation. I want to propose a change in the way the grenad sorter currently works.

How does it work now?

  1. The sorter allocates a big in-memory buffer
  2. If we can insert entries into the in-memory buffer, we do and return to 2.
  3. If there are less than 25 on-disk chunks, we create a new file, dump the buffer into it, and go to 2. Otherwise, we go to 4.
  4. We create a new file and merge the 25 files into the 26th one, then delete all of them. Return to 2.

In this configuration, if an (error 24) is raised, we cannot do anything apart from returning the error above. The reason is that the in-memory buffer is full, so we cannot accept any new entry, and we cannot write the buffer's content into any chunk file as the content would be unordered.

grenad/src/sorter.rs

Lines 470 to 625 in 46e5e27

pub fn insert<K, V>(&mut self, key: K, val: V) -> Result<(), Error<U>>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let val = val.as_ref();
#[allow(clippy::branches_sharing_code)]
if self.entries.fits(key, val) || (!self.threshold_exceeded() && self.allow_realloc) {
self.entries.insert(key, val);
} else {
self.chunks_total_size += self.write_chunk()?;
self.entries.insert(key, val);
if self.chunks.len() >= self.max_nb_chunks {
self.chunks_total_size = self.merge_chunks()?;
}
}
Ok(())
}
fn threshold_exceeded(&self) -> bool {
self.entries.memory_usage() >= self.dump_threshold
}
/// Returns the exact amount of bytes written to disk, the value can be trusted,
/// this is not an estimate.
///
/// Writes the in-memory entries to disk, using the specify settings
/// to compress the block and entries. It clears the in-memory entries.
fn write_chunk(&mut self) -> Result<u64, Error<U>> {
let count_write_chunk = self
.chunk_creator
.create()
.map_err(Into::into)
.map_err(Error::convert_merge_error)
.map(CountWrite::new)?;
let mut writer_builder = WriterBuilder::new();
if let Some(compression_type) = self.chunk_compression_type {
writer_builder.compression_type(compression_type);
}
if let Some(compression_level) = self.chunk_compression_level {
writer_builder.compression_level(compression_level);
}
if let Some(index_key_interval) = self.index_key_interval {
writer_builder.index_key_interval(index_key_interval);
}
if let Some(block_size) = self.block_size {
writer_builder.block_size(block_size);
}
if let Some(index_levels) = self.index_levels {
writer_builder.index_levels(index_levels);
}
let mut writer = writer_builder.build(count_write_chunk);
if self.sort_in_parallel {
self.entries.par_sort_by_key(self.sort_algorithm);
} else {
self.entries.sort_by_key(self.sort_algorithm);
}
let mut current = None;
for (key, value) in self.entries.iter() {
match current.as_mut() {
None => current = Some((key, vec![Cow::Borrowed(value)])),
Some((current_key, vals)) => {
if current_key != &key {
let merged_val = (self.merge)(current_key, vals).map_err(Error::Merge)?;
writer.insert(&current_key, &merged_val)?;
vals.clear();
*current_key = key;
}
vals.push(Cow::Borrowed(value));
}
}
}
if let Some((key, vals)) = current.take() {
let merged_val = (self.merge)(key, &vals).map_err(Error::Merge)?;
writer.insert(key, &merged_val)?;
}
// We retrieve the wrapped CountWrite and extract
// the amount of bytes effectively written.
let mut count_write_chunk = writer.into_inner()?;
count_write_chunk.flush()?;
let written_bytes = count_write_chunk.count();
let chunk = count_write_chunk.into_inner()?;
self.chunks.push(chunk);
self.entries.clear();
Ok(written_bytes)
}
/// Returns the exact amount of bytes written to disk, the value can be trusted,
/// this is not an estimate.
///
/// Merges all of the chunks into a final chunk that replaces them.
/// It uses the user provided merge function to resolve merge conflicts.
fn merge_chunks(&mut self) -> Result<u64, Error<U>> {
let count_write_chunk = self
.chunk_creator
.create()
.map_err(Into::into)
.map_err(Error::convert_merge_error)
.map(CountWrite::new)?;
let mut writer_builder = WriterBuilder::new();
if let Some(compression_type) = self.chunk_compression_type {
writer_builder.compression_type(compression_type);
}
if let Some(compression_level) = self.chunk_compression_level {
writer_builder.compression_level(compression_level);
}
if let Some(index_key_interval) = self.index_key_interval {
writer_builder.index_key_interval(index_key_interval);
}
if let Some(block_size) = self.block_size {
writer_builder.block_size(block_size);
}
if let Some(index_levels) = self.index_levels {
writer_builder.index_levels(index_levels);
}
let mut writer = writer_builder.build(count_write_chunk);
let sources: Result<Vec<_>, Error<U>> = self
.chunks
.drain(..)
.map(|mut chunk| {
chunk.seek(SeekFrom::Start(0))?;
Reader::new(chunk).and_then(Reader::into_cursor).map_err(Error::convert_merge_error)
})
.collect();
// Create a merger to merge all those chunks.
let mut builder = Merger::builder(&self.merge);
builder.extend(sources?);
let merger = builder.build();
let mut iter = merger.into_stream_merger_iter().map_err(Error::convert_merge_error)?;
while let Some((key, val)) = iter.next()? {
writer.insert(key, val)?;
}
let mut count_write_chunk = writer.into_inner()?;
count_write_chunk.flush()?;
let written_bytes = count_write_chunk.count();
let chunk = count_write_chunk.into_inner()?;
self.chunks.push(chunk);
Ok(written_bytes)
}

How can we improve that?

  1. The sorter allocates a big in-memory buffer and one backup file.
  2. If we can insert entries into the in-memory buffer, we do and return to 2.
  3. If there are less than 25 on-disk chunks, we create a new file, dump the buffer into it, and go to 2. Otherwise, we go to 4.
  4. We merge the 24 files into the backup file, then delete all of them but one that becomes the backup file. Return to 2.

In this configuration, if an (error 24) is raised, we can still merge the chunks file together into the backup file, dump the buffer's content into one of the chunks files, and keep one of the chunks files as a new backup file, dropping the others. The only moment we are not resilient to the (error 24) is at step 1., at creation time.

@Kerollmops Kerollmops added the enhancement New feature or request label Nov 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant