Skip to content

Commit a3ceed2

Browse files
committed
Revert "repair: queue-based repair replacement for tree-traversal"
This reverts commit fbff822.
1 parent 44f65be commit a3ceed2

File tree

8 files changed

+401
-578
lines changed

8 files changed

+401
-578
lines changed

src/discof/forest/fd_forest.c

Lines changed: 119 additions & 174 deletions
Large diffs are not rendered by default.

src/discof/forest/fd_forest.h

Lines changed: 100 additions & 143 deletions
Large diffs are not rendered by default.

src/discof/forest/test_forest.c

Lines changed: 85 additions & 173 deletions
Large diffs are not rendered by default.

src/discof/repair/fd_inflight.c

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ fd_inflights_join( void * shmem ) {
5151
}
5252

5353
void
54-
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey, ulong slot, ulong shred_idx ) {
54+
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey ) {
5555
if( FD_UNLIKELY( !fd_inflight_pool_free( table->pool ) ) ) {
5656
fd_inflight_t * evict = fd_inflight_dlist_ele_pop_head( table->dlist, table->pool );
5757
fd_inflight_map_ele_remove( table->map, &evict->nonce, NULL, table->pool );
@@ -62,10 +62,8 @@ fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t co
6262
inflight_req->nonce = nonce;
6363
inflight_req->timestamp_ns = fd_log_wallclock();
6464
inflight_req->pubkey = *pubkey;
65-
inflight_req->slot = slot;
66-
inflight_req->shred_idx = shred_idx;
6765

68-
fd_inflight_map_ele_insert ( table->map, inflight_req, table->pool );
66+
fd_inflight_map_ele_insert( table->map, inflight_req, table->pool );
6967
fd_inflight_dlist_ele_push_tail( table->dlist, inflight_req, table->pool );
7068
}
7169

@@ -86,16 +84,6 @@ fd_inflights_request_remove( fd_inflights_t * table, ulong nonce, fd_pubkey_t *
8684
return 0;
8785
}
8886

89-
void
90-
fd_inflights_request_pop( fd_inflights_t * table, ulong * nonce_out, ulong * slot_out, ulong * shred_idx_out ) {
91-
fd_inflight_t * inflight_req = fd_inflight_dlist_ele_pop_head( table->dlist, table->pool );
92-
fd_inflight_map_ele_remove( table->map, &inflight_req->nonce, NULL, table->pool );
93-
*nonce_out = inflight_req->nonce;
94-
*slot_out = inflight_req->slot;
95-
*shred_idx_out = inflight_req->shred_idx;
96-
fd_inflight_pool_ele_release( table->pool, inflight_req );
97-
}
98-
9987
fd_inflight_t *
10088
fd_inflights_request_query( fd_inflights_t * table, ulong nonce ) {
10189
return fd_inflight_map_ele_query( table->map, &nonce, NULL, table->pool );

src/discof/repair/fd_inflight.h

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ struct __attribute__((aligned(128UL))) fd_inflight {
2121
long timestamp_ns; /* timestamp when request was created (nanoseconds) */
2222
fd_pubkey_t pubkey; /* public key of the peer */
2323

24-
ulong slot; /* slot of the request */
25-
ulong shred_idx; /* shred index of the request */
26-
2724
/* Reserved for DLL eviction */
2825
ulong prevll; /* pool index of previous element in DLL */
2926
ulong nextll; /* pool index of next element in DLL */
@@ -77,30 +74,11 @@ fd_inflights_t *
7774
fd_inflights_join( void * shmem );
7875

7976
void
80-
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey, ulong slot, ulong shred_idx );
77+
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey );
8178

8279
long
8380
fd_inflights_request_remove( fd_inflights_t * table, ulong nonce, fd_pubkey_t * peer_out );
8481

85-
/* Important! Caller must guarantee that the request list is not empty.
86-
This function cannot fail and will always try to populate the output
87-
parameters. Typical use should only call this after
88-
fd_inflights_should_drain returns true. */
89-
90-
void
91-
fd_inflights_request_pop( fd_inflights_t * table, ulong * nonce_out, ulong * slot_out, ulong * shred_idx_out );
92-
93-
static inline int
94-
fd_inflights_should_drain( fd_inflights_t * table, long now ) {
95-
/* peek at head */
96-
if( FD_UNLIKELY( fd_inflight_dlist_is_empty( table->dlist, table->pool ) ) ) return 0;
97-
98-
fd_inflight_t * inflight_req = fd_inflight_dlist_ele_peek_head( table->dlist, table->pool );
99-
if( FD_UNLIKELY( inflight_req->timestamp_ns + 100e6L < now ) ) return 1;
100-
return 0;
101-
}
102-
103-
10482
fd_inflight_t *
10583
fd_inflights_request_query ( fd_inflights_t * table, ulong nonce );
10684

src/discof/repair/fd_policy.c

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
4141
policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
4242
policy->peers.fast = fd_peer_dlist_new ( peers_fast );
4343
policy->peers.slow = fd_peer_dlist_new ( peers_slow );
44+
policy->iterf.ele_idx = ULONG_MAX;
4445
policy->turbine_slot0 = ULONG_MAX;
4546
policy->tsreset = 0;
4647
policy->nonce = 1;
@@ -201,13 +202,18 @@ fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair
201202
}
202203
}
203204

