Skip to content

DRAFT: Add example for async runtime #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/nginx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ env:
NGX_TEST_FILES: examples/t
NGX_TEST_GLOBALS_DYNAMIC: >-
load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_tokio_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_awssigv4_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_curl_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_shared_dict_module.so;
Expand Down Expand Up @@ -185,9 +186,8 @@ jobs:
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
nginx/objs/**/CACHEDIR.TAG
nginx/objs/**/ngx-debug
nginx/objs/**/ngx-release
# Windows nmake implementation doesn't support .PHONY. Don't cache
# anything make creates because it might not rebuild correctly
key: ${{ runner.os }}-nginx-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-nginx-

Expand Down
12 changes: 10 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ idna_adapter = "=1.1.0"
libc = "0.2.140"
tokio = { version = "1.33.0", features = ["full"] }


[[example]]
name = "async"
path = "async.rs"
crate-type = ["cdylib"]
required-features = [ "async" ]

[[example]]
name = "curl"
path = "curl.rs"
Expand All @@ -47,8 +54,8 @@ path = "upstream.rs"
crate-type = ["cdylib"]

[[example]]
name = "async"
path = "async.rs"
name = "tokio"
path = "tokio.rs"
crate-type = ["cdylib"]

[[example]]
Expand All @@ -65,3 +72,4 @@ default = ["export-modules", "ngx/vendored"]
# See https://github.com/rust-lang/rust/issues/20267
export-modules = []
linux = []
async = [ "ngx/async" ]
245 changes: 125 additions & 120 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
use std::ffi::{c_char, c_void};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Instant;

use ngx::async_::{sleep, spawn, Task};
use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt,
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG,
ngx_array_push, ngx_buf_t, ngx_chain_t, ngx_command_t, ngx_conf_t, ngx_http_finalize_request,
ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE,
ngx_http_read_client_request_body, ngx_http_request_t, ngx_int_t, ngx_module_t, ngx_str_t,
ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
NGX_HTTP_SPECIAL_RESPONSE, NGX_LOG_EMERG,
};
use ngx::http::{self, HttpModule, MergeConfigError};
use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use tokio::runtime::Runtime;

struct Module;

impl http::HttpModule for Module {
fn module() -> &'static ngx_module_t {
unsafe { &*::core::ptr::addr_of!(ngx_http_async_module) }
unsafe { &*std::ptr::addr_of!(ngx_http_async_module) }
}

unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
Expand Down Expand Up @@ -96,47 +94,33 @@ impl http::Merge for ModuleConfig {
}
}

unsafe extern "C" fn check_async_work_done(event: *mut ngx_event_t) {
let ctx = ngx::ngx_container_of!(event, RequestCTX, event);
let c: *mut ngx_connection_t = (*event).data.cast();

if (*ctx).done.load(Ordering::Relaxed) {
// Triggering async_access_handler again
ngx_post_event((*c).write, addr_of_mut!(ngx_posted_events));
} else {
// this doesn't have have good performance but works as a simple thread-safe example and
// doesn't causes segfault. The best method that provides both thread-safety and
// performance requires an nginx patch.
ngx_post_event(event, addr_of_mut!(ngx_posted_next_events));
}
}

struct RequestCTX {
done: Arc<AtomicBool>,
event: ngx_event_t,
task: Option<tokio::task::JoinHandle<()>>,
}
extern "C" fn ngx_http_async_commands_set_enable(
cf: *mut ngx_conf_t,
_cmd: *mut ngx_command_t,
conf: *mut c_void,
) -> *mut c_char {
unsafe {
let conf = &mut *(conf as *mut ModuleConfig);
let args: &[ngx_str_t] = (*(*cf).args).as_slice();
let val = match args[1].to_str() {
Ok(s) => s,
Err(_) => {
ngx_conf_log_error!(NGX_LOG_EMERG, cf, "`async` argument is not utf-8 encoded");
return ngx::core::NGX_CONF_ERROR;
}
};

impl Default for RequestCTX {
fn default() -> Self {
Self {
done: AtomicBool::new(false).into(),
event: unsafe { std::mem::zeroed() },
task: Default::default(),
}
}
}
// set default value optionally
conf.enable = false;

impl Drop for RequestCTX {
fn drop(&mut self) {
if let Some(handle) = self.task.take() {
handle.abort();
if val.eq_ignore_ascii_case("on") {
conf.enable = true;
} else if val.eq_ignore_ascii_case("off") {
conf.enable = false;
}
};

if self.event.posted() != 0 {
unsafe { ngx::ffi::ngx_delete_posted_event(&mut self.event) };
}
}
ngx::core::NGX_CONF_OK
}

http_request_handler!(async_access_handler, |request: &mut http::Request| {
Expand All @@ -148,96 +132,117 @@ http_request_handler!(async_access_handler, |request: &mut http::Request| {
return core::Status::NGX_DECLINED;
}

if let Some(ctx) =
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
if request
.get_module_ctx::<Task<()>>(unsafe { &*std::ptr::addr_of!(ngx_http_async_module) })
.is_some()
{
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
}

return core::Status::NGX_OK;
return core::Status::NGX_DONE;
}

let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
let rc =
unsafe { ngx_http_read_client_request_body(request.into(), Some(content_event_handler)) };
if rc as u32 >= NGX_HTTP_SPECIAL_RESPONSE {
return core::Status(rc);
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();
core::Status::NGX_DONE
});

let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
extern "C" fn content_event_handler(request: *mut ngx_http_request_t) {
let task = spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
sleep(std::time::Duration::from_secs(2)).await;

let req = unsafe { http::Request::from_ngx_http_request(request) };
req.add_header_out(
"X-Async-Time",
start.elapsed().as_millis().to_string().as_str(),
);
req.set_status(http::HTTPStatus::OK);
req.send_header();
let buf = req.pool().calloc(std::mem::size_of::<ngx_buf_t>()) as *mut ngx_buf_t;
unsafe {
(*buf).set_last_buf(if req.is_main() { 1 } else { 0 });
(*buf).set_last_in_chain(1);
}
req.output_filter(&mut ngx_chain_t {
buf,
next: std::ptr::null_mut(),
});

unsafe {
ngx::ffi::ngx_post_event(
(*(*request).connection).write,
std::ptr::addr_of_mut!(ngx::ffi::ngx_posted_events),
);
}
});

done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
// and use the same trick as the thread pool)
}));
let req = unsafe { http::Request::from_ngx_http_request(request) };

