Server IP : 104.21.38.3 / Your IP : 162.158.106.122 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/group_replication/src/ |
Upload File : |
/* Copyright (c) 2014, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #include "replication_threads_api.h" using std::string; int Replication_thread_api::initialize_channel(char* hostname, uint port, char* user, char* password, bool use_ssl, char* ssl_ca, char* ssl_capath, char* ssl_cert, char* ssl_cipher, char* ssl_key, char* ssl_crl, char* ssl_crlpath, bool ssl_verify_server_cert, int priority, int retry_count, bool preserve_logs, bool ignore_ws_mem_limit, bool allow_drop_write_set) { DBUG_ENTER("Replication_thread_api::initialize"); int error= 0; Channel_creation_info info; initialize_channel_creation_info(&info); Channel_ssl_info ssl_info; initialize_channel_ssl_info(&ssl_info); info.user= user; info.password= password; info.hostname= hostname; info.port= port; info.auto_position= true; info.replicate_same_server_id= true; if (priority == GROUP_REPLICATION_APPLIER_THREAD_PRIORITY) { info.thd_tx_priority= GROUP_REPLICATION_APPLIER_THREAD_PRIORITY; } info.type= GROUP_REPLICATION_CHANNEL; info.retry_count= retry_count; info.preserve_relay_logs= preserve_logs; info.m_ignore_write_set_memory_limit = ignore_ws_mem_limit; info.m_allow_drop_write_set = allow_drop_write_set; if( use_ssl || ssl_ca != NULL || ssl_capath != NULL || ssl_cert != NULL || ssl_cipher!= NULL || ssl_key != NULL || ssl_crl != NULL || ssl_crlpath != NULL || ssl_verify_server_cert) { ssl_info.use_ssl= use_ssl; ssl_info.ssl_ca_file_name= ssl_ca; ssl_info.ssl_ca_directory= ssl_capath; ssl_info.ssl_cert_file_name= ssl_cert; ssl_info.ssl_cipher= ssl_cipher; ssl_info.ssl_key= ssl_key; ssl_info.ssl_crl_file_name= ssl_crl; ssl_info.ssl_crl_directory= ssl_crlpath; ssl_info.ssl_verify_server_cert= ssl_verify_server_cert; info.ssl_info= &ssl_info; } error= channel_create(interface_channel, &info); /* Flush relay log to indicate a new start. */ if (!error) error= channel_flush(interface_channel); DBUG_RETURN(error); } int Replication_thread_api::start_threads(bool start_receiver, bool start_applier, string* view_id, bool wait_for_connection) { DBUG_ENTER("Replication_thread_api::start_threads"); Channel_connection_info info; initialize_channel_connection_info(&info); char* cview_id= NULL; if (view_id) { cview_id= new char[view_id->size() + 1]; memcpy(cview_id, view_id->c_str(), view_id->size() + 1); info.until_condition= CHANNEL_UNTIL_VIEW_ID; info.view_id= cview_id; } int thread_mask= 0; if (start_applier) { thread_mask |= CHANNEL_APPLIER_THREAD; } if (start_receiver) { thread_mask |= CHANNEL_RECEIVER_THREAD; } int error= channel_start(interface_channel, &info, thread_mask, wait_for_connection); if (view_id) { delete [] cview_id; } DBUG_RETURN(error); } int Replication_thread_api::purge_logs(bool reset_all) { DBUG_ENTER("Replication_thread_api::purge_logs"); //If there is no channel, no point in invoking the method if (!channel_is_active(interface_channel, CHANNEL_NO_THD)) DBUG_RETURN(0); int error= channel_purge_queue(interface_channel, reset_all); DBUG_RETURN(error); } int Replication_thread_api::stop_threads(bool stop_receiver, bool stop_applier) { DBUG_ENTER("Replication_thread_api::stop_threads"); stop_receiver= stop_receiver && is_receiver_thread_running(); stop_applier= stop_applier && is_applier_thread_running(); //If there is nothing to do, return 0 if (!stop_applier && !stop_receiver) DBUG_RETURN(0); int thread_mask= 0; if (stop_applier) { thread_mask |= CHANNEL_APPLIER_THREAD; } if (stop_receiver) { thread_mask |= CHANNEL_RECEIVER_THREAD; } int error= channel_stop(interface_channel, thread_mask, stop_wait_timeout); DBUG_RETURN(error); } bool Replication_thread_api::is_receiver_thread_running() { return(channel_is_active(interface_channel, CHANNEL_RECEIVER_THREAD)); } bool Replication_thread_api::is_receiver_thread_stopping() { return(channel_is_stopping(interface_channel, CHANNEL_RECEIVER_THREAD)); } bool Replication_thread_api::is_applier_thread_running() { return(channel_is_active(interface_channel, CHANNEL_APPLIER_THREAD)); } bool Replication_thread_api::is_applier_thread_stopping() { return(channel_is_stopping(interface_channel, CHANNEL_APPLIER_THREAD)); } int Replication_thread_api::queue_packet(const char* buf, ulong event_len) { return channel_queue_packet(interface_channel, buf, event_len); } bool Replication_thread_api::is_applier_thread_waiting() { return (channel_is_applier_waiting(interface_channel) == 1); } int Replication_thread_api::wait_for_gtid_execution(double timeout) { DBUG_ENTER("Replication_thread_api::wait_for_gtid_execution"); int error= channel_wait_until_apply_queue_applied(interface_channel, timeout); /* Check that applier relay log is indeed consumed. This is different from channel_wait_until_apply_queue_applied() on the following case: if transactions on relay log are already on GTID_EXECUTED, applier thread still needs to read the relay log and update log positions. So despite transactions on relay log are applied, applier thread is still updating log positions on info tables. */ if (!error) { if (channel_is_applier_waiting(interface_channel) != 1) error= REPLICATION_THREAD_WAIT_TIMEOUT_ERROR; } DBUG_RETURN(error); } rpl_gno Replication_thread_api::get_last_delivered_gno(rpl_sidno sidno) { DBUG_ENTER("Replication_thread_api::get_last_delivered_gno"); DBUG_RETURN(channel_get_last_delivered_gno(interface_channel, sidno)); } int Replication_thread_api::get_applier_thread_ids(unsigned long** thread_ids) { DBUG_ENTER("Replication_thread_api::get_applier_thread_ids"); DBUG_RETURN(channel_get_thread_id(interface_channel, CHANNEL_APPLIER_THREAD, thread_ids)); } bool Replication_thread_api::is_own_event_applier(my_thread_id id, const char* channel_name) { DBUG_ENTER("Replication_thread_api::is_own_event_applier"); bool result= false; unsigned long* thread_ids= NULL; const char* name= channel_name ? channel_name : interface_channel; //Fetch all applier thread ids for this channel. int number_appliers= channel_get_thread_id(name, CHANNEL_APPLIER_THREAD, &thread_ids); //If none are found return false if (number_appliers <= 0) { goto end; } if (number_appliers == 1) //One applier, check its id { result= (*thread_ids == id); } else //The channel has more than one applier, check if the id is in the list { for (int i = 0; i < number_appliers; i++) { unsigned long thread_id= thread_ids[i]; if (thread_id == id) { result= true; break; } } } end: my_free(thread_ids); //The given id is not an id of the channel applier threads, return false DBUG_RETURN(result); } bool Replication_thread_api::is_own_event_receiver(my_thread_id id) { DBUG_ENTER("Replication_thread_api::is_own_event_receiver"); bool result= false; unsigned long* thread_id= NULL; //Fetch the receiver thread id for this channel int number_receivers= channel_get_thread_id(interface_channel, CHANNEL_RECEIVER_THREAD, &thread_id); //If one is found if (number_receivers > 0) { result= (*thread_id == id); } my_free(thread_id); //The given id is not the id of the channel receiver thread, return false DBUG_RETURN(result); } bool Replication_thread_api::get_retrieved_gtid_set(std::string& retrieved_set, const char* channel_name) { DBUG_ENTER("Replication_thread_api::get_retrieved_gtid_set"); const char* name= channel_name ? channel_name : interface_channel; char *receiver_retrieved_gtid_set= NULL; int error; error= channel_get_retrieved_gtid_set(name, &receiver_retrieved_gtid_set); if (!error) retrieved_set.assign(receiver_retrieved_gtid_set); my_free(receiver_retrieved_gtid_set); DBUG_RETURN((error != 0)); } bool Replication_thread_api::is_partial_transaction_on_relay_log() { return is_partial_transaction_on_channel_relay_log(interface_channel); }