204-
/**********************/
205-
/* ADVANCE ITERATOR */
206-
/**********************/
205+
/* Every so often we'll need to reset the frontier iterator to the
206+
head of frontier, because we could end up traversing down a very
207+
long tree if we are far behind. */
207208

208-
fd_forest_iter_next( forest );
209-
if( FD_UNLIKELY( fd_forest_iter_done( forest ) ) ) {
210-
// This happens when we have already requested all the shreds we know about.
209+
if( FD_UNLIKELY( now_ms - policy->tsreset > 100UL /* ms */ ||
210+
policy->iterf.frontier_ver != fd_fseq_query( fd_forest_ver_const( forest ) ) ) ) {
211+
fd_policy_reset( policy, forest );
212+
}
213+
214+
fd_forest_blk_t * ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
215+
if( FD_UNLIKELY( !ele ) ) {
216+
// This happens when we are fully caught up i.e. we have all the shreds of every slot we know about.
211217
return NULL;
212218
}
213219

@@ -225,34 +231,53 @@ fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair
225231
next valid requestable element. */
226232

227233
int req_made = 0;
234+
while( !req_made ) {
235+
ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
236+
237+
if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
238+
/* We are not ready to repair this slot yet. But it's possible we
239+
have another fork that we need to repair... so we just
240+
should skip to the next SLOT in the consumed iterator. The
241+
likelihood that this ele is the head of turbine is high, which
242+
means that the shred_idx of the iterf is likely to be UINT_MAX,
243+
which means calling fd_forest_iter_next will advance the iterf
244+
to the next slot. */
245+
policy->iterf.shred_idx = UINT_MAX; // heinous... i'm sorry
246+
policy->iterf = fd_forest_iter_next( policy->iterf, forest );
247+
if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
248+
policy->iterf = fd_forest_iter_init( forest );
249+
break;
250+
}
251+
continue;
252+
}
228253

229-
fd_forest_blk_t * ele = fd_forest_pool_ele( pool, forest->iter.ele_idx );
230-
if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
231-
/* We are not ready to repair this slot yet. But it's possible we
232-
have another fork that we need to repair... so we just
233-
should skip to the next SLOT in the consumed iterator. The
234-
likelihood that this ele is the head of turbine is high, which
235-
means that the shred_idx of the iterf is likely to be UINT_MAX,
236-
which means calling fd_forest_iter_next will advance the iterf
237-
to the next slot. */
238-
//forest->iter.shred_idx = UINT_MAX; // heinous... i'm sorry
239-
//fd_forest_iter_next( forest );
240-
//if( FD_UNLIKELY( fd_forest_iter_done( forest->iter, forest ) ) ) break;
241-
//continue;
242-
}
254+
if( FD_UNLIKELY( policy->iterf.shred_idx == UINT_MAX ) ) {
255+
ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_HIGHEST_SHRED, ele->slot, 0 );
256+
if( FD_UNLIKELY( ele->slot < highest_known_slot && !dedup_next( policy, key, now ) ) ) {
257+
// We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
258+
out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
259+
policy->nonce++;
260+
req_made = 1;
261+
}
262+
} else {
263+
ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_SHRED, ele->slot, policy->iterf.shred_idx );
264+
if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
265+
out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, policy->iterf.shred_idx );
266+
policy->nonce++;
267+
if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
268+
req_made = 1;
269+
}
270+
}
243271

