Skip to content

keep knowledge of ongoing merges across merge pipelines #5633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -90,6 +90,7 @@ binggan = { version = "0.14" }
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
census = "0.4.2"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "54cbc70" }
chrono = { version = "0.4", default-features = false, features = [
"clock",
1 change: 1 addition & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ anyhow = { workspace = true }
async-speed-limit = { workspace = true }
async-trait = { workspace = true }
bytesize = { workspace = true }
census = { workspace = true }
coarsetime = { workspace = true }
dyn-clone = { workspace = true }
env_logger = { workspace = true }
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ pub mod temp_dir;
pub mod test_utils;
pub mod thread_pool;
pub mod tower;
pub mod tracker;
pub mod type_map;
pub mod uri;

208 changes: 208 additions & 0 deletions quickwit/quickwit-common/src/tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};

use census::{Inventory, TrackedObject as InventoredObject};

/// A ressource tracker
///
/// This is used to track whether an object is alive (still in use), or if it's dead (no longer
/// used, but not acknowledged). It does not keep any traces of object that were alive, but were
/// since acknowledged.
#[derive(Clone)]
pub struct Tracker<T: Clone> {
inner_inventory: Inventory<T>,
unacknowledged_drop_receiver: Arc<Mutex<Receiver<T>>>,
return_channel: Sender<T>,
}

/// A single tracked object
#[derive(Debug)]
pub struct TrackedObject<T: Clone> {
inner: Option<InventoredObject<T>>,
return_channel: Sender<T>,
}

impl<T: Clone> TrackedObject<T> {
/// acknoledge an object
pub fn acknowledge(mut self) {
self.inner.take();
}

/// Create an untracked object mostly for tests
pub fn untracked(value: T) -> Self {
Tracker::new().track(value)
}

/// Create an object which is tracked only as long as it's alive,
/// but not once it's dead.
/// The object is tracked through the provided census inventory
pub fn track_alive_in(value: T, inventory: &Inventory<T>) -> Self {
TrackedObject {
inner: Some(inventory.track(value)),
return_channel: channel().0,
}
}
}

impl<T: Clone> AsRef<T> for TrackedObject<T> {
fn as_ref(&self) -> &T {
self
}
}

impl<T: Clone> Deref for TrackedObject<T> {
type Target = T;
fn deref(&self) -> &T {
self.inner
.as_ref()
.expect("inner should only be None during drop")
}
}

impl<T: Clone> Drop for TrackedObject<T> {
fn drop(&mut self) {
if let Some(item) = self.inner.take() {
// if send fails, no one cared about getting that notification, it's fine to
// drop item
let _ = self.return_channel.send(item.as_ref().clone());
}
}
}

impl<T: Clone> Default for Tracker<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Clone> Tracker<T> {
/// Create a new tracker
pub fn new() -> Self {
let (sender, receiver) = channel();
Tracker {
inner_inventory: Inventory::new(),
unacknowledged_drop_receiver: Arc::new(Mutex::new(receiver)),
return_channel: sender,
}
}

/// Return whether it is safe to recreate this tracker.
///
/// A tracker is considered safe to recreate if this is the only instance left,
/// and it contains no alive object (it may contain dead objects though).
///
/// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are
/// called.
pub fn safe_to_recreate(&self) -> bool {
Arc::strong_count(&self.unacknowledged_drop_receiver) == 1
&& self.inner_inventory.len() == 0
}

/// List object which are considered alive
pub fn list_ongoing(&self) -> Vec<InventoredObject<T>> {
self.inner_inventory.list()
}

/// Take away the list of object considered dead
pub fn take_dead(&self) -> Vec<T> {
let mut res = Vec::new();
let receiver = self.unacknowledged_drop_receiver.lock().unwrap();
while let Ok(dead_entry) = receiver.try_recv() {
res.push(dead_entry);
}
res
}

/// Track a new object.
pub fn track(&self, value: T) -> TrackedObject<T> {
TrackedObject {
inner: Some(self.inner_inventory.track(value)),
return_channel: self.return_channel.clone(),
}
}
}

#[cfg(test)]
mod tests {
use super::{InventoredObject, Tracker};

#[track_caller]
fn assert_tracked_eq<T: PartialEq + std::fmt::Debug>(
got: Vec<InventoredObject<T>>,
expected: Vec<T>,
) {
assert_eq!(
got.len(),
expected.len(),
"expected vec of same lenght, {} != {}",
got.len(),
expected.len()
);
for (got_item, expected_item) in got.into_iter().zip(expected) {
assert_eq!(*got_item, expected_item);
}
}

#[test]
fn test_single_tracker() {
let tracker = Tracker::<u32>::new();

assert!(tracker.list_ongoing().is_empty());
assert!(tracker.take_dead().is_empty());
assert!(tracker.safe_to_recreate());

{
let tracked_1 = tracker.track(1);
assert_tracked_eq(tracker.list_ongoing(), vec![1]);
assert!(tracker.take_dead().is_empty());
assert!(!tracker.safe_to_recreate());
std::mem::drop(tracked_1); // done for clarity and silence unused var warn
}

assert!(tracker.list_ongoing().is_empty());
assert!(tracker.safe_to_recreate());
assert_eq!(tracker.take_dead(), vec![1]);
assert!(tracker.safe_to_recreate());
}

#[test]
fn test_two_tracker() {
let tracker = Tracker::<u32>::new();
let tracker2 = tracker.clone();

assert!(tracker.list_ongoing().is_empty());
assert!(tracker.take_dead().is_empty());
assert!(!tracker.safe_to_recreate());

{
let tracked_1 = tracker.track(1);
assert_tracked_eq(tracker.list_ongoing(), vec![1]);
assert_tracked_eq(tracker2.list_ongoing(), vec![1]);
assert!(tracker.take_dead().is_empty());
assert!(tracker2.take_dead().is_empty());
assert!(!tracker.safe_to_recreate());
std::mem::drop(tracked_1); // done for clarity and silence unused var warn
}

assert!(tracker.list_ongoing().is_empty());
assert!(tracker2.list_ongoing().is_empty());
assert_eq!(tracker2.take_dead(), vec![1]);
// we took awai the dead from tracker2, so they don't show up in tracker
assert!(tracker.take_dead().is_empty());
}
}
126 changes: 124 additions & 2 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
//! Below we test panics at different steps in the indexing pipeline.

use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Barrier, Mutex};
use std::time::Duration;

