diff --git a/.github/workflows/proof-alarm.yml b/.github/workflows/proof-alarm.yml index 39187aa33..e23414f1a 100644 --- a/.github/workflows/proof-alarm.yml +++ b/.github/workflows/proof-alarm.yml @@ -16,7 +16,7 @@ jobs: - name: Check run: | TMPFILE=$(mktemp) - echo "495506dac0244fd96f66dc1d11740db8 source/linux/epoll_event_loop.c" > $TMPFILE + echo "267fca6c3fc1f8cb58dc80e1926b23e0 source/linux/epoll_event_loop.c" > $TMPFILE md5sum --check $TMPFILE # No further steps if successful diff --git a/include/aws/io/event_loop.h b/include/aws/io/event_loop.h index 32d626869..0d9a7cddc 100644 --- a/include/aws/io/event_loop.h +++ b/include/aws/io/event_loop.h @@ -110,6 +110,7 @@ struct aws_event_loop { uint64_t latest_tick_start; size_t current_tick_latency_sum; struct aws_atomic_var next_flush_time; + struct aws_event_loop_group *owner; void *impl_data; }; @@ -125,6 +126,7 @@ struct aws_event_loop_local_object { struct aws_event_loop_options { aws_io_clock_fn *clock; struct aws_thread_options *thread_options; + struct aws_event_loop_group *owner; }; typedef struct aws_event_loop *(aws_new_event_loop_fn)( @@ -190,6 +192,22 @@ struct aws_event_loop *aws_event_loop_new_default_with_options( AWS_IO_API void aws_event_loop_destroy(struct aws_event_loop *event_loop); +/** + * If this event loop belongs to an aws_event_loop_group, acquire a hold on the group + * preventing it from being cleaned up. + * + * If the event loop is not in a group, this function does nothing, + * but only internal tests should ever create a naked event loop without a group. + */ +AWS_IO_API +void aws_event_loop_acquire_hold_on_group(struct aws_event_loop *event_loop); + +/** + * Release a hold on the event loop's group, allowing it to be cleaned up. + */ +AWS_IO_API +void aws_event_loop_release_hold_on_group(struct aws_event_loop *event_loop); + /** * Initializes common event-loop data structures. * This is only called from the *new() function of event loop implementations. diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c index 43130fa49..0262d0015 100644 --- a/source/bsd/kqueue_event_loop.c +++ b/source/bsd/kqueue_event_loop.c @@ -161,6 +161,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options( } clean_up_event_loop_base = true; + event_loop->owner = options->owner; + struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop)); if (!impl) { goto clean_up; diff --git a/source/channel.c b/source/channel.c index 1cf6886fa..beb3c64fe 100644 --- a/source/channel.c +++ b/source/channel.c @@ -80,7 +80,7 @@ struct aws_channel { size_t window_update_batch_emit_threshold; struct aws_channel_task window_update_task; bool read_back_pressure_enabled; - bool window_update_in_progress; + bool window_update_scheduled; }; struct channel_setup_args { @@ -214,7 +214,6 @@ struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aw AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: Beginning creation and setup of new channel.", (void *)channel); channel->alloc = alloc; - channel->loop = creation_args->event_loop; channel->on_shutdown_completed = creation_args->on_shutdown_completed; channel->shutdown_user_data = creation_args->shutdown_user_data; @@ -256,6 +255,10 @@ struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aw setup_args->on_setup_completed = creation_args->on_setup_completed; setup_args->user_data = creation_args->setup_user_data; + /* keep loop alive until channel is destroyed */ + channel->loop = creation_args->event_loop; + aws_event_loop_acquire_hold_on_group(channel->loop); + aws_task_init(&setup_args->task, s_on_channel_setup_complete, setup_args, "on_channel_setup_complete"); aws_event_loop_schedule_task_now(creation_args->event_loop, &setup_args->task); @@ -278,8 +281,6 @@ static void s_cleanup_slot(struct aws_channel_slot *slot) { } void aws_channel_destroy(struct aws_channel *channel) { - AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel); - aws_channel_release_hold(channel); } @@ -307,6 +308,8 @@ static void s_final_channel_deletion_task(struct aws_task *task, void *arg, enum aws_channel_set_statistics_handler(channel, NULL); + aws_event_loop_release_hold_on_group(channel->loop); + aws_mem_release(channel->alloc, channel); } @@ -322,6 +325,7 @@ void aws_channel_release_hold(struct aws_channel *channel) { if (prev_refcount == 1) { /* Refcount is now 0, finish cleaning up channel memory. */ + AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel); if (aws_channel_thread_is_callers_thread(channel)) { s_final_channel_deletion_task(NULL, channel, AWS_TASK_STATUS_RUN_READY); } else { @@ -833,6 +837,8 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar (void)channel_task; struct aws_channel *channel = arg; + channel->window_update_scheduled = false; + if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) { /* get the right-most slot to start the updates. */ struct aws_channel_slot *slot = channel->first; @@ -852,7 +858,6 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar "channel %p: channel update task failed with status %d", (void *)slot->channel, aws_last_error()); - slot->channel->window_update_in_progress = false; aws_channel_shutdown(channel, aws_last_error()); return; } @@ -860,7 +865,6 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar slot = slot->adj_left; } } - channel->window_update_in_progress = false; } int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) { @@ -869,9 +873,9 @@ int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t slot->current_window_update_batch_size = aws_add_size_saturating(slot->current_window_update_batch_size, window); - if (!slot->channel->window_update_in_progress && + if (!slot->channel->window_update_scheduled && slot->window_size <= slot->channel->window_update_batch_emit_threshold) { - slot->channel->window_update_in_progress = true; + slot->channel->window_update_scheduled = true; aws_channel_task_init( &slot->channel->window_update_task, s_window_update_task, slot->channel, "window update task"); aws_channel_schedule_task_now(slot->channel, &slot->channel->window_update_task); diff --git a/source/event_loop.c b/source/event_loop.c index 5870b87e8..39364f00e 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -4,6 +4,7 @@ */ #include +#include #include #include @@ -124,6 +125,7 @@ static struct aws_event_loop_group *s_event_loop_group_new( struct aws_event_loop_options options = { .clock = clock, .thread_options = &thread_options, + .owner = el_group, }; if (pin_threads) { @@ -385,6 +387,22 @@ void aws_event_loop_destroy(struct aws_event_loop *event_loop) { event_loop->vtable->destroy(event_loop); } +void aws_event_loop_acquire_hold_on_group(struct aws_event_loop *event_loop) { + AWS_ASSERT(event_loop); + + if (event_loop->owner) { + aws_event_loop_group_acquire(event_loop->owner); + } else { + AWS_LOGF_WARN(AWS_LS_IO_EVENT_LOOP, "id=%p: Event loop does not belong to a group.", (void *)event_loop); + } +} + +void aws_event_loop_release_hold_on_group(struct aws_event_loop *event_loop) { + if (event_loop && event_loop->owner) { + aws_event_loop_group_release(event_loop->owner); + } +} + int aws_event_loop_fetch_local_object( struct aws_event_loop *event_loop, void *key, diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c index 151ffef9c..db4dc63ad 100644 --- a/source/linux/epoll_event_loop.c +++ b/source/linux/epoll_event_loop.c @@ -127,6 +127,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options( goto clean_up_loop; } + loop->owner = options->owner; + struct epoll_loop *epoll_loop = aws_mem_calloc(alloc, 1, sizeof(struct epoll_loop)); if (!epoll_loop) { goto cleanup_base_loop; diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index 9ccce3007..14509edaa 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -189,6 +189,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options( } clean_up_event_loop_base = true; + event_loop->owner = options->owner; + impl = aws_mem_calloc(alloc, 1, sizeof(struct iocp_loop)); if (!impl) { goto clean_up; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 23a51cfc2..28d08c0de 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -88,6 +88,7 @@ add_test_case(channel_tasks_serialized_run) add_test_case(channel_rejects_post_shutdown_tasks) add_test_case(channel_cancels_pending_tasks) add_test_case(channel_duplicate_shutdown) +add_test_case(channel_keeps_event_loop_group_alive) add_net_test_case(channel_connect_some_hosts_timeout) add_net_test_case(test_default_with_ipv6_lookup) diff --git a/tests/channel_test.c b/tests/channel_test.c index 9a730a351..6b8d7f998 100644 --- a/tests/channel_test.c +++ b/tests/channel_test.c @@ -20,9 +20,10 @@ struct channel_setup_test_args { struct aws_mutex mutex; struct aws_condition_variable condition_variable; - bool setup_completed; /* protected by mutex */ - bool shutdown_completed; /* protected by mutex */ - int error_code; /* protected by mutex */ + bool setup_completed; /* protected by mutex */ + bool shutdown_completed; /* protected by mutex */ + int error_code; /* protected by mutex */ + bool event_loop_group_shutdown_completed; /* protected by mutex (not used by all tests) */ enum aws_task_status task_status; }; @@ -59,6 +60,19 @@ static int s_channel_setup_create_and_wait( return AWS_OP_SUCCESS; } +static void s_event_loop_group_on_shutdown_complete(void *user_data) { + struct channel_setup_test_args *setup_test_args = user_data; + aws_mutex_lock(&setup_test_args->mutex); + setup_test_args->event_loop_group_shutdown_completed = true; + aws_condition_variable_notify_all(&setup_test_args->condition_variable); + aws_mutex_unlock(&setup_test_args->mutex); +} + +static bool s_event_loop_group_shutdown_completed_predicate(void *arg) { + struct channel_setup_test_args *setup_test_args = (struct channel_setup_test_args *)arg; + return setup_test_args->event_loop_group_shutdown_completed; +} + static int s_test_channel_setup(struct aws_allocator *allocator, void *ctx) { (void)ctx; struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks); @@ -616,6 +630,77 @@ static int s_test_channel_duplicate_shutdown(struct aws_allocator *allocator, vo AWS_TEST_CASE(channel_duplicate_shutdown, s_test_channel_duplicate_shutdown) +/* This is a regression test. The channel didn't used to do anything to keep the event-loop alive. + * So if the event-loop-group was released before the channel, the loops would get destroyed, + * then the channel would try to schedule its own destruction task on the loop and crash. */ +static int s_test_channel_keeps_event_loop_group_alive(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + aws_io_library_init(allocator); + + struct channel_setup_test_args test_args = { + .mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + }; + + struct aws_shutdown_callback_options event_loop_group_shutdown_options = { + .shutdown_callback_fn = s_event_loop_group_on_shutdown_complete, + .shutdown_callback_user_data = &test_args, + }; + struct aws_event_loop_group *event_loop_group = + aws_event_loop_group_new_default(allocator, 1, &event_loop_group_shutdown_options); + ASSERT_NOT_NULL(event_loop_group); + + struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(event_loop_group); + + struct aws_channel_options channel_options = { + .on_setup_completed = s_channel_setup_test_on_setup_completed, + .setup_user_data = &test_args, + .on_shutdown_completed = s_channel_test_shutdown, + .shutdown_user_data = &test_args, + .event_loop = event_loop, + }; + + struct aws_channel *channel = NULL; + ASSERT_SUCCESS(s_channel_setup_create_and_wait(allocator, &channel_options, &test_args, &channel)); + + /* shut down channel, but don't clean it up yet */ + aws_channel_shutdown(channel, 0); + + ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &test_args.condition_variable, &test_args.mutex, s_channel_test_shutdown_predicate, &test_args)); + ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex)); + + /* release event loop group before channel */ + aws_event_loop_group_release(event_loop_group); + + /* wait a bit to ensure the event-loop-group doesn't shut down (because channel has a hold on it) */ + uint64_t wait_time = aws_timestamp_convert(500, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex)); + ASSERT_FAILS( + aws_condition_variable_wait_for_pred( + &test_args.condition_variable, + &test_args.mutex, + wait_time, + s_event_loop_group_shutdown_completed_predicate, + &test_args), + "Channel failed to keep event loop alive"); + ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex)); + + /* release channel for destruction */ + aws_channel_destroy(channel); + + /* event loop group should shut down now */ + ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &test_args.condition_variable, &test_args.mutex, s_event_loop_group_shutdown_completed_predicate, &test_args)); + ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex)); + + aws_io_library_clean_up(); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(channel_keeps_event_loop_group_alive, s_test_channel_keeps_event_loop_group_alive) + struct channel_connect_test_args { struct aws_mutex *mutex; struct aws_condition_variable cv;