Skip to content

Commit cc684fc

Browse files
author
Dmitry Polyakovsky
committed
repl_async_load_event
1 parent 107e701 commit cc684fc

File tree

5 files changed

+143
-9
lines changed

5 files changed

+143
-9
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ src/redisraw/bindings.rs
1313
.vscode
1414

1515
# Valkey database
16-
dump.rdb
16+
*.rdb
1717

1818
# Debugger-related files:
1919
.gdb_history

examples/server_events.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
33
use valkey_module::alloc::ValkeyAlloc;
44
use valkey_module::server_events::{
55
ClientChangeSubevent, ForkChildSubevent, KeyChangeSubevent, MasterLinkChangeSubevent,
6-
PersistenceSubevent, ReplicaChangeSubevent,
6+
PersistenceSubevent, ReplAsyncLoadSubevent, ReplicaChangeSubevent,
77
};
88
use valkey_module::{
9-
server_events::FlushSubevent, valkey_module, Context, ValkeyResult, ValkeyString, ValkeyValue,
9+
server_events::FlushSubevent, valkey_module, Context, ModuleOptions, Status, ValkeyResult,
10+
ValkeyString, ValkeyValue,
1011
};
1112
use valkey_module_macros::{
1213
client_changed_event_handler, config_changed_event_handler, cron_event_handler,
1314
flush_event_handler, fork_child_event_handler, key_event_handler,
14-
master_link_change_event_handler, persistence_event_handler, replica_change_event_handler,
15-
shutdown_event_handler,
15+
master_link_change_event_handler, persistence_event_handler, repl_async_load_event_handler,
16+
replica_change_event_handler, shutdown_event_handler,
1617
};
1718

1819
static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);
@@ -25,6 +26,7 @@ static NUM_MASTER_LINK_CHANGE_EVENTS: AtomicI64 = AtomicI64::new(0);
2526
static IS_MASTER_LINK_UP: AtomicBool = AtomicBool::new(false);
2627
static NUM_FORK_CHILD_EVENTS: AtomicI64 = AtomicI64::new(0);
2728
static NUM_REPLICA_CHANGE_EVENTS: AtomicI64 = AtomicI64::new(0);
29+
static NUM_REPL_ASYNC_LOAD_EVENTS: AtomicI64 = AtomicI64::new(0);
2830

2931
#[flush_event_handler]
3032
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
@@ -140,27 +142,41 @@ fn fork_child_event_handler(ctx: &Context, fork_child_subevent: ForkChildSubeven
140142
match fork_child_subevent {
141143
ForkChildSubevent::Born => {
142144
ctx.log_warning("Fork child born");
143-
NUM_FORK_CHILD_EVENTS.fetch_add(1, Ordering::SeqCst);
144145
}
145146
ForkChildSubevent::Died => {
146147
ctx.log_warning("Fork child died");
147-
NUM_FORK_CHILD_EVENTS.fetch_add(1, Ordering::SeqCst);
148148
}
149149
}
150+
NUM_FORK_CHILD_EVENTS.fetch_add(1, Ordering::SeqCst);
150151
}
151152

152153
#[replica_change_event_handler]
153154
fn replica_change_event_handler(ctx: &Context, replica_change_subevent: ReplicaChangeSubevent) {
154155
match replica_change_subevent {
155156
ReplicaChangeSubevent::Online => {
156157
ctx.log_notice("Replica online");
157-
NUM_REPLICA_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
158158
}
159159
ReplicaChangeSubevent::Offline => {
160160
ctx.log_notice("Replica offline");
161-
NUM_REPLICA_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
162161
}
163162
}
163+
NUM_REPLICA_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
164+
}
165+
166+
#[repl_async_load_event_handler]
167+
fn repl_async_load_event_handler(ctx: &Context, repl_async_load_subevent: ReplAsyncLoadSubevent) {
168+
match repl_async_load_subevent {
169+
ReplAsyncLoadSubevent::Started => {
170+
ctx.log_notice("Repl async load started");
171+
}
172+
ReplAsyncLoadSubevent::Aborted => {
173+
ctx.log_notice("Repl async load aborted");
174+
}
175+
ReplAsyncLoadSubevent::Completed => {
176+
ctx.log_notice("Repl async load completed");
177+
}
178+
}
179+
NUM_REPL_ASYNC_LOAD_EVENTS.fetch_add(1, Ordering::SeqCst);
164180
}
165181

