Skip to content

Commit d1a015e

Browse files
committed
refactor: rename CurlData to CurlHandle
Also add comments to main structs
1 parent 82fe34c commit d1a015e

File tree

3 files changed

+80
-79
lines changed

3 files changed

+80
-79
lines changed

src/core.c

Lines changed: 54 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ static SPIPlanPtr ins_response_plan = NULL;
1616
static size_t
1717
body_cb(void *contents, size_t size, size_t nmemb, void *userp)
1818
{
19-
CurlData *cdata = (CurlData*) userp;
19+
CurlHandle *handle = (CurlHandle*) userp;
2020
size_t realsize = size * nmemb;
21-
appendBinaryStringInfo(cdata->body, (const char*)contents, (int)realsize);
21+
appendBinaryStringInfo(handle->body, (const char*)contents, (int)realsize);
2222
return realsize;
2323
}
2424

@@ -45,12 +45,12 @@ static struct curl_slist *pg_text_array_to_slist(ArrayType *array,
4545
return headers;
4646
}
4747

48-
void init_curl_handle(CurlData *cdata, RequestQueueRow row){
49-
cdata->id = row.id;
50-
cdata->body = makeStringInfo();
51-
cdata->ez_handle = curl_easy_init();
48+
void init_curl_handle(CurlHandle *handle, RequestQueueRow row){
49+
handle->id = row.id;
50+
handle->body = makeStringInfo();
51+
handle->ez_handle = curl_easy_init();
5252

53-
cdata->timeout_milliseconds = row.timeout_milliseconds;
53+
handle->timeout_milliseconds = row.timeout_milliseconds;
5454

5555
if (!row.headersBin.isnull) {
5656
ArrayType *pgHeaders = DatumGetArrayTypeP(row.headersBin.value);
@@ -60,57 +60,57 @@ void init_curl_handle(CurlData *cdata, RequestQueueRow row){
6060

6161
EREPORT_CURL_SLIST_APPEND(request_headers, "User-Agent: pg_net/" EXTVERSION);
6262

63-
cdata->request_headers = request_headers;
63+
handle->request_headers = request_headers;
6464
}
6565

66-
cdata->url = TextDatumGetCString(row.url);
66+
handle->url = TextDatumGetCString(row.url);
6767

68-
cdata->req_body = !row.bodyBin.isnull ? TextDatumGetCString(row.bodyBin.value) : NULL;
68+
handle->req_body = !row.bodyBin.isnull ? TextDatumGetCString(row.bodyBin.value) : NULL;
6969

70-
cdata->method = TextDatumGetCString(row.method);
70+
handle->method = TextDatumGetCString(row.method);
7171

72-
if (strcasecmp(cdata->method, "GET") != 0 && strcasecmp(cdata->method, "POST") != 0 && strcasecmp(cdata->method, "DELETE") != 0) {
73-
ereport(ERROR, errmsg("Unsupported request method %s", cdata->method));
72+
if (strcasecmp(handle->method, "GET") != 0 && strcasecmp(handle->method, "POST") != 0 && strcasecmp(handle->method, "DELETE") != 0) {
73+
ereport(ERROR, errmsg("Unsupported request method %s", handle->method));
7474
}
7575

76-
if (strcasecmp(cdata->method, "GET") == 0) {
77-
if (cdata->req_body) {
78-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_POSTFIELDS, cdata->req_body);
79-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_CUSTOMREQUEST, "GET");
76+
if (strcasecmp(handle->method, "GET") == 0) {
77+
if (handle->req_body) {
78+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
79+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_CUSTOMREQUEST, "GET");
8080
}
8181
}
8282

83-
if (strcasecmp(cdata->method, "POST") == 0) {
84-
if (cdata->req_body) {
85-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_POSTFIELDS, cdata->req_body);
83+
if (strcasecmp(handle->method, "POST") == 0) {
84+
if (handle->req_body) {
85+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
8686
}
8787
else {
88-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_POST, 1L);
89-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_POSTFIELDSIZE, 0L);
88+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POST, 1L);
89+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDSIZE, 0L);
9090
}
9191
}
9292

93-
if (strcasecmp(cdata->method, "DELETE") == 0) {
94-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_CUSTOMREQUEST, "DELETE");
95-
if (cdata->req_body) {
96-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_POSTFIELDS, cdata->req_body);
93+
if (strcasecmp(handle->method, "DELETE") == 0) {
94+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_CUSTOMREQUEST, "DELETE");
95+
if (handle->req_body) {
96+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body);
9797
}
9898
}
9999

