Skip to content

Cleanup stake_ci uses, factor out multi-epoch leader #5211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions src/app/rpcserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "sham_link.h"

#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
#define SHAM_LINK_STATE fd_stake_ci_t
#define SHAM_LINK_STATE fd_multi_epoch_leaders_t
#define SHAM_LINK_NAME stake_sham_link
#include "sham_link.h"

Expand Down Expand Up @@ -61,11 +61,8 @@ init_args( int * argc, char *** argv, fd_rpcserver_args_t * args ) {
FD_LOG_NOTICE(( "blockstore has slot root=%lu", args->blockstore->shmem->wmk ));
fd_wksp_mprotect( wksp, 1 );

fd_pubkey_t identity_key[1]; /* Just the public key */
memset( identity_key, 0xa5, sizeof(fd_pubkey_t) );
args->stake_ci = fd_stake_ci_join( fd_stake_ci_new( aligned_alloc( fd_stake_ci_align(), fd_stake_ci_footprint() ), identity_key ) );

args->port = (ushort)fd_env_strip_cmdline_ulong( argc, argv, "--port", NULL, 8899 );
args->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( aligned_alloc( fd_multi_epoch_leaders_align(), fd_multi_epoch_leaders_footprint() ) ) );
args->port = (ushort)fd_env_strip_cmdline_ulong( argc, argv, "--port", NULL, 8899 );

args->params.max_connection_cnt = fd_env_strip_cmdline_ulong( argc, argv, "--max-connection-cnt", NULL, 30 );
args->params.max_ws_connection_cnt = fd_env_strip_cmdline_ulong( argc, argv, "--max-ws-connection-cnt", NULL, 10 );
Expand Down Expand Up @@ -221,7 +218,7 @@ int main( int argc, char ** argv ) {
fd_replay_notif_msg_t msg;
replay_sham_link_poll( rep_notify, ctx, &msg );

stake_sham_link_poll( stake_notify, ctx, args.stake_ci );
stake_sham_link_poll( stake_notify, ctx, args.leaders );

fd_rpc_ws_poll( ctx );
}
Expand All @@ -241,11 +238,11 @@ replay_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) {
}

static void
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_stake_ci_t * state, void const * msg, int sz) {
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz) {
fd_rpc_stake_during_frag( ctx, state, msg, sz );
}

static void
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_stake_ci_t * state) {
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state) {
fd_rpc_stake_after_frag( ctx, state );
}
2 changes: 0 additions & 2 deletions src/disco/pack/fd_pack_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#define IN_KIND_BANK (2UL)
#define IN_KIND_SIGN (3UL)

#define MAX_SLOTS_PER_EPOCH 432000UL

/* Pace microblocks, but only slightly. This helps keep performance
more stable. This limit is 2,000 microblocks/second/bank. At 31
transactions/microblock, that's 62k txn/sec/bank. */
Expand Down
8 changes: 1 addition & 7 deletions src/disco/shred/fd_shred_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@
have a max and use statically sized arrays than alloca. */
#define MAX_BANK_CNT 64UL

/* MAX_SHRED_DESTS indicates the maximum number of destinations (i.e. a
pubkey -> ip, port) that the shred tile can keep track of. */
#define MAX_SHRED_DESTS 40200UL

#define FD_SHRED_TILE_SCRATCH_ALIGN 128UL

#define IN_KIND_CONTACT (0UL)
Expand All @@ -102,8 +98,6 @@
#define NET_OUT_IDX 1
#define SIGN_OUT_IDX 2

#define MAX_SLOTS_PER_EPOCH 432000UL

#define DCACHE_ENTRIES_PER_FEC_SET (4UL)
FD_STATIC_ASSERT( sizeof(fd_shred34_t) < USHORT_MAX, shred_34 );
FD_STATIC_ASSERT( 34*DCACHE_ENTRIES_PER_FEC_SET >= FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX, shred_34 );
Expand Down Expand Up @@ -397,7 +391,7 @@ during_frag( fd_shred_ctx_t * ctx,
ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));

uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
fd_stake_ci_stake_msg_init( ctx->stake_ci, fd_type_pun_const( dcache_entry ) );
return;
}

Expand Down
28 changes: 10 additions & 18 deletions src/disco/shred/fd_stake_ci.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,19 @@ void * fd_stake_ci_delete( void * mem ) { return mem; }


