From 87b1fa4abc461703511d005416566b1cf3fd6f02 Mon Sep 17 00:00:00 2001 From: yongke liu Date: Fri, 9 May 2025 17:10:07 +0800 Subject: [PATCH 1/2] Fixed issue 107 and 123, receiver check connection when pop msg failed, and call reconnect function when the connection check result is false --- src/libipc/ipc.cpp | 5 ++++- src/libipc/queue.h | 11 ++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 7d919dd..9a5b025 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -627,7 +627,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg {}; - if (!wait_for(inf->rd_waiter_, [que, &msg] { + if (!wait_for(inf->rd_waiter_, [que, &msg, &h] { + if (!que->connected(que->elems())) { + reconnect(&h, true); + } return !que->pop(msg); }, tm)) { // pop failed, just return. diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 71dc170..8177d0c 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -67,6 +67,11 @@ class queue_conn { return connected_ != 0; } + template + bool connected(Elems* elems) noexcept { + return connected_ & elems->connections(); + } + circ::cc_t connected_id() const noexcept { return connected_; } @@ -77,16 +82,16 @@ class queue_conn { -> std::tuple().cursor())> { if (elems == nullptr) return {}; // if it's already connected, just return - if (connected()) return {connected(), false, 0}; + if (connected(elems)) return {connected(elems), false, 0}; connected_ = elems->connect_receiver(); - return {connected(), true, elems->cursor()}; + return {connected(elems), true, elems->cursor()}; } template bool disconnect(Elems* elems) noexcept { if (elems == nullptr) return false; // if it's already disconnected, just return false - if (!connected()) return false; + if (!connected(elems)) return false; elems->disconnect_receiver(std::exchange(connected_, 0)); return true; } From a1cdc9a711a1bdd0071283309a1d4fcdbdf337c7 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 10 May 2025 15:14:39 +0800 Subject: [PATCH 2/2] In non-broadcast mode, connection tags are only used for counting. --- src/libipc/circ/elem_def.h | 9 +++++++++ src/libipc/ipc.cpp | 2 +- src/libipc/queue.h | 12 ++++++------ 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index 24e4940..1b6c6ec 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -74,6 +74,10 @@ class conn_head : public conn_head_base { return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id; } + bool connected(cc_t cc_id) const noexcept { + return (this->connections() & cc_id) != 0; + } + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { cc_t cur = this->cc_.load(order); cc_t cnt; // accumulates the total bits set in cc @@ -100,6 +104,11 @@ class conn_head : public conn_head_base { } } + bool connected(cc_t cc_id) const noexcept { + // In non-broadcast mode, connection tags are only used for counting. + return (this->connections() != 0) && (cc_id != 0); + } + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { return this->connections(order); } diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 9a5b025..e71dca9 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -628,7 +628,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { // pop a new message typename queue_t::value_t msg {}; if (!wait_for(inf->rd_waiter_, [que, &msg, &h] { - if (!que->connected(que->elems())) { + if (!que->connected()) { reconnect(&h, true); } return !que->pop(msg); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 8177d0c..86b4dfa 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -63,13 +63,9 @@ class queue_conn { shm::handle::clear_storage(name); } - bool connected() const noexcept { - return connected_ != 0; - } - template - bool connected(Elems* elems) noexcept { - return connected_ & elems->connections(); + bool connected(Elems* elems) const noexcept { + return elems->connected(connected_); } circ::cc_t connected_id() const noexcept { @@ -155,6 +151,10 @@ class queue_base : public queue_conn { elems_->disconnect_sender(); } + bool connected() const noexcept { + return base_t::connected(elems_); + } + bool connect() noexcept { auto tp = base_t::connect(elems_); if (std::get<0>(tp) && std::get<1>(tp)) {