Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Error Handling For Connection Manager #507

Merged
merged 12 commits into from
Mar 27, 2025
51 changes: 25 additions & 26 deletions source/connection_manager.c
Original file line number Diff line number Diff line change
@@ -1164,7 +1164,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect

struct aws_http_connection_manager *manager = work->manager;

int representative_error = 0;
size_t new_connection_failures = 0;

/*
@@ -1218,14 +1217,18 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
for (size_t i = 0; i < work->new_connections; ++i) {
if (s_aws_http_connection_manager_new_connection(manager)) {
++new_connection_failures;
representative_error = aws_last_error();
int error = aws_last_error();
if (push_errors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just assert on the array list allocation failure. That code is probably written when the allocation can still fail and we want to handle it properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &representative_error) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &error) == AWS_OP_SUCCESS);
}
}
}

if (new_connection_failures > 0) {
/* If there are pending acquisitions, schedule work again to try acquiring them */
struct aws_connection_management_transaction updated_work;
s_aws_connection_management_transaction_init(&updated_work, manager);
bool has_pending_acquisitions = false;
/*
* We failed and aren't going to receive a callback, but the current state assumes we will receive
* a callback. So we need to re-lock and update the state ourselves.
@@ -1235,31 +1238,29 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
AWS_FATAL_ASSERT(manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] >= new_connection_failures);
s_connection_manager_internal_ref_decrease(manager, AWS_HCMCT_PENDING_CONNECTIONS, new_connection_failures);

/*
* Rather than failing one acquisition for each connection failure, if there's at least one
* connection failure, we instead fail all excess acquisitions, since there's no pending
* connect that will necessarily resolve them.
*
* Try to correspond an error with the acquisition failure, but as a fallback just use the
* representative error.
*/
size_t i = 0;
while (manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS]) {
int error = representative_error;
if (i < aws_array_list_length(&errors)) {
aws_array_list_get_at(&errors, &error, i);
}

for (size_t i = 0; i < new_connection_failures &&
manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] be an assert?

We just decrease the AWS_HCMCT_PENDING_CONNECTIONS by new_connection_failures. If this condition happens, it means at somepoint, we have more AWS_HCMCT_PENDING_CONNECTIONS then pending_acquisition_count, which is probably wrong.

And it is confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be an assert because there is a gap between when we calculate the new_connection_failures and acquire a lock, so it's possible that by the time we come here, the pending acquisitions that failed were already completed by something else like idle connections returning to the pool. I have simplified the check to just validate that we have pending acquisitions before trying to complete them with an error.

i++) {
int error;
aws_array_list_get_at(&errors, &error, i);
AWS_LOGF_DEBUG(
AWS_LS_HTTP_CONNECTION_MANAGER,
"id=%p: Failing excess connection acquisition with error code %d",
"id=%p: Failing connection acquisition with error code %d",
(void *)manager,
(int)error);
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error, &work->completions);
++i;
}

has_pending_acquisitions =
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] < manager->pending_acquisition_count;
if (has_pending_acquisitions) {
s_aws_http_connection_manager_build_transaction(&updated_work);
}
aws_mutex_unlock(&manager->lock);
if (has_pending_acquisitions) {
s_aws_http_connection_manager_execute_transaction(&updated_work);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of this recursive call happens in the mid of the function.
let's move it to the tail of the function.
https://www.geeksforgeeks.org/tail-recursion/

Also, it could lead to all the connection fails in the recursion and then callbacks starts to happen, which seems not be expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link, I have updated it.

} else {
s_aws_connection_management_transaction_clean_up(&updated_work);
}
}

