Skip to content

Commit 5cd9523

Browse files
author
Dmitry Polyakovsky
committed
swapdb_event
Signed-off-by: Dmitry Polyakovsky <[email protected]>
1 parent cc684fc commit 5cd9523

File tree

4 files changed

+79
-5
lines changed

4 files changed

+79
-5
lines changed

examples/server_events.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use valkey_module_macros::{
1313
client_changed_event_handler, config_changed_event_handler, cron_event_handler,
1414
flush_event_handler, fork_child_event_handler, key_event_handler,
1515
master_link_change_event_handler, persistence_event_handler, repl_async_load_event_handler,
16-
replica_change_event_handler, shutdown_event_handler,
16+
replica_change_event_handler, shutdown_event_handler, swapdb_event_handler,
1717
};
1818

1919
static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);
@@ -27,6 +27,7 @@ static IS_MASTER_LINK_UP: AtomicBool = AtomicBool::new(false);
2727
static NUM_FORK_CHILD_EVENTS: AtomicI64 = AtomicI64::new(0);
2828
static NUM_REPLICA_CHANGE_EVENTS: AtomicI64 = AtomicI64::new(0);
2929
static NUM_REPL_ASYNC_LOAD_EVENTS: AtomicI64 = AtomicI64::new(0);
30+
static NUM_SWAP_DB_EVENTS: AtomicI64 = AtomicI64::new(0);
3031

3132
#[flush_event_handler]
3233
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
@@ -179,6 +180,12 @@ fn repl_async_load_event_handler(ctx: &Context, repl_async_load_subevent: ReplAs
179180
NUM_REPL_ASYNC_LOAD_EVENTS.fetch_add(1, Ordering::SeqCst);
180181
}
181182

183+
#[swapdb_event_handler]
184+
fn swapdb_event_handler(ctx: &Context, _: u64) {
185+
NUM_SWAP_DB_EVENTS.fetch_add(1, Ordering::SeqCst);
186+
ctx.log_notice("Databases swapped");
187+
}
188+
182189
fn num_flushed(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
183190
Ok(ValkeyValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst)))
184191
}
@@ -235,7 +242,13 @@ fn num_repl_async_load_events(_ctx: &Context, _args: Vec<ValkeyString>) -> Valke
235242
))
236243
}
237244

238-
fn init(ctx: &Context, args: &[ValkeyString]) -> Status {
245+
fn num_swapdb_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
246+
Ok(ValkeyValue::Integer(
247+
NUM_SWAP_DB_EVENTS.load(Ordering::SeqCst),
248+
))
249+
}
250+
251+
fn init(ctx: &Context, _args: &[ValkeyString]) -> Status {
239252
// https://valkey.io/topics/modules-api-ref/#ValkeyModule_SetModuleOptions
240253
// otherwise you get: Skipping diskless-load because there are modules that are not aware of async replication.
241254
// needed for repl_async_load_event_handler
@@ -262,6 +275,7 @@ valkey_module! {
262275
["is_master_link_up", is_master_link_up, "readonly", 0, 0, 0],
263276
["num_fork_child_events", num_fork_child_events, "readonly", 0, 0, 0],
264277
["num_replica_change_events", num_replica_change_events, "readonly", 0, 0, 0],
265-
["num_repl_async_load_events", num_repl_async_load_events, "readonly", 0, 0, 0]
278+
["num_repl_async_load_events", num_repl_async_load_events, "readonly", 0, 0, 0],
279+
["num_swapdb_events", num_swapdb_events, "readonly", 0, 0, 0],
266280
]
267281
}

