diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index 90bc99c01ad..9051caa7543 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -22,7 +22,7 @@ use quickwit_common::Progress; use quickwit_common::uri::Uri; use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::Position; +use quickwit_proto::types::{Offset, Position}; use quickwit_storage::StorageResolver; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; @@ -146,8 +146,13 @@ impl DocFileReader { pub struct ObjectUriBatchReader { partition_id: PartitionId, reader: DocFileReader, - current_offset: usize, - is_eof: bool, + current_position: Position, +} + +fn parse_offset(offset: &Offset) -> anyhow::Result { + offset + .as_usize() + .context("file offset should be stored as usize") } impl ObjectUriBatchReader { @@ -157,26 +162,22 @@ impl ObjectUriBatchReader { uri: &Uri, position: Position, ) -> anyhow::Result { - let current_offset = match position { - Position::Beginning => 0, - Position::Offset(offset) => offset - .as_usize() - .context("file offset should be stored as usize")?, + let current_offset = match &position { Position::Eof(_) => { return Ok(ObjectUriBatchReader { partition_id, reader: DocFileReader::empty(), - current_offset: 0, - is_eof: true, + current_position: position, }); } + Position::Beginning => 0, + Position::Offset(offset) => parse_offset(offset)?, }; let reader = DocFileReader::from_uri(storage_resolver, uri, current_offset).await?; Ok(ObjectUriBatchReader { partition_id, reader, - current_offset, - is_eof: false, + current_position: position, }) } @@ -186,11 +187,15 @@ impl ObjectUriBatchReader { source_type: SourceType, ) -> anyhow::Result { let mut batch_builder = BatchBuilder::new(source_type); - if self.is_eof { - return Ok(batch_builder); - } - let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize; - let mut new_offset = self.current_offset; + let current_offset = match &self.current_position { + Position::Eof(_) => return Ok(batch_builder), + Position::Beginning => 0, + Position::Offset(offset) => parse_offset(offset)?, + }; + + let limit_num_bytes = current_offset + BATCH_NUM_BYTES_LIMIT as usize; + let mut new_offset = current_offset; + let mut eof_position: Option = None; while new_offset < limit_num_bytes { if let Some(record) = source_progress .protect_future(self.reader.next_record()) @@ -199,30 +204,26 @@ impl ObjectUriBatchReader { new_offset = record.next_offset as usize; batch_builder.add_doc(record.doc); if record.is_last { - self.is_eof = true; + eof_position = Some(Position::eof(new_offset)); break; } } else { - self.is_eof = true; + eof_position = Some(Position::eof(new_offset)); break; } } - let to_position = if self.is_eof { - Position::eof(new_offset) - } else { - Position::offset(new_offset) - }; + let to_position = eof_position.unwrap_or(Position::offset(new_offset)); batch_builder.checkpoint_delta.record_partition_delta( self.partition_id.clone(), - Position::offset(self.current_offset), - to_position, + self.current_position.clone(), + to_position.clone(), )?; - self.current_offset = new_offset; + self.current_position = to_position; Ok(batch_builder) } pub fn is_eof(&self) -> bool { - self.is_eof + self.current_position.is_eof() } } diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index fadb4282c37..ca69dca4d91 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -564,6 +564,52 @@ mod tests { assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2)); } + #[tokio::test] + async fn test_checkpoint_delta_of_existing_messages() { + let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await; + let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap(); + let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id(); + + let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await; + let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap(); + let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id(); + + let queue = Arc::new(MemoryQueueForTests::new()); + let shared_state = init_state( + "test-index", + &[ + ( + partition_id_1.clone(), + ("existing_token_1".to_string(), Position::Beginning, true), + ), + ( + partition_id_2.clone(), + ( + "existing_token_2".to_string(), + Position::offset((DUMMY_DOC.len() + 1) * 2), + true, + ), + ), + ], + ); + let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone()); + let batches = process_messages( + &mut coordinator, + queue, + &[(&test_uri_1, "ack-id-1"), (&test_uri_2, "ack-id-2")], + ) + .await; + assert_eq!(batches.len(), 2); + let deltas = batches[0].checkpoint_delta.iter().collect::>(); + assert_eq!(deltas.len(), 1); + assert_eq!(deltas[0].1.from, Position::Beginning); + assert_eq!(deltas[0].1.to, Position::eof(350u64)); + let deltas = batches[1].checkpoint_delta.iter().collect::>(); + assert_eq!(deltas.len(), 1); + assert_eq!(deltas[0].1.from, Position::Offset(70u64.into())); + assert_eq!(deltas[0].1.to, Position::eof(350u64)); + } + #[tokio::test] async fn test_process_multiple_coordinator() { let queue = Arc::new(MemoryQueueForTests::new()); diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index f103e5c60f2..c96019dab87 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -34,7 +34,7 @@ pub use doc_mapping_uid::DocMappingUid; pub use doc_uid::{DocUid, DocUidGenerator}; pub use index_uid::IndexUid; pub use pipeline_uid::PipelineUid; -pub use position::Position; +pub use position::{Offset, Position}; pub use shard_id::ShardId; /// The size of an ULID in bytes. Use `ULID_LEN` for the length of Base32 encoded ULID strings.