/*
@@ -1268,7 +1269,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
s_aws_http_connection_manager_complete_acquisitions(&work->completions, work->allocator);

aws_array_list_clean_up(&errors);

/*
* Step 5 - Clean up work. Do this here rather than at the end of every caller. Destroy the manager if necessary
*/
@@ -1479,12 +1479,11 @@ static void s_cm_on_connection_ready_or_failed(
work->connection_to_release = connection;
}
} else {
/* fail acquisition as one connection cannot be used any more */
while (manager->pending_acquisition_count >
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count) {
if (manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert?
I spent a while try to figure out what those numbers are. We only start connecting when there is some acquisition from user.

Copy link
Contributor Author

@waahm7 waahm7 Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it's possible that by the time we got an async error from connection callback, the pending acquisition is already complete by something else. I have simplified the check to make it less confusing.

/* fail acquisition as connection acquire failed */
AWS_LOGF_DEBUG(
AWS_LS_HTTP_CONNECTION_MANAGER,
"id=%p: Failing excess connection acquisition with error code %d",
"id=%p: Failing connection acquisition with error code %d",
(void *)manager,
(int)error_code);
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error_code, &work->completions);
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -529,6 +529,8 @@ add_net_test_case(connection_manager_setup_shutdown)
add_net_test_case(connection_manager_acquire_release_mix_synchronous)
add_net_test_case(connection_manager_acquisition_timeout)
add_net_test_case(connection_manager_connect_callback_failure)
add_net_test_case(test_connection_manager_connect_callback_async_failure)
add_net_test_case(test_connection_manager_connect_callback_async_with_immediate_failure)
add_net_test_case(connection_manager_connect_immediate_failure)
add_net_test_case(connection_manager_proxy_setup_shutdown)
add_net_test_case(connection_manager_idle_culling_single)
142 changes: 139 additions & 3 deletions tests/test_connection_manager.c
Original file line number Diff line number Diff line change
@@ -819,7 +819,7 @@ AWS_TEST_CASE(
connection_manager_max_pending_acquisitions_with_vended_connections,
s_test_connection_manager_max_pending_acquisitions_with_vended_connections);

static int s_aws_http_connection_manager_create_connection_sync_mock(
static int s_aws_http_connection_manager_create_connection_validate(
const struct aws_http_client_connection_options *options) {
struct cm_tester *tester = &s_tester;

@@ -830,8 +830,6 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
ASSERT_TRUE(aws_byte_cursor_eq_c_str(&interface_name, options->socket_options->network_interface_name));
}

size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);

ASSERT_SUCCESS(aws_mutex_lock(&tester->lock));
tester->release_connection_fn = options->on_shutdown;
ASSERT_SUCCESS(aws_mutex_unlock(&tester->lock));
@@ -847,7 +845,15 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
ASSERT_UINT_EQUALS(options->proxy_options->connection_type, tester->verify_proxy_options->connection_type);
}

return AWS_OP_SUCCESS;
}
static int s_aws_http_connection_manager_create_connection_sync_mock(
const struct aws_http_client_connection_options *options) {
s_aws_http_connection_manager_create_connection_validate(options);
struct cm_tester *tester = &s_tester;

struct mock_connection *connection = NULL;
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);

if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
@@ -868,6 +874,63 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
}

struct connect_task_args {
void *user_data;
struct mock_connection *connection;
aws_http_on_client_connection_setup_fn *on_setup;
};

static void s_aws_http_connection_manager_connect_task(
struct aws_task *task,
void *user_data,
enum aws_task_status status) {
(void)status;
struct cm_tester *tester = &s_tester;

struct connect_task_args *task_args = user_data;
struct mock_connection *connection = task_args->connection;
if (connection) {
if (connection->result == AWS_NCRT_SUCCESS) {
task_args->on_setup((struct aws_http_connection *)connection, AWS_ERROR_SUCCESS, task_args->user_data);
} else if (connection->result == AWS_NCRT_ERROR_VIA_CALLBACK) {
task_args->on_setup(NULL, AWS_ERROR_HTTP_UNKNOWN, task_args->user_data);
} else {
AWS_FATAL_ASSERT(0 && "Unexpected connection->result");
}
}

aws_mem_release(tester->allocator, task);
aws_mem_release(tester->allocator, task_args);
}