244-
if( FD_UNLIKELY( forest->iter.shred_idx == UINT_MAX ) ) {
245-
if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
246-
// We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
247-
out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
248-
policy->nonce++;
249-
req_made = 1;
272+
/* Even if we have a request ready, we need to advance the iterator.
273+
Otherwise on the next call of policy_next, we'll try to re-request the
274+
same shred and it will get deduped. */
275+
276+
policy->iterf = fd_forest_iter_next( policy->iterf, forest );
277+
if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
278+
policy->iterf = fd_forest_iter_init( forest );
279+
break;
250280
}
251-
} else {
252-
out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, forest->iter.shred_idx );
253-
policy->nonce++;
254-
if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
255-
req_made = 1;
256281
}
257282

258283
if( FD_UNLIKELY( !req_made ) ) return NULL;
@@ -340,6 +365,12 @@ fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, lo
340365
}
341366
}
342367

368+
void
369+
fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest ) {
370+
policy->iterf = fd_forest_iter_init( forest );
371+
policy->tsreset = ts_ms( fd_log_wallclock() );
372+
}
373+
343374
void
344375
fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
345376
policy->turbine_slot0 = slot;

src/discof/repair/fd_policy.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ struct fd_policy {
180180
long tsmax; /* maximum time for an iteration before resetting the DFS to root */
181181
long tsref; /* reference timestamp for resetting DFS */
182182

183+
fd_forest_iter_t iterf; /* forest iterator */
183184
ulong tsreset; /* ms timestamp of last reset of iterf */
184185

185186
ulong turbine_slot0;
@@ -286,4 +287,7 @@ fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, lo
286287
void
287288
fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot );
288289

290+
void
291+
fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest );
292+
289293
#endif /* HEADER_fd_src_discof_repair_fd_policy_h */

src/discof/repair/fd_repair_tile.c

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ struct ctx {
310310

311311
/* Slot-level metrics */
312312
fd_repair_metrics_t * slot_metrics;
313+
313314
ulong turbine_slot0; // catchup considered complete after this slot
314315
};
315316
typedef struct ctx ctx_t;
@@ -619,8 +620,8 @@ after_sign( ctx_t * ctx,
619620
}
620621

621622
int is_regular_request = pending->msg.kind != FD_REPAIR_KIND_PONG && pending->msg.shred.nonce > 0;
622-
if( FD_LIKELY( is_regular_request && pending->msg.kind == FD_REPAIR_KIND_SHRED ) ) {
623-
fd_inflights_request_insert( ctx->inflight, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
623+
if( FD_LIKELY( is_regular_request ) ) {
624+
fd_inflights_request_insert( ctx->inflight, pending->msg.shred.nonce, &pending->msg.shred.to );
624625
fd_policy_peer_request_update( ctx->policy, &pending->msg.shred.to );
625626
}
626627
send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
@@ -649,6 +650,12 @@ after_shred( ctx_t * ctx,
649650
int ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
650651
fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
651652
fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick, src );
653+
654+
/* Check if there are FECs to force complete. Algorithm: window
655+
through the idxs in interval [i, j). If j = next fec_set_idx
656+
then we know we can force complete the FEC set interval [i, j)
657+
(assuming it wasn't already completed based on `cmpl`). */
658+
652659
} else {
653660
fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx );
654661
}
@@ -744,6 +751,7 @@ after_frag( ctx_t * ctx,
744751
uint in_kind = ctx->in_kind[ in_idx ];
745752
if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
746753
fd_forest_init( ctx->forest, 0 );
754+
fd_policy_reset( ctx->policy, ctx->forest );
747755
return;
748756
}
749757

