Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ else
endif

EXTENSION = pg_net
EXTVERSION = 0.19.5
EXTVERSION = 0.19.6

DATA = $(wildcard sql/*--*.sql)

Expand Down
1 change: 1 addition & 0 deletions sql/pg_net--0.19.5--0.19.6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- no SQL changes in 0.19.6
144 changes: 73 additions & 71 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,61 +169,6 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
return affected_rows;
}

static void insert_failure_response(CURL *ez_handle, CURLcode return_code, int64 id, int32 timeout_milliseconds){
SPI_connect();

const char* error_msg;
if (return_code == CURLE_OPERATION_TIMEDOUT){
error_msg = detailed_timeout_strerror(ez_handle, timeout_milliseconds).msg;
} else {
error_msg = curl_easy_strerror(return_code);
}

int ret_code = SPI_execute_with_args("\
insert into net._http_response(id, error_msg) values ($1, $2)",
2,
(Oid[]){INT8OID, CSTRINGOID},
(Datum[]){Int64GetDatum(id), CStringGetDatum(error_msg)},
NULL, false, 1);

if (ret_code != SPI_OK_INSERT)
{
ereport(ERROR, errmsg("Error when inserting failed response: %s", SPI_result_code_string(ret_code)));
}

SPI_finish();
}

static void insert_success_response(CurlData *cdata, long http_status_code, char *contentType, Jsonb *jsonb_headers){
SPI_connect();

int ret_code = SPI_execute_with_args("\
insert into net._http_response(id, status_code, content, headers, content_type, timed_out) values ($1, $2, $3, $4, $5, $6)",
6,
(Oid[]){INT8OID, INT4OID, CSTRINGOID, JSONBOID, CSTRINGOID, BOOLOID},
(Datum[]){
Int64GetDatum(cdata->id)
, Int32GetDatum(http_status_code)
, CStringGetDatum(cdata->body->data)
, JsonbPGetDatum(jsonb_headers)
, CStringGetDatum(contentType)
, BoolGetDatum(false) // timed_out is false here as it's a success
},
(char[6]){
' '
, [2] = cdata->body->data[0] == '\0'? 'n' : ' '
, [4] = !contentType? 'n' :' '
},
false, 1);

if (ret_code != SPI_OK_INSERT)
{
ereport(ERROR, errmsg("Error when inserting successful response: %s", SPI_result_code_string(ret_code)));
}

SPI_finish();
}

uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx){
SPI_connect();

Expand Down Expand Up @@ -282,8 +227,9 @@ uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext
}

static void pfree_curl_data(CurlData *cdata){
pfree(cdata->body->data);
pfree(cdata->body);
if(cdata->body){
destroyStringInfo(cdata->body);
}
if(cdata->request_headers) //curl_slist_free_all already handles the NULL case, but be explicit about it
curl_slist_free_all(cdata->request_headers);
}
Expand All @@ -307,6 +253,71 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
return jsonb_headers;
}

static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_return_code){
enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
Datum vals[nparams];
char nulls[nparams]; MemSet(nulls, 'n', nparams);

vals[0] = Int64GetDatum(cdata->id);
nulls[0] = ' ';

if (curl_return_code == CURLE_OK) {
Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(ez_handle);
long res_http_status_code = 0;

EREPORT_CURL_GETINFO(ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code);

vals[1] = Int32GetDatum(res_http_status_code);
nulls[1] = ' ';

if (cdata->body && cdata->body->data[0] != '\0'){
vals[2] = CStringGetTextDatum(cdata->body->data);
nulls[2] = ' ';
}

vals[3] = JsonbPGetDatum(jsonb_headers);
nulls[3] = ' ';

struct curl_header *hdr;
if (curl_easy_header(ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == CURLHE_OK){
vals[4] = CStringGetTextDatum(hdr->value);
nulls[4] = ' ';
}

vals[5] = BoolGetDatum(false);
nulls[5] = ' ';
} else {
bool timed_out = curl_return_code == CURLE_OPERATION_TIMEDOUT;
char *error_msg = NULL;

if (timed_out){
error_msg = detailed_timeout_strerror(ez_handle, cdata->timeout_milliseconds).msg;
} else {
error_msg = (char *) curl_easy_strerror(curl_return_code);
}

vals[5] = BoolGetDatum(timed_out);
nulls[5] = ' ';

if (error_msg){
vals[6] = CStringGetTextDatum(error_msg);
nulls[6] = ' ';
}
}

int ret_code = SPI_execute_with_args("\
insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)",
nparams,
(Oid[nparams]){INT8OID, INT4OID, TEXTOID, JSONBOID, TEXTOID, BOOLOID, TEXTOID},
vals, nulls,
false, 1);

if (ret_code != SPI_OK_INSERT)
{
ereport(ERROR, errmsg("Error when inserting response: %s", SPI_result_code_string(ret_code)));
}
}

// Switch back to the curl memory context, which has the curl handles stored
void insert_curl_responses(WorkerState *wstate, MemoryContext curl_memctx){
MemoryContext old_ctx = MemoryContextSwitchTo(curl_memctx);
Expand All @@ -321,21 +332,11 @@ void insert_curl_responses(WorkerState *wstate, MemoryContext curl_memctx){
CurlData *cdata = NULL;
EREPORT_CURL_GETINFO(ez_handle, CURLINFO_PRIVATE, &cdata);

if (return_code != CURLE_OK) {
insert_failure_response(ez_handle, return_code, cdata->id, cdata->timeout_milliseconds);
} else {
char *contentType;
EREPORT_CURL_GETINFO(ez_handle, CURLINFO_CONTENT_TYPE, &contentType);

long http_status_code;
EREPORT_CURL_GETINFO(ez_handle, CURLINFO_RESPONSE_CODE, &http_status_code);
SPI_connect();
insert_response(ez_handle, cdata, return_code);
SPI_finish();

Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(ez_handle);

insert_success_response(cdata, http_status_code, contentType, jsonb_headers);

pfree_curl_data(cdata);
}
pfree_curl_data(cdata);

int res = curl_multi_remove_handle(curl_mhandle, ez_handle);
if(res != CURLM_OK)
Expand All @@ -349,3 +350,4 @@ void insert_curl_responses(WorkerState *wstate, MemoryContext curl_memctx){

MemoryContextSwitchTo(old_ctx);
}

17 changes: 17 additions & 0 deletions src/pg_prelude.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@
#pragma GCC diagnostic pop

#define PG15_GTE (PG_VERSION_NUM >= 150000)
#define PG17_LT (PG_VERSION_NUM < 170000)

const char *xact_event_name(XactEvent event);

#if PG17_LT
void destroyStringInfo(StringInfo str);
#endif

#endif /* PG_PRELUDE_H */

#ifdef PG_PRELUDE_IMPL
Expand All @@ -68,4 +74,15 @@ const char *xact_event_name(XactEvent event){
}
}