166182
fn num_flushed(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
@@ -213,13 +229,28 @@ fn num_replica_change_events(_ctx: &Context, _args: Vec<ValkeyString>) -> Valkey
213229
))
214230
}
215231

232+
fn num_repl_async_load_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
233+
Ok(ValkeyValue::Integer(
234+
NUM_REPL_ASYNC_LOAD_EVENTS.load(Ordering::SeqCst),
235+
))
236+
}
237+
238+
fn init(ctx: &Context, args: &[ValkeyString]) -> Status {
239+
// https://valkey.io/topics/modules-api-ref/#ValkeyModule_SetModuleOptions
240+
// otherwise you get: Skipping diskless-load because there are modules that are not aware of async replication.
241+
// needed for repl_async_load_event_handler
242+
ctx.set_module_options(ModuleOptions::HANDLE_REPL_ASYNC_LOAD);
243+
Status::Ok
244+
}
245+
216246
//////////////////////////////////////////////////////
217247

218248
valkey_module! {
219249
name: "srv_events",
220250
version: 1,
221251
allocator: (ValkeyAlloc, ValkeyAlloc),
222252
data_types: [],
253+
init: init,
223254
commands: [
224255
["num_flushed", num_flushed, "readonly", 0, 0, 0],
225256
["num_max_memory_changes", num_maxmemory_changes, "readonly", 0, 0, 0],
@@ -231,5 +262,6 @@ valkey_module! {
231262
["is_master_link_up", is_master_link_up, "readonly", 0, 0, 0],
232263
["num_fork_child_events", num_fork_child_events, "readonly", 0, 0, 0],
233264
["num_replica_change_events", num_replica_change_events, "readonly", 0, 0, 0],
265+
["num_repl_async_load_events", num_repl_async_load_events, "readonly", 0, 0, 0]
234266
]
235267
}

src/context/server_events.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ pub enum ReplicaChangeSubevent {
7373
Offline,
7474
}
7575

76+
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
77+
pub enum ReplAsyncLoadSubevent {
78+
Started,
79+
Aborted,
80+
Completed,
81+
}
82+
7683
#[derive(Clone)]
7784
pub enum ServerEventHandler {
7885
RoleChanged(fn(&Context, ServerRole)),
@@ -85,6 +92,7 @@ pub enum ServerEventHandler {
8592
MasterLinkChangeSubevent(fn(&Context, MasterLinkChangeSubevent)),
8693
ForkChildSubevent(fn(&Context, ForkChildSubevent)),
8794
ReplicaChangeSubevent(fn(&Context, ReplicaChangeSubevent)),
95+
ReplAsyncLoadSubevent(fn(&Context, ReplAsyncLoadSubevent)),
8896
}
8997

9098
#[distributed_slice()]
@@ -129,6 +137,9 @@ pub static FORK_CHILD_SERVER_EVENTS_LIST: [fn(&Context, ForkChildSubevent)] = [.
129137
#[distributed_slice()]
130138
pub static REPLICA_CHANGE_SERVER_EVENTS_LIST: [fn(&Context, ReplicaChangeSubevent)] = [..];
131139

140+
#[distributed_slice()]
141+
pub static REPL_ASYNC_LOAD_SERVER_EVENTS_LIST: [fn(&Context, ReplAsyncLoadSubevent)] = [..];
142+
132143
extern "C" fn cron_callback(
133144
ctx: *mut raw::RedisModuleCtx,
134145
_eid: raw::RedisModuleEvent,
@@ -370,6 +381,26 @@ extern "C" fn replica_change_event_callback(
370381
});
371382
}
372383

384+
extern "C" fn repl_async_load_event_callback(
385+
ctx: *mut raw::RedisModuleCtx,
386+
_eid: raw::RedisModuleEvent,
387+
subevent: u64,
388+
_data: *mut ::std::os::raw::c_void,
389+
) {
390+
let repl_async_load_sub_event = match subevent {
391+
raw::REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED => ReplAsyncLoadSubevent::Started,
392+
raw::REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED => ReplAsyncLoadSubevent::Aborted,
393+
raw::REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED => ReplAsyncLoadSubevent::Completed,
394+
_ => return,
395+
};
396+
let ctx = Context::new(ctx);
397+
REPL_ASYNC_LOAD_SERVER_EVENTS_LIST
398+
.iter()
399+
.for_each(|callback| {
400+
callback(&ctx, repl_async_load_sub_event);
401+
});
402+
}
403+
373404
fn register_single_server_event_type<T>(
374405
ctx: &Context,
375406
callbacks: &[fn(&Context, T)],
@@ -474,5 +505,11 @@ pub fn register_server_events(ctx: &Context) -> Result<(), ValkeyError> {
474505
raw::REDISMODULE_EVENT_REPLICA_CHANGE,
475506
Some(replica_change_event_callback),
476507
)?;
508+
register_single_server_event_type(
509+
ctx,
510+
&REPL_ASYNC_LOAD_SERVER_EVENTS_LIST,
511+
raw::REDISMODULE_EVENT_REPL_ASYNC_LOAD,
512+
Some(repl_async_load_event_callback),
513+
)?;
477514
Ok(())
478515
}

tests/integration.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,3 +1904,47 @@ fn test_replica_change_event() -> Result<()> {
19041904

19051905
Ok(())
19061906
}
1907+
1908+
#[test]
1909+
fn test_repl_asnc_load_event() -> Result<()> {
1910+
let primary_port: u16 = 6526;
1911+
let _guards = vec![
1912+
start_valkey_server_with_module("server_events", primary_port)
1913+
.with_context(|| FAILED_TO_START_SERVER)?,
1914+
];
1915+
let mut primary_con =
1916+
get_valkey_connection(primary_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
1917+
// repl-diskless-load swapdb
1918+
let _: () = redis::cmd("config")
1919+
.arg(&["set", "repl-diskless-load", "swapdb"])
1920+
.exec(&mut primary_con)
1921+
.with_context(|| "failed to run config set repl-diskless-load")?;
1922+
1923+
let replica_port: u16 = 6527;
1924+
let _guards = vec![
1925+
start_valkey_server_with_module("server_events", replica_port)
1926+
.with_context(|| FAILED_TO_START_SERVER)?,
1927+
];
1928+
let mut replica_con =
1929+
get_valkey_connection(replica_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
1930+
// repl-diskless-load swapdb
1931+
let _: () = redis::cmd("config")
1932+
.arg(&["set", "repl-diskless-load", "swapdb"])
1933+
.exec(&mut replica_con)
1934+
.with_context(|| "failed to run config set repl-diskless-load")?;
1935+
1936+
// setup replication
1937+
let _: () = redis::cmd("replicaof")
1938+
.arg("127.0.0.1")
1939+
.arg(primary_port)
1940+
.query(&mut replica_con)?;
1941+
// need to wait for replication to establish and event to fire
1942+
thread::sleep(Duration::from_millis(5000));
1943+
1944+
// check num_repl_async_load_events on the replica
1945+
let event_count1: i64 = redis::cmd("num_repl_async_load_events").query(&mut replica_con)?;
1946+
// started and completed fire so result is 2, aborted even tdoes not fire
1947+
assert_eq!(event_count1, 2);
1948+
1949+
Ok(())
1950+
}

valkeymodule-rs-macros/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,4 +593,25 @@ pub fn replica_change_event_handler(_attr: TokenStream, item: TokenStream) -> To
593593
#ast
594594
};
595595
gen.into()
596+
}
597+
598+
/// Proc macro which is set on a function that need to be called whenever a
599+
/// repl-diskless-load config is set to swapdb and a replication with a primary of same data set history occurs.
600+
/// The function must accept a [Context] and [ReplAsyncLoadSubevent].
601+
/// Example:
602+
/// ```rust,no_run,ignore
603+
/// #[repl_async_load_event_handler]
604+
/// fn repl_async_load_event_handler(ctx: &Context, values: ReplAsyncLoadSubevent) { ... }
605+
/// ```
606+
#[proc_macro_attribute]
607+
pub fn repl_async_load_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
608+
let ast: ItemFn = match syn::parse(item) {
609+
Ok(res) => res,
610+
Err(e) => return e.to_compile_error().into(),
611+
};
612+
let gen = quote! {
613+
#[linkme::distributed_slice(valkey_module::server_events::REPL_ASYNC_LOAD_SERVER_EVENTS_LIST)]
614+
#ast
615+
};
616+
gen.into()
596617
}

0 commit comments

Comments
 (0)