@@ -733,12 +733,14 @@ mod tests {
733733 use crate :: execution_context:: Workload ;
734734 use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent , ModuleFunctionCall } ;
735735 use crate :: messages:: websocket as ws;
736+ use crate :: sql:: execute:: run;
736737 use crate :: subscription:: module_subscription_manager:: SubscriptionManager ;
737738 use crate :: subscription:: query:: compile_read_only_query;
738739 use crate :: subscription:: TableUpdateType ;
739740 use hashbrown:: HashMap ;
740741 use itertools:: Itertools ;
741742 use parking_lot:: RwLock ;
743+ use pretty_assertions:: assert_matches;
742744 use spacetimedb_client_api_messages:: energy:: EnergyQuanta ;
743745 use spacetimedb_client_api_messages:: websocket:: {
744746 CompressableQueryUpdate , Compression , FormatSwitch , QueryId , Subscribe , SubscribeMulti , SubscribeSingle ,
@@ -1529,6 +1531,51 @@ mod tests {
15291531 Ok ( ( ) )
15301532 }
15311533
1534+ /// Test that we receive subscription updates for DML
1535+ #[ tokio:: test]
1536+ async fn test_updates_for_dml ( ) -> anyhow:: Result < ( ) > {
1537+ // Establish a client connection
1538+ let ( tx, mut rx) = client_connection ( client_id_from_u8 ( 1 ) ) ;
1539+
1540+ let db = relational_db ( ) ?;
1541+ let subs = module_subscriptions ( db. clone ( ) ) ;
1542+ let schema = [ ( "x" , AlgebraicType :: U8 ) , ( "y" , AlgebraicType :: U8 ) ] ;
1543+ let t_id = db. create_table_for_test ( "t" , & schema, & [ ] ) ?;
1544+
1545+ // Subscribe to `t`
1546+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1547+
1548+ // Wait to receive the initial subscription message
1549+ assert_matches ! ( rx. recv( ) . await , Some ( SerializableMessage :: Subscription ( _) ) ) ;
1550+
1551+ let schema = ProductType :: from ( [ AlgebraicType :: U8 , AlgebraicType :: U8 ] ) ;
1552+
1553+ // Only the owner can invoke DML commands
1554+ let auth = AuthCtx :: new ( identity_from_u8 ( 0 ) , identity_from_u8 ( 0 ) ) ;
1555+
1556+ run (
1557+ & db,
1558+ "INSERT INTO t (x, y) VALUES (0, 1)" ,
1559+ auth,
1560+ Some ( & subs) ,
1561+ & mut vec ! [ ] ,
1562+ ) ?;
1563+
1564+ // Client should receive insert
1565+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 1_u8 ] ] , [ ] ) . await ;
1566+
1567+ run ( & db, "UPDATE t SET y=2 WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1568+
1569+ // Client should receive update
1570+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 2_u8 ] ] , [ product ! [ 0_u8 , 1_u8 ] ] ) . await ;
1571+
1572+ run ( & db, "DELETE FROM t WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1573+
1574+ // Client should receive delete
1575+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ ] , [ product ! [ 0_u8 , 2_u8 ] ] ) . await ;
1576+ Ok ( ( ) )
1577+ }
1578+
15321579 /// Test that we do not compress within a [TransactionUpdateMessage].
15331580 /// The message itself is compressed before being sent over the wire,
15341581 /// but we don't care about that for this test.
0 commit comments