@@ -64,7 +64,7 @@ pub(crate) struct RemoteMap {
6464 disco : DiscoState ,
6565 sender : TransportsSender ,
6666 discovery : ConcurrentDiscovery ,
67- actor_tasks : Mutex < JoinSet < Vec < RemoteStateMessage > > > ,
67+ actor_tasks : Mutex < JoinSet < ( EndpointId , Vec < RemoteStateMessage > ) > > ,
6868}
6969
7070impl RemoteMap {
@@ -105,7 +105,22 @@ impl RemoteMap {
105105 senders. retain ( |_eid, sender| !sender. is_closed ( ) ) ;
106106 while let Some ( result) = self . actor_tasks . lock ( ) . expect ( "poisoned" ) . try_join_next ( ) {
107107 match result {
108- Ok ( leftover_msgs) => debug ! ( ?leftover_msgs, "TODO: handle leftover messages" ) ,
108+ Ok ( ( eid, leftover_msgs) ) => {
109+ let entry = senders. entry ( eid) ;
110+ if leftover_msgs. is_empty ( ) {
111+ match entry {
112+ hash_map:: Entry :: Occupied ( occupied_entry) => occupied_entry. remove ( ) ,
113+ hash_map:: Entry :: Vacant ( _) => {
114+ panic ! ( "this should be impossible TODO(matheus23)" ) ;
115+ }
116+ } ;
117+ } else {
118+ // The remote actor got messages while it was closing, so we're restarting
119+ debug ! ( %eid, "restarting terminated remote state actor: messages received during shutdown" ) ;
120+ let sender = self . start_remote_state_actor ( eid, leftover_msgs) ;
121+ entry. insert_entry ( sender) ;
122+ }
123+ }
109124 Err ( err) => {
110125 if let Ok ( panic) = err. try_into_panic ( ) {
111126 error ! ( "RemoteStateActor panicked." ) ;
@@ -126,7 +141,7 @@ impl RemoteMap {
126141 match handles. entry ( eid) {
127142 hash_map:: Entry :: Occupied ( entry) => entry. get ( ) . clone ( ) ,
128143 hash_map:: Entry :: Vacant ( entry) => {
129- let sender = self . start_remote_state_actor ( eid) ;
144+ let sender = self . start_remote_state_actor ( eid, vec ! [ ] ) ;
130145 entry. insert ( sender. clone ( ) ) ;
131146 sender
132147 }
@@ -136,7 +151,11 @@ impl RemoteMap {
136151 /// Starts a new remote state actor and returns a handle and a sender.
137152 ///
138153 /// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
139- fn start_remote_state_actor ( & self , eid : EndpointId ) -> mpsc:: Sender < RemoteStateMessage > {
154+ fn start_remote_state_actor (
155+ & self ,
156+ eid : EndpointId ,
157+ initial_msgs : Vec < RemoteStateMessage > ,
158+ ) -> mpsc:: Sender < RemoteStateMessage > {
140159 // Ensure there is a RemoteMappedAddr for this EndpointId.
141160 self . endpoint_mapped_addrs . get ( & eid) ;
142161 RemoteStateActor :: new (
@@ -149,7 +168,10 @@ impl RemoteMap {
149168 self . sender . clone ( ) ,
150169 self . discovery . clone ( ) ,
151170 )
152- . start ( self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) )
171+ . start (
172+ initial_msgs,
173+ self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) ,
174+ )
153175 }
154176
155177 pub ( super ) fn handle_ping ( & self , msg : disco:: Ping , sender : EndpointId , src : transports:: Addr ) {
0 commit comments