Server IP : 104.21.38.3 / Your IP : 172.70.188.81 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/group_replication/src/ |
Upload File : |
/* Copyright (c) 2015, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #include "plugin_server_include.h" #include "recovery_state_transfer.h" #include "plugin_log.h" #include "recovery_channel_state_observer.h" #include "plugin_psi.h" #include "plugin.h" #include <mysql/group_replication_priv.h> using std::string; Recovery_state_transfer:: Recovery_state_transfer(char* recovery_channel_name, const string& member_uuid, Channel_observation_manager *channel_obsr_mngr) : selected_donor(NULL), group_members(NULL), donor_connection_retry_count(0), recovery_aborted(false), donor_transfer_finished(false), connected_to_donor(false), on_failover(false), donor_connection_interface(recovery_channel_name), channel_observation_manager(channel_obsr_mngr), recovery_channel_observer(NULL), recovery_use_ssl(false), recovery_ssl_verify_server_cert(false), max_connection_attempts_to_donors(0), donor_reconnect_interval(0) { //set the recovery SSL options to 0 (void) strncpy(recovery_ssl_ca, "", 1); (void) strncpy(recovery_ssl_capath, "", 1); (void) strncpy(recovery_ssl_cert, "", 1); (void) strncpy(recovery_ssl_cipher, "", 1); (void) strncpy(recovery_ssl_key, "", 1); (void) strncpy(recovery_ssl_crl, "", 1); (void) strncpy(recovery_ssl_crlpath, "", 1); this->member_uuid= member_uuid; mysql_mutex_init(key_GR_LOCK_recovery, &recovery_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_recovery, &recovery_condition); mysql_mutex_init(key_GR_LOCK_recovery_donor_selection, &donor_selection_lock, MY_MUTEX_INIT_FAST); recovery_channel_observer= new Recovery_channel_state_observer(this); } Recovery_state_transfer::~Recovery_state_transfer() { if (group_members != NULL) { std::vector<Group_member_info*>::iterator member_it= group_members->begin(); while (member_it != group_members->end()) { delete (*member_it); ++member_it; } } delete group_members; delete recovery_channel_observer; mysql_mutex_destroy(&recovery_lock); mysql_cond_destroy(&recovery_condition); mysql_mutex_destroy(&donor_selection_lock); } void Recovery_state_transfer::initialize(const string& rec_view_id) { DBUG_ENTER("Recovery_state_transfer::initialize"); //reset the recovery aborted flag recovery_aborted= false; //reset the donor transfer ending flag donor_transfer_finished= false; //reset the failover flag on_failover= false; //reset the donor channel thread error flag donor_channel_thread_error= false; //reset the retry count donor_connection_retry_count= 0; this->view_id.clear(); this->view_id.append(rec_view_id); DBUG_VOID_RETURN; } void Recovery_state_transfer::inform_of_applier_stop(my_thread_id thread_id, bool aborted) { DBUG_ENTER("Recovery_state_transfer::inform_of_applier_stop"); /* This method doesn't take any locks as it could lead to dead locks between the connection process and this method that can be invoked in that context. Since this only affects the recovery loop and the flag is reset at each connection, no major concurrency issues should exist. */ //Act if: if ( // we don't have all the data yet !donor_transfer_finished && // recovery was not aborted !recovery_aborted && // the signal belongs to the recovery donor channel thread donor_connection_interface.is_own_event_applier(thread_id)) { mysql_mutex_lock(&recovery_lock); donor_channel_thread_error = true; mysql_cond_broadcast(&recovery_condition); mysql_mutex_unlock(&recovery_lock); } DBUG_VOID_RETURN; } void Recovery_state_transfer::inform_of_receiver_stop(my_thread_id thread_id) { DBUG_ENTER("Recovery_state_transfer::inform_of_receiver_stop"); /* This method doesn't take any locks as it could lead to dead locks between the connection process and this method that can be invoked in that context. Since this only affects the recovery loop and the flag is reset at each connection, no major concurrency issues should exist. */ //Act if: if (!donor_transfer_finished && // we don't have all the data yet !recovery_aborted &&// recovery was not aborted // the signal belongs to the recovery donor channel thread donor_connection_interface.is_own_event_receiver(thread_id)) { mysql_mutex_lock(&recovery_lock); donor_channel_thread_error = true; mysql_cond_broadcast(&recovery_condition); mysql_mutex_unlock(&recovery_lock); } DBUG_VOID_RETURN; } void Recovery_state_transfer::initialize_group_info() { DBUG_ENTER("Recovery_state_transfer::initialize_group_info"); selected_donor= NULL; //Update the group member info mysql_mutex_lock(&donor_selection_lock); update_group_membership(false); mysql_mutex_unlock(&donor_selection_lock); DBUG_VOID_RETURN; } void Recovery_state_transfer::update_group_membership(bool update_donor) { DBUG_ENTER("Recovery_state_transfer::update_group_membership"); #ifndef NDEBUG mysql_mutex_assert_owner(&donor_selection_lock); #endif // if needed update the reference to the donor member string donor_uuid; if (selected_donor != NULL && update_donor) { donor_uuid.assign(selected_donor->get_uuid()); } if (group_members != NULL) { std::vector<Group_member_info*>::iterator member_it= group_members->begin(); while (member_it != group_members->end()) { delete (*member_it); ++member_it; } } delete group_members; group_members= group_member_mgr->get_all_members(); //When updating the member list, also rebuild the suitable donor list build_donor_list(&donor_uuid); DBUG_VOID_RETURN; } void Recovery_state_transfer::abort_state_transfer() { DBUG_ENTER("Recovery_state_transfer::abort_state_transfer"); //Break the wait for view change event mysql_mutex_lock(&recovery_lock); recovery_aborted= true; mysql_cond_broadcast(&recovery_condition); mysql_mutex_unlock(&recovery_lock); DBUG_VOID_RETURN; } int Recovery_state_transfer::update_recovery_process(bool did_members_left) { DBUG_ENTER("Recovery_state_transfer::update_recovery_process"); int error= 0; /* Lock to avoid concurrency between this code that handles failover and the establish_donor_connection method. We either: 1) lock first and see that the method did not run yet, updating the list of group members that will be used there. 2) lock after the method executed, and if the selected donor is leaving we stop the connection thread and select a new one. */ mysql_mutex_lock(&donor_selection_lock); bool donor_left= false; string current_donor_uuid; string current_donor_hostname; uint current_donor_port= 0; /* The selected donor can be NULL if: * The donor was not yet chosen or * Was deleted in a previous group updated, but there was no need to select a new one since as the data transfer is finished */ if (selected_donor != NULL && did_members_left) { current_donor_uuid.assign(selected_donor->get_uuid()); current_donor_hostname.assign(selected_donor->get_hostname()); current_donor_port = selected_donor->get_port(); Group_member_info* current_donor= group_member_mgr->get_group_member_info(current_donor_uuid); donor_left= (current_donor == NULL); delete current_donor; } /* Get updated information about the new group members. */ update_group_membership(!donor_left); /* It makes sense to cut our connection to the donor if: 1) The donor has left the building and 2) We are already connected to him. */ if (donor_left) { //The selected donor no longer holds a meaning after deleting the group selected_donor= NULL; if (connected_to_donor) { /* The donor_transfer_finished flag is not lock protected on the recovery thread so we have the scenarios. 1) The flag is true and we do nothing 2) The flag is false and remains false so we restart the connection, and that new connection will deliver the rest of the data 3) The flag turns true while we are restarting the connection. In this case we will probably create a new connection that won't be needed and will be terminated the instant the lock is freed. */ if (!donor_transfer_finished) { log_message(MY_INFORMATION_LEVEL, "The member with address %s:%u has unexpectedly disappeared," " killing the current group replication recovery connection", current_donor_hostname.c_str(), current_donor_port); //Awake the recovery loop to connect to another donor donor_failover(); }//else do nothing } } mysql_mutex_unlock(&donor_selection_lock); DBUG_RETURN(error); } void Recovery_state_transfer::end_state_transfer() { DBUG_ENTER("Recovery_state_transfer::end_state_transfer"); mysql_mutex_lock(&recovery_lock); donor_transfer_finished= true; mysql_cond_broadcast(&recovery_condition); mysql_mutex_unlock(&recovery_lock); DBUG_VOID_RETURN; } void Recovery_state_transfer::donor_failover() { DBUG_ENTER("Recovery_state_transfer::donor_failover"); //Awake the recovery process so it can loop again to connect to another donor mysql_mutex_lock(&recovery_lock); on_failover= true; mysql_cond_broadcast(&recovery_condition); mysql_mutex_unlock(&recovery_lock); DBUG_VOID_RETURN; } int Recovery_state_transfer::check_recovery_thread_status() { DBUG_ENTER("Recovery_state_transfer::check_recovery_thread_status"); //if some of the threads are running if (donor_connection_interface.is_receiver_thread_running() || donor_connection_interface.is_applier_thread_running()) { return terminate_recovery_slave_threads(); /* purecov: inspected */ } DBUG_RETURN(0); } bool Recovery_state_transfer::is_own_event_channel(my_thread_id id) { DBUG_ENTER("Recovery_state_transfer::is_own_event_channel"); DBUG_RETURN(donor_connection_interface.is_own_event_applier(id)); } void Recovery_state_transfer::build_donor_list(string* selected_donor_uuid) { DBUG_ENTER("Recovery_state_transfer::build_donor_list"); suitable_donors.clear(); std::vector<Group_member_info*>::iterator member_it= group_members->begin(); while (member_it != group_members->end()) { Group_member_info* member= *member_it; //is online and it's not me string m_uuid= member->get_uuid(); bool is_online= member->get_recovery_status() == Group_member_info::MEMBER_ONLINE; bool not_self= m_uuid.compare(member_uuid); if (is_online && not_self) { suitable_donors.push_back(member); } //if requested, and if the donor is still in the group, update its reference if (selected_donor_uuid != NULL && !m_uuid.compare(*selected_donor_uuid)) { selected_donor= member; } ++member_it; } if (suitable_donors.size() > 1) { std::random_shuffle(suitable_donors.begin(), suitable_donors.end()); } //no need for errors if no donors exist, we thrown it in the connection method. DBUG_VOID_RETURN; } int Recovery_state_transfer::establish_donor_connection() { DBUG_ENTER("Recovery_state_transfer::establish_donor_connection"); int error= -1; connected_to_donor= false; while (error != 0 && !recovery_aborted) { mysql_mutex_lock(&donor_selection_lock); DBUG_EXECUTE_IF("gr_reset_max_connection_attempts_to_donors", { if (donor_connection_retry_count == 3) { const char act[] = "now signal signal.connection_attempt_3 wait_for " "signal.reset_recovery_retry_count_done"; assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); } };); // max number of retries reached, abort if (donor_connection_retry_count >= max_connection_attempts_to_donors) { log_message(MY_ERROR_LEVEL, "Maximum number of retries when trying to " "connect to a donor reached. " "Aborting group replication recovery."); mysql_mutex_unlock(&donor_selection_lock); DBUG_RETURN(error); } if (group_member_mgr->get_number_of_members() == 1) { log_message(MY_ERROR_LEVEL, "All donors left. Aborting group replication recovery."); mysql_mutex_unlock(&donor_selection_lock); DBUG_RETURN(error); } if(donor_connection_retry_count == 0) { log_message(MY_INFORMATION_LEVEL, "Establishing group recovery connection with a possible donor." " Attempt %d/%d", donor_connection_retry_count + 1, max_connection_attempts_to_donors); } else { log_message(MY_INFORMATION_LEVEL, "Retrying group recovery connection with another donor. " "Attempt %d/%d", donor_connection_retry_count + 1, max_connection_attempts_to_donors); } //Rebuild the list, if empty if (suitable_donors.empty()) { mysql_mutex_unlock(&donor_selection_lock); struct timespec abstime; set_timespec(&abstime, donor_reconnect_interval); mysql_mutex_lock(&recovery_lock); mysql_cond_timedwait(&recovery_condition, &recovery_lock, &abstime); mysql_mutex_unlock(&recovery_lock); mysql_mutex_lock(&donor_selection_lock); build_donor_list(NULL); if (suitable_donors.empty()) { log_message(MY_INFORMATION_LEVEL, "No valid donors exist in the group, retrying"); donor_connection_retry_count++; mysql_mutex_unlock(&donor_selection_lock); continue; } } donor_channel_thread_error= false; //Get the last element and delete it selected_donor= suitable_donors.back(); suitable_donors.pop_back(); //increment the number of tries donor_connection_retry_count++; if ((error= initialize_donor_connection())) { log_message(MY_ERROR_LEVEL, "Error when configuring the group recovery" " connection to the donor."); /* purecov: inspected */ } if (!error && !recovery_aborted) { error= start_recovery_donor_threads(); } if (!error) { connected_to_donor = true; //if were on failover, now we are again connected to a valid server. on_failover= false; } mysql_mutex_unlock(&donor_selection_lock); /* sleep so other method (recovery) can get some time to grab the lock and update the group. */ my_sleep(100); } DBUG_RETURN(error); } int Recovery_state_transfer::initialize_donor_connection() { DBUG_ENTER("Recovery_state_transfer::initialize_donor_connection"); int error= 0; donor_connection_interface.purge_logs(false); char* hostname= const_cast<char*>(selected_donor->get_hostname().c_str()); uint port= selected_donor->get_port(); error= donor_connection_interface.initialize_channel(hostname, port, NULL, NULL, recovery_use_ssl, recovery_ssl_ca, recovery_ssl_capath, recovery_ssl_cert, recovery_ssl_cipher, recovery_ssl_key, recovery_ssl_crl, recovery_ssl_crlpath, recovery_ssl_verify_server_cert, DEFAULT_THREAD_PRIORITY, 1, false, true, true); if (!error) { log_message(MY_INFORMATION_LEVEL, "Establishing connection to a group replication recovery donor" " %s at %s port: %d.", selected_donor->get_uuid().c_str(), hostname, port); } else { log_message(MY_ERROR_LEVEL, "Error while creating the group replication recovery channel " "with donor %s at %s port: %d.", selected_donor->get_uuid().c_str(), hostname, port); /* purecov: inspected */ } DBUG_RETURN(error); } int Recovery_state_transfer::start_recovery_donor_threads() { DBUG_ENTER("Recovery_state_transfer::start_recovery_donor_threads"); int error= donor_connection_interface.start_threads(true, true, &view_id, true); if(!error) { DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook", { const char act[]= "now " "WAIT_FOR reached_stopping_io_thread"; assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook", { const char act[]= "now " "WAIT_FOR reached_stopping_sql_thread"; assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); /* Register a channel observer to detect SQL/IO thread stops This is not done before the start as the hooks in place verify the stopping thread id and that can lead to deadlocks with start itself. */ channel_observation_manager ->register_channel_observer(recovery_channel_observer); } /* We should unregister the observer and error out if the threads are stopping or have stopped while the observer was being registered and the state transfer is not yet completed. */ bool is_receiver_stopping= donor_connection_interface.is_receiver_thread_stopping(); bool is_receiver_stopped= !donor_connection_interface.is_receiver_thread_running(); bool is_applier_stopping= donor_connection_interface.is_applier_thread_stopping(); bool is_applier_stopped= !donor_connection_interface.is_applier_thread_running(); if (!error && !donor_transfer_finished && (is_receiver_stopping || is_receiver_stopped || is_applier_stopping || is_applier_stopped)) { error= 1; channel_observation_manager ->unregister_channel_observer(recovery_channel_observer); /* At this point, at least one of the threads are about to stop (if it didn't stopped yet). During retry attempts, we will: a) reconfigure the receiver thread to point to a new donor; b) start all thread channels; In order to not fail while doing (a) we must forcefully stop the receiver thread if it didn't stopped yet, or else the reconfiguration process will fail. */ if ((is_applier_stopping || is_applier_stopped) && !(is_receiver_stopping || is_receiver_stopped)) donor_connection_interface.stop_threads(true /* receiver */, false /* applier */); } DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook", { const char act[]= "now SIGNAL continue_to_stop_io_thread"; assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook", { const char act[]= "now SIGNAL continue_to_stop_sql_thread"; assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); if (error) { if (error == RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR) { log_message(MY_ERROR_LEVEL, "There was an error when connecting to the donor server. " "Please check that group_replication_recovery channel " "credentials and all MEMBER_HOST column values of " "performance_schema.replication_group_members table are " "correct and DNS resolvable."); log_message(MY_ERROR_LEVEL, "For details please check " "performance_schema.replication_connection_status table " "and error log messages of Slave I/O for channel " "group_replication_recovery."); } else { log_message(MY_ERROR_LEVEL, "Error while starting the group replication recovery " "receiver/applier threads"); } } DBUG_RETURN(error); } int Recovery_state_transfer::terminate_recovery_slave_threads(bool purge_logs) { DBUG_ENTER("Recovery_state_transfer::terminate_recovery_slave_threads"); log_message(MY_INFORMATION_LEVEL, "Terminating existing group replication donor connection " "and purging the corresponding logs."); int error= 0; //If the threads never started, the method just returns if ((error= donor_connection_interface.stop_threads(true, true))) { log_message(MY_ERROR_LEVEL, "Error when stopping the group replication recovery's donor" " connection"); /* purecov: inspected */ } else { if (purge_logs) { //If there is no repository in place nothing happens error= purge_recovery_slave_threads_repos(); } } DBUG_RETURN(error); } int Recovery_state_transfer::purge_recovery_slave_threads_repos() { DBUG_ENTER("Recovery_state_transfer::purge_recovery_slave_threads_repos"); int error= 0; if ((error = donor_connection_interface.purge_logs(false))) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error when purging the group replication recovery's relay logs"); DBUG_RETURN(error); /* purecov: end */ } error= donor_connection_interface.initialize_channel(const_cast<char*>("<NULL>"), 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, DEFAULT_THREAD_PRIORITY, 1, false, true, true); DBUG_RETURN(error); } int Recovery_state_transfer::state_transfer(THD *recovery_thd) { DBUG_ENTER("Recovery_state_transfer::state_transfer"); int error= 0; while (!donor_transfer_finished && !recovery_aborted) { /* If an applier error happened: stop the slave threads. We do not purge logs or reset channel configuration to preserve the error information on performance schema tables until the next recovery attempt. Recovery_state_transfer::initialize_donor_connection() will take care of that. */ if (donor_channel_thread_error) { //Unsubscribe the listener until it connects again. channel_observation_manager ->unregister_channel_observer(recovery_channel_observer); if ((error= terminate_recovery_slave_threads(false))) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Can't kill the current group replication recovery donor" " connection after an applier error." " Recovery will shutdown."); //if we can't stop, abort recovery DBUG_RETURN(error); /* purecov: end */ } } //If the donor left, just terminate the threads with no log purging if (on_failover) { //Unsubscribe the listener until it connects again. channel_observation_manager ->unregister_channel_observer(recovery_channel_observer); //Stop the threads before reconfiguring the connection if ((error= donor_connection_interface.stop_threads(true, true))) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Can't kill the current group replication recovery donor" " connection during failover. Recovery will shutdown."); //if we can't stop, abort recovery DBUG_RETURN(error); /* purecov: end */ } } #ifndef _WIN32 THD_STAGE_INFO(recovery_thd, stage_connecting_to_master); #endif if (!recovery_aborted) { //if the connection to the donor failed, abort recovery if ((error = establish_donor_connection())) { break; } } #ifndef _WIN32 THD_STAGE_INFO(recovery_thd, stage_executing); #endif /* donor_transfer_finished -> set by the set_retrieved_cert_info method. lock: recovery_lock recovery_aborted -> set when stopping recovery lock: run_lock on_failover -> set to true on update_recovery_process. set to false when connected to a valid donor lock: donor_selection_lock donor_channel_thread_error -> set to true on inform_of_applier_stop or inform_of_receiver_stop. set to false before connecting to any donor lock: donor_selection_lock */ mysql_mutex_lock(&recovery_lock); while (!donor_transfer_finished && !recovery_aborted && !on_failover && !donor_channel_thread_error) { mysql_cond_wait(&recovery_condition, &recovery_lock); } mysql_mutex_unlock(&recovery_lock); }//if the current connection was terminated, connect again channel_observation_manager ->unregister_channel_observer(recovery_channel_observer); // do not purge logs if an error occur, keep the diagnose on SLAVE STATUS terminate_recovery_slave_threads(!error); connected_to_donor= false; DBUG_RETURN(error); }