@@ -42,15 +43,17 @@ use quickwit_common::split_file;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::{MergeOperation, MergeTask};
use quickwit_indexing::models::MergeScratch;
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, MergeScratch, SpawnPipeline,
};
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
SplitState,
};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid};
use serde_json::Value as JsonValue;
use tantivy::Directory;

@@ -346,3 +349,122 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul

Ok(())
}

#[tokio::test]
async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: datetime
fast: true
timestamp_field: ts
"#;
let indexing_setting_yaml = r#"
split_num_docs_target: 2500
merge_policy:
type: "limit_merge"
max_merge_ops: 1
merge_factor: 4
max_merge_factor: 4
max_finalize_merge_operations: 1
"#;
let search_fields = ["body"];
let index_id = "test-index-merge-duplication";
let mut test_index_builder = TestSandbox::create(
index_id,
doc_mapper_yaml,
indexing_setting_yaml,
&search_fields,
)
.await?;

// 0: start
// 1: 1st merge reached the failpoint
// 11: 1st merge failed
// 12: 2nd merge reached the failpoint
// 22: 2nd merge failed (we don't care about this state)
let state = Arc::new(AtomicU32::new(0));
let state_clone = state.clone();

fail::cfg_callback("before-merge-split", move || {
use std::sync::atomic::Ordering;
state_clone.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(300));
state_clone.fetch_add(10, Ordering::Relaxed);
panic!("kill merge pipeline");
})
.unwrap();

let batch: Vec<JsonValue> =
std::iter::repeat_with(|| serde_json::json!({"body ": TEST_TEXT, "ts": 1631072713 }))
.take(500)
.collect();
// this sometime fails because the ingest api isn't aware of the index yet?!
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
for _ in 0..4 {
test_index_builder
.add_documents_through_api(batch.clone())
.await?;
}