core::Status::NGX_AGAIN
});
let ctx = req.pool().allocate::<Task<()>>(task);
if ctx.is_null() {
unsafe { ngx_http_finalize_request(request, core::Status::NGX_ERROR.into()) };
return;
}
req.set_module_ctx(ctx.cast(), unsafe {
&*std::ptr::addr_of!(ngx_http_async_module)
});
unsafe { (*request).write_event_handler = Some(write_event_handler) };
}

extern "C" fn ngx_http_async_commands_set_enable(
cf: *mut ngx_conf_t,
_cmd: *mut ngx_command_t,
conf: *mut c_void,
) -> *mut c_char {
unsafe {
let conf = &mut *(conf as *mut ModuleConfig);
let args: &[ngx_str_t] = (*(*cf).args).as_slice();
let val = match args[1].to_str() {
Ok(s) => s,
Err(_) => {
ngx_conf_log_error!(NGX_LOG_EMERG, cf, "`async` argument is not utf-8 encoded");
return ngx::core::NGX_CONF_ERROR;
}
};
extern "C" fn write_event_handler(request: *mut ngx_http_request_t) {
let req = unsafe { http::Request::from_ngx_http_request(request) };
if let Some(task) =
req.get_module_ctx::<Task<()>>(unsafe { &*std::ptr::addr_of!(ngx_http_async_module) })
{
if task.is_finished() {
unsafe { ngx_http_finalize_request(request, core::Status::NGX_OK.into()) };
return;
}
}

// set default value optionally
conf.enable = false;
let write_event =
unsafe { (*(*request).connection).write.as_ref() }.expect("write event is not null");
if write_event.timedout() != 0 {
unsafe {
ngx::ffi::ngx_connection_error(
(*request).connection,
ngx::ffi::NGX_ETIMEDOUT as i32,
c"client timed out".as_ptr() as *mut _,
)
};
return;
}

if val.eq_ignore_ascii_case("on") {
conf.enable = true;
} else if val.eq_ignore_ascii_case("off") {
conf.enable = false;
}
};
if unsafe { ngx::ffi::ngx_http_output_filter(request, std::ptr::null_mut()) }
== ngx::ffi::NGX_ERROR as isize
{
// Client error
return;
}
let clcf =
NgxHttpCoreModule::location_conf(unsafe { request.as_ref().expect("request not null") })
.expect("http core server conf");

ngx::core::NGX_CONF_OK
}
if unsafe {
ngx::ffi::ngx_handle_write_event(std::ptr::from_ref(write_event) as *mut _, clcf.send_lowat)
} != ngx::ffi::NGX_OK as isize
{
// Client error
return;
}

fn ngx_http_async_runtime() -> &'static Runtime {
// Should not be called from the master process
assert_ne!(
unsafe { ngx::ffi::ngx_process },
ngx::ffi::NGX_PROCESS_MASTER as _
);

static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime init")
})
if write_event.delayed() == 0 {
if (write_event.active() != 0) && (write_event.ready() == 0) {
unsafe {
ngx::ffi::ngx_add_timer(
std::ptr::from_ref(write_event) as *mut _,
clcf.send_timeout,
)
}
} else if write_event.timer_set() != 0 {
unsafe { ngx::ffi::ngx_del_timer(std::ptr::from_ref(write_event) as *mut _) }
}
}
}
9 changes: 9 additions & 0 deletions examples/config
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ if [ $HTTP = YES ]; then
ngx_module_name=ngx_http_async_module
ngx_module_libs="-lm"
ngx_rust_target_name=async
ngx_rust_target_features=async

ngx_rust_module
fi

if :; then
ngx_module_name=ngx_http_tokio_module
ngx_module_libs="-lm"
ngx_rust_target_name=tokio

ngx_rust_module
fi
Expand Down
Loading
Loading