99#include  "event.h" 
1010#include  "errors.h" 
1111
12- typedef  struct  {
13-   int64  id ;
14-   StringInfo  body ;
15-   struct  curl_slist *  request_headers ;
16-   int32  timeout_milliseconds ;
17- } CurlData ;
18- 
1912static  size_t 
2013body_cb (void  * contents , size_t  size , size_t  nmemb , void  * userp )
2114{
@@ -48,18 +41,15 @@ static struct curl_slist *pg_text_array_to_slist(ArrayType *array,
4841    return  headers ;
4942}
5043
51- // We need a different memory context here, as the parent function will have an SPI memory context, which has a shorter lifetime. 
52- static  void  init_curl_handle (CURLM  * curl_mhandle , MemoryContext  curl_memctx , int64  id , Datum  urlBin , NullableDatum  bodyBin , NullableDatum  headersBin , Datum  methodBin , int32  timeout_milliseconds ){
53-   MemoryContext  old_ctx  =  MemoryContextSwitchTo (curl_memctx );
54- 
55-   CurlData  * cdata  =  palloc (sizeof (CurlData ));
56-   cdata -> id    =  id ;
44+ void  init_curl_handle (CurlData  * cdata , RequestQueueRow  row ){
45+   cdata -> id    =  row .id ;
5746  cdata -> body  =  makeStringInfo ();
47+   cdata -> ez_handle  =  curl_easy_init ();
5848
59-   cdata -> timeout_milliseconds  =  timeout_milliseconds ;
49+   cdata -> timeout_milliseconds  =  row . timeout_milliseconds ;
6050
61-   if  (!headersBin .isnull ) {
62-     ArrayType  * pgHeaders  =  DatumGetArrayTypeP (headersBin .value );
51+   if  (!row . headersBin .isnull ) {
52+     ArrayType  * pgHeaders  =  DatumGetArrayTypeP (row . headersBin .value );
6353    struct  curl_slist  * request_headers  =  NULL ;
6454
6555    request_headers  =  pg_text_array_to_slist (pgHeaders , request_headers );
@@ -69,64 +59,55 @@ static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int
6959    cdata -> request_headers  =  request_headers ;
7060  }
7161
72-   char   * url  =  TextDatumGetCString (urlBin );
62+   cdata -> url  =  TextDatumGetCString (row . url );
7363
74-   char   * reqBody   =  !bodyBin .isnull  ? TextDatumGetCString (bodyBin .value ) : NULL ;
64+   cdata -> req_body   =  !row . bodyBin .isnull  ? TextDatumGetCString (row . bodyBin .value ) : NULL ;
7565
76-   char  * method  =  TextDatumGetCString (methodBin );
77-   if  (strcasecmp (method , "GET" ) !=  0  &&  strcasecmp (method , "POST" ) !=  0  &&  strcasecmp (method , "DELETE" ) !=  0 ) {
78-     ereport (ERROR , errmsg ("Unsupported request method %s" , method ));
79-   }
66+   cdata -> method  =  TextDatumGetCString (row .method );
8067
81-   CURL   * curl_ez_handle   =   curl_easy_init (); 
82-   if (! curl_ez_handle ) 
83-      ereport ( ERROR ,  errmsg ( "curl_easy_init()" )); 
68+   if  ( strcasecmp ( cdata -> method ,  "GET" )  !=   0   &&   strcasecmp ( cdata -> method ,  "POST" )  !=   0   &&   strcasecmp ( cdata -> method ,  "DELETE" )  !=   0 ) { 
69+      ereport ( ERROR ,  errmsg ( "Unsupported request method %s" ,  cdata -> method )); 
70+   } 
8471
85-   if  (strcasecmp (method , "GET" ) ==  0 ) {
86-     if  (reqBody ) {
87-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
88-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
72+   if  (strcasecmp (cdata -> method , "GET" ) ==  0 ) {
73+     if  (cdata -> req_body ) {
74+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
75+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
8976    }
9077  }
9178
92-   if  (strcasecmp (method , "POST" ) ==  0 ) {
93-     if  (reqBody ) {
94-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
79+   if  (strcasecmp (cdata -> method , "POST" ) ==  0 ) {
80+     if  (cdata -> req_body ) {
81+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
9582    }
9683    else  {
97-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POST , 1L );
98-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
84+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POST , 1L );
85+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
9986    }
10087  }
10188
102-   if  (strcasecmp (method , "DELETE" ) ==  0 ) {
103-     EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
104-     if  (reqBody ) {
105-       EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
89+   if  (strcasecmp (cdata -> method , "DELETE" ) ==  0 ) {
90+     EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
91+     if  (cdata -> req_body ) {
92+       EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
10693    }
10794  }
10895
109-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
110-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEDATA , cdata );
111-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HEADER , 0L );
112-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_URL , url );
113-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
114-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
115-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PRIVATE , cdata );
116-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
96+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
97+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEDATA , cdata );
98+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HEADER , 0L );
99+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_URL , cdata -> url );
100+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
101+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
102+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PRIVATE , cdata );
103+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
117104  if  (log_min_messages  <= DEBUG2 )
118-     EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_VERBOSE , 1L );
105+     EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_VERBOSE , 1L );
119106#if  LIBCURL_VERSION_NUM  >= 0x075500  /* libcurl 7.85.0 */ 
120-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
107+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
121108#else 
122-   EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP  | CURLPROTO_HTTPS );
109+   EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP  | CURLPROTO_HTTPS );
123110#endif 
124- 
125-   EREPORT_MULTI (
126-     curl_multi_add_handle (curl_mhandle , curl_ez_handle )
127-   );
128- 
129-   MemoryContextSwitchTo (old_ctx );
130111}
131112
132113void  set_curl_mhandle (WorkerState  * wstate ){
@@ -137,8 +118,6 @@ void set_curl_mhandle(WorkerState *wstate){
137118}
138119
139120uint64  delete_expired_responses (char  * ttl , int  batch_size ){
140-   SPI_connect ();
141- 
142121  int  ret_code  =  SPI_execute_with_args ("\ 
143122    WITH\ 
144123    rows AS (\ 
@@ -164,14 +143,10 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
164143    ereport (ERROR , errmsg ("Error expiring response table rows: %s" , SPI_result_code_string (ret_code )));
165144  }
166145
167-   SPI_finish ();
168- 
169146  return  affected_rows ;
170147}
171148
172- uint64  consume_request_queue (CURLM  * curl_mhandle , int  batch_size , MemoryContext  curl_memctx ){
173-   SPI_connect ();
174- 
149+ uint64  consume_request_queue (const  int  batch_size ){
175150  int  ret_code  =  SPI_execute_with_args ("\ 
176151    WITH\ 
177152    rows AS (\ 
@@ -191,47 +166,40 @@ uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext
191166  if  (ret_code  !=  SPI_OK_DELETE_RETURNING )
192167    ereport (ERROR , errmsg ("Error getting http request queue: %s" , SPI_result_code_string (ret_code )));
193168
194-   uint64  affected_rows  =  SPI_processed ;
195- 
196-   for  (size_t  j  =  0 ; j  <  affected_rows ; j ++ ) {
197-     bool  tupIsNull  =  false;
198- 
199-     int64  id  =  DatumGetInt64 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 1 , & tupIsNull ));
200-     EREPORT_NULL_ATTR (tupIsNull , id );
169+   return  SPI_processed ;
170+ }
201171
202-     int32  timeout_milliseconds  =  DatumGetInt32 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 4 , & tupIsNull ));
203-     EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
172+ // This has an implicit dependency on the execution of delete_return_request_queue, 
173+ // unfortunately we're not able to make this dependency explicit 
174+ // due to the design of SPI (which uses global variables) 
175+ RequestQueueRow  get_request_queue_row (HeapTuple  spi_tupval , TupleDesc  spi_tupdesc ){
176+   bool  tupIsNull  =  false;
204177
205-      Datum   method   =   SPI_getbinval (SPI_tuptable -> vals [ j ],  SPI_tuptable -> tupdesc ,  2 , & tupIsNull );
206-      EREPORT_NULL_ATTR (tupIsNull , method );
178+   int64   id   =   DatumGetInt64 ( SPI_getbinval (spi_tupval ,  spi_tupdesc ,  1 , & tupIsNull ) );
179+   EREPORT_NULL_ATTR (tupIsNull , id );
207180
208-      Datum  url  =  SPI_getbinval (SPI_tuptable -> vals [ j ],  SPI_tuptable -> tupdesc ,  3 , & tupIsNull );
209-      EREPORT_NULL_ATTR (tupIsNull , url );
181+   Datum  method  =  SPI_getbinval (spi_tupval ,  spi_tupdesc ,  2 , & tupIsNull );
182+   EREPORT_NULL_ATTR (tupIsNull , method );
210183
211-     NullableDatum  headersBin  =  {
212-       .value  =  SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 5 , & tupIsNull ),
213-       .isnull  =  tupIsNull 
214-     };
184+   Datum  url  =  SPI_getbinval (spi_tupval , spi_tupdesc , 3 , & tupIsNull );
185+   EREPORT_NULL_ATTR (tupIsNull , url );
215186
216-     NullableDatum  bodyBin  =  {
217-       .value  =  SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 6 , & tupIsNull ),
218-       .isnull  =  tupIsNull 
219-     };
187+   int32  timeout_milliseconds  =  DatumGetInt32 (SPI_getbinval (spi_tupval , spi_tupdesc , 4 , & tupIsNull ));
188+   EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
220189
221-     init_curl_handle (curl_mhandle , curl_memctx , id , url , bodyBin , headersBin , method , timeout_milliseconds );
222-   }
190+   NullableDatum  headersBin  =  {
191+     .value  =  SPI_getbinval (spi_tupval , spi_tupdesc , 5 , & tupIsNull ),
192+     .isnull  =  tupIsNull 
193+   };
223194
224-   SPI_finish ();
195+   NullableDatum  bodyBin  =  {
196+     .value  =  SPI_getbinval (spi_tupval , spi_tupdesc , 6 , & tupIsNull ),
197+     .isnull  =  tupIsNull 
198+   };
225199
226-   return  affected_rows ;
227- }
228- 
229- static  void  pfree_curl_data (CurlData  * cdata ){
230-   if (cdata -> body ){
231-     destroyStringInfo (cdata -> body );
232-   }
233-   if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it 
234-     curl_slist_free_all (cdata -> request_headers );
200+   return  (RequestQueueRow ){
201+     id , method , url , timeout_milliseconds , headersBin , bodyBin 
202+   };
235203}
236204
237205static  Jsonb  * jsonb_headers_from_curl_handle (CURL  * ez_handle ){
@@ -253,11 +221,14 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
253221  return  jsonb_headers ;
254222}
255223
256- static   void  insert_response (CURL  * ez_handle ,  CurlData   * cdata , CURLcode  curl_return_code ){
224+ void  insert_response (CURL  * ez_handle , CURLcode  curl_return_code ){
257225  enum  { nparams  =  7  }; // using an enum because const size_t nparams doesn't compile 
258226  Datum  vals [nparams ];
259227  char   nulls [nparams ]; MemSet (nulls , 'n' , nparams );
260228
229+   CurlData  * cdata  =  NULL ;
230+   EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
231+ 
261232  vals [0 ] =  Int64GetDatum (cdata -> id );
262233  nulls [0 ] =  ' ' ;
263234
@@ -318,36 +289,15 @@ static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_retu
318289  }
319290}
320291
321- // Switch back to the curl memory context, which has the curl handles stored 
322- void  insert_curl_responses (WorkerState  * wstate , MemoryContext  curl_memctx ){
323-   MemoryContext  old_ctx  =  MemoryContextSwitchTo (curl_memctx );
324-   int  msgs_left = 0 ;
325-   CURLMsg  * msg  =  NULL ;
326-   CURLM  * curl_mhandle  =  wstate -> curl_mhandle ;
327- 
328-   while  ((msg  =  curl_multi_info_read (curl_mhandle , & msgs_left ))) {
329-     if  (msg -> msg  ==  CURLMSG_DONE ) {
330-       CURLcode  return_code  =  msg -> data .result ;
331-       CURL  * ez_handle =  msg -> easy_handle ;
332-       CurlData  * cdata  =  NULL ;
333-       EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
334- 
335-       SPI_connect ();
336-       insert_response (ez_handle , cdata , return_code );
337-       SPI_finish ();
338- 
339-       pfree_curl_data (cdata );
292+ void  pfree_curl_data (CurlData  * cdata ){
293+   pfree (cdata -> url );
294+   pfree (cdata -> method );
295+   if (cdata -> req_body )
296+     pfree (cdata -> req_body );
340297
341-       int  res  =  curl_multi_remove_handle (curl_mhandle , ez_handle );
342-       if (res  !=  CURLM_OK )
343-         ereport (ERROR , errmsg ("curl_multi_remove_handle: %s" , curl_multi_strerror (res )));
344- 
345-       curl_easy_cleanup (ez_handle );
346-     } else  {
347-       ereport (ERROR , errmsg ("curl_multi_info_read(), CURLMsg=%d\n" , msg -> msg ));
348-     }
349-   }
298+   if (cdata -> body )
299+     destroyStringInfo (cdata -> body );
350300
351-   MemoryContextSwitchTo (old_ctx );
301+   if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it 
302+     curl_slist_free_all (cdata -> request_headers );
352303}
353- 
0 commit comments