src/context/server_events.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ pub static REPLICA_CHANGE_SERVER_EVENTS_LIST: [fn(&Context, ReplicaChangeSubeven
140140
#[distributed_slice()]
141141
pub static REPL_ASYNC_LOAD_SERVER_EVENTS_LIST: [fn(&Context, ReplAsyncLoadSubevent)] = [..];
142142

143+
#[distributed_slice()]
144+
pub static SWAPDB_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
145+
143146
extern "C" fn cron_callback(
144147
ctx: *mut raw::RedisModuleCtx,
145148
_eid: raw::RedisModuleEvent,
@@ -401,6 +404,18 @@ extern "C" fn repl_async_load_event_callback(
401404
});
402405
}
403406

407+
extern "C" fn swapdb_callback(
408+
ctx: *mut raw::RedisModuleCtx,
409+
_eid: raw::RedisModuleEvent,
410+
subevent: u64,
411+
_data: *mut ::std::os::raw::c_void,
412+
) {
413+
let ctx = Context::new(ctx);
414+
SWAPDB_SERVER_EVENTS_LIST.iter().for_each(|callback| {
415+
callback(&ctx, subevent);
416+
});
417+
}
418+
404419
fn register_single_server_event_type<T>(
405420
ctx: &Context,
406421
callbacks: &[fn(&Context, T)],
@@ -511,5 +526,11 @@ pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
511526
raw::REDISMODULE_EVENT_REPL_ASYNC_LOAD,
512527
Some(repl_async_load_event_callback),
513528
)?;
529+
register_single_server_event_type(
530+
ctx,
531+
&SWAPDB_SERVER_EVENTS_LIST,
532+
raw::REDISMODULE_EVENT_SWAPDB,
533+
Some(swapdb_callback),
534+
)?;
514535
Ok(())
515536
}

tests/integration.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1943,8 +1943,25 @@ fn test_repl_asnc_load_event() -> Result<()> {
19431943

19441944
// check num_repl_async_load_events on the replica
19451945
let event_count1: i64 = redis::cmd("num_repl_async_load_events").query(&mut replica_con)?;
1946-
// started and completed fire so result is 2, aborted even tdoes not fire
1946+
// started and completed fire so result is 2, aborted event does not fire
19471947
assert_eq!(event_count1, 2);
19481948

19491949
Ok(())
19501950
}
1951+
1952+
#[test]
1953+
fn test_swapdb_event() -> Result<()> {
1954+
let port: u16 = 6528;
1955+
let _guards = vec![start_valkey_server_with_module("server_events", port)
1956+
.with_context(|| FAILED_TO_START_SERVER)?];
1957+
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
1958+
1959+
// run swapdb between db 0 and db 1
1960+
let _: () = redis::cmd("swapdb").arg(&["0", "1"]).query(&mut con)?;
1961+
1962+
// check swapdb event count
1963+
let event_count1: i64 = redis::cmd("num_swapdb_events").query(&mut con)?;
1964+
assert_eq!(event_count1, 1);
1965+
1966+
Ok(())
1967+
}

valkeymodule-rs-macros/src/lib.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,4 +614,26 @@ pub fn repl_async_load_event_handler(_attr: TokenStream, item: TokenStream) -> T
614614
#ast
615615
};
616616
gen.into()
617-
}
617+
}
618+
619+
/// Proc macro which is set on a function that need to be called whenever a swapdb event happens.
620+
/// The function must accept a [Context] and [u64].
621+
///
622+
/// Example:
623+
///
624+
/// ```rust,no_run,ignore
625+
/// #[swapdb_event_handler]
626+
/// fn swapdb_event_handler(ctx: &Context, _: u64) { ... }
627+
/// ```
628+
#[proc_macro_attribute]
629+
pub fn swapdb_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
630+
let ast: ItemFn = match syn::parse(item) {
631+
Ok(res) => res,
632+
Err(e) => return e.to_compile_error().into(),
633+
};
634+
let gen = quote! {
635+
#[linkme::distributed_slice(valkey_module::server_events::SWAPDB_SERVER_EVENTS_LIST)]
636+
#ast
637+
};
638+
gen.into()
639+
}

0 commit comments

Comments
 (0)