Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ fd_quic_trace_log_tile( fd_frag_meta_t const * in_mcache ) {
/* cons_cnt */ 0UL,
/* cons_out */ NULL,
/* cons_fseq */ NULL,
NULL,
NULL,
/* stem_burst */ 1UL,
/* stem_lazy */ 0L,
/* rng */ rng,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ fd_quic_trace_rx_tile( fd_quic_trace_ctx_t * trace_ctx,
/* cons_cnt */ 0UL,
/* cons_out */ NULL,
/* cons_fseq */ NULL,
NULL,
NULL,
/* stem_burst */ 1UL,
/* stem_lazy */ 0L,
/* rng */ rng,
Expand Down
23 changes: 23 additions & 0 deletions src/disco/stem/fd_stem.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,17 @@
#endif
#define STEM_(n) FD_EXPAND_THEN_CONCAT3(STEM_NAME,_,n)

#ifndef STEM_DO_LINK_BURST
#define STEM_DO_LINK_BURST (0)
#endif

#ifndef STEM_BURST
#if STEM_DO_LINK_BURST
#define STEM_BURST (0UL)
#else
#error "STEM_BURST must be defined"
#endif
#endif

#ifndef STEM_CALLBACK_CONTEXT_TYPE
#error "STEM_CALLBACK_CONTEXT_TYPE must be defined"
Expand Down Expand Up @@ -241,11 +249,15 @@ STEM_(run1)( ulong in_cnt,
ulong cons_cnt,
ulong * _cons_out,
ulong ** _cons_fseq,
int * link_kind,
ulong * link_burst,
ulong burst,
long lazy,
fd_rng_t * rng,
void * scratch,
STEM_CALLBACK_CONTEXT_TYPE * ctx ) {
(void)link_kind;
(void)link_burst;
/* in frag stream state */
ulong in_seq; /* current position in input poll sequence, in [0,in_cnt) */
fd_stem_tile_in_t * in; /* in[in_seq] for in_seq in [0,in_cnt) has information about input fragment stream currently at
Expand Down Expand Up @@ -505,6 +517,8 @@ STEM_(run1)( ulong in_cnt,
.cr_avail = cr_avail,
.min_cr_avail = &min_cr_avail,
.cr_decrement_amount = fd_ulong_if( out_cnt>0UL, 1UL, 0UL ),
.link_kind = link_kind,
.burst = link_burst,
};
#endif

Expand Down Expand Up @@ -740,6 +754,12 @@ STEM_(run)( fd_topo_t * topo,
FD_TEST( out_mcache[ i ] );
}

ulong link_burst[ FD_TOPO_MAX_LINKS ];
for( ulong i=0UL; i<tile->out_cnt; i++ ) {
link_burst[ i ] = topo->links[ tile->out_link_id[ i ] ].burst;
}

int link_kind[ FD_TOPO_MAX_LINKS ];
ulong reliable_cons_cnt = 0UL;
ulong cons_out[ FD_TOPO_MAX_LINKS ];
ulong * cons_fseq[ FD_TOPO_MAX_LINKS ];
Expand All @@ -750,6 +770,7 @@ STEM_(run)( fd_topo_t * topo,
if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) {
cons_out[ reliable_cons_cnt ] = k;
cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ];
link_kind[ k ] = RELIABLE_LINK;
FD_TEST( cons_fseq[ reliable_cons_cnt ] );
reliable_cons_cnt++;
/* Need to test this, since each link may connect to many outs,
Expand All @@ -774,6 +795,8 @@ STEM_(run)( fd_topo_t * topo,
reliable_cons_cnt,
cons_out,
cons_fseq,
link_kind,
link_burst,
STEM_BURST,
STEM_LAZY,
rng,
Expand Down
41 changes: 33 additions & 8 deletions src/disco/stem/fd_stem.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

#define FD_STEM_SCRATCH_ALIGN (128UL)

#define RELIABLE_LINK (0)
#define UNRELIABLE_LINK (1)

struct fd_stem_context {
fd_frag_meta_t ** mcaches;
ulong * seqs;
ulong * depths;
fd_frag_meta_t ** mcaches;
ulong * seqs;
ulong * depths;

ulong * cr_avail;
ulong * min_cr_avail;
ulong cr_decrement_amount;
ulong * cr_avail;
ulong * min_cr_avail;
ulong cr_decrement_amount;
int * link_kind;
ulong * burst;
};

typedef struct fd_stem_context fd_stem_context_t;
Expand All @@ -31,7 +36,19 @@ struct __attribute__((aligned(64))) fd_stem_tile_in {

typedef struct fd_stem_tile_in fd_stem_tile_in_t;

static inline void
static inline int
fd_stem_link_ready( fd_stem_context_t * stem,
ulong out_idx ) {
return stem->link_kind[ out_idx ]==RELIABLE_LINK ? stem->cr_avail[ out_idx ]>=stem->burst[ out_idx ] : 1;
}

static inline ulong
fd_stem_link_cr_avail( fd_stem_context_t * stem,
ulong out_idx ) {
return stem->link_kind[ out_idx ]==RELIABLE_LINK ? stem->cr_avail[ out_idx ] : ULONG_MAX;
}

static inline int
fd_stem_publish( fd_stem_context_t * stem,
ulong out_idx,
ulong sig,
Expand All @@ -40,12 +57,20 @@ fd_stem_publish( fd_stem_context_t * stem,
ulong ctl,
ulong tsorig,
ulong tspub ) {
if( FD_LIKELY( stem->link_kind[ out_idx ]==RELIABLE_LINK ) ) {
if( FD_UNLIKELY( stem->cr_avail[ out_idx ]<stem->cr_decrement_amount ) ) return -1;
}

ulong * seqp = &stem->seqs[ out_idx ];
ulong seq = *seqp;
fd_mcache_publish( stem->mcaches[ out_idx ], stem->depths[ out_idx ], seq, sig, chunk, sz, ctl, tsorig, tspub );
stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount;
if( FD_LIKELY( stem->link_kind[ out_idx ]==RELIABLE_LINK ) ) {
FD_TEST( stem->cr_avail[ out_idx ]>=stem->cr_decrement_amount );
stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount;
}
*stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail );
*seqp = fd_seq_inc( seq, 1UL );
return 0;
}

static inline ulong
Expand Down
Loading