@@ -733,6 +733,7 @@ mod tests {
733733 use crate :: execution_context:: Workload ;
734734 use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent , ModuleFunctionCall } ;
735735 use crate :: messages:: websocket as ws;
736+ use crate :: sql:: execute:: run;
736737 use crate :: subscription:: module_subscription_manager:: SubscriptionManager ;
737738 use crate :: subscription:: query:: compile_read_only_query;
738739 use crate :: subscription:: TableUpdateType ;
@@ -1529,6 +1530,51 @@ mod tests {
15291530 Ok ( ( ) )
15301531 }
15311532
1533+ /// Test that we receive subscription updates for DML
1534+ #[ tokio:: test]
1535+ async fn test_updates_for_dml ( ) -> anyhow:: Result < ( ) > {
1536+ // Establish a client connection
1537+ let ( tx, mut rx) = client_connection ( client_id_from_u8 ( 1 ) ) ;
1538+
1539+ let db = relational_db ( ) ?;
1540+ let subs = module_subscriptions ( db. clone ( ) ) ;
1541+ let schema = [ ( "x" , AlgebraicType :: U8 ) , ( "y" , AlgebraicType :: U8 ) ] ;
1542+ let t_id = db. create_table_for_test ( "t" , & schema, & [ ] ) ?;
1543+
1544+ // Subscribe to `t`
1545+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1546+
1547+ // Wait to receive the initial subscription message
1548+ assert ! ( matches!( rx. recv( ) . await , Some ( SerializableMessage :: Subscription ( _) ) ) ) ;
1549+
1550+ let schema = ProductType :: from ( [ AlgebraicType :: U8 , AlgebraicType :: U8 ] ) ;
1551+
1552+ // Only the owner can invoke DML commands
1553+ let auth = AuthCtx :: new ( identity_from_u8 ( 0 ) , identity_from_u8 ( 0 ) ) ;
1554+
1555+ run (
1556+ & db,
1557+ "INSERT INTO t (x, y) VALUES (0, 1)" ,
1558+ auth,
1559+ Some ( & subs) ,
1560+ & mut vec ! [ ] ,
1561+ ) ?;
1562+
1563+ // Client should receive insert
1564+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 1_u8 ] ] , [ ] ) . await ;
1565+
1566+ run ( & db, "UPDATE t SET y=2 WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1567+
1568+ // Client should receive update
1569+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 2_u8 ] ] , [ product ! [ 0_u8 , 1_u8 ] ] ) . await ;
1570+
1571+ run ( & db, "DELETE FROM t WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1572+
1573+ // Client should receive delete
1574+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ ] , [ product ! [ 0_u8 , 2_u8 ] ] ) . await ;
1575+ Ok ( ( ) )
1576+ }
1577+
15321578 /// Test that we do not compress within a [TransactionUpdateMessage].
15331579 /// The message itself is compressed before being sent over the wire,
15341580 /// but we don't care about that for this test.
0 commit comments