From bd7eed3778d3ba43723fc8811156ae2966069d07 Mon Sep 17 00:00:00 2001 From: DerThorsten Date: Wed, 20 Mar 2024 09:43:45 +0100 Subject: [PATCH] async execute request --- include/xeus/xinterpreter.hpp | 4 +- include/xeus/xrequest_context.hpp | 13 +++++ src/xinterpreter.cpp | 2 +- src/xkernel_core.cpp | 90 +++++++++++++++++-------------- src/xkernel_core.hpp | 7 ++- src/xmock_interpreter.hpp | 4 +- src/xrequest_context.cpp | 18 +++++++ test/xmock_interpreter.cpp | 6 +-- test/xmock_interpreter.hpp | 2 +- 9 files changed, 96 insertions(+), 50 deletions(-) diff --git a/include/xeus/xinterpreter.hpp b/include/xeus/xinterpreter.hpp index 41847ce8..50da36d9 100644 --- a/include/xeus/xinterpreter.hpp +++ b/include/xeus/xinterpreter.hpp @@ -42,7 +42,7 @@ namespace xeus void configure(); - nl::json execute_request(xrequest_context context, + nl::json execute_request( xexecute_request_context context, const std::string& code, bool silent, bool store_history, @@ -101,7 +101,7 @@ namespace xeus virtual void configure_impl() = 0; - virtual nl::json execute_request_impl(xrequest_context request_context, + virtual nl::json execute_request_impl(xexecute_request_context request_context, int execution_counter, const std::string& code, bool silent, diff --git a/include/xeus/xrequest_context.hpp b/include/xeus/xrequest_context.hpp index 9de54c9a..bf4d6485 100644 --- a/include/xeus/xrequest_context.hpp +++ b/include/xeus/xrequest_context.hpp @@ -16,6 +16,8 @@ #include "xeus/xmessage.hpp" // for xmessage::guid_list #include "xeus/xserver.hpp" // for channel +#include // for std::function + namespace nl = nlohmann; namespace xeus @@ -38,6 +40,17 @@ namespace xeus channel m_origin; guid_list m_id; }; + + + class XEUS_API xexecute_request_context : public xrequest_context + { + public: + xexecute_request_context(nl::json header, channel origin, guid_list id, std::function on_send_reply); + + void send_reply(nl::json reply); + private: + std::function m_on_send_reply; + }; } #endif diff --git a/src/xinterpreter.cpp b/src/xinterpreter.cpp index 536dce12..d58057b9 100644 --- a/src/xinterpreter.cpp +++ b/src/xinterpreter.cpp @@ -29,7 +29,7 @@ namespace xeus configure_impl(); } - nl::json xinterpreter::execute_request(xrequest_context context, + nl::json xinterpreter::execute_request(xexecute_request_context context, const std::string& code, bool silent, bool store_history, diff --git a/src/xkernel_core.cpp b/src/xkernel_core.cpp index 98ac5a20..a617cbfe 100644 --- a/src/xkernel_core.cpp +++ b/src/xkernel_core.cpp @@ -42,20 +42,20 @@ namespace xeus , p_history_manager(history_manager) , p_debugger(debugger) { - // Request handlers - m_handler["execute_request"] = &xkernel_core::execute_request; - m_handler["complete_request"] = &xkernel_core::complete_request; - m_handler["inspect_request"] = &xkernel_core::inspect_request; - m_handler["history_request"] = &xkernel_core::history_request; - m_handler["is_complete_request"] = &xkernel_core::is_complete_request; - m_handler["comm_info_request"] = &xkernel_core::comm_info_request; - m_handler["comm_open"] = &xkernel_core::comm_open; - m_handler["comm_close"] = &xkernel_core::comm_close; - m_handler["comm_msg"] = &xkernel_core::comm_msg; - m_handler["kernel_info_request"] = &xkernel_core::kernel_info_request; - m_handler["shutdown_request"] = &xkernel_core::shutdown_request; - m_handler["interrupt_request"] = &xkernel_core::interrupt_request; - m_handler["debug_request"] = &xkernel_core::debug_request; + // Request handlers (all but execute_request are blocking) + m_handler["execute_request"] = handler_type{&xkernel_core::execute_request, /*blocking*/ false}; + m_handler["complete_request"] = handler_type{&xkernel_core::complete_request, true}; + m_handler["inspect_request"] = handler_type{&xkernel_core::inspect_request, true}; + m_handler["history_request"] = handler_type{&xkernel_core::history_request, true}; + m_handler["is_complete_request"] = handler_type{&xkernel_core::is_complete_request, true}; + m_handler["comm_info_request"] = handler_type{&xkernel_core::comm_info_request, true}; + m_handler["comm_open"] = handler_type{&xkernel_core::comm_open, true}; + m_handler["comm_close"] = handler_type{&xkernel_core::comm_close, true}; + m_handler["comm_msg"] = handler_type{&xkernel_core::comm_msg, true}; + m_handler["kernel_info_request"] = handler_type{&xkernel_core::kernel_info_request, true}; + m_handler["shutdown_request"] = handler_type{&xkernel_core::shutdown_request, true}; + m_handler["interrupt_request"] = handler_type{&xkernel_core::interrupt_request, true}; + m_handler["debug_request"] = handler_type{&xkernel_core::debug_request, true}; // Server bindings p_server->register_shell_listener(std::bind(&xkernel_core::dispatch_shell, this, _1)); @@ -195,7 +195,7 @@ namespace xeus std::string msg_type = header.value("msg_type", ""); handler_type handler = get_handler(msg_type); - if (handler == nullptr) + if (handler.fptr == nullptr) { std::cerr << "ERROR: received unknown message" << std::endl; std::cerr << "Message type: " << msg_type << std::endl; @@ -204,7 +204,7 @@ namespace xeus { try { - (this->*handler)(std::move(msg), c); + (this->*(handler.fptr))(std::move(msg), c); } catch (std::exception& e) { @@ -212,14 +212,17 @@ namespace xeus std::cerr << "Message type: " << msg_type << std::endl; } } - - publish_status(header, "idle", c); + // async handlers need to set the idle status themselves + if(handler.blocking) + { + publish_status(header, "idle", c); + } } auto xkernel_core::get_handler(const std::string& msg_type) -> handler_type { auto iter = m_handler.find(msg_type); - handler_type res = (iter == m_handler.end()) ? nullptr : iter->second; + handler_type res = (iter == m_handler.end()) ? handler_type{nullptr} : iter->second; return res; } @@ -237,29 +240,36 @@ namespace xeus bool stop_on_error = content.value("stop_on_error", false); nl::json metadata = get_metadata(); - xrequest_context request_context(request.header(), c, request.identities()); - nl::json reply = p_interpreter->execute_request(std::move(request_context), + xexecute_request_context request_context(request.header(), c, request.identities(), + [this, silent, store_history, code, stop_on_error](const xexecute_request_context & ctx , nl::json reply) + { + this->send_reply(ctx.id(), "execute_reply", ctx.header(), nl::json::object(), std::move(reply), ctx.origin()); + + int execution_count = reply.value("execution_count", 1); + std::string status = reply.value("status", "error"); + + + if (!silent && store_history) + { + this->p_history_manager->store_inputs(0, execution_count, code); + } + + if (!silent && status == "error" && stop_on_error) + { + constexpr long polling_interval = 50; + p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval); + } + + + // idle + publish_status("idle", ctx.header(), ctx.origin()); + + } + ); + + p_interpreter->execute_request(std::move(request_context), code, silent, store_history, std::move(user_expression), allow_stdin); - int execution_count = reply.value("execution_count", 1); - std::string status = reply.value("status", "error"); - send_reply( - request.identities(), - "execute_reply", - request.header(), - std::move(metadata), - std::move(reply), - c); - - if (!silent && store_history) - { - p_history_manager->store_inputs(0, execution_count, code); - } - if (!silent && status == "error" && stop_on_error) - { - constexpr long polling_interval = 50; - p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval); - } } catch (std::exception& e) { diff --git a/src/xkernel_core.hpp b/src/xkernel_core.hpp index a901e354..d7904fe5 100644 --- a/src/xkernel_core.hpp +++ b/src/xkernel_core.hpp @@ -72,7 +72,12 @@ namespace xeus private: - using handler_type = void (xkernel_core::*)(xmessage, channel); + using handler_fptr_type = void (xkernel_core::*)(xmessage, channel); + + struct handler_type{ + handler_fptr_type fptr = nullptr; + bool blocking = true; + }; void dispatch(xmessage msg, channel c); diff --git a/src/xmock_interpreter.hpp b/src/xmock_interpreter.hpp index 5ef465c3..6536205e 100644 --- a/src/xmock_interpreter.hpp +++ b/src/xmock_interpreter.hpp @@ -38,7 +38,7 @@ namespace xeus { } - nl::json execute_request_impl(xrequest_context /*request_context*/, + nl::json execute_request_impl(xexecute_request_context request_context, int /*execution_counter*/, const std::string& /*code*/, bool /*silent*/, @@ -46,7 +46,7 @@ namespace xeus nl::json /*user_expressions*/, bool /*allow_stdin*/) override { - return nl::json(); + request_context.send_reply(nl::json()); } nl::json complete_request_impl(const std::string& /*code*/, int /*cursor_pos*/) override diff --git a/src/xrequest_context.cpp b/src/xrequest_context.cpp index bb11c783..6fd38295 100644 --- a/src/xrequest_context.cpp +++ b/src/xrequest_context.cpp @@ -21,4 +21,22 @@ namespace xeus { return m_id; } + + + void xexecute_request_context::send_reply(nl::json reply) + { + m_on_send_reply(*this, std::move(reply)); + } + + + xexecute_request_context::xexecute_request_context(nl::json header, + channel origin, + guid_list id, + std::function on_send_reply) + : xrequest_context(std::move(header), + origin, + std::move(id)), + m_on_send_reply(std::move(on_send_reply)) + { + } } \ No newline at end of file diff --git a/test/xmock_interpreter.cpp b/test/xmock_interpreter.cpp index f498db49..d8a149aa 100644 --- a/test/xmock_interpreter.cpp +++ b/test/xmock_interpreter.cpp @@ -29,7 +29,7 @@ namespace xeus using function_type = std::function; } - nl::json xmock_interpreter::execute_request_impl(xrequest_context request_context, + nl::json xmock_interpreter::execute_request_impl(xexecute_request_context ctx, int execution_counter, const std::string& code, bool /* silent */, @@ -62,14 +62,14 @@ namespace xeus {"start", 0} }); - return xeus::create_successful_reply(payload); + ctx.send_reply(xeus::create_successful_reply(payload)); } nl::json pub_data; pub_data["text/plain"] = code; publish_execution_result(request_context, execution_counter, std::move(pub_data), nl::json::object()); - return xeus::create_successful_reply(); + ctx.send_reply( xeus::create_successful_reply()); } nl::json xmock_interpreter::complete_request_impl(const std::string& /* code */, diff --git a/test/xmock_interpreter.hpp b/test/xmock_interpreter.hpp index 2ec010c7..b0b1b99d 100644 --- a/test/xmock_interpreter.hpp +++ b/test/xmock_interpreter.hpp @@ -25,7 +25,7 @@ namespace xeus void configure_impl() override; - nl::json execute_request_impl(xrequest_context request_context, + nl::json execute_request_impl(xexecute_request_context request_context, int execution_counter, const std::string& code, bool silent,