void
fd_stake_ci_stake_msg_init( fd_stake_ci_t * info,
uchar const * new_message ) {
ulong const * hdr = fd_type_pun_const( new_message );

ulong epoch = hdr[ 0 ];
ulong staked_cnt = hdr[ 1 ];
ulong start_slot = hdr[ 2 ];
ulong slot_cnt = hdr[ 3 ];
ulong excluded_stake = hdr[ 4 ];

if( FD_UNLIKELY( staked_cnt > MAX_SHRED_DESTS ) )
fd_stake_ci_stake_msg_init( fd_stake_ci_t * info,
fd_stake_weight_msg_t const * msg ) {
if( FD_UNLIKELY( msg->staked_cnt > MAX_SHRED_DESTS ) )
FD_LOG_ERR(( "The stakes -> Firedancer splice sent a malformed update with %lu stakes in it,"
" but the maximum allowed is %lu", staked_cnt, MAX_SHRED_DESTS ));
" but the maximum allowed is %lu", msg->staked_cnt, MAX_SHRED_DESTS ));

info->scratch->epoch = epoch;
info->scratch->start_slot = start_slot;
info->scratch->slot_cnt = slot_cnt;
info->scratch->staked_cnt = staked_cnt;
info->scratch->excluded_stake = excluded_stake;
info->scratch->epoch = msg->epoch;
info->scratch->start_slot = msg->start_slot;
info->scratch->slot_cnt = msg->slot_cnt;
info->scratch->staked_cnt = msg->staked_cnt;
info->scratch->excluded_stake = msg->excluded_stake;

fd_memcpy( info->stake_weight, hdr+5UL, sizeof(fd_stake_weight_t)*staked_cnt );
fd_memcpy( info->stake_weight, msg->weights, msg->staked_cnt*sizeof(fd_stake_weight_t) );
}

static inline void
Expand Down
22 changes: 10 additions & 12 deletions src/disco/shred/fd_stake_ci.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
#include "fd_shred_dest.h"
#include "../../flamenco/leaders/fd_leaders.h"

#define MAX_SHRED_DESTS 40200UL
#define MAX_SLOTS_PER_EPOCH 432000UL
#define MAX_SHRED_DESTS MAX_STAKED_LEADERS
/* staked+unstaked <= MAX_SHRED_DESTS implies
MAX_SHRED_DEST_FOOTPRINT>=fd_shred_dest_footprint( staked, unstaked )
This is asserted in the tests. The size of fd_shred_dest_t, varies
Expand Down Expand Up @@ -119,12 +118,11 @@ void * fd_stake_ci_delete( void * mem );
need to cancel an operation that begun but didn't finish. Calling
init multiple times without calling fini will not leak any resources.

new_message should be a pointer to the first byte of the dcache entry
containing the stakes update. new_message will be accessed
new_message[i] for i in [0, FD_STAKE_CI_STAKE_MSG_SZ). new_message
must contain at least one staked pubkey, and the pubkeys must be
sorted in the usual way (by stake descending, ties broken by pubkey
ascending).
msg should be a pointer to the first byte of the dcache entry
containing the stakes update. msg will be accessed msg->weights[i]
for i in [0, msg->staked_cnt). msg must contain at least one
staked pubkey, and the pubkeys must be sorted in the usual way (by
stake descending, ties broken by pubkey ascending).

fd_stake_ci_dest_add_init behaves slightly differently and returns a
pointer to the first element of an array of size MAX_SHRED_DESTS-1 to
Expand All @@ -148,10 +146,10 @@ void * fd_stake_ci_delete( void * mem );
contact info will be preserved. If a stake message doesn't have
contact info for an unstaked node, on the other hand, that node will
be deleted from the list. */
void fd_stake_ci_stake_msg_init( fd_stake_ci_t * info, uchar const * new_message );
void fd_stake_ci_stake_msg_fini( fd_stake_ci_t * info );
fd_shred_dest_weighted_t * fd_stake_ci_dest_add_init ( fd_stake_ci_t * info );
void fd_stake_ci_dest_add_fini ( fd_stake_ci_t * info, ulong cnt );
void fd_stake_ci_stake_msg_init( fd_stake_ci_t * info, fd_stake_weight_msg_t const * msg );
void fd_stake_ci_stake_msg_fini( fd_stake_ci_t * info );
fd_shred_dest_weighted_t * fd_stake_ci_dest_add_init ( fd_stake_ci_t * info );
void fd_stake_ci_dest_add_fini ( fd_stake_ci_t * info, ulong cnt );


/* fd_stake_ci_set_identity changes the identity of the locally running
Expand Down
30 changes: 10 additions & 20 deletions src/disco/shred/test_stake_ci.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "fd_stake_ci.h"

#define SLOTS_PER_EPOCH 1000 /* Just for testing */

