diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c index 33a517e7b..937d5a527 100644 --- a/source/bsd/kqueue_event_loop.c +++ b/source/bsd/kqueue_event_loop.c @@ -919,6 +919,10 @@ static void aws_event_loop_thread(void *user_data) { handle_data->events_this_loop |= event_flags; } + /* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling + * the IO, do not immediately get executed in the scheduler run. */ + aws_task_scheduler_enter_deferment_boundary(&impl->thread_data.scheduler); + /* Invoke each handle's event callback (unless the handle has been unsubscribed) */ for (int i = 0; i < num_io_handle_events; ++i) { struct handle_data *handle_data = io_handle_events[i]; @@ -947,6 +951,8 @@ static void aws_event_loop_thread(void *user_data) { will not be run. That's ok, we'll handle them next time around. */ AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop); aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns); + /* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */ + aws_task_scheduler_exit_deferment_boundary(&impl->thread_data.scheduler); /* Set timeout for next kevent() call. * If clock fails, or scheduler has no tasks, use default timeout */ diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c index 5eacac915..463c001d7 100644 --- a/source/linux/epoll_event_loop.c +++ b/source/linux/epoll_event_loop.c @@ -100,6 +100,8 @@ struct epoll_event_data { aws_event_loop_on_event_fn *on_event; void *user_data; struct aws_task cleanup_task; + int event_type_mask; + struct aws_linked_list_node node; bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */ }; @@ -407,6 +409,7 @@ static int s_subscribe_to_io_events( epoll_event_data->handle = handle; epoll_event_data->on_event = on_event; epoll_event_data->is_subscribed = true; + aws_linked_list_node_reset(&epoll_event_data->node); /*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */ uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR; @@ -615,29 +618,49 @@ static void aws_event_loop_thread(void *args) { AWS_LOGF_TRACE( AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count); + + struct aws_linked_list deduped_events; + aws_linked_list_init(&deduped_events); + for (int i = 0; i < event_count; ++i) { struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr; - int event_mask = 0; + /* only do this once per event, this handles the case where the same fd has multiple events on it. */ + if (event_data->node.next == NULL) { + event_data->event_type_mask = 0; + aws_linked_list_push_back(&deduped_events, &event_data->node); + } + if (events[i].events & EPOLLIN) { - event_mask |= AWS_IO_EVENT_TYPE_READABLE; + event_data->event_type_mask |= AWS_IO_EVENT_TYPE_READABLE; } if (events[i].events & EPOLLOUT) { - event_mask |= AWS_IO_EVENT_TYPE_WRITABLE; + event_data->event_type_mask |= AWS_IO_EVENT_TYPE_WRITABLE; } if (events[i].events & EPOLLRDHUP) { - event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP; + event_data->event_type_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP; } if (events[i].events & EPOLLHUP) { - event_mask |= AWS_IO_EVENT_TYPE_CLOSED; + event_data->event_type_mask |= AWS_IO_EVENT_TYPE_CLOSED; } if (events[i].events & EPOLLERR) { - event_mask |= AWS_IO_EVENT_TYPE_ERROR; + event_data->event_type_mask |= AWS_IO_EVENT_TYPE_ERROR; } + } + + /* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling + * the IO, do not immediately get executed in the scheduler run. */ + aws_task_scheduler_enter_deferment_boundary(&epoll_loop->scheduler); + + /* this should now be unique per fd */ + while (!aws_linked_list_empty(&deduped_events)) { + struct aws_linked_list_node *deduped_node = aws_linked_list_pop_front(&deduped_events); + struct epoll_event_data *event_data = AWS_CONTAINER_OF(deduped_node, struct epoll_event_data, node); + aws_linked_list_node_reset(&event_data->node); if (event_data->is_subscribed) { AWS_LOGF_TRACE( @@ -645,7 +668,9 @@ static void aws_event_loop_thread(void *args) { "id=%p: activity on fd %d, invoking handler.", (void *)event_loop, event_data->handle->data.fd); - event_data->on_event(event_loop, event_data->handle, event_mask, event_data->user_data); + event_data->on_event( + event_loop, event_data->handle, event_data->event_type_mask, event_data->user_data); + event_data->event_type_mask = 0; } } @@ -658,6 +683,9 @@ static void aws_event_loop_thread(void *args) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop); aws_task_scheduler_run_all(&epoll_loop->scheduler, now_ns); + /* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */ + aws_task_scheduler_exit_deferment_boundary(&epoll_loop->scheduler); + /* set timeout for next epoll_wait() call. * if clock fails, or scheduler has no tasks, use default timeout */ bool use_default_timeout = false; diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index 666acbdd3..d01d162fc 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -120,7 +120,7 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code * or schedule a task to enforce fairness for other sockets in the event loop if we read up to the max * read per event loop tick. */ -static void s_do_read(struct socket_handler *socket_handler) { +static void s_do_read(struct socket_handler *socket_handler, bool from_task) { size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot); size_t max_to_read = @@ -134,6 +134,16 @@ static void s_do_read(struct socket_handler *socket_handler) { (unsigned long long)max_to_read); if (max_to_read == 0) { + if (from_task) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET_HANDLER, + "id=%p: do_read call was called from a task, but we've exceeded the available channel window." + "Scheduling another read task to avoid missing edge-triggers", + (void *)socket_handler->slot->handler); + aws_channel_task_init( + &socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_re_read"); + aws_channel_schedule_task_now(socket_handler->slot->channel, &socket_handler->read_task_storage); + } return; } @@ -218,7 +228,7 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code * then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make * sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket * closure, when in reality it was a TLS error */ - s_do_read(socket_handler); + s_do_read(socket_handler, false); if (error_code && !socket_handler->shutdown_in_progress) { aws_channel_shutdown(socket_handler->slot->channel, error_code); @@ -232,7 +242,7 @@ static void s_read_task(struct aws_channel_task *task, void *arg, aws_task_statu if (status == AWS_TASK_STATUS_RUN_READY) { struct socket_handler *socket_handler = arg; - s_do_read(socket_handler); + s_do_read(socket_handler, true); } } @@ -358,7 +368,7 @@ static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_ static void s_trigger_read(struct aws_channel_handler *handler) { struct socket_handler *socket_handler = (struct socket_handler *)handler->impl; - s_do_read(socket_handler); + s_do_read(socket_handler, false); } static struct aws_channel_handler_vtable s_vtable = { diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index 9ccce3007..04766b6f9 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -699,6 +699,10 @@ static void aws_event_loop_thread(void *user_data) { aws_event_loop_register_tick_start(event_loop); + /* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling + * the IO, do not immediately get executed in the scheduler run. */ + aws_task_scheduler_enter_deferment_boundary(&impl->thread_data.scheduler); + if (has_completion_entries) { AWS_LOGF_TRACE( AWS_LS_IO_EVENT_LOOP, @@ -746,6 +750,9 @@ static void aws_event_loop_thread(void *user_data) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop); aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns); + /* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */ + aws_task_scheduler_exit_deferment_boundary(&impl->thread_data.scheduler); + /* Set timeout for next GetQueuedCompletionStatus() call. * If clock fails, or scheduler has no tasks, use default timeout */ bool use_default_timeout = false;