Skip to content

Commit

Permalink
remove table from notify
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 24, 2024
1 parent 00b1282 commit 984f79e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
12 changes: 6 additions & 6 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,12 +1151,12 @@ mod tests {

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-3".into()],)
QueryEvent::Notify(ChangeType::Update, vec!["service-id-3".into()],)
);

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-4".into()],)
QueryEvent::Notify(ChangeType::Update, vec!["service-id-4".into()],)
);

let mut res = api_v1_subs(
Expand Down Expand Up @@ -1243,7 +1243,7 @@ mod tests {
assert_eq!(rows_from.recv().await.unwrap().unwrap(), query_evt);

let notify_evt =
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-5".into()]);
QueryEvent::Notify(ChangeType::Update, vec!["service-id-5".into()]);

assert_eq!(notify_rows.recv().await.unwrap().unwrap(), notify_evt);

Expand Down Expand Up @@ -1341,12 +1341,12 @@ mod tests {

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-6".into()],)
QueryEvent::Notify(ChangeType::Update, vec!["service-id-6".into()],)
);

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Delete, vec!["service-id-6".into()],)
QueryEvent::Notify(ChangeType::Delete, vec!["service-id-6".into()],)
);

}
Expand Down Expand Up @@ -1712,7 +1712,7 @@ mod tests {
let notify_res = timeout(Duration::from_secs(5), notify_rows.recv()).await?;
assert_eq!(
notify_res.unwrap().unwrap(),
QueryEvent::Notify(TableName("buftests".into()), ChangeType::Update, vec![Integer(2)],)
QueryEvent::Notify(ChangeType::Update, vec![Integer(2)],)
);

tripwire_tx.send(()).await.ok();
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum TypedQueryEvent<T> {
},
Change(ChangeType, RowId, T, ChangeId),
Error(CompactString),
Notify(TableName, ChangeType, T)
Notify(ChangeType, T)
}

impl<T> TypedQueryEvent<T> {
Expand All @@ -46,7 +46,7 @@ impl<T> TypedQueryEvent<T> {
TypedQueryEvent::EndOfQuery { change_id, .. } => QueryEventMeta::EndOfQuery(*change_id),
TypedQueryEvent::Change(_, _, _, id) => QueryEventMeta::Change(*id),
TypedQueryEvent::Error(_) => QueryEventMeta::Error,
TypedQueryEvent::Notify(_, _, _) => QueryEventMeta::Notify,
TypedQueryEvent::Notify(_, _) => QueryEventMeta::Notify,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions crates/corro-types/src/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn handle_candidates(
candidates.keys().collect::<Vec<_>>()
);

for (table, pks) in candidates {
for (_, pks) in candidates {
let pks = pks
.iter()
.map(|(pk, cl)| unpack_columns(pk).and_then(|x| Ok((x, cl.clone()))))
Expand All @@ -261,7 +261,6 @@ fn handle_candidates(
change_type = ChangeType::Delete
}
if let Err(e) = evt_tx.blocking_send(QueryEvent::Notify(
table.clone(),
change_type,
pk.iter().map(|x| x.to_owned()).collect::<Vec<_>>(),
)) {
Expand Down

0 comments on commit 984f79e

Please sign in to comment.