Skip to content

Commit

Permalink
Merge pull request #4528 from sysown/v2.x_monitor_slave_lag_when_null…
Browse files Browse the repository at this point in the history
…_4521

Introduced new HG attribute 'monitor_slave_lag_when_null' which takes precedence over 'mysql_thread_monitor_slave_lag_when_null'
  • Loading branch information
renecannao authored May 20, 2024
2 parents 4f419e6 + 88af1f1 commit cdfcfdc
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 49 deletions.
11 changes: 9 additions & 2 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class MyHGC { // MySQL Host Group Container
char * ignore_session_variables_text; // this is the original version (text format) of ignore_session_variables
uint32_t max_num_online_servers;
uint32_t throttle_connections_per_sec;
int32_t monitor_slave_lag_when_null;
int8_t autocommit;
int8_t free_connections_pct;
int8_t handle_warnings;
Expand All @@ -316,6 +317,10 @@ class MyHGC { // MySQL Host Group Container
bool handle_warnings_enabled() const {
return attributes.configured == true && attributes.handle_warnings != -1 ? attributes.handle_warnings : mysql_thread___handle_warnings;
}
inline
int32_t get_monitor_slave_lag_when_null() const {
return attributes.configured == true && attributes.monitor_slave_lag_when_null != -1 ? attributes.monitor_slave_lag_when_null : mysql_thread___monitor_slave_lag_when_null;
}
MyHGC(int);
~MyHGC();
MySrvC *get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms, MySQL_Session *sess);
Expand Down Expand Up @@ -549,9 +554,10 @@ using address_t = std::string;
using port_t = unsigned int;
using read_only_t = int;
using current_replication_lag = int;
using override_replication_lag = bool;

using read_only_server_t = std::tuple<hostname_t,port_t,read_only_t>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag,override_replication_lag>;

enum READ_ONLY_SERVER_T {
ROS_HOSTNAME = 0,
Expand All @@ -565,6 +571,7 @@ enum REPLICATION_LAG_SERVER_T {
RLS_ADDRESS,
RLS_PORT,
RLS_CURRENT_REPLICATION_LAG,
RLS_OVERRIDE_REPLICATION_LAG,
RLS__SIZE
};

Expand Down Expand Up @@ -1101,7 +1108,7 @@ class MySQL_HostGroups_Manager {
void push_MyConn_to_pool_array(MySQL_Connection **, unsigned int);
void destroy_MyConn_from_pool(MySQL_Connection *, bool _lock=true);

void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int);
void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int, bool);
void replication_lag_action(const std::list<replication_lag_server_t>& mysql_servers);
void read_only_action(char *hostname, int port, int read_only);
void read_only_action_v2(const std::list<read_only_server_t>& mysql_servers);
Expand Down
4 changes: 2 additions & 2 deletions include/SQLite3_Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SQLite3_Server {
std::vector<table_def_t *> *tables_defs_readonly;
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
std::unordered_map<std::string, int> replicationlag_map;
std::unordered_map<std::string, std::unique_ptr<int>> replicationlag_map;
std::vector<table_def_t*>* tables_defs_replicationlag;
#endif // TEST_REPLICATIONLAG
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
Expand Down Expand Up @@ -105,7 +105,7 @@ class SQLite3_Server {
#ifdef TEST_REPLICATIONLAG
pthread_mutex_t test_replicationlag_mutex;
void load_replicationlag_table(MySQL_Session* sess);
int replicationlag_test_value(const char* p);
int* replicationlag_test_value(const char* p);
int replicationlag_map_size() {
return replicationlag_map.size();
}
Expand Down
1 change: 1 addition & 0 deletions lib/MyHGC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void MyHGC::reset_attributes() {
attributes.autocommit = -1;
attributes.free_connections_pct = 10;
attributes.handle_warnings = -1;
attributes.monitor_slave_lag_when_null = -1;
attributes.multiplex = true;
attributes.connection_warming = false;
free(attributes.init_connect);
Expand Down
34 changes: 24 additions & 10 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2692,9 +2692,16 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) {
myhgc->mysrvs->add(mysrvc);
}

void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, int current_replication_lag) {
int j;
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port,
int current_replication_lag, bool override_repl_lag) {

if (current_replication_lag == -1 && override_repl_lag == true) {
current_replication_lag = myhgc->get_monitor_slave_lag_when_null();
override_repl_lag = false;
proxy_error("Replication lag on server %s:%d is NULL, using value %d\n", address, port, current_replication_lag);
}

for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) {
mysrvc->cur_replication_lag = current_replication_lag;
Expand All @@ -2703,9 +2710,9 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const
// (current_replication_lag==-1 )
// ||
(
current_replication_lag>=0 &&
current_replication_lag >= 0 &&
mysrvc->max_replication_lag > 0 && // see issue #4018
((unsigned int)current_replication_lag > mysrvc->max_replication_lag)
(current_replication_lag > (int)mysrvc->max_replication_lag)
)
) {
// always increase the counter
Expand All @@ -2730,9 +2737,10 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const
} else {
if (mysrvc->get_status() == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
if (
(current_replication_lag>=0 && ((unsigned int)current_replication_lag <= mysrvc->max_replication_lag))
(/*current_replication_lag >= 0 &&*/override_repl_lag == false &&
(current_replication_lag <= (int)mysrvc->max_replication_lag))
||
(current_replication_lag==-2) // see issue 959
(current_replication_lag==-2 && override_repl_lag == true) // see issue 959
) {
mysrvc->set_status(MYSQL_SERVER_STATUS_ONLINE);
proxy_warning("Re-enabling server %s:%d from HG %u with replication lag of %d second\n", address, port, myhgc->hid, current_replication_lag);
Expand All @@ -2758,18 +2766,19 @@ void MySQL_HostGroups_Manager::replication_lag_action(const std::list<replicatio
const std::string& address = std::get<REPLICATION_LAG_SERVER_T::RLS_ADDRESS>(server);
const unsigned int port = std::get<REPLICATION_LAG_SERVER_T::RLS_PORT>(server);
const int current_replication_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_CURRENT_REPLICATION_LAG>(server);
const bool override_repl_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_OVERRIDE_REPLICATION_LAG>(server);

if (mysql_thread___monitor_replication_lag_group_by_host == false) {
// legacy check. 1 check per server per hostgroup
MyHGC *myhgc = MyHGC_find(hid);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
else {
// only 1 check per server, no matter the hostgroup
// all hostgroups must be searched
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC*myhgc=(MyHGC*)MyHostGroups->index(i);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
}
}
Expand Down Expand Up @@ -6226,8 +6235,13 @@ void init_myhgc_hostgroup_settings(const char* hostgroup_settings, MyHGC* myhgc)
nlohmann::json j = nlohmann::json::parse(hostgroup_settings);

const auto handle_warnings_check = [](int8_t handle_warnings) -> bool { return handle_warnings == 0 || handle_warnings == 1; };
int8_t handle_warnings = j_get_srv_default_int_val<int8_t>(j, hid, "handle_warnings", handle_warnings_check);
const int8_t handle_warnings = j_get_srv_default_int_val<int8_t>(j, hid, "handle_warnings", handle_warnings_check);
myhgc->attributes.handle_warnings = handle_warnings;

const auto monitor_slave_lag_when_null_check = [](int32_t monitor_slave_lag_when_null) -> bool
{ return (monitor_slave_lag_when_null >= 0 && monitor_slave_lag_when_null <= 604800); };
const int32_t monitor_slave_lag_when_null = j_get_srv_default_int_val<int32_t>(j, hid, "monitor_slave_lag_when_null", monitor_slave_lag_when_null_check);
myhgc->attributes.monitor_slave_lag_when_null = monitor_slave_lag_when_null;
}
catch (const json::exception& e) {
proxy_error(
Expand Down
21 changes: 11 additions & 10 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,7 @@ void * monitor_replication_lag_thread(void *arg) {
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag=-2;
bool override_repl_lag = true;
rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now=realtime_time();
Expand Down Expand Up @@ -2792,16 +2793,16 @@ void * monitor_replication_lag_thread(void *arg) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
repl_lag=-1; // this is old behavior
repl_lag=mysql_thread___monitor_slave_lag_when_null; // new behavior, see 669
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag=atoi(row[j]);
override_repl_lag = false;
} else {
proxy_error("Replication lag on server %s:%d is NULL, using the value %d (mysql-monitor_slave_lag_when_null)\n", mmsd->hostname, mmsd->port, mysql_thread___monitor_slave_lag_when_null);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag>=0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc=(*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand All @@ -2822,7 +2823,7 @@ void * monitor_replication_lag_thread(void *arg) {
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->replication_lag_action( std::list<replication_lag_server_t> {
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag}
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }
} );
(*proxy_sqlite3_finalize)(statement);
if (mmsd->mysql_error_msg == NULL) {
Expand Down Expand Up @@ -7749,8 +7750,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(


bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {

std::list<std::tuple<int, std::string, unsigned int, int>> mysql_servers;
std::list<replication_lag_server_t> mysql_servers;

for (auto& mmsd : mmsds) {

Expand Down Expand Up @@ -7792,6 +7792,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag = -2;
bool override_repl_lag = true;
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
Expand Down Expand Up @@ -7828,16 +7829,16 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
repl_lag = -1; // this is old behavior
repl_lag = mysql_thread___monitor_slave_lag_when_null; // new behavior, see 669
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag = atoi(row[j]);
override_repl_lag = false;
} else {
proxy_error("Replication lag on server %s:%d is NULL, using the value %d (mysql-monitor_slave_lag_when_null)\n", mmsd->hostname, mmsd->port, mysql_thread___monitor_slave_lag_when_null);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag >= 0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand All @@ -7859,7 +7860,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back( std::tuple<int,std::string,int,int> { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag });
mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag });
}

//executing replication lag action
Expand Down
14 changes: 0 additions & 14 deletions lib/ProxySQL_Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,6 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum, checksums_values.mysql_servers.diff_check);
}
if (strcmp(checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) {
// See LOGGING-NOTE at 'admin_variables' above.
if (checksums_values.mysql_servers.last_changed == now) {
proxy_info(
"Cluster: checksum for mysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n",
hostname, port, GloVars.checksums_values.mysql_servers.checksum
);
}
checksums_values.mysql_servers.diff_check = 0;
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Checksum for mysql_servers from peer %s:%d matches with local checksum %s, reset diff_check to 0.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum);
}
Expand Down Expand Up @@ -577,13 +570,6 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
checksums_values.mysql_servers_v2.checksum, GloVars.checksums_values.mysql_servers_v2.checksum, checksums_values.mysql_servers_v2.diff_check);
}
if (strcmp(checksums_values.mysql_servers_v2.checksum, GloVars.checksums_values.mysql_servers_v2.checksum) == 0) {
// See LOGGING-NOTE at 'admin_variables' above.
if (checksums_values.mysql_servers_v2.last_changed == now) {
proxy_info(
"Cluster: checksum for mysql_servers_v2 from peer %s:%d matches with local checksum %s , we won't sync.\n",
hostname, port, GloVars.checksums_values.mysql_servers_v2.checksum
);
}
checksums_values.mysql_servers_v2.diff_check = 0;
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Checksum for mysql_servers_v2 from peer %s:%d matches with local checksum %s, reset diff_check to 0.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum);
}
Expand Down
33 changes: 22 additions & 11 deletions src/SQLite3_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,11 +879,17 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
// probably never initialized
GloSQLite3Server->load_replicationlag_table(sess);
}
const int rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
const int* rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
free(query);
char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, rc);
if (rc == nullptr) {
const char* a = (char*)"SELECT null as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a);
} else {
const char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, *rc);
}
pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex);
}
}
Expand Down Expand Up @@ -1845,7 +1851,7 @@ bool SQLite3_Server::init() {
insert_into_tables_defs(tables_defs_replicationlag,
(const char*)"REPLICATIONLAG_HOST_STATUS",
(const char*)"CREATE TABLE REPLICATIONLAG_HOST_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT NOT NULL, PRIMARY KEY (hostname, port)"
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT DEFAULT NULL, PRIMARY KEY (hostname, port)"
")"
);

Expand Down Expand Up @@ -2016,27 +2022,32 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = std::string(r->fields[0]) + ":" + std::string(r->fields[1]);
replicationlag_map[s] = atoi(r->fields[2]);

if (r->fields[2] == nullptr) {
replicationlag_map[s] = nullptr;
} else {
replicationlag_map[s] = std::make_unique<int>(atoi(r->fields[2]));
}
}
}
delete resultset;
if (replicationlag_map.size() == 0) {
GloAdmin->admindb->execute_statement((char*)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE hostgroup_id BETWEEN 5202 AND 5700", &error, &cols, &affected_rows, &resultset);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",0)";
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",null)";
sessdb->execute(s.c_str());
}
delete resultset;
}
GloAdmin->mysql_servers_wrunlock();
}

int SQLite3_Server::replicationlag_test_value(const char* p) {
int rc = 0; // default
std::unordered_map<std::string, int>::iterator it = replicationlag_map.find(std::string(p));
int* SQLite3_Server::replicationlag_test_value(const char* p) {
int* rc = 0; // default
std::unordered_map<std::string, std::unique_ptr<int>>::iterator it = replicationlag_map.find(std::string(p));
if (it != replicationlag_map.end()) {
rc = it->second;
rc = it->second.get();
}
return rc;
}
Expand Down

0 comments on commit cdfcfdc

Please sign in to comment.