fd_stake_ci_t _info[1];
Expand All @@ -8,20 +7,11 @@ uchar stake_msg[ FD_STAKE_CI_STAKE_MSG_SZ ];

fd_pubkey_t identity_key[1];

typedef struct {
ulong epoch;
ulong staked_cnt;
ulong start_slot;
ulong slot_cnt;
ulong excluded_stake;
fd_stake_weight_t weights[];
} stake_msg_hdr_t;

static uchar *
static fd_stake_weight_msg_t *
generate_stake_msg( uchar * _buf,
ulong epoch,
char const * stakers ) {
stake_msg_hdr_t *buf = (stake_msg_hdr_t *)_buf;
fd_stake_weight_msg_t *buf = fd_type_pun( _buf );

buf->epoch = epoch;
buf->start_slot = epoch * SLOTS_PER_EPOCH;
Expand All @@ -34,7 +24,7 @@ generate_stake_msg( uchar * _buf,
memset( buf->weights[i].key.uc, *stakers, sizeof(fd_pubkey_t) );
buf->weights[i].stake = 1000UL/(i+1UL);
}
return _buf;
return fd_type_pun( _buf );
}

static ulong
Expand Down Expand Up @@ -346,12 +336,12 @@ test_limits( void ) {
fd_stake_ci_t * info = fd_stake_ci_join( fd_stake_ci_new( _info, identity_key ) );

for( ulong stake_weight_cnt=40198UL; stake_weight_cnt<=40201UL; stake_weight_cnt++ ) {
stake_msg_hdr_t * buf = (stake_msg_hdr_t *)stake_msg;
buf->epoch = stake_weight_cnt;
buf->start_slot = stake_weight_cnt * SLOTS_PER_EPOCH;
buf->slot_cnt = SLOTS_PER_EPOCH;
buf->staked_cnt = 0UL;
buf->excluded_stake = 0UL;
fd_stake_weight_msg_t * buf = fd_type_pun( stake_msg );
buf->epoch = stake_weight_cnt;
buf->start_slot = stake_weight_cnt * SLOTS_PER_EPOCH;
buf->slot_cnt = SLOTS_PER_EPOCH;
buf->staked_cnt = 0UL;
buf->excluded_stake = 0UL;

for( ulong i=0UL; i<stake_weight_cnt; i++ ) {
ulong stake = 2000000000UL/(i+1UL);
Expand All @@ -364,7 +354,7 @@ test_limits( void ) {
buf->excluded_stake += stake;
}
}
fd_stake_ci_stake_msg_init( info, stake_msg );
fd_stake_ci_stake_msg_init( info, buf );
fd_stake_ci_stake_msg_fini( info );

for( ulong cluster_info_cnt=40198UL; cluster_info_cnt<=40201UL; cluster_info_cnt++ ) {
Expand Down
43 changes: 13 additions & 30 deletions src/discof/poh/fd_poh_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,11 @@
#include "../../disco/metrics/fd_metrics.h"
#include "../../util/pod/fd_pod_format.h"
#include "../../disco/shred/fd_shredder.h"
#include "../../disco/shred/fd_stake_ci.h"
#include "../../disco/keyguard/fd_keyload.h"
#include "../../disco/keyguard/fd_keyswitch.h"
#include "../../disco/metrics/generated/fd_metrics_poh.h"
#include "../../disco/plugin/fd_plugin.h"
#include "../../flamenco/leaders/fd_leaders.h"
#include "../../flamenco/leaders/fd_multi_epoch_leaders.h"

#include <string.h>

Expand Down Expand Up @@ -501,7 +500,7 @@ typedef struct {

fd_sha256_t * sha256;

fd_stake_ci_t * stake_ci;
fd_multi_epoch_leaders_t * mleaders;

/* The last sequence number of an outgoing fragment to the shred tile,
or ULONG max if no such fragment. See fd_keyswitch.h for details
Expand Down Expand Up @@ -548,6 +547,8 @@ typedef struct {

ulong parent_slot;
uchar parent_block_id[ 32 ];

uchar __attribute__((aligned(FD_MULTI_EPOCH_LEADERS_ALIGN))) mleaders_mem[ FD_MULTI_EPOCH_LEADERS_FOOTPRINT ];
} fd_poh_ctx_t;

/* The PoH recorder is implemented in Firedancer but for now needs to
Expand Down Expand Up @@ -1136,19 +1137,7 @@ next_leader_slot( fd_poh_ctx_t * ctx ) {
/* If we have published anything in a particular slot, then we
should never become leader for that slot again. */
ulong min_leader_slot = fd_ulong_max( ctx->slot, fd_ulong_if( ctx->highwater_leader_slot==ULONG_MAX, 0UL, ctx->highwater_leader_slot ) );

for(;;) {
fd_epoch_leaders_t * leaders = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, min_leader_slot ); /* Safe to call from Rust */
if( FD_UNLIKELY( !leaders ) ) break;

while( min_leader_slot<(leaders->slot0+leaders->slot_cnt) ) {
fd_pubkey_t const * leader = fd_epoch_leaders_get( leaders, min_leader_slot ); /* Safe to call from Rust */
if( FD_UNLIKELY( !memcmp( leader->key, ctx->identity_key.key, 32UL ) ) ) return min_leader_slot;
min_leader_slot++;
}
}

return ULONG_MAX;
return fd_multi_epoch_leaders_get_next_slot( ctx->mleaders, min_leader_slot, &ctx->identity_key );
}

extern int
Expand Down Expand Up @@ -1180,7 +1169,6 @@ maybe_change_identity( fd_poh_ctx_t * ctx,
}

memcpy( ctx->identity_key.uc, ctx->keyswitch->bytes+32UL, 32UL );
fd_stake_ci_set_identity( ctx->stake_ci, &ctx->identity_key );

/* When we switch key, we might have ticked part way through a slot
that we are now leader in. This violates the contract of the
Expand Down Expand Up @@ -1350,15 +1338,12 @@ fd_ext_poh_get_leader_after_n_slots( ulong n,
uchar out_pubkey[ static 32 ] ) {
fd_poh_ctx_t * ctx = fd_ext_poh_write_lock();
ulong slot = ctx->slot + n;
fd_epoch_leaders_t * leaders = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, slot ); /* Safe to call from Rust */
fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, slot );

int copied = 0;
if( FD_LIKELY( leaders ) ) {
fd_pubkey_t const * leader = fd_epoch_leaders_get( leaders, slot ); /* Safe to call from Rust */
if( FD_LIKELY( leader ) ) {
memcpy( out_pubkey, leader, 32UL );
copied = 1;
}
if( FD_LIKELY( leader ) ) {
memcpy( out_pubkey, leader, 32UL );
copied = 1;
}
fd_ext_poh_write_unlock();
return copied;
Expand All @@ -1374,7 +1359,6 @@ scratch_footprint( fd_topo_tile_t const * tile ) {
(void)tile;
ulong l = FD_LAYOUT_INIT;
l = FD_LAYOUT_APPEND( l, alignof( fd_poh_ctx_t ), sizeof( fd_poh_ctx_t ) );
l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
l = FD_LAYOUT_APPEND( l, FD_SHA256_ALIGN, FD_SHA256_FOOTPRINT );
return FD_LAYOUT_FINI( l, scratch_align() );
}
Expand Down Expand Up @@ -1821,7 +1805,7 @@ during_frag( fd_poh_ctx_t * ctx,
ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));

uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
fd_multi_epoch_leaders_stake_msg_init( ctx->mleaders, fd_type_pun_const( dcache_entry ) );
return;
}

