Skip to content

feat: implement async packet handler to prevent keep-alive starvation #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a68c35f
feat: enhance release script with CI failure diagnostics
sanity Jul 16, 2025
f47d397
Merge branch 'main' of github.com:freenet/freenet-core
sanity Jul 16, 2025
964e5b6
feat: implement async packet handler to prevent keep-alive starvation
sanity Jul 17, 2025
9ca409d
feat: implement async packet handler to prevent keep-alive starvation
sanity Jul 17, 2025
7b5c545
cleanup: remove dead code to fix clippy warnings
sanity Jul 17, 2025
70896c4
fix: cargo fmt formatting
sanity Jul 17, 2025
c61e538
fix: remove remaining dead code references and broken tests
sanity Jul 18, 2025
9f3269d
fix: cargo fmt formatting
sanity Jul 18, 2025
0416e53
fix: restore test-only methods with cfg(test) annotations
sanity Jul 18, 2025
e5f22f0
fix: cargo fmt formatting
sanity Jul 18, 2025
84cd2b9
fix: resolve race condition in async packet handler causing test hangs
sanity Jul 18, 2025
4c130a9
fix: restore stream handling and fix race condition in async packet h…
sanity Jul 18, 2025
92320e0
fix: rate limit channel backlog warnings to prevent log spam
sanity Jul 18, 2025
191830b
fix: replace problematic wait_for_next_completed with proper async pa…
sanity Jul 18, 2025
1f34a36
debug: test async packet handler timing fix for NAT traversal
sanity Jul 18, 2025
1368e03
fix: resolve async packet handler race condition with polling frequency
sanity Jul 18, 2025
dfb8bd5
fix: increase async handler polling interval to 50ms for stable opera…
sanity Jul 18, 2025
d60613e
fix: use adaptive polling interval for packet handler checking
sanity Jul 18, 2025
b254b04
fix: resolve deadlock in contract operations (PUT/GET/UPDATE)
sanity Jul 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
client_request = client_events.recv() => {
let req = match client_request {
Ok(request) => {
tracing::debug!(%request, "got client request event");
tracing::info!(%request, "NODE: Received client request event");
request
}
Err(error) if matches!(error.kind(), ErrorKind::Shutdown) => {
Expand Down Expand Up @@ -347,12 +347,15 @@ async fn process_open_request(
return Err(Error::Disconnected);
};

tracing::debug!(
let contract_key = contract.key();
tracing::info!(
this_peer = %peer_id,
"Received put from user event",
contract_key = %contract_key,
state_size = state.size(),
subscribe = subscribe,
"NODE: Processing PUT request from client",
);

let contract_key = contract.key();
let op = put::start_op(
contract,
related_contracts,
Expand All @@ -362,6 +365,17 @@ async fn process_open_request(
);
let op_id = op.id;

tracing::info!(
op_id = %op_id,
contract_key = %contract_key,
"NODE: Created PUT operation"
);

// Start the PUT operation BEFORE waiting for result to avoid deadlock
if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}

op_manager
.ch_outbound
.waiting_for_transaction_result(op_id, client_id)
Expand All @@ -370,10 +384,6 @@ async fn process_open_request(
tracing::error!("Error waiting for transaction result: {}", err);
})?;

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}

// Register subscription listener if subscribe=true
if subscribe {
if let Some(subscription_listener) = subscription_listener {
Expand Down Expand Up @@ -465,18 +475,20 @@ async fn process_open_request(
"Sending update op",
);
let op = update::start_op(key, new_state, related_contracts);
let op_id = op.id;

// Start the UPDATE operation BEFORE waiting for result to avoid deadlock
if let Err(err) = update::request_update(&op_manager, op).await {
tracing::error!("request update error {}", err)
}

op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.waiting_for_transaction_result(op_id, client_id)
.await
.inspect_err(|err| {
tracing::error!("Error waiting for transaction result: {}", err);
})?;

if let Err(err) = update::request_update(&op_manager, op).await {
tracing::error!("request update error {}", err)
}
}
ContractRequest::Get {
key,
Expand Down Expand Up @@ -578,23 +590,25 @@ async fn process_open_request(
);

let op = get::start_op(key, return_contract_code, subscribe);
let op_id = op.id;

// Start the GET operation BEFORE waiting for result to avoid deadlock
if let Err(err) =
get::request_get(&op_manager, op, HashSet::new()).await
{
tracing::error!("get::request_get error: {}", err);
}

op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.waiting_for_transaction_result(op_id, client_id)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result (get): {}",
err
);
})?;

if let Err(err) =
get::request_get(&op_manager, op, HashSet::new()).await
{
tracing::error!("get::request_get error: {}", err);
}
}
}
ContractRequest::Subscribe { key, summary } => {
Expand Down
45 changes: 43 additions & 2 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ impl WebSocketProxy {
auth_token,
attested_contract,
} => {
tracing::info!(
"SERVER: Processing request in WebSocketProxy - client_id: {}, request_type: {}",
client_id,
match req.as_ref() {
ClientRequest::ContractOp(ContractRequest::Put { .. }) => "PUT",
ClientRequest::ContractOp(ContractRequest::Get { .. }) => "GET",
ClientRequest::ContractOp(ContractRequest::Subscribe { .. }) => "SUBSCRIBE",
ClientRequest::ContractOp(ContractRequest::Update { .. }) => "UPDATE",
_ => "OTHER",
}
);
let open_req = match &*req {
ClientRequest::ContractOp(ContractRequest::Subscribe { key, .. }) => {
tracing::debug!(%client_id, contract = %key, "subscribing to contract");
Expand Down Expand Up @@ -521,7 +532,25 @@ async fn process_client_request(
*auth_token = Some(AuthToken::from(token.clone()));
}

tracing::debug!(req = %req, "received client request");
tracing::info!(req = %req, client_id = %client_id, "SERVER: Received client request");

// Log specific details for PUT requests
if let ClientRequest::ContractOp(ContractRequest::Put {
contract,
state,
subscribe,
..
}) = &req
{
tracing::info!(
"SERVER: PUT request details - contract_key: {}, state_size: {} bytes, subscribe: {}",
contract.key(),
state.size(),
subscribe
);
}

let send_start = std::time::Instant::now();
request_sender
.send(ClientConnection::Request {
client_id,
Expand All @@ -531,6 +560,11 @@ async fn process_client_request(
})
.await
.map_err(|err| Some(err.into()))?;

tracing::info!(
"SERVER: Request forwarded to node - elapsed: {:?}",
send_start.elapsed()
);
Ok(None)
}

Expand All @@ -552,7 +586,14 @@ async fn process_host_response(
HostResponse::Ok => "HostResponse::Ok",
_ => "Unknown",
};
tracing::debug!(response = %res, response_type, cli_id = %id, "sending response");
tracing::info!(response = %res, response_type, cli_id = %id, "SERVER: Sending response to client");

// Log specific details for PUT responses
if let HostResponse::ContractResponse(ContractResponse::PutResponse { key }) =
&res
{
tracing::info!("SERVER: PUT response - contract_key: {key}");
}
match res {
HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
Expand Down
18 changes: 15 additions & 3 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
client_api::{ClientRequest, ContractResponse, ErrorKind, HostResponse},
prelude::ContractKey,
};
use std::{
Expand Down Expand Up @@ -377,8 +377,20 @@ async fn report_result(
Ok(Some(op_res)) => {
if let Some((client_ids, cb)) = client_req_handler_callback {
for client_id in client_ids {
tracing::debug!(?tx, %client_id, "Sending response to client");
let _ = cb.send((client_id, op_res.to_host_result()));
let host_result = op_res.to_host_result();
tracing::info!(
?tx,
%client_id,
op_type = ?op_res.id().transaction_type(),
"NODE: Sending operation result to client - {:?}",
match &host_result {
Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key })) =>
format!("PUT response for key: {key}"),
Ok(_) => "Other response".to_string(),
Err(e) => format!("Error: {e:?}")
}
);
let _ = cb.send((client_id, host_result));
}
}
// check operations.rs:handle_op_result to see what's the meaning of each state
Expand Down
63 changes: 58 additions & 5 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,27 @@ impl HandshakeHandler {
tracing::debug!("Unconfirmed connection event: {:?}", event);
match event {
InternalEvent::InboundGwJoinRequest(mut req) => {
tracing::info!(
"GATEWAY_JOIN_REQ: Received StartJoinReq from {} (tx: {})",
req.joiner.addr,
req.id
);
let location = if let Some((_, other)) = self.this_location.zip(req.location) {
other
} else {
Location::from_address(&req.conn.remote_addr())
};
tracing::info!(
"GATEWAY_JOIN_REQ: Checking should_accept for peer {} at location {}",
req.joiner.addr,
location
);
let should_accept = self.connection_manager.should_accept(location, &req.joiner);
tracing::info!(
"GATEWAY_JOIN_REQ: should_accept returned {} for peer {}",
should_accept,
req.joiner.addr
);
if should_accept {
let accepted_msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: req.id,
Expand Down Expand Up @@ -518,6 +533,10 @@ impl HandshakeHandler {
})

} else {
tracing::warn!(
"GATEWAY_JOIN_REQ: Rejected join request from {} - gateway NOT accepting new connections",
req.joiner.addr
);
let InboundGwJoinRequest {
mut conn,
id,
Expand Down Expand Up @@ -974,6 +993,11 @@ async fn wait_for_gw_confirmation(
mut tracker: AcceptedTracker,
) -> OutboundConnResult {
let gw_peer_id = tracker.gw_peer.peer.clone();
tracing::info!(
"HANDSHAKE_JOIN_REQ: Preparing StartJoinReq to gateway {}, tx: {}",
gw_peer_id.addr,
tracker.tx
);
let msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
id: tracker.tx,
target: tracker.gw_peer.clone(),
Expand All @@ -993,11 +1017,22 @@ async fn wait_for_gw_confirmation(
msg = ?msg,
"Sending initial connection message to gw"
);
tracker
.gw_conn
.send(msg)
.await
.map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err)))?;
tracing::info!(
"HANDSHAKE_JOIN_REQ: Sending StartJoinReq message to gateway {}",
tracker.gw_conn.remote_addr()
);
tracker.gw_conn.send(msg).await.map_err(|err| {
tracing::error!(
"HANDSHAKE_JOIN_REQ: Failed to send message to gateway {}: {:?}",
tracker.gw_conn.remote_addr(),
err
);
(gw_peer_id.clone(), HandshakeError::TransportError(err))
})?;
tracing::info!(
"HANDSHAKE_JOIN_REQ: Message sent successfully to gateway {}, now waiting for response",
tracker.gw_conn.remote_addr()
);
tracing::debug!(
at=?tracker.gw_conn.my_address(),
from=%tracker.gw_conn.remote_addr(),
Expand All @@ -1024,12 +1059,22 @@ async fn wait_for_gw_confirmation(
async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResult {
let remote_addr = tracker.gw_conn.remote_addr();
let gw_peer_id = tracker.gw_peer.peer.clone();
tracing::info!(
"HANDSHAKE_RESPONSE_WAIT: Starting to wait for response from gateway {}, remaining_checks: {}",
remote_addr,
tracker.remaining_checks
);
tracing::debug!(
at=?tracker.gw_conn.my_address(),
from=%tracker.gw_conn.remote_addr(),
"Checking for remaining hops, left: {}", tracker.remaining_checks
);
while tracker.remaining_checks > 0 {
tracing::info!(
"HANDSHAKE_RESPONSE_WAIT: Waiting for message #{} from gateway {}",
tracker.remaining_checks,
remote_addr
);
let msg = tokio::time::timeout(
TIMEOUT,
tracker
Expand All @@ -1038,13 +1083,21 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul
.map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err))),
)
.map_err(|_| {
tracing::error!(
"HANDSHAKE_RESPONSE_WAIT: TIMEOUT waiting for response from gateway {}, no response received",
remote_addr
);
tracing::debug!(from = %gw_peer_id, "Timed out waiting for response from gw");
(
gw_peer_id.clone(),
HandshakeError::ConnectionClosed(remote_addr),
)
})
.await??;
tracing::info!(
"HANDSHAKE_RESPONSE_WAIT: Received message from gateway {}",
remote_addr
);
let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e))?;
match msg {
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
Expand Down
11 changes: 11 additions & 0 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,22 @@ pub(crate) async fn initial_join_procedure(
number_of_parallel_connections.min(unconnected_count)
);
let select_all = futures::stream::FuturesUnordered::new();

// Get own peer key safely - might not be set during initial connection
let own_peer_key = op_manager.ring.connection_manager.get_peer_key();

for gateway in unconnected_gateways
.into_iter()
.shuffle()
.take(number_of_parallel_connections)
{
// Skip self-connections to prevent routing conflicts
if let Some(ref own_peer) = own_peer_key {
if gateway.peer.addr == own_peer.addr {
tracing::debug!(%gateway, "Skipping self-connection to prevent routing conflicts");
continue;
}
}
tracing::info!(%gateway, "Attempting connection to gateway");
let op_manager = op_manager.clone();
select_all.push(async move {
Expand Down
Loading
Loading