Server IP : 172.67.216.182 / Your IP : 172.69.176.47 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/group_replication/src/sql_service/ |
Upload File : |
/* Copyright (c) 2015, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #include "sql_service_command.h" #include "plugin_log.h" #include "plugin_psi.h" #include <mysql/group_replication_priv.h> #include <sstream> using std::string; Sql_service_command_interface::Sql_service_command_interface() : connection_thread_isolation(PSESSION_USE_THREAD), m_server_interface(NULL), m_plugin_session_thread(NULL) {} Sql_service_command_interface::~Sql_service_command_interface() { if (m_server_interface != NULL) { if (m_plugin_session_thread) { m_plugin_session_thread->terminate_session_thread(); delete m_plugin_session_thread; } else { delete m_server_interface; } } } int Sql_service_command_interface:: establish_session_connection(enum_plugin_con_isolation isolation_param, void *plugin_pointer) { assert(m_server_interface == NULL); int error = 0; connection_thread_isolation= isolation_param; switch (connection_thread_isolation) { case PSESSION_USE_THREAD: m_server_interface = new Sql_service_interface(); error = m_server_interface->open_session(); break; case PSESSION_INIT_THREAD: m_server_interface = new Sql_service_interface(); error = m_server_interface->open_thread_session(plugin_pointer); break; case PSESSION_DEDICATED_THREAD: m_plugin_session_thread = new Session_plugin_thread(&sql_service_commands); error = m_plugin_session_thread->launch_session_thread(plugin_pointer); if (!error) m_server_interface = m_plugin_session_thread->get_service_interface(); break; } if (error) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Can't establish a internal server connection to execute" " plugin operations"); if (m_plugin_session_thread) { m_plugin_session_thread->terminate_session_thread(); delete m_plugin_session_thread; m_plugin_session_thread = NULL; } else { delete m_server_interface; m_server_interface = NULL; } return error; /* purecov: end */ } return error; } Sql_service_interface* Sql_service_command_interface::get_sql_service_interface() { return m_server_interface; } int Sql_service_command_interface::set_interface_user(const char* user) { return m_server_interface->set_session_user(user); } long Sql_service_command_interface::set_super_read_only() { DBUG_ENTER("Sql_service_command_interface::set_super_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_set_super_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_set_super_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_command_interface::set_read_only() { DBUG_ENTER("Sql_service_command_interface::set_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_set_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_set_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_commands:: internal_set_super_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_set_super_read_only"); assert(sql_interface != NULL); Sql_resultset rset; long srv_err= sql_interface->execute_query("SET GLOBAL super_read_only= 1;"); if (srv_err == 0) { #ifndef NDEBUG long err; err = sql_interface->execute_query("SELECT @@GLOBAL.super_read_only;", &rset); assert(!err && rset.get_rows() > 0 && rset.getLong(0) == 1); log_message(MY_INFORMATION_LEVEL, "Setting super_read_only=ON."); #endif } DBUG_RETURN(srv_err); } long Sql_service_commands:: internal_set_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_set_read_only"); assert(sql_interface != NULL); Sql_resultset rset; long srv_err= sql_interface->execute_query("SET GLOBAL read_only= 1;"); if (srv_err == 0) { #ifndef NDEBUG sql_interface->execute_query("SELECT @@GLOBAL.read_only;", &rset); assert(rset.getLong(0) == 1); log_message(MY_INFORMATION_LEVEL, "Setting read_only=ON."); #endif } else { log_message(MY_ERROR_LEVEL, "'SET read_only= 1' query execution" " resulted in failure. errno: %d", srv_err); /* purecov: inspected */ } DBUG_RETURN(srv_err); } long Sql_service_command_interface::reset_super_read_only() { DBUG_ENTER("Sql_service_command_interface::reset_super_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_reset_super_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_reset_super_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_commands:: internal_reset_super_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_reset_super_read_only"); assert(sql_interface != NULL); Sql_resultset rset; const char * query= "SET GLOBAL super_read_only= 0"; long srv_err= sql_interface->execute_query(query); #ifndef NDEBUG if (srv_err == 0) { long err; query= "SELECT @@GLOBAL.super_read_only;"; err= sql_interface->execute_query(query, &rset); assert(!err && rset.get_rows() > 0 && rset.getLong(0) == 0); log_message(MY_INFORMATION_LEVEL, "Setting super_read_only=OFF."); } #endif DBUG_RETURN(srv_err); } long Sql_service_command_interface::reset_read_only() { DBUG_ENTER("Sql_service_command_interface::reset_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_reset_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_reset_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_commands:: internal_reset_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_reset_read_only"); assert(sql_interface != NULL); Sql_resultset rset; const char* query= "SET GLOBAL read_only= 0"; long srv_err= sql_interface->execute_query(query); #ifndef NDEBUG if (srv_err == 0) { long err; query= "SELECT @@GLOBAL.read_only"; err= sql_interface->execute_query(query, &rset); assert(!err && rset.get_rows() > 0 && rset.getLong(0) == 0); log_message(MY_INFORMATION_LEVEL, "Setting read_only=OFF."); } #endif DBUG_RETURN(srv_err); } long Sql_service_command_interface::kill_session(uint32_t session_id, MYSQL_SESSION session) { DBUG_ENTER("Sql_service_command_interface::kill_session"); assert(m_server_interface != NULL); Sql_resultset rset; long srv_err= 0; if (!m_server_interface->is_session_killed(session)) { COM_DATA data; data.com_kill.id= session_id; srv_err= m_server_interface->execute(data, COM_PROCESS_KILL, &rset); if (srv_err == 0) { log_message(MY_INFORMATION_LEVEL, "killed session id: %d status: %d", session_id, m_server_interface->is_session_killed(session)); } else { log_message(MY_INFORMATION_LEVEL, "killed failed id: %d failed: %d", session_id, srv_err); /* purecov: inspected */ } } DBUG_RETURN(srv_err); } long Sql_service_command_interface::get_server_super_read_only() { DBUG_ENTER("Sql_service_command_interface::get_server_super_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_get_server_super_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_get_server_super_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_commands:: internal_get_server_super_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_get_server_super_read_only"); assert(sql_interface != NULL); Sql_resultset rset; long server_super_read_only= -1; long srv_error= sql_interface->execute_query("SELECT @@GLOBAL.super_read_only", &rset); if (srv_error == 0 && rset.get_rows() > 0) { server_super_read_only= rset.getLong(0); } DBUG_RETURN(server_super_read_only); } long Sql_service_command_interface::get_server_read_only() { DBUG_ENTER("Sql_service_command_interface::get_server_read_only"); long error=0; if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands.internal_get_server_read_only(m_server_interface); } else { m_plugin_session_thread-> queue_new_method_for_application(&Sql_service_commands::internal_get_server_read_only); error= m_plugin_session_thread->wait_for_method_execution(); } DBUG_RETURN(error); } long Sql_service_commands:: internal_get_server_read_only(Sql_service_interface *sql_interface) { DBUG_ENTER("Sql_service_commands::internal_get_server_read_only"); assert(sql_interface != NULL); Sql_resultset rset; longlong server_read_only= -1; long srv_error= sql_interface->execute_query("SELECT @@GLOBAL.read_only", &rset); if (srv_error == 0 && rset.get_rows()) { server_read_only= rset.getLong(0); } DBUG_RETURN(server_read_only); } int Sql_service_command_interface::get_server_gtid_executed(string& gtid_executed) { DBUG_ENTER("Sql_service_command_interface::get_server_gtid_executed"); long error=0; /* No support for this method on thread isolation mode */ assert(connection_thread_isolation != PSESSION_DEDICATED_THREAD); if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands. internal_get_server_gtid_executed(m_server_interface, gtid_executed); } DBUG_RETURN(error); } int Sql_service_commands:: internal_get_server_gtid_executed(Sql_service_interface *sql_interface, std::string& gtid_executed) { DBUG_ENTER("Sql_service_command_interface::get_server_gtid_executed"); assert(sql_interface != NULL); Sql_resultset rset; long srv_err= sql_interface->execute_query("SELECT @@GLOBAL.gtid_executed", &rset); if (srv_err == 0 && rset.get_rows() > 0) { gtid_executed.assign(rset.getString(0)); DBUG_RETURN(0); } DBUG_RETURN(1); } long Sql_service_command_interface:: wait_for_server_gtid_executed(std::string& gtid_executed, int timeout) { DBUG_ENTER("Sql_service_command_interface::wait_for_server_gtid_executed"); long error=0; /* No support for this method on thread isolation mode */ assert(connection_thread_isolation != PSESSION_DEDICATED_THREAD); if (connection_thread_isolation != PSESSION_DEDICATED_THREAD) { error= sql_service_commands. internal_wait_for_server_gtid_executed(m_server_interface, gtid_executed, timeout); } DBUG_RETURN(error); } long Sql_service_commands:: internal_wait_for_server_gtid_executed(Sql_service_interface *sql_interface, std::string& gtid_executed, int timeout) { DBUG_ENTER("Sql_service_commands::internal_wait_for_server_gtid_executed"); assert(sql_interface != NULL); DBUG_EXECUTE_IF("sql_int_wait_for_gtid_executed_no_timeout", { timeout= 0; }); std::stringstream ss; ss << "SELECT WAIT_FOR_EXECUTED_GTID_SET('" << gtid_executed << "'"; if (timeout > 0) { ss << ", " << timeout << ")"; } else { ss << ")"; } std::string query= ss.str(); Sql_resultset rset; long srv_err= sql_interface->execute_query(query, &rset); if (srv_err) { /* purecov: begin inspected */ std::stringstream errorstream; errorstream << "Internal query: " << query; errorstream << " result in error. Error number: " << srv_err; log_message(MY_ERROR_LEVEL, errorstream.str().c_str()); DBUG_RETURN(1); /* purecov: end */ } else if(rset.get_rows() > 0) { if (rset.getLong(0) == 1) DBUG_RETURN(-1); } DBUG_RETURN(0); } Session_plugin_thread:: Session_plugin_thread(Sql_service_commands* command_interface) : command_interface(command_interface), m_server_interface(NULL), incoming_methods(NULL), m_plugin_pointer(NULL), m_method_execution_completed(false), m_method_execution_return_value(0), m_session_thread_running(false), m_session_thread_starting(false), m_session_thread_terminate(false), m_session_thread_error(0) { mysql_mutex_init(key_GR_LOCK_session_thread_run, &m_run_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_session_thread_run, &m_run_cond); mysql_mutex_init(key_GR_LOCK_session_thread_method_exec, &m_method_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_session_thread_method_exec, &m_method_cond); this->incoming_methods= new Synchronized_queue<st_session_method*>(); } Session_plugin_thread::~Session_plugin_thread() { mysql_mutex_destroy(&m_run_lock); mysql_cond_destroy(&m_run_cond); mysql_mutex_destroy(&m_method_lock); mysql_cond_destroy(&m_method_cond); delete incoming_methods; } void Session_plugin_thread:: queue_new_method_for_application(long (Sql_service_commands::*method)(Sql_service_interface*), bool terminate) { st_session_method* method_to_execute; method_to_execute= (st_session_method*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(st_session_method), MYF(0)); method_to_execute->method= method; method_to_execute->terminated=terminate; m_method_execution_completed= false; incoming_methods->push(method_to_execute); } long Session_plugin_thread::wait_for_method_execution() { mysql_mutex_lock(&m_method_lock); while (!m_method_execution_completed) { DBUG_PRINT("sleep",("Waiting for the plugin session thread to execute a method")); mysql_cond_wait(&m_method_cond, &m_method_lock); } mysql_mutex_unlock(&m_method_lock); return m_method_execution_return_value; } static void *launch_handler_thread(void* arg) { Session_plugin_thread *handler= (Session_plugin_thread*) arg; handler->session_thread_handler(); return 0; } int Session_plugin_thread::launch_session_thread(void* plugin_pointer_var) { DBUG_ENTER("Session_plugin_thread::launch_session_thread(plugin_pointer)"); //avoid concurrency calls against stop invocations mysql_mutex_lock(&m_run_lock); m_session_thread_error= 0; m_session_thread_terminate= false; m_session_thread_starting= true; m_plugin_pointer= plugin_pointer_var; if ((mysql_thread_create(key_GR_THD_plugin_session, &m_plugin_session_pthd, get_connection_attrib(), launch_handler_thread, (void*)this))) { m_session_thread_starting= false; mysql_mutex_unlock(&m_run_lock); /* purecov: inspected */ DBUG_RETURN(1); /* purecov: inspected */ } while (!m_session_thread_running && !m_session_thread_error) { DBUG_PRINT("sleep",("Waiting for the plugin session thread to start")); mysql_cond_wait(&m_run_cond, &m_run_lock); } mysql_mutex_unlock(&m_run_lock); DBUG_RETURN(m_session_thread_error); } int Session_plugin_thread::terminate_session_thread() { DBUG_ENTER("Session_plugin_thread::terminate_session_thread()"); mysql_mutex_lock(&m_run_lock); m_session_thread_terminate= true; m_method_execution_completed=true; queue_new_method_for_application(NULL,true); int stop_wait_timeout= GR_PLUGIN_SESSION_THREAD_TIMEOUT; while (m_session_thread_running || m_session_thread_starting) { DBUG_PRINT("loop", ("killing plugin session thread")); mysql_cond_broadcast(&m_run_cond); struct timespec abstime; set_timespec(&abstime, 1); #ifndef NDEBUG int error= #endif mysql_cond_timedwait(&m_run_cond, &m_run_lock, &abstime); if (stop_wait_timeout >= 1) { stop_wait_timeout= stop_wait_timeout - 1; } else if (m_session_thread_running || m_session_thread_starting) // quit waiting { mysql_mutex_unlock(&m_run_lock); DBUG_RETURN(1); } assert(error == ETIMEDOUT || error == 0); } assert(!m_session_thread_running); while (!this->incoming_methods->empty()) { st_session_method *method= NULL; this->incoming_methods->pop(&method); my_free(method); } mysql_mutex_unlock(&m_run_lock); DBUG_RETURN(0); } int Session_plugin_thread::session_thread_handler() { DBUG_ENTER("Session_plugin_thread::session_thread_handler()"); st_session_method *method= NULL; m_server_interface= new Sql_service_interface(); m_session_thread_error= m_server_interface->open_thread_session(m_plugin_pointer); DBUG_EXECUTE_IF("group_replication_sql_service_force_error", { m_session_thread_error= 1; }); mysql_mutex_lock(&m_run_lock); m_session_thread_starting= false; m_session_thread_running= true; mysql_cond_broadcast(&m_run_cond); mysql_mutex_unlock(&m_run_lock); if (m_session_thread_error) goto end; while (!m_session_thread_terminate) { this->incoming_methods->pop(&method); if (method->terminated) { my_free(method); break; } long (Sql_service_commands::*method_to_execute)(Sql_service_interface*)= method->method; m_method_execution_return_value= (command_interface->*method_to_execute)(m_server_interface); my_free(method); mysql_mutex_lock(&m_method_lock); m_method_execution_completed= true; mysql_cond_broadcast(&m_method_cond); mysql_mutex_unlock(&m_method_lock); } mysql_mutex_lock(&m_run_lock); while (!m_session_thread_terminate) { DBUG_PRINT("sleep",("Waiting for the plugin session thread" " to be signaled termination")); mysql_cond_wait(&m_run_cond, &m_run_lock); } mysql_mutex_unlock(&m_run_lock); end: delete m_server_interface; m_server_interface = NULL; mysql_mutex_lock(&m_run_lock); m_session_thread_running= false; mysql_mutex_unlock(&m_run_lock); DBUG_RETURN(m_session_thread_error); } Sql_service_interface* Session_plugin_thread::get_service_interface() { return m_server_interface; }