static int s_aws_http_connection_manager_connect_async_mock(const struct aws_http_client_connection_options *options) {
s_aws_http_connection_manager_create_connection_validate(options);
struct cm_tester *tester = &s_tester;

struct mock_connection *connection = NULL;
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);
if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
}

if (connection->result == AWS_NCRT_ERROR_FROM_CREATE) {
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
}

struct aws_task *task = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_task));
struct connect_task_args *task_args = aws_mem_calloc(options->allocator, 1, sizeof(struct connect_task_args));
task_args->connection = connection;
task_args->user_data = options->user_data;
task_args->on_setup = options->on_setup;
aws_task_init(task, s_aws_http_connection_manager_connect_task, task_args, "create_connection_task");

struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(tester->event_loop_group);
uint64_t now;
ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now));
aws_event_loop_schedule_task_future(event_loop, task, now + 1000000000);
return AWS_OP_SUCCESS;
}

static void s_aws_http_connection_manager_release_connection_sync_mock(struct aws_http_connection *connection) {
(void)connection;

@@ -920,6 +983,17 @@ static struct aws_http_connection_manager_system_vtable s_synchronous_mocks = {
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
};

static struct aws_http_connection_manager_system_vtable s_async_connect_mock = {
.aws_http_client_connect = s_aws_http_connection_manager_connect_async_mock,
.aws_http_connection_release = s_aws_http_connection_manager_release_connection_sync_mock,
.aws_http_connection_close = s_aws_http_connection_manager_close_connection_sync_mock,
.aws_http_connection_new_requests_allowed = s_aws_http_connection_manager_is_connection_available_sync_mock,
.aws_high_res_clock_get_ticks = aws_high_res_clock_get_ticks,
.aws_http_connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock,
.aws_channel_thread_is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock,
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
};

static int s_test_connection_manager_with_network_interface_list(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
struct aws_byte_cursor *interface_names_array = aws_mem_calloc(allocator, 3, sizeof(struct aws_byte_cursor));
@@ -1055,6 +1129,68 @@ static int s_test_connection_manager_connect_callback_failure(struct aws_allocat
}
AWS_TEST_CASE(connection_manager_connect_callback_failure, s_test_connection_manager_connect_callback_failure);

static int s_test_connection_manager_connect_callback_async_failure(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
int error_connections = 5;
int success_connections = 5;
struct cm_tester_options options = {
.allocator = allocator,
.max_connections = error_connections,
.mock_table = &s_async_connect_mock,
};

ASSERT_SUCCESS(s_cm_tester_init(&options));

s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);
s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);

s_acquire_connections(error_connections + success_connections);

ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));

ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
ASSERT_SUCCESS(s_release_connections(success_connections, false));

ASSERT_SUCCESS(s_cm_tester_clean_up());

return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(
test_connection_manager_connect_callback_async_failure,
s_test_connection_manager_connect_callback_async_failure);

static int s_test_connection_manager_connect_callback_async_with_immediate_failure(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;
int error_connections = 5;
int success_connections = 5;
struct cm_tester_options options = {
.allocator = allocator,
.max_connections = error_connections + success_connections,
.mock_table = &s_async_connect_mock,
};

ASSERT_SUCCESS(s_cm_tester_init(&options));

s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);
s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);

s_acquire_connections(error_connections + success_connections);

ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));

ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
ASSERT_SUCCESS(s_release_connections(success_connections, false));

ASSERT_SUCCESS(s_cm_tester_clean_up());

return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(
test_connection_manager_connect_callback_async_with_immediate_failure,
s_test_connection_manager_connect_callback_async_with_immediate_failure);

static int s_test_connection_manager_connect_immediate_failure(struct aws_allocator *allocator, void *ctx) {
(void)ctx;