Skip to content

Commit

Permalink
Merge pull request #4490 from sysown/2.x-AWS_RDS_autodiscovery
Browse files Browse the repository at this point in the history
Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery
  • Loading branch information
JavierJF authored Apr 2, 2024
2 parents 1f78c9e + 8d8f2ea commit b368fc9
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 6 deletions.
2 changes: 2 additions & 0 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();

void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector<tuple<string, int, int>>& new_servers);

void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
Expand Down
10 changes: 8 additions & 2 deletions include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct cmp_str {

#define N_L_ASE 16

#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"

/*
Implementation of monitoring in AWS Aurora will be different than previous modules
Expand Down Expand Up @@ -197,7 +200,8 @@ enum MySQL_Monitor_State_Data_Task_Type {
MON_GROUP_REPLICATION,
MON_REPLICATION_LAG,
MON_GALERA,
MON_AWS_AURORA
MON_AWS_AURORA,
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
};

enum class MySQL_Monitor_State_Data_Task_Result {
Expand Down Expand Up @@ -229,6 +233,7 @@ class MySQL_Monitor_State_Data {
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
int reader_hostgroup;
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
Expand Down Expand Up @@ -442,6 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();

void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup);

private:
std::vector<table_def_t *> *tables_defs_monitor;
Expand Down Expand Up @@ -553,7 +559,7 @@ class MySQL_Monitor {
* Note: Calling init_async is mandatory before executing tasks asynchronously.
*/
void monitor_ping_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check);
void monitor_replication_lag_async(SQLite3_result* resultset);
void monitor_group_replication_async();
void monitor_galera_async();
Expand Down
3 changes: 3 additions & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_aws_rds_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
Expand Down Expand Up @@ -385,6 +386,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_aws_rds_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down Expand Up @@ -1073,6 +1074,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down
78 changes: 78 additions & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,7 @@ bool MySQL_HostGroups_Manager::commit(
// fill Hostgroup_Manager_Mapping with latest records
update_hostgroup_manager_mappings();


ev_async_send(gtid_ev_loop, gtid_ev_async);

__sync_fetch_and_add(&status.servers_table_version,1);
Expand Down Expand Up @@ -8583,3 +8584,80 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h
}
return NULL;
}

/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*/
void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(
const vector<tuple<string, int, int>>& new_servers
) {
int added_new_server = -1;

GloAdmin->mysql_servers_wrlock();
wrlock();

// Add the discovered server with default values
for (const tuple<string, int, int>& s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);

srv_info_t srv_info { host.c_str(), port, "AWS RDS" };
srv_opts_t srv_opts { -1, -1, -1 };

added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
}

// If servers were added, perform necessary updates to internal structures
if (added_new_server > -1) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();

// Update the global checksums after 'mysql_servers' regeneration
{
unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
save_runtime_mysql_servers(resultset.release());

// Update the runtime_mysql_servers checksum with the new checksum
uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0;
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum;

// This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums
SpookyHash rep_hgs_hash {};
bool init = false;
uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2];

if (servers_v2_hash) {
if (init == false) {
init = true;
rep_hgs_hash.Init(19, 3);
}

rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash));
}

CUCFT1(
rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup",
table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]
);

proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());

pthread_mutex_lock(&GloVars.checksum_mutex);
update_glovars_mysql_servers_checksum(mysrvs_checksum);
pthread_mutex_unlock(&GloVars.checksum_mutex);
}

update_table_mysql_servers_for_monitor(false);
update_hostgroup_manager_mappings();
}

wrunlock();
GloAdmin->mysql_servers_wrunlock();
}
127 changes: 123 additions & 4 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,12 @@ void MySQL_Monitor_State_Data::init_async() {
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY:
query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
#else // TEST_READONLY
case MON_READ_ONLY:
case MON_INNODB_READ_ONLY:
Expand Down Expand Up @@ -1623,6 +1629,8 @@ void * monitor_read_only_thread(void *arg) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY);
} else { // default
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only");
}
Expand Down Expand Up @@ -3282,6 +3290,68 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return ret;
}

/**
* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'.
* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'.
* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found.
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;

char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);

if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
vector<tuple<string, int, int>> new_servers;
vector<string> saved_hostnames;
saved_hostnames.push_back(originating_server_hostname);

// Do an initial loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];

saved_hostnames.push_back(current_runtime_hostname);
}

// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_hostname = s[2];
string current_discovered_port_string = s[3];
int current_discovered_port_int;

try {
current_discovered_port_int = stoi(s[3]);
} catch (...) {
proxy_error(
"Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n",
originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str()
);
return;
}

if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
}

// Add the new servers if any
if (!new_servers.empty()) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
}

void * MySQL_Monitor::monitor_read_only() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
Expand All @@ -3295,14 +3365,17 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t1;
unsigned long long t2;
unsigned long long next_loop_at=0;
int topology_loop = 0;
int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval;

while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) {
bool do_discovery_check = false;

unsigned int glover;
char *error=NULL;
SQLite3_result *resultset=NULL;
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
t1=monotonic_time();

if (!GloMTH) return NULL; // quick exit during shutdown/restart
Expand All @@ -3313,6 +3386,7 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at=0;
}


if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}
Expand All @@ -3329,8 +3403,14 @@ void * MySQL_Monitor::monitor_read_only() {
goto __end_monitor_read_only_loop;
}

if (topology_loop >= topology_loop_max) {
do_discovery_check = true;
topology_loop = 0;
}
topology_loop += 1;

// resultset must be initialized before calling monitor_read_only_async
monitor_read_only_async(resultset);
monitor_read_only_async(resultset, do_discovery_check);
if (shutdown) return NULL;

__end_monitor_read_only_loop:
Expand Down Expand Up @@ -7199,7 +7279,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector<MySQ
std::list<read_only_server_t> mysql_servers;

for (auto& mmsd : mmsds) {

string originating_server_hostname = mmsd->hostname;
const auto task_result = mmsd->get_task_result();

assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
Expand Down Expand Up @@ -7269,6 +7349,38 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}

rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
// Process the read_only field as above and store the first server
vector<MYSQL_ROW> discovered_servers;
for (k = 0; k < num_fields; k++) {
if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) {
j = k;
}
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
discovered_servers.push_back(row);
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[j]) {
if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF"))
read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
}
}

// Store the remaining servers
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 1; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
discovered_servers.push_back(row);
}

// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
Expand Down Expand Up @@ -7329,7 +7441,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return true;
}

void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) {
assert(resultset);

std::vector<std::unique_ptr<MySQL_Monitor_State_Data>> mmsds;
Expand All @@ -7352,11 +7464,18 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
} else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) {
task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY;
}

// Change task type if it's time to do discovery check. Only for aws rds endpoints
string hostname = r->fields[0];
if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) {
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
}
}

std::unique_ptr<MySQL_Monitor_State_Data> mmsd(
new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2])));

mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup
mmsd->mondb = monitordb;
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());

Expand Down
Loading

0 comments on commit b368fc9

Please sign in to comment.