Expand Down Expand Up @@ -1957,7 +1941,7 @@ after_frag( fd_poh_ctx_t * ctx,
if( FD_UNLIKELY( ctx->skip_frag ) ) return;

if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
fd_stake_ci_stake_msg_fini( ctx->stake_ci );
fd_multi_epoch_leaders_stake_msg_fini( ctx->mleaders );
/* It might seem like we do not need to do state transitions in and
out of being the leader here, since leader schedule updates are
always one epoch in advance (whether we are leader or not would
Expand Down Expand Up @@ -2215,16 +2199,15 @@ unprivileged_init( fd_topo_t * topo,

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_poh_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_poh_ctx_t ), sizeof( fd_poh_ctx_t ) );
void * stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
void * sha256 = FD_SCRATCH_ALLOC_APPEND( l, FD_SHA256_ALIGN, FD_SHA256_FOOTPRINT );

#define NONNULL( x ) (__extension__({ \
__typeof__((x)) __x = (x); \
if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
__x; }))

ctx->stake_ci = NONNULL( fd_stake_ci_join( fd_stake_ci_new( stake_ci, &ctx->identity_key ) ) );
ctx->sha256 = NONNULL( fd_sha256_join( fd_sha256_new( sha256 ) ) );
ctx->mleaders = NONNULL( fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem ) ) );
ctx->sha256 = NONNULL( fd_sha256_join( fd_sha256_new( sha256 ) ) );
ctx->current_leader_bank = NULL;
ctx->signal_leader_change = NULL;

Expand Down
Loading
Loading