From b9c60103d8017d1cce760b259d329246266431e6 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 31 Oct 2025 16:21:45 -0400 Subject: [PATCH 1/8] cmake: add BEMAN_NET_WITH_URING for liburing dependency Signed-off-by: Casey Bodley --- CMakeLists.txt | 2 ++ src/beman/net/CMakeLists.txt | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e22a18..07a1efe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,8 @@ set(TARGET_LIBRARY beman_${TARGET_NAME}) set(TARGET_ALIAS beman::${TARGET_NAME}) set(TARGETS_EXPORT_NAME ${CMAKE_PROJECT_NAME}) +option(BEMAN_NET_WITH_URING "Enable liburing io context" OFF) + include(FetchContent) FetchContent_Declare( execution diff --git a/src/beman/net/CMakeLists.txt b/src/beman/net/CMakeLists.txt index 7ebde7f..df5e388 100644 --- a/src/beman/net/CMakeLists.txt +++ b/src/beman/net/CMakeLists.txt @@ -66,6 +66,12 @@ target_include_directories( ) target_link_libraries(${TARGET_LIBRARY} PUBLIC beman::task) +if(BEMAN_NET_WITH_URING) + find_package(PkgConfig REQUIRED) + pkg_check_modules(uring REQUIRED IMPORTED_TARGET liburing) + target_link_libraries(${TARGET_LIBRARY} PUBLIC PkgConfig::uring) +endif() + if(FALSE) install( EXPORT ${TARGETS_EXPORT_NAME} From 643270c3a873c8f8b2dcb7aa78f1b514b98e38a6 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 31 Oct 2025 23:39:41 -0400 Subject: [PATCH 2/8] add uring_context uring_context manages a single instance of io_uring which has an associated submission queue and completion queue each async operation prepares a submission queue entry (sqe) but does not call io_uring_submit() to submit it (except for resume_at(), see comment). this allows sqes to be submitted in batches by run_one(), where a single system call in io_uring_submit_and_wait() can submit pending sqes and await the next completion queue entry (cqe) io_uring_sqe_set_data() associates each sqe with its io_base pointer. run_one() calls io_uring_cqe_get_data() to retreive that pointer and call its work() function to invoke complete()/cancel()/error() depending on the return code in cqe->res unlike poll_context, cancel() and resume_at() are treated as their own async operations initiated with io_uring_prep_cancel() and io_uring_prep_timeout() respectively Signed-off-by: Casey Bodley --- include/beman/net/detail/uring_context.hpp | 321 +++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 include/beman/net/detail/uring_context.hpp diff --git a/include/beman/net/detail/uring_context.hpp b/include/beman/net/detail/uring_context.hpp new file mode 100644 index 0000000..e767561 --- /dev/null +++ b/include/beman/net/detail/uring_context.hpp @@ -0,0 +1,321 @@ +// include/beman/net/detail/uring_context.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT +#define INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT + +#include +#include + +#include +#include +#include +#include +#include + +namespace beman::net::detail { + +// io_context implementation based on liburing +struct uring_context final : context_base { + static constexpr unsigned QUEUE_DEPTH = 128; + ::io_uring ring; + container sockets; + task* tasks = nullptr; + ::std::size_t submitting = 0; // sqes not yet submitted + ::std::size_t outstanding = 0; // cqes expected + + uring_context() { + int flags = 0; + int r = ::io_uring_queue_init(QUEUE_DEPTH, &ring, flags); + if (r < 0) { + throw ::std::system_error(-r, ::std::system_category(), "io_uring_queue_init failed"); + } + } + ~uring_context() override { ::io_uring_queue_exit(&ring); } + + auto make_socket(int fd) -> socket_id override { return sockets.insert(fd); } + + auto make_socket(int d, int t, int p, ::std::error_code& error) -> socket_id override { + int fd(::socket(d, t, p)); + if (fd < 0) { + error = ::std::error_code(errno, ::std::system_category()); + return socket_id::invalid; + } + return make_socket(fd); + } + + auto release(socket_id id, ::std::error_code& error) -> void override { + const native_handle_type handle = sockets[id]; + sockets.erase(id); + if (::close(handle) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + + auto native_handle(socket_id id) -> native_handle_type override { return sockets[id]; } + + auto set_option(socket_id id, int level, int name, const void* data, ::socklen_t size, ::std::error_code& error) + -> void override { + if (::setsockopt(native_handle(id), level, name, data, size) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + + auto bind(socket_id id, const endpoint& ep, ::std::error_code& error) -> void override { + if (::bind(native_handle(id), ep.data(), ep.size()) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + + auto listen(socket_id id, int no, ::std::error_code& error) -> void override { + if (::listen(native_handle(id), no) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + + auto submit() -> void { + int r = ::io_uring_submit(&ring); + if (r < 0) { + throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit failed"); + } + assert(submitting >= r); + submitting -= r; + outstanding += r; + } + + auto get_sqe(io_base* completion) -> ::io_uring_sqe* { + auto sqe = ::io_uring_get_sqe(&ring); + while (sqe == nullptr) { + // if the submission queue is full, flush and try again + submit(); + sqe = ::io_uring_get_sqe(&ring); + } + ::io_uring_sqe_set_data(sqe, completion); + ++submitting; + return sqe; + } + + auto wait() -> ::std::tuple { + ::io_uring_cqe* cqe = nullptr; + int r = ::io_uring_wait_cqe(&ring, &cqe); + if (r < 0) { + throw ::std::system_error(-r, ::std::system_category(), "io_uring_wait_cqe failed"); + } + + assert(outstanding > 0); + --outstanding; + + const int res = cqe->res; + const auto completion = ::io_uring_cqe_get_data(cqe); + ::io_uring_cqe_seen(&ring, cqe); + + return {res, static_cast(completion)}; + } + + auto run_one() -> ::std::size_t override { + if (auto count = process_task(); count) { + return count; + } + + if (submitting) { + // if we have anything to submit, batch the submit and wait in a + // single system call. this allows io_uring_wait_cqe() below to be + // served directly from memory + unsigned wait_nr = 1; + int r = ::io_uring_submit_and_wait(&ring, wait_nr); + if (r < 0) { + throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit_and_wait failed"); + } + assert(submitting >= r); + submitting -= r; + outstanding += r; + } + + if (!outstanding) { + // nothing to submit and nothing to wait on, we're done + return 0; + } + + // read the next completion, waiting if necessary + auto [res, completion] = wait(); + + if (completion) { + // work() functions depend on res, so pass it in via 'extra' + completion->extra.reset(&res); + completion->work(*this, completion); + } + + return 1; + } + + auto cancel(io_base* cancel_op, io_base* op) -> void override { + auto sqe = get_sqe(nullptr); + int flags = 0; + ::io_uring_prep_cancel(sqe, op, flags); + + // use io_uring_prep_cancel() for asynchronous cancellation of op. + // cancel_op, aka sender_state::cancel_callback, lives inside of op's + // operation state. op's completion may race with this cancellation, + // causing that sender_state and its cancel_callback to be destroyed. + // so we can't pass cancel_op to io_uring_sqe_set_data() and attach a + // cancel_op->work() function to handle its completion in run_one(). + // instead, we just complete it here without waiting for the result + cancel_op->complete(); + } + + auto schedule(task* t) -> void override { + t->next = tasks; + tasks = t; + } + + auto process_task() -> ::std::size_t { + if (tasks) { + auto* t = tasks; + tasks = t->next; + t->complete(); + return 1u; + } + return 0u; + } + + auto accept(accept_operation* op) -> submit_result override { + op->work = [](context_base& ctx, io_base* io) { + auto res = *static_cast(io->extra.get()); + if (res == -ECANCELED) { + io->cancel(); + return submit_result::ready; + } else if (res < 0) { + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + } + auto op = static_cast(io); + // set socket + ::std::get<2>(*op) = ctx.make_socket(res); + io->complete(); + return submit_result::ready; + }; + + auto sqe = get_sqe(op); + auto fd = native_handle(op->id); + auto addr = ::std::get<0>(*op).data(); + auto addrlen = &::std::get<1>(*op); + int flags = 0; + ::io_uring_prep_accept(sqe, fd, addr, addrlen, flags); + return submit_result::submit; + } + + auto connect(connect_operation* op) -> submit_result override { + op->work = [](context_base&, io_base* io) { + auto res = *static_cast(io->extra.get()); + if (res == -ECANCELED) { + io->cancel(); + return submit_result::ready; + } else if (res < 0) { + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + } + io->complete(); + return submit_result::ready; + }; + + auto sqe = get_sqe(op); + auto fd = native_handle(op->id); + auto& addr = ::std::get<0>(*op); + ::io_uring_prep_connect(sqe, fd, addr.data(), addr.size()); + return submit_result::submit; + } + + auto receive(receive_operation* op) -> submit_result override { + op->work = [](context_base&, io_base* io) { + auto res = *static_cast(io->extra.get()); + if (res == -ECANCELED) { + io->cancel(); + return submit_result::ready; + } else if (res < 0) { + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + } + auto op = static_cast(io); + // set bytes received + ::std::get<2>(*op) = res; + io->complete(); + return submit_result::ready; + }; + + auto sqe = get_sqe(op); + auto fd = native_handle(op->id); + auto msg = &::std::get<0>(*op); + auto flags = ::std::get<1>(*op); + ::io_uring_prep_recvmsg(sqe, fd, msg, flags); + return submit_result::submit; + } + + auto send(send_operation* op) -> submit_result override { + op->work = [](context_base&, io_base* io) { + auto res = *static_cast(io->extra.get()); + if (res == -ECANCELED) { + io->cancel(); + return submit_result::ready; + } else if (res < 0) { + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + } + auto op = static_cast(io); + // set bytes sent + ::std::get<2>(*op) = res; + io->complete(); + return submit_result::ready; + }; + + auto sqe = get_sqe(op); + auto fd = native_handle(op->id); + auto msg = &::std::get<0>(*op); + auto flags = ::std::get<1>(*op); + ::io_uring_prep_sendmsg(sqe, fd, msg, flags); + return submit_result::submit; + } + + static auto make_timespec(auto dur) -> __kernel_timespec { + auto sec = ::std::chrono::duration_cast<::std::chrono::seconds>(dur); + dur -= sec; + auto nsec = ::std::chrono::duration_cast<::std::chrono::nanoseconds>(dur); + __kernel_timespec ts; + ts.tv_sec = sec.count(); + ts.tv_nsec = nsec.count(); + return ts; + } + + auto resume_at(resume_at_operation* op) -> submit_result override { + auto at = ::std::get<0>(*op); + op->work = [](context_base&, io_base* io) { + auto res = *static_cast(io->extra.get()); + auto op = static_cast(io); + if (res == -ECANCELED) { + io->cancel(); + return submit_result::ready; + } else if (res == -ETIME) { + io->complete(); + return submit_result::ready; + } + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + }; + + auto sqe = get_sqe(op); + auto ts = make_timespec(at.time_since_epoch()); + unsigned count = 0; + unsigned flags = IORING_TIMEOUT_ABS | IORING_TIMEOUT_REALTIME; + ::io_uring_prep_timeout(sqe, &ts, count, flags); + + // unlike other operations whose submissions can be batched in run_one(), + // the timeout argument to io_uring_prep_timeout() is a pointer to memory + // on the stack. this memory must remain valid until submit, so we either + // have to call submit here or allocate heap memory to store it + submit(); + return submit_result::submit; + } +}; + +} // namespace beman::net::detail + +#endif From ec4c06802c35eb95ff3b4d858e53b19898f6296a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 31 Oct 2025 23:41:03 -0400 Subject: [PATCH 3/8] io_context: BEMAN_NET_USE_URING to select uring_context for testing, this BEMAN_NET_USE_URING define is added to example targets when cmake option BEMAN_NET_WITH_URING is enabled Signed-off-by: Casey Bodley --- examples/CMakeLists.txt | 6 ++++++ include/beman/net/detail/io_context.hpp | 11 +++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 79c863b..ac726ae 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -20,6 +20,12 @@ set(xEXAMPLES taps) foreach(EXAMPLE ${EXAMPLES}) set(EXAMPLE_TARGET ${TARGET_PREFIX}.examples.${EXAMPLE}) add_executable(${EXAMPLE_TARGET}) + if(BEMAN_NET_WITH_URING) + target_compile_definitions( + ${EXAMPLE_TARGET} + PRIVATE BEMAN_NET_USE_URING + ) + endif() target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp) target_link_libraries(${EXAMPLE_TARGET} PRIVATE ${TARGET_LIBRARY}) target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::task) diff --git a/include/beman/net/detail/io_context.hpp b/include/beman/net/detail/io_context.hpp index e8609f3..876079d 100644 --- a/include/beman/net/detail/io_context.hpp +++ b/include/beman/net/detail/io_context.hpp @@ -10,14 +10,17 @@ #include #include #include +#ifdef BEMAN_NET_USE_URING +#include +#else #include +#endif #include #include #include #include #include -#include #include #include #include @@ -33,8 +36,12 @@ class io_context; class beman::net::io_context { private: +#ifdef BEMAN_NET_USE_URING + ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::uring_context()}; +#else ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::poll_context()}; - ::beman::net::detail::context_base& d_context{*this->d_owned}; +#endif + ::beman::net::detail::context_base& d_context{*this->d_owned}; public: using scheduler_type = ::beman::net::detail::io_context_scheduler; From 2ec9cc5d8eca677d36d516a8b24204d5ca7a0c5a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 4 Nov 2025 14:56:44 -0500 Subject: [PATCH 4/8] demo::task supports async cancellation with poll_context: when stop_source.request_stop() calls poll_context::cancel(), the target operation's io_base::cancel() is called inline. this calls sender_awaiter::stop() which transitions from stop_state::stopping to stopped. because request_stop() returns back to callback_t in stop_state::stopped, complete_stopped() is called there with uring_context: uring_context::cancel() is asynchronous, so callback_t is still in stop_state::stopping when request_stop() returns. run_one() eventually sees the target operation complete with ECANCELED and calls io_base::cancel(). when that calls sender_awaiter::stop(), the state is still stop_state::stopping, so complete_stopped() never gets called this causes beman.net.examples.client to exit before demo::scope gets the stop signal from the 2 coroutines: > ERROR: scope destroyed with live jobs: 2 to address this, callback_t now handles this async case by transitioning back to stop_state::running after stop_source.request_stop() returns Signed-off-by: Casey Bodley --- examples/demo_task.hpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/demo_task.hpp b/examples/demo_task.hpp index fd538e8..eb59d34 100644 --- a/examples/demo_task.hpp +++ b/examples/demo_task.hpp @@ -184,8 +184,12 @@ struct task { state->callback.reset(); state->handle->stop_state = task::stop_state::stopping; state->handle->stop_source.request_stop(); - if (state->handle->stop_state == task::stop_state::stopped) + if (state->handle->stop_state == task::stop_state::stopped) { this->object->handle->state->complete_stopped(); + } else { + // transition back to running so sender_awaiter::stop() can safely complete later + state->handle->stop_state = task::stop_state::running; + } } }; using stop_token = decltype(ex::get_stop_token(ex::get_env(::std::declval()))); From 017c56b3b9819774789037c750a17efa5cc373c8 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 9 Nov 2025 12:02:30 -0500 Subject: [PATCH 5/8] squash! add uring_context --- src/beman/net/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/beman/net/CMakeLists.txt b/src/beman/net/CMakeLists.txt index df5e388..439f08a 100644 --- a/src/beman/net/CMakeLists.txt +++ b/src/beman/net/CMakeLists.txt @@ -40,6 +40,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/sorted_list.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/stop_token.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/timer.hpp + ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/uring_context.hpp ) get_property( DETAIL_HEADER_FILES From 017838ff127af23a74482de38458e588d8aec9e3 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Jan 2026 16:34:56 -0500 Subject: [PATCH 6/8] squash! add uring_context 'outstanding' tracks all pending completions, not just the submitted ones --- include/beman/net/detail/uring_context.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/beman/net/detail/uring_context.hpp b/include/beman/net/detail/uring_context.hpp index e767561..13b02be 100644 --- a/include/beman/net/detail/uring_context.hpp +++ b/include/beman/net/detail/uring_context.hpp @@ -80,7 +80,6 @@ struct uring_context final : context_base { } assert(submitting >= r); submitting -= r; - outstanding += r; } auto get_sqe(io_base* completion) -> ::io_uring_sqe* { @@ -92,6 +91,7 @@ struct uring_context final : context_base { } ::io_uring_sqe_set_data(sqe, completion); ++submitting; + ++outstanding; return sqe; } @@ -128,7 +128,6 @@ struct uring_context final : context_base { } assert(submitting >= r); submitting -= r; - outstanding += r; } if (!outstanding) { From 945dca7fed3ef1e17bf3abc93443a2abb5d5cdfc Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Jan 2026 17:02:30 -0500 Subject: [PATCH 7/8] squash! add uring_context treat cancel() as a normal async operation whose completion is triggered by its corresponding cqe. remove the special case for this null completion in run_one() --- include/beman/net/detail/uring_context.hpp | 32 ++++++++++++---------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/include/beman/net/detail/uring_context.hpp b/include/beman/net/detail/uring_context.hpp index 13b02be..6bf581d 100644 --- a/include/beman/net/detail/uring_context.hpp +++ b/include/beman/net/detail/uring_context.hpp @@ -138,28 +138,30 @@ struct uring_context final : context_base { // read the next completion, waiting if necessary auto [res, completion] = wait(); - if (completion) { - // work() functions depend on res, so pass it in via 'extra' - completion->extra.reset(&res); - completion->work(*this, completion); - } + // work() functions depend on res, so pass it in via 'extra' + completion->extra.reset(&res); + completion->work(*this, completion); return 1; } auto cancel(io_base* cancel_op, io_base* op) -> void override { - auto sqe = get_sqe(nullptr); + cancel_op->work = [](context_base& ctx, io_base* io) { + auto res = *static_cast(io->extra.get()); + if (res == -ENOENT || res == -EALREADY) { // op already completed + io->cancel(); + return submit_result::ready; + } else if (res < 0) { + io->error(::std::error_code(-res, ::std::system_category())); + return submit_result::error; + } + io->complete(); + return submit_result::ready; + }; + + auto sqe = get_sqe(cancel_op); int flags = 0; ::io_uring_prep_cancel(sqe, op, flags); - - // use io_uring_prep_cancel() for asynchronous cancellation of op. - // cancel_op, aka sender_state::cancel_callback, lives inside of op's - // operation state. op's completion may race with this cancellation, - // causing that sender_state and its cancel_callback to be destroyed. - // so we can't pass cancel_op to io_uring_sqe_set_data() and attach a - // cancel_op->work() function to handle its completion in run_one(). - // instead, we just complete it here without waiting for the result - cancel_op->complete(); } auto schedule(task* t) -> void override { From 904380228e918e2f8e996cb0709b888d43a19506 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 4 Jan 2026 18:06:36 -0500 Subject: [PATCH 8/8] sender: complete/error() preserve cancel_callback for cancellation if the cancel_callback() has incremented d_outstanding such that sender_state::complete() or error() cannot complete immediately, avoid destroying the cancel_callback with d_callback.reset(). where cancellation is asynchronous, that memory needs to remain intact until the cancellation completes Signed-off-by: Casey Bodley --- include/beman/net/detail/sender.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/beman/net/detail/sender.hpp b/include/beman/net/detail/sender.hpp index 8c5f880..648e052 100644 --- a/include/beman/net/detail/sender.hpp +++ b/include/beman/net/detail/sender.hpp @@ -113,14 +113,14 @@ struct beman::net::detail::sender_state : Desc::operation, ::beman::net::detail: } } auto complete() -> void override final { - d_callback.reset(); if (0 == --this->d_outstanding) { + d_callback.reset(); this->d_data.set_value(*this, ::std::move(this->d_receiver)); } } auto error(::std::error_code err) -> void override final { - d_callback.reset(); if (0 == --this->d_outstanding) { + d_callback.reset(); ::beman::net::detail::ex::set_error(::std::move(this->d_receiver), std::move(err)); } }