#if PG17_LT
// Polyfill for pg < 17
// see https://github.com/postgres/postgres/blob/3c4e26a62c31ebe296e3aedb13ac51a7a35103bd/src/common/stringinfo.c#L402-L416
void destroyStringInfo(StringInfo str) {
Assert(str->maxlen != 0);
pfree(str->data);
pfree(str);
}
#endif

#endif /* PG_PRELUDE_IMPL */
14 changes: 10 additions & 4 deletions test/test_http_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ def test_http_get_timeout_reached(sess):
# wait for timeout
time.sleep(7)

(response,) = sess.execute(
(content_type, content, response,timed_out) = sess.execute(
text(
"""
select error_msg from net._http_response where id = :request_id;
select content_type, content, error_msg, timed_out from net._http_response where id = :request_id;
"""
),
{"request_id": request_id},
).fetchone()

assert content_type == None
assert content == None
assert timed_out == True
assert response.startswith("Timeout of 5000 ms reached")


Expand Down Expand Up @@ -66,10 +69,10 @@ def test_http_detailed_timeout(sess):
# wait for timeout
time.sleep(2.1)

(response,) = sess.execute(
(content_type, content, response,timed_out) = sess.execute(
text(
"""
select error_msg from net._http_response where id = :request_id;
select content_type, content, error_msg, timed_out from net._http_response where id = :request_id;
"""
),
{"request_id": request_id},
Expand All @@ -82,6 +85,9 @@ def test_http_detailed_timeout(sess):
tcp_ssl_time = float(match.group('C'))
http_time = float(match.group('D'))

assert content_type == None
assert content == None
assert timed_out == True
assert total_time > 0
assert dns_time > 0
assert tcp_ssl_time > 0
Expand Down