diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index 24e4940..1b6c6ec 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -74,6 +74,10 @@ class conn_head : public conn_head_base { return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id; } + bool connected(cc_t cc_id) const noexcept { + return (this->connections() & cc_id) != 0; + } + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { cc_t cur = this->cc_.load(order); cc_t cnt; // accumulates the total bits set in cc @@ -100,6 +104,11 @@ class conn_head : public conn_head_base { } } + bool connected(cc_t cc_id) const noexcept { + // In non-broadcast mode, connection tags are only used for counting. + return (this->connections() != 0) && (cc_id != 0); + } + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { return this->connections(order); } diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 7d919dd..e71dca9 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -627,7 +627,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg {}; - if (!wait_for(inf->rd_waiter_, [que, &msg] { + if (!wait_for(inf->rd_waiter_, [que, &msg, &h] { + if (!que->connected()) { + reconnect(&h, true); + } return !que->pop(msg); }, tm)) { // pop failed, just return. diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 71dc170..86b4dfa 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -63,8 +63,9 @@ class queue_conn { shm::handle::clear_storage(name); } - bool connected() const noexcept { - return connected_ != 0; + template + bool connected(Elems* elems) const noexcept { + return elems->connected(connected_); } circ::cc_t connected_id() const noexcept { @@ -77,16 +78,16 @@ class queue_conn { -> std::tuple().cursor())> { if (elems == nullptr) return {}; // if it's already connected, just return - if (connected()) return {connected(), false, 0}; + if (connected(elems)) return {connected(elems), false, 0}; connected_ = elems->connect_receiver(); - return {connected(), true, elems->cursor()}; + return {connected(elems), true, elems->cursor()}; } template bool disconnect(Elems* elems) noexcept { if (elems == nullptr) return false; // if it's already disconnected, just return false - if (!connected()) return false; + if (!connected(elems)) return false; elems->disconnect_receiver(std::exchange(connected_, 0)); return true; } @@ -150,6 +151,10 @@ class queue_base : public queue_conn { elems_->disconnect_sender(); } + bool connected() const noexcept { + return base_t::connected(elems_); + } + bool connect() noexcept { auto tp = base_t::connect(elems_); if (std::get<0>(tp) && std::get<1>(tp)) {