Skip to content

Commit 7cce2c9

Browse files
committed
feature: ping from server to clients added
Signed-off-by: Leonid Kaganov <[email protected]>
1 parent 28dd653 commit 7cce2c9

File tree

8 files changed

+62
-8
lines changed

8 files changed

+62
-8
lines changed

000rename.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
mv "Cargo.lock" "Cargo.lock"
2+
mv "Cargo.toml" "Cargo.toml"
3+
mv "DOCKER.sh" "DOCKER.sh"
4+
mv "Dockerfile" "Dockerfile"
5+
mv "GO.sh" "GO.sh"
6+
mv "LICENSE" "LICENSE"
7+
mv "README.md" "README.md"
8+
mv "TEST.sh" "TEST.sh"
9+
mv "TEST_WS.sh" "TEST_WS.sh"
10+
mv "TODO.txt" "TODO.txt"
11+
mv "client" "client"
12+
mv "commit.sh" "commit.sh"
13+
mv "lleo" "lleo"
14+
mv "policy.repo" "policy.repo"
15+
mv "scripts" "scripts"
16+
mv "src" "src"
17+
mv "target" "target"
18+
mv "tests" "tests"

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hulypulse"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
edition = "2024"
55

66
[dependencies]

scripts/TEST.html

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ <h1>WebSocket JSON Tester</h1>
110110

111111
ws.onopen = () => {
112112
pr("clear");
113-
pr("✅ WebSocket connected.");
113+
pr("✅ WebSocket connected");
114+
};
115+
116+
ws.onclose = () => {
117+
pr("❌ WebSocket closed");
114118
};
115119

116120
ws.onmessage = (e) => {

src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub struct Config {
5858
pub no_authorization: bool,
5959

6060
pub heartbeat_timeout: u64,
61+
pub ping_timeout: u64,
6162

6263
pub policy_file: Option<String>,
6364
}

src/config/default.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ backend = "redis"
1313
no_authorization = false
1414

1515
heartbeat_timeout = 90
16+
ping_timeout = 30
1617

1718
# optional settings
1819
# max_size = 100

src/handlers_ws.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,15 @@ pub async fn handler(
317317
hub_state.write().await.renew_heartbeat(session_id);
318318

319319
match msg {
320-
// actix_ws::Message::Text(text) => {
321-
// session.text(format!("echo: {}", text)).await.ok();
322-
// }
323320
actix_ws::Message::Ping(bytes) => {
324321
session.pong(&bytes).await.ok();
325322
continue;
326323
}
327324

325+
actix_ws::Message::Pong(_) => {
326+
continue;
327+
}
328+
328329
actix_ws::Message::Text(text) if text == "ping" => {
329330
let _ = session.text("pong").await;
330331
continue;

src/hub_service.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,30 @@ pub struct HubState {
6767
sessions: HashMap<SessionId, actix_ws::Session>,
6868
subs: HashMap<String, HashSet<SessionId>>,
6969
heartbeats: HashMap<SessionId, std::time::Instant>,
70+
serverping: HashMap<SessionId, std::time::Instant>,
7071
}
7172

7273
impl HubState {
7374
pub fn renew_heartbeat(&mut self, session_id: SessionId) {
7475
if self.sessions.contains_key(&session_id) {
7576
let now = std::time::Instant::now();
7677
self.heartbeats.insert(session_id, now);
78+
self.serverping.insert(session_id, now);
7779
}
7880
}
7981

8082
pub fn connect(&mut self, session_id: SessionId, session: actix_ws::Session) {
8183
self.sessions.insert(session_id, session);
8284
self.heartbeats
8385
.insert(session_id, std::time::Instant::now());
86+
self.serverping
87+
.insert(session_id, std::time::Instant::now());
8488
}
8589

8690
pub fn disconnect(&mut self, session_id: SessionId) {
8791
self.sessions.remove(&session_id);
8892
self.heartbeats.remove(&session_id);
93+
self.serverping.remove(&session_id);
8994
self.subs.retain(|_, ids| {
9095
ids.remove(&session_id);
9196
!ids.is_empty()
@@ -192,10 +197,11 @@ pub fn check_heartbeat(hub_state: Arc<RwLock<HubState>>) {
192197
ticker.tick().await;
193198

194199
let now = std::time::Instant::now();
195-
let timeout = std::time::Duration::from_secs(CONFIG.heartbeat_timeout);
196-
let timelimit = now - timeout;
200+
let timelimit = now - std::time::Duration::from_secs(CONFIG.heartbeat_timeout);
201+
let pinglimit = now - std::time::Duration::from_secs(CONFIG.ping_timeout);
197202

198203
let hub = hub_state.read().await;
204+
199205
let expired: Vec<actix_ws::Session> = hub
200206
.heartbeats
201207
.iter()
@@ -208,6 +214,18 @@ pub fn check_heartbeat(hub_state: Arc<RwLock<HubState>>) {
208214
})
209215
.collect();
210216

217+
let to_ping: Vec<SessionId> = hub
218+
.serverping
219+
.iter()
220+
.filter_map(|(&sid, &last_ping)| {
221+
if last_ping < pinglimit {
222+
Some(sid)
223+
} else {
224+
None
225+
}
226+
})
227+
.collect();
228+
211229
drop(hub);
212230

213231
if !expired.is_empty() {
@@ -216,6 +234,17 @@ pub fn check_heartbeat(hub_state: Arc<RwLock<HubState>>) {
216234
let _ = addr.clone().close(None).await;
217235
}
218236
}
237+
238+
if !to_ping.is_empty() {
239+
let mut hub = hub_state.write().await;
240+
for sid in &to_ping {
241+
if let Some(session) = hub.sessions.get_mut(sid) {
242+
let _ = session.ping(&[]).await;
243+
}
244+
hub.serverping.insert(*sid, now);
245+
}
246+
drop(hub);
247+
}
219248
}
220249
});
221250
}

0 commit comments

Comments
 (0)