diff --git a/src/app/shared_dev/commands/quic_trace/fd_quic_trace_log_tile.c b/src/app/shared_dev/commands/quic_trace/fd_quic_trace_log_tile.c index 2ebdf453673..f4faad8515a 100644 --- a/src/app/shared_dev/commands/quic_trace/fd_quic_trace_log_tile.c +++ b/src/app/shared_dev/commands/quic_trace/fd_quic_trace_log_tile.c @@ -78,6 +78,8 @@ fd_quic_trace_log_tile( fd_frag_meta_t const * in_mcache ) { /* cons_cnt */ 0UL, /* cons_out */ NULL, /* cons_fseq */ NULL, + NULL, + NULL, /* stem_burst */ 1UL, /* stem_lazy */ 0L, /* rng */ rng, diff --git a/src/app/shared_dev/commands/quic_trace/fd_quic_trace_rx_tile.c b/src/app/shared_dev/commands/quic_trace/fd_quic_trace_rx_tile.c index a8463e4423e..776cce48547 100644 --- a/src/app/shared_dev/commands/quic_trace/fd_quic_trace_rx_tile.c +++ b/src/app/shared_dev/commands/quic_trace/fd_quic_trace_rx_tile.c @@ -547,6 +547,8 @@ fd_quic_trace_rx_tile( fd_quic_trace_ctx_t * trace_ctx, /* cons_cnt */ 0UL, /* cons_out */ NULL, /* cons_fseq */ NULL, + NULL, + NULL, /* stem_burst */ 1UL, /* stem_lazy */ 0L, /* rng */ rng, diff --git a/src/disco/stem/fd_stem.c b/src/disco/stem/fd_stem.c index 26ee58d6510..b637e33ff5d 100644 --- a/src/disco/stem/fd_stem.c +++ b/src/disco/stem/fd_stem.c @@ -178,9 +178,17 @@ #endif #define STEM_(n) FD_EXPAND_THEN_CONCAT3(STEM_NAME,_,n) +#ifndef STEM_DO_LINK_BURST +#define STEM_DO_LINK_BURST (0) +#endif + #ifndef STEM_BURST +#if STEM_DO_LINK_BURST +#define STEM_BURST (0UL) +#else #error "STEM_BURST must be defined" #endif +#endif #ifndef STEM_CALLBACK_CONTEXT_TYPE #error "STEM_CALLBACK_CONTEXT_TYPE must be defined" @@ -241,11 +249,15 @@ STEM_(run1)( ulong in_cnt, ulong cons_cnt, ulong * _cons_out, ulong ** _cons_fseq, + int * link_kind, + ulong * link_burst, ulong burst, long lazy, fd_rng_t * rng, void * scratch, STEM_CALLBACK_CONTEXT_TYPE * ctx ) { + (void)link_kind; + (void)link_burst; /* in frag stream state */ ulong in_seq; /* current position in input poll sequence, in [0,in_cnt) */ fd_stem_tile_in_t * in; /* in[in_seq] for in_seq in [0,in_cnt) has information about input fragment stream currently at @@ -505,6 +517,8 @@ STEM_(run1)( ulong in_cnt, .cr_avail = cr_avail, .min_cr_avail = &min_cr_avail, .cr_decrement_amount = fd_ulong_if( out_cnt>0UL, 1UL, 0UL ), + .link_kind = link_kind, + .burst = link_burst, }; #endif @@ -740,6 +754,12 @@ STEM_(run)( fd_topo_t * topo, FD_TEST( out_mcache[ i ] ); } + ulong link_burst[ FD_TOPO_MAX_LINKS ]; + for( ulong i=0UL; iout_cnt; i++ ) { + link_burst[ i ] = topo->links[ tile->out_link_id[ i ] ].burst; + } + + int link_kind[ FD_TOPO_MAX_LINKS ]; ulong reliable_cons_cnt = 0UL; ulong cons_out[ FD_TOPO_MAX_LINKS ]; ulong * cons_fseq[ FD_TOPO_MAX_LINKS ]; @@ -750,6 +770,7 @@ STEM_(run)( fd_topo_t * topo, if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) { cons_out[ reliable_cons_cnt ] = k; cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ]; + link_kind[ k ] = RELIABLE_LINK; FD_TEST( cons_fseq[ reliable_cons_cnt ] ); reliable_cons_cnt++; /* Need to test this, since each link may connect to many outs, @@ -774,6 +795,8 @@ STEM_(run)( fd_topo_t * topo, reliable_cons_cnt, cons_out, cons_fseq, + link_kind, + link_burst, STEM_BURST, STEM_LAZY, rng, diff --git a/src/disco/stem/fd_stem.h b/src/disco/stem/fd_stem.h index d8e9777fab8..4c25b87cf8e 100644 --- a/src/disco/stem/fd_stem.h +++ b/src/disco/stem/fd_stem.h @@ -5,14 +5,19 @@ #define FD_STEM_SCRATCH_ALIGN (128UL) +#define RELIABLE_LINK (0) +#define UNRELIABLE_LINK (1) + struct fd_stem_context { - fd_frag_meta_t ** mcaches; - ulong * seqs; - ulong * depths; + fd_frag_meta_t ** mcaches; + ulong * seqs; + ulong * depths; - ulong * cr_avail; - ulong * min_cr_avail; - ulong cr_decrement_amount; + ulong * cr_avail; + ulong * min_cr_avail; + ulong cr_decrement_amount; + int * link_kind; + ulong * burst; }; typedef struct fd_stem_context fd_stem_context_t; @@ -31,7 +36,19 @@ struct __attribute__((aligned(64))) fd_stem_tile_in { typedef struct fd_stem_tile_in fd_stem_tile_in_t; -static inline void +static inline int +fd_stem_link_ready( fd_stem_context_t * stem, + ulong out_idx ) { + return stem->link_kind[ out_idx ]==RELIABLE_LINK ? stem->cr_avail[ out_idx ]>=stem->burst[ out_idx ] : 1; +} + +static inline ulong +fd_stem_link_cr_avail( fd_stem_context_t * stem, + ulong out_idx ) { + return stem->link_kind[ out_idx ]==RELIABLE_LINK ? stem->cr_avail[ out_idx ] : ULONG_MAX; +} + +static inline int fd_stem_publish( fd_stem_context_t * stem, ulong out_idx, ulong sig, @@ -40,12 +57,20 @@ fd_stem_publish( fd_stem_context_t * stem, ulong ctl, ulong tsorig, ulong tspub ) { + if( FD_LIKELY( stem->link_kind[ out_idx ]==RELIABLE_LINK ) ) { + if( FD_UNLIKELY( stem->cr_avail[ out_idx ]cr_decrement_amount ) ) return -1; + } + ulong * seqp = &stem->seqs[ out_idx ]; ulong seq = *seqp; fd_mcache_publish( stem->mcaches[ out_idx ], stem->depths[ out_idx ], seq, sig, chunk, sz, ctl, tsorig, tspub ); - stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount; + if( FD_LIKELY( stem->link_kind[ out_idx ]==RELIABLE_LINK ) ) { + FD_TEST( stem->cr_avail[ out_idx ]>=stem->cr_decrement_amount ); + stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount; + } *stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail ); *seqp = fd_seq_inc( seq, 1UL ); + return 0; } static inline ulong