@@ -759,7 +767,10 @@ after_frag( ctx_t * ctx,
759767

760768
if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
761769
fd_tower_slot_done_t const * msg = (fd_tower_slot_done_t const *)fd_type_pun_const( ctx->buffer );
762-
if( FD_LIKELY( msg->new_root ) ) fd_forest_publish( ctx->forest, msg->root_slot );
770+
if( FD_LIKELY( msg->new_root ) ) {
771+
fd_forest_publish( ctx->forest, msg->root_slot );
772+
fd_policy_reset ( ctx->policy, ctx->forest );
773+
}
763774
return;
764775
}
765776

@@ -849,7 +860,19 @@ after_frag( ctx_t * ctx,
849860
}
850861
}
851862
}
852-
/* update metrics */
863+
864+
ulong max_repaired_slot = 0;
865+
fd_forest_conslist_t const * conslist = fd_forest_conslist_const( ctx->forest );
866+
fd_forest_cns_t const * conspool = fd_forest_conspool_const( ctx->forest );
867+
fd_forest_blk_t const * pool = fd_forest_pool_const( ctx->forest );
868+
for( fd_forest_conslist_iter_t iter = fd_forest_conslist_iter_fwd_init( conslist, conspool );
869+
!fd_forest_conslist_iter_done( iter, conslist, conspool );
870+
iter = fd_forest_conslist_iter_fwd_next( iter, conslist, conspool ) ) {
871+
fd_forest_cns_t const * ele = fd_forest_conslist_iter_ele_const( iter, conslist, conspool );
872+
fd_forest_blk_t const * ele_ = fd_forest_pool_ele_const( pool, ele->forest_pool_idx );
873+
if( ele_->slot > max_repaired_slot ) max_repaired_slot = ele_->slot;
874+
}
875+
ctx->metrics->repaired_slots = max_repaired_slot;
853876
return;
854877
}
855878

@@ -875,6 +898,7 @@ after_credit( ctx_t * ctx,
875898
int * opt_poll_in FD_PARAM_UNUSED,
876899
int * charge_busy ) {
877900
long now = fd_log_wallclock();
901+
878902
*charge_busy = 1;
879903

880904
/* Verify that there is at least one sign tile with available credits.
@@ -890,20 +914,6 @@ after_credit( ctx_t * ctx,
890914
return;
891915
}
892916

893-
/* TODO make sure not to re-request inflights for stale slots */
894-
895-
if( FD_UNLIKELY( fd_inflights_should_drain( ctx->inflight, now ) ) ) {
896-
ulong nonce; ulong slot; ulong shred_idx;
897-
fd_inflights_request_pop( ctx->inflight, &nonce, &slot, &shred_idx );
898-
fd_forest_blk_t * blk = fd_forest_query( ctx->forest, slot );
899-
if( FD_UNLIKELY( !fd_forest_blk_idxs_test( blk->idxs, shred_idx ) ) ) {
900-
fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
901-
fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)((ulong)now / 1e6L), (uint)nonce, slot, shred_idx );
902-
fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
903-
return;
904-
}
905-
}
906-
907917
fd_repair_msg_t const * cout = fd_policy_next( ctx->policy, ctx->forest, ctx->protocol, now, ctx->metrics->current_slot );
908918
if( FD_UNLIKELY( !cout ) ) return;
909919

@@ -1115,8 +1125,6 @@ populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
11151125

11161126
static inline void
11171127
metrics_write( ctx_t * ctx ) {
1118-
ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
1119-
11201128
FD_MCNT_SET( REPAIR, CURRENT_SLOT, ctx->metrics->current_slot );
11211129
FD_MCNT_SET( REPAIR, REPAIRED_SLOTS, ctx->metrics->repaired_slots );
11221130
FD_MCNT_SET( REPAIR, REQUEST_PEERS, fd_peer_pool_used( ctx->policy->peers.pool ) );

0 commit comments

Comments
 (0)