Skip to content

Commit 99f16e3

Browse files
committed
feat(actions): time in enabling conditions
1 parent 05ef952 commit 99f16e3

File tree

6 files changed

+16
-63
lines changed

6 files changed

+16
-63
lines changed

node/src/p2p/channels/rpc/p2p_channels_rpc_actions.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,6 @@ use super::*;
22

33
impl redux::EnablingCondition<crate::State> for P2pChannelsRpcAction {
44
fn is_enabled(&self, state: &crate::State) -> bool {
5-
match self {
6-
Self::Timeout { peer_id, id } => {
7-
self.is_enabled(&state.p2p)
8-
&& state.p2p.is_peer_rpc_timed_out(peer_id, *id, state.time())
9-
}
10-
_ => self.is_enabled(&state.p2p),
11-
}
5+
self.is_enabled(&state.p2p)
126
}
137
}

node/src/p2p/connection/incoming/p2p_connection_incoming_actions.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,6 @@ use super::*;
22

33
impl redux::EnablingCondition<crate::State> for P2pConnectionIncomingAction {
44
fn is_enabled(&self, state: &crate::State) -> bool {
5-
match self {
6-
Self::Timeout { peer_id } => {
7-
let peer = state.p2p.peers.get(peer_id);
8-
let timed_out = peer
9-
.and_then(|peer| peer.status.as_connecting()?.as_incoming())
10-
.map_or(false, |s| s.is_timed_out(state.time()));
11-
timed_out && self.is_enabled(&state.p2p)
12-
}
13-
_ => self.is_enabled(&state.p2p),
14-
}
5+
self.is_enabled(&state.p2p)
156
}
167
}
Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,7 @@
1-
use std::time::Duration;
2-
3-
use crate::p2p::connection::P2pConnectionState;
4-
use crate::p2p::P2pPeerStatus;
5-
61
use super::*;
72

83
impl redux::EnablingCondition<crate::State> for P2pConnectionOutgoingAction {
94
fn is_enabled(&self, state: &crate::State) -> bool {
10-
match self {
11-
P2pConnectionOutgoingAction::Reconnect { opts, .. } => {
12-
if !self.is_enabled(&state.p2p) {
13-
return false;
14-
}
15-
16-
let Some(peer) = state.p2p.peers.get(opts.peer_id()) else {
17-
return false;
18-
};
19-
let delay_passed = match &peer.status {
20-
P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(
21-
P2pConnectionOutgoingState::Error { time, .. },
22-
))
23-
| P2pPeerStatus::Disconnected { time, .. } => {
24-
state.time().checked_sub(*time) >= Some(Duration::from_secs(30))
25-
}
26-
_ => true,
27-
};
28-
delay_passed
29-
}
30-
P2pConnectionOutgoingAction::Timeout { peer_id } => {
31-
let peer = state.p2p.peers.get(peer_id);
32-
let timed_out = peer
33-
.and_then(|peer| peer.status.as_connecting()?.as_outgoing())
34-
.map_or(false, |s| s.is_timed_out(state.time()));
35-
timed_out && self.is_enabled(&state.p2p)
36-
}
37-
_ => self.is_enabled(&state.p2p),
38-
}
5+
self.is_enabled(&state.p2p)
396
}
407
}

p2p/src/channels/rpc/p2p_channels_rpc_actions.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use redux::Timestamp;
12
use serde::{Deserialize, Serialize};
23

34
use crate::{P2pState, PeerId};
@@ -61,7 +62,7 @@ impl P2pChannelsRpcAction {
6162
}
6263

