@@ -4,6 +4,8 @@ use std::{
44} ;
55
66use ethlambda_blockchain:: { BlockChain , OutboundGossip } ;
7+ use ethlambda_storage:: Store ;
8+ use ethlambda_types:: state:: Checkpoint ;
79use ethrex_common:: H264 ;
810use ethrex_p2p:: types:: NodeRecord ;
911use ethrex_rlp:: decode:: RLPDecode ;
@@ -41,6 +43,7 @@ pub async fn start_p2p(
4143 listening_socket : SocketAddr ,
4244 blockchain : BlockChain ,
4345 p2p_rx : mpsc:: UnboundedReceiver < OutboundGossip > ,
46+ store : Store ,
4447) {
4548 let config = libp2p:: gossipsub:: ConfigBuilder :: default ( )
4649 // d
@@ -142,7 +145,15 @@ pub async fn start_p2p(
142145
143146 info ! ( "P2P node started on {listening_socket}" ) ;
144147
145- event_loop ( swarm, blockchain, p2p_rx, attestation_topic, block_topic) . await ;
148+ event_loop (
149+ swarm,
150+ blockchain,
151+ p2p_rx,
152+ attestation_topic,
153+ block_topic,
154+ store,
155+ )
156+ . await ;
146157}
147158
148159/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours
@@ -160,6 +171,7 @@ async fn event_loop(
160171 mut p2p_rx : mpsc:: UnboundedReceiver < OutboundGossip > ,
161172 attestation_topic : libp2p:: gossipsub:: IdentTopic ,
162173 block_topic : libp2p:: gossipsub:: IdentTopic ,
174+ store : Store ,
163175) {
164176 loop {
165177 tokio:: select! {
@@ -179,7 +191,7 @@ async fn event_loop(
179191 SwarmEvent :: Behaviour ( BehaviourEvent :: ReqResp (
180192 message @ request_response:: Event :: Message { .. } ,
181193 ) ) => {
182- handle_req_resp_message( & mut swarm, message) . await ;
194+ handle_req_resp_message( & mut swarm, message, & store ) . await ;
183195 }
184196 SwarmEvent :: Behaviour ( BehaviourEvent :: Gossipsub (
185197 message @ libp2p:: gossipsub:: Event :: Message { .. } ,
@@ -195,8 +207,13 @@ async fn event_loop(
195207 let direction = connection_direction( & endpoint) ;
196208 if num_established. get( ) == 1 {
197209 metrics:: notify_peer_connected( & Some ( peer_id) , direction, "success" ) ;
210+ // Send status request on first connection to this peer
211+ let our_status = build_status( & store) ;
212+ info!( %peer_id, %direction, finalized_slot=%our_status. finalized. slot, head_slot=%our_status. head. slot, "Added connection to new peer, sending status request" ) ;
213+ swarm. behaviour_mut( ) . req_resp. send_request( & peer_id, our_status) ;
214+ } else {
215+ info!( %peer_id, %direction, "Added peer connection" ) ;
198216 }
199- info!( %peer_id, %direction, "Peer connected" ) ;
200217 }
201218 SwarmEvent :: ConnectionClosed {
202219 peer_id,
@@ -296,6 +313,7 @@ async fn handle_outgoing_gossip(
296313async fn handle_req_resp_message (
297314 swarm : & mut libp2p:: Swarm < Behaviour > ,
298315 event : request_response:: Event < Status , Status > ,
316+ store : & Store ,
299317) {
300318 let request_response:: Event :: Message {
301319 peer,
@@ -312,13 +330,12 @@ async fn handle_req_resp_message(
312330 channel,
313331 } => {
314332 info ! ( finalized_slot=%request. finalized. slot, head_slot=%request. head. slot, "Received status request from peer {peer}" ) ;
315- // TODO: send real status
333+ let our_status = build_status ( store ) ;
316334 swarm
317335 . behaviour_mut ( )
318336 . req_resp
319- . send_response ( channel, request . clone ( ) )
337+ . send_response ( channel, our_status )
320338 . unwrap ( ) ;
321- swarm. behaviour_mut ( ) . req_resp . send_request ( & peer, request) ;
322339 }
323340 request_response:: Message :: Response {
324341 request_id : _,
@@ -377,6 +394,20 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str
377394 }
378395}
379396
397+ /// Build a Status message from the current Store state.
398+ fn build_status ( store : & Store ) -> Status {
399+ let finalized = store. latest_finalized ( ) ;
400+ let head_root = store. head ( ) ;
401+ let head_slot = store. get_block ( & head_root) . expect ( "head block exists" ) . slot ;
402+ Status {
403+ finalized,
404+ head : Checkpoint {
405+ root : head_root,
406+ slot : head_slot,
407+ } ,
408+ }
409+ }
410+
380411fn compute_message_id ( message : & libp2p:: gossipsub:: Message ) -> libp2p:: gossipsub:: MessageId {
381412 const MESSAGE_DOMAIN_INVALID_SNAPPY : [ u8 ; 4 ] = [ 0x00 , 0x00 , 0x00 , 0x00 ] ;
382413 const MESSAGE_DOMAIN_VALID_SNAPPY : [ u8 ; 4 ] = [ 0x01 , 0x00 , 0x00 , 0x00 ] ;
0 commit comments