Skip to content

Commit bd4ea28

Browse files
authored
feat(pubsub): implement core PubSub framework infrastructure (#103)
Signed-off-by: Joe Brinkman <[email protected]> Signed-off-by: Joseph Brinkman <[email protected]>
1 parent 410fac4 commit bd4ea28

31 files changed

+5747
-36
lines changed

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
submodules: true
3535

3636
- name: Initialize CodeQL
37-
uses: github/codeql-action/init@v3
37+
uses: github/codeql-action/init@v4
3838
with:
3939
languages: csharp
4040
build-mode: manual
@@ -50,6 +50,6 @@ jobs:
5050
run: dotnet build sources/Valkey.Glide/Valkey.Glide.csproj --configuration Lint --framework net8.0
5151

5252
- name: Perform CodeQL Analysis
53-
uses: github/codeql-action/analyze@v3
53+
uses: github/codeql-action/analyze@v4
5454
with:
5555
category: "/language:csharp"

.gitignore

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ x64/
3131
[Tt]est[Rr]esult*/
3232
[Bb]uild[Ll]og.*
3333

34-
# Code Coverage Reports
35-
[Rr]eports/
34+
3635

3736
*_i.c
3837
*_p.c
@@ -156,8 +155,9 @@ _NCrunch*
156155

157156
glide-logs/
158157

159-
# Test results and reports
160-
reports/
158+
# Code Coverage Reports
159+
[Rr]eports/
160+
161161
testresults/
162162

163163
# Temporary submodules (not for commit)

rust/src/ffi.rs

Lines changed: 170 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::{
77

88
use glide_core::{
99
client::{
10-
ConnectionRequest, ConnectionRetryStrategy, NodeAddress, ReadFrom as coreReadFrom, TlsMode,
10+
AuthenticationInfo as CoreAuthenticationInfo, ConnectionRequest, ConnectionRetryStrategy,
11+
NodeAddress, ReadFrom as coreReadFrom, TlsMode,
1112
},
1213
request_type::RequestType,
1314
};
@@ -71,16 +72,96 @@ pub struct ConnectionConfig {
7172
/// zero pointer is valid, means no client name is given (`None`)
7273
pub client_name: *const c_char,
7374
pub lazy_connect: bool,
75+
pub refresh_topology_from_initial_nodes: bool,
76+
pub has_pubsub_config: bool,
77+
pub pubsub_config: PubSubConfigInfo,
7478
/*
7579
TODO below
7680
pub periodic_checks: Option<PeriodicCheck>,
77-
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
7881
pub inflight_requests_limit: Option<u32>,
7982
pub otel_endpoint: Option<String>,
8083
pub otel_flush_interval_ms: Option<u64>,
8184
*/
8285
}
8386

87+
#[repr(C)]
88+
#[derive(Debug, Clone, Copy)]
89+
pub struct PubSubConfigInfo {
90+
pub channels_ptr: *const *const c_char,
91+
pub channel_count: u32,
92+
pub patterns_ptr: *const *const c_char,
93+
pub pattern_count: u32,
94+
pub sharded_channels_ptr: *const *const c_char,
95+
pub sharded_channel_count: u32,
96+
}
97+
98+
/// Convert a C string array to a Vec of Vec<u8>
99+
///
100+
/// # Safety
101+
///
102+
/// * `ptr` must point to an array of `count` valid C string pointers
103+
/// * Each C string pointer must be valid and null-terminated
104+
unsafe fn convert_string_array(ptr: *const *const c_char, count: u32) -> Vec<Vec<u8>> {
105+
if ptr.is_null() || count == 0 {
106+
return Vec::new();
107+
}
108+
109+
let slice = unsafe { std::slice::from_raw_parts(ptr, count as usize) };
110+
slice
111+
.iter()
112+
.map(|&str_ptr| {
113+
let c_str = unsafe { CStr::from_ptr(str_ptr) };
114+
c_str.to_bytes().to_vec()
115+
})
116+
.collect()
117+
}
118+
119+
/// Convert PubSubConfigInfo to the format expected by glide-core
120+
///
121+
/// # Safety
122+
///
123+
/// * All pointers in `config` must be valid or null
124+
/// * String arrays must contain valid C strings
125+
unsafe fn convert_pubsub_config(
126+
config: &PubSubConfigInfo,
127+
) -> std::collections::HashMap<redis::PubSubSubscriptionKind, std::collections::HashSet<Vec<u8>>> {
128+
use redis::PubSubSubscriptionKind;
129+
use std::collections::{HashMap, HashSet};
130+
131+
let mut subscriptions = HashMap::new();
132+
133+
// Convert exact channels
134+
if config.channel_count > 0 {
135+
let channels = unsafe { convert_string_array(config.channels_ptr, config.channel_count) };
136+
subscriptions.insert(
137+
PubSubSubscriptionKind::Exact,
138+
channels.into_iter().collect::<HashSet<_>>(),
139+
);
140+
}
141+
142+
// Convert patterns
143+
if config.pattern_count > 0 {
144+
let patterns = unsafe { convert_string_array(config.patterns_ptr, config.pattern_count) };
145+
subscriptions.insert(
146+
PubSubSubscriptionKind::Pattern,
147+
patterns.into_iter().collect::<HashSet<_>>(),
148+
);
149+
}
150+
151+
// Convert sharded channels
152+
if config.sharded_channel_count > 0 {
153+
let sharded = unsafe {
154+
convert_string_array(config.sharded_channels_ptr, config.sharded_channel_count)
155+
};
156+
subscriptions.insert(
157+
PubSubSubscriptionKind::Sharded,
158+
sharded.into_iter().collect::<HashSet<_>>(),
159+
);
160+
}
161+
162+
subscriptions
163+
}
164+
84165
/// Convert connection configuration to a corresponding object.
85166
///
86167
/// # Safety
@@ -135,7 +216,7 @@ pub(crate) unsafe fn create_connection_request(
135216
None
136217
};
137218

138-
Some(glide_core::client::AuthenticationInfo {
219+
Some(CoreAuthenticationInfo {
139220
username: unsafe { ptr_to_opt_str(auth_info.username) },
140221
password: unsafe { ptr_to_opt_str(auth_info.password) },
141222
iam_config,
@@ -172,10 +253,19 @@ pub(crate) unsafe fn create_connection_request(
172253
None
173254
},
174255
lazy_connect: config.lazy_connect,
175-
refresh_topology_from_initial_nodes: false,
256+
refresh_topology_from_initial_nodes: config.refresh_topology_from_initial_nodes,
257+
pubsub_subscriptions: if config.has_pubsub_config {
258+
let subscriptions = unsafe { convert_pubsub_config(&config.pubsub_config) };
259+
if subscriptions.is_empty() {
260+
None
261+
} else {
262+
Some(subscriptions)
263+
}
264+
} else {
265+
None
266+
},
176267
// TODO below
177268
periodic_checks: None,
178-
pubsub_subscriptions: None,
179269
inflight_requests_limit: None,
180270
}
181271
}
@@ -636,3 +726,78 @@ pub(crate) unsafe fn get_pipeline_options(
636726
PipelineRetryStrategy::new(info.retry_server_error, info.retry_connection_error),
637727
)
638728
}
729+
730+
/// FFI-safe version of [`redis::PushKind`] for C# interop.
731+
/// This enum maps to the `PushKind` enum in `sources/Valkey.Glide/Internals/FFI.structs.cs`.
732+
///
733+
/// The `#[repr(u32)]` attribute ensures a stable memory layout compatible with C# marshaling.
734+
/// Each variant corresponds to a specific Redis/Valkey PubSub notification type.
735+
#[repr(u32)]
736+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
737+
pub enum PushKind {
738+
/// Disconnection notification sent from the library when connection is closed.
739+
Disconnection = 0,
740+
/// Other/unknown push notification type.
741+
Other = 1,
742+
/// Cache invalidation notification received when a key is changed/deleted.
743+
Invalidate = 2,
744+
/// Regular channel message received via SUBSCRIBE.
745+
Message = 3,
746+
/// Pattern-based message received via PSUBSCRIBE.
747+
PMessage = 4,
748+
/// Sharded channel message received via SSUBSCRIBE.
749+
SMessage = 5,
750+
/// Unsubscribe confirmation.
751+
Unsubscribe = 6,
752+
/// Pattern unsubscribe confirmation.
753+
PUnsubscribe = 7,
754+
/// Sharded unsubscribe confirmation.
755+
SUnsubscribe = 8,
756+
/// Subscribe confirmation.
757+
Subscribe = 9,
758+
/// Pattern subscribe confirmation.
759+
PSubscribe = 10,
760+
/// Sharded subscribe confirmation.
761+
SSubscribe = 11,
762+
}
763+
764+
impl From<&redis::PushKind> for PushKind {
765+
fn from(kind: &redis::PushKind) -> Self {
766+
match kind {
767+
redis::PushKind::Disconnection => PushKind::Disconnection,
768+
redis::PushKind::Other(_) => PushKind::Other,
769+
redis::PushKind::Invalidate => PushKind::Invalidate,
770+
redis::PushKind::Message => PushKind::Message,
771+
redis::PushKind::PMessage => PushKind::PMessage,
772+
redis::PushKind::SMessage => PushKind::SMessage,
773+
redis::PushKind::Unsubscribe => PushKind::Unsubscribe,
774+
redis::PushKind::PUnsubscribe => PushKind::PUnsubscribe,
775+
redis::PushKind::SUnsubscribe => PushKind::SUnsubscribe,
776+
redis::PushKind::Subscribe => PushKind::Subscribe,
777+
redis::PushKind::PSubscribe => PushKind::PSubscribe,
778+
redis::PushKind::SSubscribe => PushKind::SSubscribe,
779+
}
780+
}
781+
}
782+
783+
/// FFI callback function type for PubSub messages.
784+
/// This callback is invoked by Rust when a PubSub message is received.
785+
/// The callback signature matches the C# expectations for marshaling PubSub data.
786+
///
787+
/// # Parameters
788+
/// * `push_kind` - The type of push notification. See [`PushKind`] for valid values.
789+
/// * `message_ptr` - Pointer to the raw message bytes
790+
/// * `message_len` - Length of the message data in bytes (unsigned, cannot be negative)
791+
/// * `channel_ptr` - Pointer to the raw channel name bytes
792+
/// * `channel_len` - Length of the channel name in bytes (unsigned, cannot be negative)
793+
/// * `pattern_ptr` - Pointer to the raw pattern bytes (null if no pattern)
794+
/// * `pattern_len` - Length of the pattern in bytes (unsigned, 0 if no pattern)
795+
pub type PubSubCallback = unsafe extern "C" fn(
796+
push_kind: PushKind,
797+
message_ptr: *const u8,
798+
message_len: u64,
799+
channel_ptr: *const u8,
800+
channel_len: u64,
801+
pattern_ptr: *const u8,
802+
pattern_len: u64,
803+
);

0 commit comments

Comments
 (0)