diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index be69e254..aa15f623 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -1151,12 +1151,12 @@ mod tests { assert_eq!( notify_rows.recv().await.unwrap().unwrap(), - QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-3".into()],) + QueryEvent::Notify(ChangeType::Update, vec!["service-id-3".into()],) ); assert_eq!( notify_rows.recv().await.unwrap().unwrap(), - QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-4".into()],) + QueryEvent::Notify(ChangeType::Update, vec!["service-id-4".into()],) ); let mut res = api_v1_subs( @@ -1243,7 +1243,7 @@ mod tests { assert_eq!(rows_from.recv().await.unwrap().unwrap(), query_evt); let notify_evt = - QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-5".into()]); + QueryEvent::Notify(ChangeType::Update, vec!["service-id-5".into()]); assert_eq!(notify_rows.recv().await.unwrap().unwrap(), notify_evt); @@ -1341,12 +1341,12 @@ mod tests { assert_eq!( notify_rows.recv().await.unwrap().unwrap(), - QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-6".into()],) + QueryEvent::Notify(ChangeType::Update, vec!["service-id-6".into()],) ); assert_eq!( notify_rows.recv().await.unwrap().unwrap(), - QueryEvent::Notify(TableName("tests".into()), ChangeType::Delete, vec!["service-id-6".into()],) + QueryEvent::Notify(ChangeType::Delete, vec!["service-id-6".into()],) ); } @@ -1712,7 +1712,7 @@ mod tests { let notify_res = timeout(Duration::from_secs(5), notify_rows.recv()).await?; assert_eq!( notify_res.unwrap().unwrap(), - QueryEvent::Notify(TableName("buftests".into()), ChangeType::Update, vec![Integer(2)],) + QueryEvent::Notify(ChangeType::Update, vec![Integer(2)],) ); tripwire_tx.send(()).await.ok(); diff --git a/crates/corro-api-types/src/lib.rs b/crates/corro-api-types/src/lib.rs index aaf2e62f..03d6247d 100644 --- a/crates/corro-api-types/src/lib.rs +++ b/crates/corro-api-types/src/lib.rs @@ -35,7 +35,7 @@ pub enum TypedQueryEvent { }, Change(ChangeType, RowId, T, ChangeId), Error(CompactString), - Notify(TableName, ChangeType, T) + Notify(ChangeType, T) } impl TypedQueryEvent { @@ -46,7 +46,7 @@ impl TypedQueryEvent { TypedQueryEvent::EndOfQuery { change_id, .. } => QueryEventMeta::EndOfQuery(*change_id), TypedQueryEvent::Change(_, _, _, id) => QueryEventMeta::Change(*id), TypedQueryEvent::Error(_) => QueryEventMeta::Error, - TypedQueryEvent::Notify(_, _, _) => QueryEventMeta::Notify, + TypedQueryEvent::Notify(_, _) => QueryEventMeta::Notify, } } } diff --git a/crates/corro-types/src/updates.rs b/crates/corro-types/src/updates.rs index b88e4335..1bbb6e44 100644 --- a/crates/corro-types/src/updates.rs +++ b/crates/corro-types/src/updates.rs @@ -249,7 +249,7 @@ fn handle_candidates( candidates.keys().collect::>() ); - for (table, pks) in candidates { + for (_, pks) in candidates { let pks = pks .iter() .map(|(pk, cl)| unpack_columns(pk).and_then(|x| Ok((x, cl.clone())))) @@ -261,7 +261,6 @@ fn handle_candidates( change_type = ChangeType::Delete } if let Err(e) = evt_tx.blocking_send(QueryEvent::Notify( - table.clone(), change_type, pk.iter().map(|x| x.to_owned()).collect::>(), )) {