100-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_WRITEFUNCTION, body_cb);
101-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_WRITEDATA, cdata);
102-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_HEADER, 0L);
103-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_URL, cdata->url);
104-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_HTTPHEADER, cdata->request_headers);
105-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_TIMEOUT_MS, (long) cdata->timeout_milliseconds);
106-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_PRIVATE, cdata);
107-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_FOLLOWLOCATION, (long) true);
100+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_WRITEFUNCTION, body_cb);
101+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_WRITEDATA, handle);
102+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HEADER, 0L);
103+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_URL, handle->url);
104+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HTTPHEADER, handle->request_headers);
105+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_TIMEOUT_MS, (long) handle->timeout_milliseconds);
106+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PRIVATE, handle);
107+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_FOLLOWLOCATION, (long) true);
108108
if (log_min_messages <= DEBUG2)
109-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_VERBOSE, 1L);
109+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_VERBOSE, 1L);
110110
#if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */
111-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_PROTOCOLS_STR, "http,https");
111+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PROTOCOLS_STR, "http,https");
112112
#else
113-
EREPORT_CURL_SETOPT(cdata->ez_handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
113+
EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
114114
#endif
115115
}
116116

@@ -244,36 +244,33 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
244244
return jsonb_headers;
245245
}
246246

247-
void insert_response(CURL *ez_handle, CURLcode curl_return_code){
247+
void insert_response(CurlHandle *handle, CURLcode curl_return_code){
248248
enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
249249
Datum vals[nparams];
250250
char nulls[nparams]; MemSet(nulls, 'n', nparams);
251251

252-
CurlData *cdata = NULL;
253-
EREPORT_CURL_GETINFO(ez_handle, CURLINFO_PRIVATE, &cdata);
254-
255-
vals[0] = Int64GetDatum(cdata->id);
252+
vals[0] = Int64GetDatum(handle->id);
256253
nulls[0] = ' ';
257254

258255
if (curl_return_code == CURLE_OK) {
259-
Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(ez_handle);
256+
Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(handle->ez_handle);
260257
long res_http_status_code = 0;
261258

262-
EREPORT_CURL_GETINFO(ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code);
259+
EREPORT_CURL_GETINFO(handle->ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code);
263260

264261
vals[1] = Int32GetDatum(res_http_status_code);
265262
nulls[1] = ' ';
266263

267-
if (cdata->body && cdata->body->data[0] != '\0'){
268-
vals[2] = CStringGetTextDatum(cdata->body->data);
264+
if (handle->body && handle->body->data[0] != '\0'){
265+
vals[2] = CStringGetTextDatum(handle->body->data);
269266
nulls[2] = ' ';
270267
}
271268

272269
vals[3] = JsonbPGetDatum(jsonb_headers);
273270
nulls[3] = ' ';
274271

275272
struct curl_header *hdr;
276-
if (curl_easy_header(ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == CURLHE_OK){
273+
if (curl_easy_header(handle->ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == CURLHE_OK){
277274
vals[4] = CStringGetTextDatum(hdr->value);
278275
nulls[4] = ' ';
279276
}
@@ -285,7 +282,7 @@ void insert_response(CURL *ez_handle, CURLcode curl_return_code){
285282
char *error_msg = NULL;
286283

287284
if (timed_out){
288-
error_msg = detailed_timeout_strerror(ez_handle, cdata->timeout_milliseconds).msg;
285+
error_msg = detailed_timeout_strerror(handle->ez_handle, handle->timeout_milliseconds).msg;
289286
} else {
290287
error_msg = (char *) curl_easy_strerror(curl_return_code);
291288
}
@@ -323,15 +320,15 @@ void insert_response(CURL *ez_handle, CURLcode curl_return_code){
323320
}
324321
}
325322

326-
void pfree_curl_data(CurlData *cdata){
327-
pfree(cdata->url);
328-
pfree(cdata->method);
329-
if(cdata->req_body)
330-
pfree(cdata->req_body);
323+
void pfree_handle(CurlHandle *handle){
324+
pfree(handle->url);
325+
pfree(handle->method);
326+
if(handle->req_body)
327+
pfree(handle->req_body);
331328

332-
if(cdata->body)
333-
destroyStringInfo(cdata->body);
329+
if(handle->body)
330+
destroyStringInfo(handle->body);
334331

335-
if(cdata->request_headers) //curl_slist_free_all already handles the NULL case, but be explicit about it
336-
curl_slist_free_all(cdata->request_headers);
332+
if(handle->request_headers) //curl_slist_free_all already handles the NULL case, but be explicit about it
333+
curl_slist_free_all(handle->request_headers);
337334
}

src/core.h

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,28 @@ typedef enum {
77
WS_EXITED,
88
} WorkerStatus;
99

10+
// the state of the background worker
1011
typedef struct {
1112
pg_atomic_uint32 got_restart;
1213
pg_atomic_uint32 should_wake;
1314
pg_atomic_uint32 status;
1415
Latch* shared_latch;
15-
ConditionVariable cv;
16+
ConditionVariable cv; // required to publish the state of the worker to other backends
1617
int epfd;
1718
CURLM *curl_mhandle;
1819
} WorkerState;
1920

21+
// A row coming from the http_request_queue
22+
typedef struct {
23+
int64 id;
24+
Datum method;
25+
Datum url;
26+
int32 timeout_milliseconds;
27+
NullableDatum headersBin;
28+
NullableDatum bodyBin;
29+
} RequestQueueRow;
30+
31+
// The curl easy handle plus additional data, this acts for both the request and response cycle
2032
typedef struct {
2133
int64 id;
2234
StringInfo body;
@@ -26,16 +38,7 @@ typedef struct {
2638
char *req_body;
2739
char *method;
2840
CURL *ez_handle;
29-
} CurlData;
30-
31-
typedef struct {
32-
int64 id;
33-
Datum method;
34-
Datum url;
35-
int32 timeout_milliseconds;
36-
NullableDatum headersBin;
37-
NullableDatum bodyBin;
38-
} RequestQueueRow;
41+
} CurlHandle;
3942

4043
uint64 delete_expired_responses(char *ttl, int batch_size);
4144

@@ -45,10 +48,10 @@ RequestQueueRow get_request_queue_row(HeapTuple spi_tupval, TupleDesc spi_tupdes
4548

4649
void set_curl_mhandle(WorkerState *wstate);
4750

48-
void insert_response(CURL *ez_handle, CURLcode curl_return_code);
51+
void insert_response(CurlHandle *handle, CURLcode curl_return_code);
4952

50-
void init_curl_handle(CurlData *cdata, RequestQueueRow row);
53+
void init_curl_handle(CurlHandle *handle, RequestQueueRow row);
5154

52-
void pfree_curl_data(CurlData *cdata);
55+
void pfree_handle(CurlHandle *handle);
5356

5457
#endif

src/worker.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,14 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
300300
elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
301301

302302
if(requests_consumed > 0){
303-
CurlData *cdatas = palloc(mul_size(sizeof(CurlData), requests_consumed));
303+
CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed));
304304

305305
// initialize curl handles
306306
for (size_t j = 0; j < requests_consumed; j++) {
307-
init_curl_handle(&cdatas[j], get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));
307+
init_curl_handle(&handles[j], get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));
308308

309309
EREPORT_MULTI(
310-
curl_multi_add_handle(worker_state->curl_mhandle, cdatas[j].ez_handle)
310+
curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle)
311311
);
312312
}
313313

@@ -353,7 +353,8 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
353353
CURLMsg *msg = NULL; int msgs_left=0;
354354
while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) {
355355
if (msg->msg == CURLMSG_DONE) {
356-
insert_response(msg->easy_handle, msg->data.result);
356+
CurlHandle *handle = NULL; EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle);
357+
insert_response(handle, msg->data.result);
357358
} else {
358359
ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
359360
}
@@ -365,15 +366,15 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
365366
// cleanup
366367
for(uint64 i = 0; i < requests_consumed; i++){
367368
EREPORT_MULTI(
368-
curl_multi_remove_handle(worker_state->curl_mhandle, cdatas[i].ez_handle)
369+
curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle)
369370
);
370371

371-
curl_easy_cleanup(cdatas[i].ez_handle);
372+
curl_easy_cleanup(handles[i].ez_handle);
372373

373-
pfree_curl_data(&cdatas[i]);
374+
pfree_handle(&handles[i]);
374375
}
375376

376-
pfree(cdatas);
377+
pfree(handles);
377378
}
378379

379380
SPI_finish();

0 commit comments

Comments
 (0)