11use anyhow:: * ;
2+ use base64:: Engine ;
3+ use base64:: engine:: general_purpose:: STANDARD as BASE64 ;
24use epoxy_protocol:: protocol:: { self , Path , Payload , ReplicaId } ;
35use gas:: prelude:: * ;
46use rivet_api_builder:: prelude:: * ;
@@ -68,15 +70,7 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
6870
6971 match path {
7072 Path :: PathFast ( protocol:: PathFast { payload } ) => {
71- commit (
72- ctx,
73- & config,
74- replica_id,
75- & quorum_members,
76- payload,
77- input. purge_cache ,
78- )
79- . await
73+ commit ( ctx, & config, replica_id, payload, input. purge_cache ) . await
8074 }
8175 Path :: PathSlow ( protocol:: PathSlow { payload } ) => {
8276 run_paxos_accept (
@@ -125,15 +119,7 @@ pub async fn run_paxos_accept(
125119
126120 // EPaxos Step 20
127121 if quorum >= utils:: calculate_quorum ( quorum_members. len ( ) , utils:: QuorumType :: Slow ) {
128- commit (
129- ctx,
130- & config,
131- replica_id,
132- & quorum_members,
133- payload_for_accepts,
134- purge_cache,
135- )
136- . await
122+ commit ( ctx, & config, replica_id, payload_for_accepts, purge_cache) . await
137123 } else {
138124 Ok ( ProposalResult :: ConsensusFailed )
139125 }
@@ -144,7 +130,6 @@ pub async fn commit(
144130 ctx : & OperationCtx ,
145131 config : & protocol:: ClusterConfig ,
146132 replica_id : ReplicaId ,
147- quorum_members : & [ ReplicaId ] ,
148133 payload : Payload ,
149134 purge_cache : bool ,
150135) -> Result < ProposalResult > {
@@ -183,6 +168,27 @@ pub async fn commit(
183168 }
184169 } ) ;
185170
171+ if purge_cache {
172+ let keys = payload
173+ . proposal
174+ . commands
175+ . iter ( )
176+ . map ( replica:: utils:: extract_key_from_command)
177+ . flatten ( )
178+ . map ( |key| BASE64 . encode ( key) )
179+ . collect :: < Vec < _ > > ( ) ;
180+
181+ // Purge optimistic cache for all dcs
182+ if !keys. is_empty ( ) {
183+ let ctx = ctx. clone ( ) ;
184+ tokio:: spawn ( async move {
185+ if let Err ( err) = purge_optimistic_cache ( ctx, keys) . await {
186+ tracing:: error!( ?err, "failed purging optimistic cache" ) ;
187+ }
188+ } ) ;
189+ }
190+ }
191+
186192 if let Some ( cmd_err) = cmd_err {
187193 Ok ( ProposalResult :: CommandError ( cmd_err) )
188194 } else {
@@ -325,3 +331,22 @@ async fn send_commits(
325331
326332 Ok ( ( ) )
327333}
334+
335+ async fn purge_optimistic_cache ( ctx : OperationCtx , keys : Vec < String > ) -> Result < ( ) > {
336+ for dc in & ctx. config ( ) . topology ( ) . datacenters {
337+ let workflow_id = ctx
338+ . workflow ( crate :: workflows:: purger:: Input {
339+ replica_id : dc. datacenter_label as u64 ,
340+ } )
341+ . tag ( "replica_id" , dc. datacenter_label as u64 )
342+ . unique ( )
343+ . dispatch ( )
344+ . await ?;
345+ ctx. signal ( crate :: workflows:: purger:: Purge { keys : keys. clone ( ) } )
346+ . to_workflow_id ( workflow_id)
347+ . send ( )
348+ . await ?;
349+ }
350+
351+ Ok ( ( ) )
352+ }
0 commit comments