6364
impl redux::EnablingCondition<P2pState> for P2pChannelsRpcAction {
64-
fn is_enabled(&self, state: &P2pState) -> bool {
65+
fn is_enabled_with_time(&self, state: &P2pState, time: Timestamp) -> bool {
6566
match self {
6667
P2pChannelsRpcAction::Init { peer_id } => {
6768
state.get_ready_peer(peer_id).map_or(false, |p| {
@@ -93,12 +94,8 @@ impl redux::EnablingCondition<P2pState> for P2pChannelsRpcAction {
9394
})
9495
},
9596
P2pChannelsRpcAction::Timeout { peer_id, id } => {
96-
state.get_ready_peer(peer_id).map_or(false, |p| match &p.channels.rpc {
97-
P2pChannelsRpcState::Ready { local, .. } => {
98-
matches!(local, P2pRpcLocalState::Requested { id: rpc_id, .. } if rpc_id == id)
99-
},
100-
_ => false,
101-
})
97+
state.get_ready_peer(peer_id).map_or(false, |p| matches!(&p.channels.rpc, P2pChannelsRpcState::Ready { local: P2pRpcLocalState::Requested { id: rpc_id, .. }, .. } if rpc_id == id))
98+
&& state.is_peer_rpc_timed_out(peer_id, *id, time)
10299
},
103100
P2pChannelsRpcAction::ResponseReceived { peer_id, id, .. } => {
104101
// TODO(binier): use consensus to enforce that peer doesn't send

p2p/src/connection/incoming/p2p_connection_incoming_actions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl P2pConnectionIncomingAction {
7979
}
8080

8181
impl redux::EnablingCondition<P2pState> for P2pConnectionIncomingAction {
82-
fn is_enabled(&self, state: &P2pState) -> bool {
82+
fn is_enabled_with_time(&self, state: &P2pState, time: redux::Timestamp) -> bool {
8383
match self {
8484
P2pConnectionIncomingAction::Init { opts, .. } => {
8585
state.incoming_accept(opts.peer_id, &opts.offer).is_ok()
@@ -160,7 +160,7 @@ impl redux::EnablingCondition<P2pState> for P2pConnectionIncomingAction {
160160
.peers
161161
.get(peer_id)
162162
.and_then(|peer| peer.status.as_connecting()?.as_incoming())
163-
.is_some(),
163+
.map_or(false, |s| s.is_timed_out(time)),
164164
P2pConnectionIncomingAction::Error { peer_id, error } => state
165165
.peers
166166
.get(peer_id)

p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use serde::{Deserialize, Serialize};
24

35
use openmina_core::requests::RpcId;
@@ -97,7 +99,7 @@ impl P2pConnectionOutgoingAction {
9799
}
98100

99101
impl redux::EnablingCondition<P2pState> for P2pConnectionOutgoingAction {
100-
fn is_enabled(&self, state: &P2pState) -> bool {
102+
fn is_enabled_with_time(&self, state: &P2pState, time: redux::Timestamp) -> bool {
101103
match self {
102104
P2pConnectionOutgoingAction::RandomInit => {
103105
!state.already_has_min_peers() && !state.initial_unused_peers().is_empty()
@@ -130,7 +132,9 @@ impl redux::EnablingCondition<P2pState> for P2pConnectionOutgoingAction {
130132
.min_by_key(|(time, ..)| *time)
131133
.filter(|(_, id, _)| *id == opts.peer_id())
132134
.filter(|(.., peer_opts)| peer_opts.as_ref().map_or(true, |o| o == opts))
133-
.is_some()
135+
.map_or(false, |(t, ..)| {
136+
time.checked_sub(t) >= Some(Duration::from_secs(30))
137+
})
134138
}
135139
P2pConnectionOutgoingAction::OfferSdpCreatePending { peer_id } => state
136140
.peers
@@ -237,7 +241,7 @@ impl redux::EnablingCondition<P2pState> for P2pConnectionOutgoingAction {
237241
.peers
238242
.get(peer_id)
239243
.and_then(|peer| peer.status.as_connecting()?.as_outgoing())
240-
.is_some(),
244+
.map_or(false, |s| s.is_timed_out(time)),
241245
P2pConnectionOutgoingAction::Error { peer_id, error } => state
242246
.peers
243247
.get(peer_id)

0 commit comments

Comments
 (0)