let (indexing_pipeline, merge_pipeline) = test_index_builder
.take_indexing_and_merge_pipeline()
.await?;

// stop the pipeline
indexing_pipeline.kill().await;
merge_pipeline
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let pipeline_id = test_index_builder
.indexing_service()
.ask_for_res(SpawnPipeline {
index_id: index_id.to_string(),
source_config: quickwit_config::SourceConfig::ingest_api_default(),
pipeline_uid: PipelineUid::for_test(1u128),
})
.await?;

tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// we shouldn't have had a 2nd split run yet (the 1st one hasn't panicked just yet)
assert_eq!(state.load(Ordering::Relaxed), 1);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(state.load(Ordering::Relaxed), 11);

let merge_pipeline_id = pipeline_id.merge_pipeline_id();
let indexing_pipeline = test_index_builder
.indexing_service()
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
let merge_pipeline = test_index_builder
.indexing_service()
.ask_for_res(DetachMergePipeline {
pipeline_id: merge_pipeline_id,
})
.await?;

indexing_pipeline.kill().await;
merge_pipeline
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;

// stoping the merge pipeline makes it recheck for possible dead merge
// (alternatively, it does that sooner when rebuilding the known split list)
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// timing-wise, we can't have reached 22, but it would be logically correct to get that state
assert_eq!(state.load(Ordering::Relaxed), 12);

let universe = test_index_builder.universe();
universe.kill();
fail::cfg("before-merge-split", "off").unwrap();
universe.quit().await;

