Skip to content

Commit

Permalink
Remembering indexes while scanning for state changes (#1195)
Browse files Browse the repository at this point in the history
* Remembering indexes while scanning for state changes

* reversing the state changes to process oldest first
  • Loading branch information
diptanu authored Jan 29, 2025
1 parent 38bc455 commit 359ae85
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 62 deletions.
2 changes: 1 addition & 1 deletion server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "indexify-server"
version = "0.2.21"
version = "0.2.22"
edition = "2021"
authors = ["Tensorlake Inc. <[email protected]>"]
license = "Apache-2.0"
Expand Down
7 changes: 7 additions & 0 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,13 @@ impl Display for StateChangeId {
}
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct UnprocessedStateChanges {
pub changes: Vec<StateChange>,
pub last_global_state_change_cursor: Option<Vec<u8>>,
pub last_namespace_state_change_cursor: Option<Vec<u8>>,
}

#[derive(Clone, Serialize, Deserialize, Debug, Builder)]
pub struct StateChange {
pub id: StateChangeId,
Expand Down
36 changes: 26 additions & 10 deletions server/processor/src/graph_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ impl GraphProcessor {
pub async fn start(&self, mut shutdown_rx: tokio::sync::watch::Receiver<()>) {
let mut cached_state_changes: Vec<StateChange> = vec![];
let mut change_events_rx = self.indexify_state.change_events_rx.clone();
let mut last_global_state_change_cursor: Option<Vec<u8>> = None;
let mut last_namespace_state_change_cursor: Option<Vec<u8>> = None;
// Used to run the loop when there are more than 1 change events queued up
// The watch from the state store only notifies that there are N number of state
// changes but if we only process one event from the queue then the
Expand All @@ -55,13 +57,13 @@ impl GraphProcessor {
tokio::select! {
_ = change_events_rx.changed() => {
change_events_rx.borrow_and_update();
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &notify).await {
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &mut last_global_state_change_cursor, &mut last_namespace_state_change_cursor, &notify).await {
error!("error processing state change: {:?}", err);
continue
}
},
_ = notify.notified() => {
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &notify).await {
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &mut last_global_state_change_cursor, &mut last_namespace_state_change_cursor, &notify).await {
error!("error processing state change: {:?}", err);
continue
}
Expand All @@ -77,19 +79,33 @@ impl GraphProcessor {
pub async fn write_sm_update(
&self,
cached_state_changes: &mut Vec<StateChange>,
last_global_state_change_cursor: &mut Option<Vec<u8>>,
last_namespace_state_change_cursor: &mut Option<Vec<u8>>,
notify: &Arc<Notify>,
) -> Result<()> {
// 1. First load 100 state changes. Process the `global` state changes first
// and then the `ns_` state changes
if cached_state_changes.is_empty() {
cached_state_changes.extend(
self.indexify_state
.reader()
.unprocessed_state_changes()?
// reversing the vec to process the oldest state changes first when using pop.
.into_iter()
.rev(),
);
let unprocessed_state_changes =
self.indexify_state.reader().unprocessed_state_changes(
&last_global_state_change_cursor,
&last_namespace_state_change_cursor,
)?;
let _ = match unprocessed_state_changes.last_global_state_change_cursor {
Some(cursor) => {
last_global_state_change_cursor.replace(cursor);
}
None => {}
};
let _ = match unprocessed_state_changes.last_namespace_state_change_cursor {
Some(cursor) => {
last_namespace_state_change_cursor.replace(cursor);
}
None => {}
};
let mut state_changes = unprocessed_state_changes.changes;
state_changes.reverse();
cached_state_changes.extend(state_changes);
}
// 2. If there are no state changes to process, return
// and wait for the scheduler to wake us up again when there are state changes
Expand Down
44 changes: 29 additions & 15 deletions server/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ mod tests {
let invocation_id = test_state_store::with_simple_graph(&indexify_state).await;

// Should have 1 unprocessed state - one task created event
let unprocessed_state_changes = indexify_state.reader().unprocessed_state_changes()?;
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(
1,
unprocessed_state_changes.len(),
unprocessed_state_changes.changes.len(),
"{:?}",
unprocessed_state_changes
);
Expand All @@ -74,9 +76,11 @@ mod tests {
assert_eq!(tasks.len(), 1);

// Should have 0 unprocessed state - one task it would be unallocated
let unprocessed_state_changes = indexify_state.reader().unprocessed_state_changes()?;
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(
unprocessed_state_changes.len(),
unprocessed_state_changes.changes.len(),
0,
"{:#?}",
unprocessed_state_changes
Expand Down Expand Up @@ -356,12 +360,14 @@ mod tests {
.unwrap()
.0;
assert_eq!(tasks.len(), 2);
let unprocessed_state_changes =
indexify_state.reader().unprocessed_state_changes().unwrap();
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)
.unwrap();

// has task created state change in it.
assert_eq!(
unprocessed_state_changes.len(),
unprocessed_state_changes.changes.len(),
0,
"{:?}",
unprocessed_state_changes
Expand Down Expand Up @@ -604,8 +610,10 @@ mod tests {
}

{
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let graph_ctx = indexify_state
.reader()
Expand Down Expand Up @@ -913,8 +921,10 @@ mod tests {
}

{
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let graph_ctx = indexify_state
.reader()
Expand Down Expand Up @@ -1186,8 +1196,10 @@ mod tests {
}

{
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let graph_ctx = indexify_state
.reader()
Expand Down Expand Up @@ -1409,8 +1421,10 @@ mod tests {

// Expect no more tasks and a completed graph
{
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let graph_ctx = indexify_state
.reader()
Expand Down
39 changes: 27 additions & 12 deletions server/src/system_task_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ mod tests {

test_srv.process_all_state_changes().await?;

let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let graph_ctx = indexify_state.reader().invocation_ctx(
&graph.namespace,
Expand Down Expand Up @@ -228,8 +230,10 @@ mod tests {
system_tasks_executor.lock().await.run().await?;

// Since graph version is the same it should generate new tasks
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

let system_tasks = indexify_state.reader().get_system_tasks(None).unwrap().0;
assert_eq!(system_tasks.len(), 0);
Expand Down Expand Up @@ -283,8 +287,10 @@ mod tests {
assert_eq!(system_tasks.len(), 1);

// Since graph version is different new changes should be generated
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 1);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 1);

// Number of pending system tasks should be incremented
let num_pending_tasks = indexify_state.reader().get_pending_system_tasks()?;
Expand Down Expand Up @@ -371,8 +377,10 @@ mod tests {

test_srv.process_all_state_changes().await?;

let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
assert_eq!(state_changes.changes.len(), 0);

// Number of pending system tasks should be decremented after graph completion
let num_pending_tasks = indexify_state.reader().get_pending_system_tasks()?;
Expand Down Expand Up @@ -488,8 +496,10 @@ mod tests {
let incomplete_tasks = tasks
.iter()
.filter(|t: &&data_model::Task| t.outcome == TaskOutcome::Unknown);
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
if state_changes.is_empty() && incomplete_tasks.count() == 0 {
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
if state_changes.changes.is_empty() && incomplete_tasks.count() == 0 {
break;
}
}
Expand Down Expand Up @@ -563,8 +573,13 @@ mod tests {

let system_tasks = indexify_state.reader().get_system_tasks(None).unwrap().0;

let state_changes = indexify_state.reader().unprocessed_state_changes()?;
if state_changes.is_empty() && num_incomplete_tasks == 0 && system_tasks.is_empty() {
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)?;
if state_changes.changes.is_empty() &&
num_incomplete_tasks == 0 &&
system_tasks.is_empty()
{
break;
}
}
Expand Down
11 changes: 6 additions & 5 deletions server/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use data_model::{
test_objects::tests::{mock_executor, mock_node_fn_output},
ExecutorId,
ExecutorMetadata,
StateChange,
Task,
TaskOutcome,
};
Expand Down Expand Up @@ -62,7 +61,8 @@ impl TestService {
.service
.indexify_state
.reader()
.unprocessed_state_changes()?
.unprocessed_state_changes(&None, &None)?
.changes
.is_empty()
{
self.process_graph_processor().await?;
Expand All @@ -72,15 +72,16 @@ impl TestService {

pub async fn process_graph_processor(&self) -> Result<()> {
let notify = Arc::new(tokio::sync::Notify::new());
let mut cached_state_changes: Vec<StateChange> = self
let mut cached_state_changes = self
.service
.indexify_state
.reader()
.unprocessed_state_changes()?;
.unprocessed_state_changes(&None, &None)?
.changes;
while !cached_state_changes.is_empty() {
self.service
.graph_processor
.write_sm_update(&mut cached_state_changes, &notify)
.write_sm_update(&mut cached_state_changes, &mut None, &mut None, &notify)
.await?;
}
Ok(())
Expand Down
13 changes: 8 additions & 5 deletions server/state_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,14 +695,17 @@ mod tests {
.unwrap();
state_machine::save_state_changes(indexify_state.db.clone(), &tx, &state_change_3).unwrap();
tx.commit().unwrap();
let state_changes = indexify_state.reader().unprocessed_state_changes().unwrap();
assert_eq!(state_changes.len(), 3);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes(&None, &None)
.unwrap();
assert_eq!(state_changes.changes.len(), 3);
// global state_change_2
assert_eq!(state_changes[0].id, StateChangeId::new(1));
assert_eq!(state_changes.changes[0].id, StateChangeId::new(1));
// state_change_1
assert_eq!(state_changes[1].id, StateChangeId::new(0));
assert_eq!(state_changes.changes[1].id, StateChangeId::new(0));
// state_change_3
assert_eq!(state_changes[2].id, StateChangeId::new(2));
assert_eq!(state_changes.changes[2].id, StateChangeId::new(2));
Ok(())
}

Expand Down
Loading

0 comments on commit 359ae85

Please sign in to comment.