@@ -94,8 +94,36 @@ struct wg_callback
94
94
pthread_mutex_t send_lock ;
95
95
c_complain_t init_complaint ;
96
96
cdtime_t last_connect_time ;
97
+
98
+ /* Force reconnect useful for load balanced environments */
99
+ cdtime_t last_reconnect_time ;
100
+ cdtime_t reconnect_interval ;
101
+ _Bool reconnect_interval_reached ;
97
102
};
98
103
104
+ /* wg_force_reconnect_check closes cb->sock_fd when it was open for longer
105
+ * than cb->reconnect_interval. Must hold cb->send_lock when calling. */
106
+ static void wg_force_reconnect_check (struct wg_callback * cb )
107
+ {
108
+ cdtime_t now ;
109
+
110
+ if (cb -> reconnect_interval == 0 )
111
+ return ;
112
+
113
+ /* check if address changes if addr_timeout */
114
+ now = cdtime ();
115
+ if ((now - cb -> last_reconnect_time ) < cb -> reconnect_interval )
116
+ return ;
117
+
118
+ /* here we should close connection on next */
119
+ close (cb -> sock_fd );
120
+ cb -> sock_fd = -1 ;
121
+ cb -> last_reconnect_time = now ;
122
+ cb -> reconnect_interval_reached = 1 ;
123
+
124
+ INFO ("write_graphite plugin: Connection closed after %.3f seconds." ,
125
+ CDTIME_T_TO_DOUBLE (now - cb -> last_reconnect_time ));
126
+ }
99
127
100
128
/*
101
129
* Functions
@@ -250,7 +278,13 @@ static int wg_callback_init (struct wg_callback *cb)
250
278
cb -> node , cb -> service , cb -> protocol );
251
279
}
252
280
253
- wg_reset_buffer (cb );
281
+ /* wg_force_reconnect_check does not flush the buffer before closing a
282
+ * sending socket, so only call wg_reset_buffer() if the socket was closed
283
+ * for a different reason (tracked in cb->reconnect_interval_reached). */
284
+ if (!cb -> reconnect_interval_reached || (cb -> send_buf_free == 0 ))
285
+ wg_reset_buffer (cb );
286
+ else
287
+ cb -> reconnect_interval_reached = 0 ;
254
288
255
289
return (0 );
256
290
}
@@ -326,6 +360,8 @@ static int wg_send_message (char const *message, struct wg_callback *cb)
326
360
327
361
pthread_mutex_lock (& cb -> send_lock );
328
362
363
+ wg_force_reconnect_check (cb );
364
+
329
365
if (cb -> sock_fd < 0 )
330
366
{
331
367
status = wg_callback_init (cb );
@@ -462,6 +498,9 @@ static int wg_config_node (oconfig_item_t *ci)
462
498
cb -> node = strdup (WG_DEFAULT_NODE );
463
499
cb -> service = strdup (WG_DEFAULT_SERVICE );
464
500
cb -> protocol = strdup (WG_DEFAULT_PROTOCOL );
501
+ cb -> last_reconnect_time = cdtime ();
502
+ cb -> reconnect_interval = 0 ;
503
+ cb -> reconnect_interval_reached = 0 ;
465
504
cb -> log_send_errors = WG_DEFAULT_LOG_SEND_ERRORS ;
466
505
cb -> prefix = NULL ;
467
506
cb -> postfix = NULL ;
@@ -502,6 +541,8 @@ static int wg_config_node (oconfig_item_t *ci)
502
541
status = -1 ;
503
542
}
504
543
}
544
+ else if (strcasecmp ("ReconnectInterval" , child -> key ) == 0 )
545
+ cf_util_get_cdtime (child , & cb -> reconnect_interval );
505
546
else if (strcasecmp ("LogSendErrors" , child -> key ) == 0 )
506
547
cf_util_get_boolean (child , & cb -> log_send_errors );
507
548
else if (strcasecmp ("Prefix" , child -> key ) == 0 )
0 commit comments