Ok(())
}
13 changes: 9 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ use futures::TryStreamExt;
use itertools::Itertools;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox,
Observation,
Observation, SendError,
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
@@ -539,12 +539,17 @@ impl IndexingService {
// The queue capacity of the merge pipeline is unbounded, so `.send_message(...)`
// should not block.
// We avoid using `.quit()` here because it waits for the actor to exit.
merge_pipeline_handle
// In some case the pipeline could already be shutting down, in which case we can
// receive a Disconnected
match merge_pipeline_handle
.handle
.mailbox()
.send_message(FinishPendingMergesAndShutdownPipeline)
.await
.expect("merge pipeline mailbox should not be full");
{
Ok(_) | Err(SendError::Disconnected) => (),
Err(SendError::Full) => panic!("merge pipeline mailbox should not be full"),
}
}
}
// Finally, we remove the completed or failed merge pipelines.
@@ -1601,7 +1606,7 @@ mod tests {
let observation = indexing_server_handle.process_pending_and_observe().await;
assert_eq!(observation.num_running_pipelines, 0);
assert_eq!(observation.num_running_merge_pipelines, 0);
universe.sleep(*HEARTBEAT).await;
universe.sleep(2 * *HEARTBEAT).await;
// Check that the merge pipeline is also shut down as they are no more indexing pipeilne on
// the index.
assert!(universe.get_one::<MergePipeline>().is_none());
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
@@ -350,7 +350,8 @@ impl MergePipeline {
self.params.merge_policy.clone(),
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
)
.await?;
let (_, merge_planner_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
77 changes: 54 additions & 23 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
@@ -18,16 +18,16 @@ use std::time::Instant;

use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::tracker::Tracker;
use quickwit_metastore::SplitMetadata;
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::DocMappingUid;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::{info, warn};

use super::MergeSchedulerService;
use crate::actors::merge_scheduler_service::schedule_merge;
use crate::actors::merge_scheduler_service::{schedule_merge, GetOperationTracker};
use crate::actors::MergeSplitDownloader;
use crate::merge_policy::MergeOperation;
use crate::models::NewSplits;
@@ -80,11 +80,15 @@ pub struct MergePlanner {
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,

/// Inventory of ongoing merge operations. If everything goes well,
/// a merge operation is dropped after the publish of the merged split.
/// Track ongoing and failed merge operations for this index
///
/// It is used to GC the known_split_ids set.
ongoing_merge_operations_inventory: Inventory<MergeOperation>,
/// We don't want to emit a new merge for splits already in the process of
/// being merged, but we want to keep track of failed merges so we can
/// reschedule them.
// TODO currently the MergePlanner is teared down when a merge fails, so this
// mechanism is only useful when there are some merges left from a previous
// pipeline. We could only tear down the rest of the pipeline on error.
ongoing_merge_operations_tracker: Tracker<MergeOperation>,

/// We use the actor start_time as a way to identify incarnations.
///
@@ -134,8 +138,15 @@ impl Handler<RunFinalizeMergePolicyAndQuit> for MergePlanner {
_plan_merge: RunFinalizeMergePolicyAndQuit,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// Note we ignore messages that could be coming from a different incarnation.
// (See comment on `Self::incarnation_start_at`.)
// consume failed merges so that we may try to reschedule them one last time
for failed_merge in self.ongoing_merge_operations_tracker.take_dead() {
for split in failed_merge.splits {
// if they were from a dead merge, we always record them, they are likely
// already part of our known splits, and we don't want to rebuild the known
// split list as it's likely to log about not halving its size.
self.record_split(split);
}
}
self.send_merge_ops(true, ctx).await?;
Err(ActorExitStatus::Success)
}
@@ -183,30 +194,42 @@ impl MergePlanner {
QueueCapacity::Bounded(1)
}

pub fn new(
pub async fn new(
pipeline_id: &MergePipelineId,
immature_splits: Vec<SplitMetadata>,
merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
) -> MergePlanner {
) -> anyhow::Result<MergePlanner> {
let immature_splits: Vec<SplitMetadata> = immature_splits
.into_iter()
.filter(|split_metadata| belongs_to_pipeline(pipeline_id, split_metadata))
.collect();
let ongoing_merge_operations_tracker = merge_scheduler_service
.ask(GetOperationTracker(pipeline_id.index_uid.clone()))
.await?;

let mut known_split_ids: HashSet<String> = HashSet::new();
let ongoing_merge_operations = ongoing_merge_operations_tracker.list_ongoing();
for merge_op in ongoing_merge_operations {
for split in &merge_op.splits {
known_split_ids.insert(split.split_id().to_string());
}
}

let mut merge_planner = MergePlanner {
known_split_ids: Default::default(),
known_split_ids,
known_split_ids_recompute_attempt_id: 0,
partitioned_young_splits: Default::default(),
merge_policy,
merge_split_downloader_mailbox,
merge_scheduler_service,
ongoing_merge_operations_inventory: Inventory::default(),
ongoing_merge_operations_tracker,

incarnation_started_at: Instant::now(),
};
merge_planner.record_splits_if_necessary(immature_splits);
merge_planner
Ok(merge_planner)
}

fn rebuild_known_split_ids(&self) -> HashSet<String> {
@@ -217,7 +240,7 @@ impl MergePlanner {
known_split_ids.insert(split.split_id().to_string());
}
}
let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list();
let ongoing_merge_operations = self.ongoing_merge_operations_tracker.list_ongoing();
// Add splits that are known as in merge.
for merge_op in ongoing_merge_operations {
for split in &merge_op.splits {
@@ -237,7 +260,7 @@ impl MergePlanner {

/// Updates `known_split_ids` and return true if the split was not
/// previously known and should be recorded.
fn acknownledge_split(&mut self, split_id: &str) -> bool {
fn acknowledge_split(&mut self, split_id: &str) -> bool {
if self.known_split_ids.contains(split_id) {
return false;
}
@@ -251,6 +274,10 @@ impl MergePlanner {
if self.known_split_ids_recompute_attempt_id % 100 == 0 {
self.known_split_ids = self.rebuild_known_split_ids();
self.known_split_ids_recompute_attempt_id = 0;

for failed_merge in self.ongoing_merge_operations_tracker.take_dead() {
self.record_splits_if_necessary(failed_merge.splits);
}
}
}

@@ -280,12 +307,13 @@ impl MergePlanner {
// a split already in store to be received.
//
// See `known_split_ids`.
if !self.acknownledge_split(new_split.split_id()) {
if !self.acknowledge_split(new_split.split_id()) {
continue;
}
self.record_split(new_split);
}
}

async fn compute_merge_ops(
&mut self,
is_finalize: bool,
@@ -323,9 +351,8 @@ impl MergePlanner {
let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?;
for merge_operation in merge_ops {
info!(merge_operation=?merge_operation, "schedule merge operation");
let tracked_merge_operation = self
.ongoing_merge_operations_inventory
.track(merge_operation);
let tracked_merge_operation =
self.ongoing_merge_operations_tracker.track(merge_operation);
schedule_merge(
&self.merge_scheduler_service,
tracked_merge_operation,
@@ -435,7 +462,8 @@ mod tests {
merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
)
.await?;
let (merge_planner_mailbox, merge_planner_handle) =
universe.spawn_builder().spawn(merge_planner);
{
@@ -560,7 +588,8 @@ mod tests {
merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
)
.await?;
let (merge_planner_mailbox, merge_planner_handle) =
universe.spawn_builder().spawn(merge_planner);

@@ -652,7 +681,8 @@ mod tests {
merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
)
.await?;
let (merge_planner_mailbox, merge_planner_handle) =
universe.spawn_builder().spawn(merge_planner);
universe.sleep(Duration::from_secs(10)).await;
@@ -717,7 +747,8 @@ mod tests {
merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
)
.await?;
// We create a fake old mailbox that contains two new splits and a PlanMerge message from an
// old incarnation. This could happen in real life if the merge pipeline failed
// right after a `PlanMerge` was pushed to the pipeline. Note that #3847 did not
43 changes: 39 additions & 4 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
@@ -14,13 +14,14 @@

use std::cmp::Reverse;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc;

use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
use tantivy::TrackedObject;
use quickwit_common::tracker::{TrackedObject, Tracker};
use quickwit_proto::types::IndexUid;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::error;

@@ -118,6 +119,8 @@ pub struct MergeSchedulerService {
pending_merge_queue: BinaryHeap<ScheduledMerge>,
next_merge_id: u64,
pending_merge_bytes: u64,
tracked_operations: HashMap<IndexUid, Tracker<MergeOperation>>,
gc_sequence_id: usize,
}

impl Default for MergeSchedulerService {
@@ -135,6 +138,8 @@ impl MergeSchedulerService {
pending_merge_queue: BinaryHeap::default(),
next_merge_id: 0,
pending_merge_bytes: 0,
tracked_operations: HashMap::new(),
gc_sequence_id: 0,
}
}

@@ -189,6 +194,14 @@ impl MergeSchedulerService {
.ongoing_merge_operations
.set(num_merges);
}

fn maybe_gc_trackers(&mut self) {
self.gc_sequence_id += 1;
if self.gc_sequence_id % 100 == 0 {
self.tracked_operations
.retain(|_k, tracker| !tracker.safe_to_recreate())
}
}
}

#[async_trait]
@@ -289,17 +302,39 @@ impl Handler<PermitReleased> for MergeSchedulerService {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.schedule_pending_merges(ctx);
self.maybe_gc_trackers();
Ok(())
}
}

#[derive(Debug)]
pub(crate) struct GetOperationTracker(pub IndexUid);

#[async_trait]
impl Handler<GetOperationTracker> for MergeSchedulerService {
type Reply = Tracker<MergeOperation>;

async fn handle(
&mut self,
get_operation_tracker: GetOperationTracker,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let tracker = self
.tracked_operations
.entry(get_operation_tracker.0)
.or_default()
.clone();
Ok(tracker)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use quickwit_actors::Universe;
use quickwit_common::tracker::Tracker;
use quickwit_metastore::SplitMetadata;
use tantivy::Inventory;
use tokio::time::timeout;

use super::*;
@@ -339,7 +374,7 @@ mod tests {
let (merge_scheduler_service, _) = universe
.spawn_builder()
.spawn(MergeSchedulerService::new(2));
let inventory = Inventory::new();
let inventory = Tracker::new();

let (merge_split_downloader_mailbox, merge_split_downloader_inbox) =
universe.create_test_mailbox();
18 changes: 11 additions & 7 deletions quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
@@ -23,13 +23,13 @@ use std::sync::Arc;
pub(crate) use const_write_amplification::ConstWriteAmplificationMergePolicy;
use itertools::Itertools;
pub use nop_merge_policy::NopMergePolicy;
use quickwit_common::tracker::TrackedObject;
use quickwit_config::merge_policy_config::MergePolicyConfig;
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::types::SplitId;
use serde::Serialize;
pub(crate) use stable_log_merge_policy::StableLogMergePolicy;
use tantivy::TrackedObject;
use tracing::{info_span, Span};

use crate::actors::MergePermit;
@@ -55,8 +55,8 @@ pub struct MergeTask {
impl MergeTask {
#[cfg(any(test, feature = "testsuite"))]
pub fn from_merge_operation_for_test(merge_operation: MergeOperation) -> MergeTask {
let inventory = tantivy::Inventory::default();
let tracked_merge_operation = inventory.track(merge_operation);
let tracker = quickwit_common::tracker::Tracker::new();
let tracked_merge_operation = tracker.track(merge_operation);
MergeTask {
merge_operation: tracked_merge_operation,
_merge_permit: MergePermit::for_test(),
@@ -397,13 +397,14 @@ pub mod tests {
fn apply_merge(
merge_policy: &Arc<dyn MergePolicy>,
split_index: &mut HashMap<String, SplitMetadata>,
merge_op: &MergeOperation,
merge_op: TrackedObject<MergeOperation>,
) -> SplitMetadata {
for split in merge_op.splits_as_slice() {
assert!(split_index.remove(split.split_id()).is_some());
}
let merged_split = fake_merge(merge_policy, merge_op.splits_as_slice());
split_index.insert(merged_split.split_id().to_string(), merged_split.clone());
merge_op.acknowledge();
merged_split
}

@@ -427,7 +428,8 @@ pub mod tests {
merge_policy.clone(),
merge_task_mailbox,
universe.get_or_spawn_one::<MergeSchedulerService>(),
);
)
.await?;
let mut split_index: HashMap<String, SplitMetadata> = HashMap::default();
let (merge_planner_mailbox, merge_planner_handler) =
universe.spawn_builder().spawn(merge_planner);
@@ -448,7 +450,9 @@ pub mod tests {
}
let new_splits: Vec<SplitMetadata> = merge_tasks
.into_iter()
.map(|merge_op| apply_merge(&merge_policy, &mut split_index, &merge_op))
.map(|merge_op| {
apply_merge(&merge_policy, &mut split_index, merge_op.merge_operation)
})
.collect();
merge_planner_mailbox
.send_message(NewSplits { new_splits })
@@ -468,7 +472,7 @@ pub mod tests {

let merge_tasks = merge_task_inbox.drain_for_test_typed::<MergeTask>();
for merge_task in merge_tasks {
apply_merge(&merge_policy, &mut split_index, &merge_task);
apply_merge(&merge_policy, &mut split_index, merge_task.merge_operation);
}

let split_metadatas: Vec<SplitMetadata> = split_index.values().cloned().collect();
74 changes: 71 additions & 3 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use bytes::Bytes;
use quickwit_actors::{Mailbox, Universe};
use quickwit_actors::{ActorHandle, Mailbox, Universe};
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::rand::append_random_suffix;
@@ -32,13 +32,16 @@ use quickwit_ingest::{init_ingest_api, IngesterPool, QUEUES_DIR_NAME};
use quickwit_metastore::{
CreateIndexRequestExt, MetastoreResolver, Split, SplitMetadata, SplitState,
};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::{CreateIndexRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid, SourceId};
use quickwit_storage::{Storage, StorageResolver};
use serde_json::Value as JsonValue;

use crate::actors::IndexingService;
use crate::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline};
use crate::actors::{IndexingPipeline, IndexingService, MergePipeline};
use crate::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};

/// Creates a Test environment.
///
@@ -56,6 +59,7 @@ pub struct TestSandbox {
storage: Arc<dyn Storage>,
add_docs_id: AtomicUsize,
universe: Universe,
indexing_pipeline_id: Option<IndexingPipelineId>,
_temp_dir: tempfile::TempDir,
}

@@ -130,6 +134,14 @@ impl TestSandbox {
.await?;
let (indexing_service, _indexing_service_handle) =
universe.spawn_builder().spawn(indexing_service_actor);

let indexing_pipeline_id = indexing_service
.ask_for_res(SpawnPipeline {
index_id: index_uid.index_id.to_string(),
source_config,
pipeline_uid: PipelineUid::for_test(1u128),
})
.await?;
Ok(TestSandbox {
node_id,
index_uid,
@@ -141,6 +153,7 @@ impl TestSandbox {
storage,
add_docs_id: AtomicUsize::default(),
universe,
indexing_pipeline_id: Some(indexing_pipeline_id),
_temp_dir: temp_dir,
})
}
@@ -189,6 +202,56 @@ impl TestSandbox {
Ok(pipeline_statistics)
}

/// Adds documents and waits for them to be indexed (creating a separate split).
///
/// The documents are expected to be `JsonValue`.
/// They can be created using the `serde_json::json!` macro.
pub async fn add_documents_through_api<I>(&self, json_docs: I) -> anyhow::Result<()>
where
I: IntoIterator<Item = JsonValue> + 'static,
I::IntoIter: Send,
{
let ingest_api_service_mailbox = self
.universe
.get_one::<quickwit_ingest::IngestApiService>()
.unwrap();

let batch_builder =
quickwit_ingest::DocBatchBuilder::new(self.index_uid.index_id.to_string());
let mut json_writer = batch_builder.json_writer();
for doc in json_docs {
json_writer.ingest_doc(doc)?;
}
let batch = json_writer.build();
let ingest_request = quickwit_ingest::IngestRequest {
doc_batches: vec![batch],
commit: quickwit_ingest::CommitType::WaitFor as i32,
};
ingest_api_service_mailbox
.ask_for_res(ingest_request)
.await?;
Ok(())
}

pub async fn take_indexing_and_merge_pipeline(
&mut self,
) -> anyhow::Result<(ActorHandle<IndexingPipeline>, ActorHandle<MergePipeline>)> {
let pipeline_id = self.indexing_pipeline_id.take().unwrap();
let merge_pipeline_id = pipeline_id.merge_pipeline_id();
let indexing_pipeline = self
.indexing_service
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
let merge_pipeline = self
.indexing_service
.ask_for_res(DetachMergePipeline {
pipeline_id: merge_pipeline_id,
})
.await?;

Ok((indexing_pipeline, merge_pipeline))
}

/// Returns the metastore of the TestSandbox.
///
/// The metastore is a file-backed metastore.
@@ -233,6 +296,11 @@ impl TestSandbox {
&self.universe
}

/// Returns a Mailbox for the indexing service
pub fn indexing_service(&self) -> Mailbox<IndexingService> {
self.indexing_service.clone()
}

/// Gracefully quits all registered actors in the underlying universe and asserts that none of
/// them panicked.
///
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ async fn validate_search_across_doc_mapping_updates(
)
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(50)).await;
sandbox
.local_ingest(index_id, ingest_after_update)
.await
10 changes: 6 additions & 4 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::extract_time_range;
use quickwit_common::tracker::TrackedObject;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader};
@@ -95,7 +96,7 @@ impl Actor for DeleteTaskPlanner {
.ongoing_delete_operations_inventory
.list()
.iter()
.map(|tracked_operation| tracked_operation.as_ref().clone())
.map(|tracked_operation| (**tracked_operation).clone())
.collect_vec();
DeleteTaskPlannerState {
ongoing_delete_operations,
@@ -195,9 +196,10 @@ impl DeleteTaskPlanner {
split_with_deletes.split_metadata,
);
info!(delete_operation=?delete_operation, "planned delete operation");
let tracked_delete_operation = self
.ongoing_delete_operations_inventory
.track(delete_operation);
let tracked_delete_operation = TrackedObject::track_alive_in(
delete_operation,
&self.ongoing_delete_operations_inventory,
);
schedule_merge(
&self.merge_scheduler_service,
tracked_delete_operation,