Server IP : 104.21.38.3 / Your IP : 172.68.164.93 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/group_replication/src/ |
Upload File : |
/* Copyright (c) 2016, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #include <mysql/group_replication_priv.h> #include "pipeline_stats.h" #include "plugin_server_include.h" #include "plugin_log.h" #include "plugin.h" /* The QUOTA based flow control tries to calculate how many transactions the slowest members can handle, at the certifier or at the applier level, by checking which members have a queue larger than the user-specified thresholds and, on those, checking which one has the lowest number of transactions certified/applied on the last step - let's call it MMT, which stands for Minimum Member Throughput. We then divide MMT by the number of writing members in the last step to specify how many transactions a member can safely send to the group (if a new member starts to write then the quota will be larger for one period but will be corrected on the next). About these factors: 1. If we used MMT as the assigned quota (and if MMT represented well the capacity of the nodes) then the queue size would stabilize but would not decrease. To allow a delayed node to catch up on the certifier and/or queues we need to reserve some capacity on the slowest node, which this HOLD_FACTOR represents: 10% reserved to catch up. 2. Once the queue is reduced below the user-specified threshold, the nodes would start to issue transactions at full speed even if that full speed meant pilling up many transactions in a single period. To avoid that we introduce the RELEASE_FACTOR (50%), which is enough to let the write capacity to grow quickly but still maintain a relation with the last throttled value so that the oscillation in number of transactions per second is not very steep, letting the throughput oscillate smoothly around the real cluster capacity. */ const int64 Flow_control_module::MAXTPS= INT_MAX32; const double Flow_control_module::HOLD_FACTOR= 0.9; const double Flow_control_module::RELEASE_FACTOR= 1.5; Pipeline_stats_member_message::Pipeline_stats_member_message( int32 transactions_waiting_certification, int32 transactions_waiting_apply, int64 transactions_certified, int64 transactions_applied, int64 transactions_local) : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE), m_transactions_waiting_certification(transactions_waiting_certification), m_transactions_waiting_apply(transactions_waiting_apply), m_transactions_certified(transactions_certified), m_transactions_applied(transactions_applied), m_transactions_local(transactions_local) {} Pipeline_stats_member_message::Pipeline_stats_member_message(const unsigned char *buf, uint64 len) : Plugin_gcs_message(CT_PIPELINE_STATS_MEMBER_MESSAGE), m_transactions_waiting_certification(0), m_transactions_waiting_apply(0), m_transactions_certified(0), m_transactions_applied(0), m_transactions_local(0) { decode(buf, len); } Pipeline_stats_member_message::~Pipeline_stats_member_message() {} int32 Pipeline_stats_member_message::get_transactions_waiting_certification() { DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_certification"); DBUG_RETURN(m_transactions_waiting_certification); } int64 Pipeline_stats_member_message::get_transactions_certified() { DBUG_ENTER("Pipeline_stats_member_message::get_transactions_certified"); DBUG_RETURN(m_transactions_certified); } int32 Pipeline_stats_member_message::get_transactions_waiting_apply() { DBUG_ENTER("Pipeline_stats_member_message::get_transactions_waiting_apply"); DBUG_RETURN(m_transactions_waiting_apply); } int64 Pipeline_stats_member_message::get_transactions_applied() { DBUG_ENTER("Pipeline_stats_member_message::get_transactions_applied"); DBUG_RETURN(m_transactions_applied); } int64 Pipeline_stats_member_message::get_transactions_local() { DBUG_ENTER("Pipeline_stats_member_message::get_transactions_local"); DBUG_RETURN(m_transactions_local); } void Pipeline_stats_member_message::encode_payload(std::vector<unsigned char> *buffer) const { DBUG_ENTER("Pipeline_stats_member_message::encode_payload"); uint32 transactions_waiting_certification_aux= (uint32)m_transactions_waiting_certification; encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_CERTIFICATION, transactions_waiting_certification_aux); uint32 transactions_waiting_apply_aux= (uint32)m_transactions_waiting_apply; encode_payload_item_int4(buffer, PIT_TRANSACTIONS_WAITING_APPLY, transactions_waiting_apply_aux); uint64 transactions_certified_aux= (uint64)m_transactions_certified; encode_payload_item_int8(buffer, PIT_TRANSACTIONS_CERTIFIED, transactions_certified_aux); uint64 transactions_applied_aux= (uint64)m_transactions_applied; encode_payload_item_int8(buffer, PIT_TRANSACTIONS_APPLIED, transactions_applied_aux); uint64 transactions_local_aux= (uint64)m_transactions_local; encode_payload_item_int8(buffer, PIT_TRANSACTIONS_LOCAL, transactions_local_aux); DBUG_VOID_RETURN; } void Pipeline_stats_member_message::decode_payload(const unsigned char *buffer, const unsigned char *end) { DBUG_ENTER("Pipeline_stats_member_message::decode_payload"); const unsigned char *slider= buffer; uint16 payload_item_type= 0; uint32 transactions_waiting_certification_aux= 0; decode_payload_item_int4(&slider, &payload_item_type, &transactions_waiting_certification_aux); m_transactions_waiting_certification= (int32)transactions_waiting_certification_aux; uint32 transactions_waiting_apply_aux= 0; decode_payload_item_int4(&slider, &payload_item_type, &transactions_waiting_apply_aux); m_transactions_waiting_apply= (int32)transactions_waiting_apply_aux; uint64 transactions_certified_aux= 0; decode_payload_item_int8(&slider, &payload_item_type, &transactions_certified_aux); m_transactions_certified= (int64)transactions_certified_aux; uint64 transactions_applied_aux= 0; decode_payload_item_int8(&slider, &payload_item_type, &transactions_applied_aux); m_transactions_applied= (int64)transactions_applied_aux; uint64 transactions_local_aux= 0; decode_payload_item_int8(&slider, &payload_item_type, &transactions_local_aux); m_transactions_local= (int64)transactions_local_aux; DBUG_VOID_RETURN; } Pipeline_stats_member_collector::Pipeline_stats_member_collector() : m_transactions_waiting_apply(0), m_transactions_certified(0), m_transactions_applied(0), m_transactions_local(0) { mysql_mutex_init(key_GR_LOCK_pipeline_stats_transactions_waiting_apply, &m_transactions_waiting_apply_lock, MY_MUTEX_INIT_FAST); } Pipeline_stats_member_collector::~Pipeline_stats_member_collector() { mysql_mutex_destroy(&m_transactions_waiting_apply_lock); } void Pipeline_stats_member_collector::increment_transactions_waiting_apply() { mysql_mutex_lock(&m_transactions_waiting_apply_lock); assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0); my_atomic_add32(&m_transactions_waiting_apply, 1); mysql_mutex_unlock(&m_transactions_waiting_apply_lock); } void Pipeline_stats_member_collector::decrement_transactions_waiting_apply() { mysql_mutex_lock(&m_transactions_waiting_apply_lock); if (m_transactions_waiting_apply > 0) my_atomic_add32(&m_transactions_waiting_apply, -1); assert(my_atomic_load32(&m_transactions_waiting_apply) >= 0); mysql_mutex_unlock(&m_transactions_waiting_apply_lock); } void Pipeline_stats_member_collector::increment_transactions_certified() { my_atomic_add64(&m_transactions_certified, 1); } void Pipeline_stats_member_collector::increment_transactions_applied() { my_atomic_add64(&m_transactions_applied, 1); } void Pipeline_stats_member_collector::increment_transactions_local() { my_atomic_add64(&m_transactions_local, 1); } int32 Pipeline_stats_member_collector::get_transactions_waiting_apply() { return my_atomic_load32(&m_transactions_waiting_apply); } int64 Pipeline_stats_member_collector::get_transactions_certified() { return my_atomic_load64(&m_transactions_certified); } int64 Pipeline_stats_member_collector::get_transactions_applied() { return my_atomic_load64(&m_transactions_applied); } int64 Pipeline_stats_member_collector::get_transactions_local() { return my_atomic_load64(&m_transactions_local); } void Pipeline_stats_member_collector::send_stats_member_message() { if (local_member_info == NULL) return; /* purecov: inspected */ Group_member_info::Group_member_status member_status= local_member_info->get_recovery_status(); if (member_status != Group_member_info::MEMBER_ONLINE && member_status != Group_member_info::MEMBER_IN_RECOVERY) return; Pipeline_stats_member_message message( static_cast<int32>(applier_module->get_message_queue_size()), my_atomic_load32(&m_transactions_waiting_apply), my_atomic_load64(&m_transactions_certified), my_atomic_load64(&m_transactions_applied), my_atomic_load64(&m_transactions_local)); enum_gcs_error msg_error= gcs_module->send_message(message, true); if (msg_error != GCS_OK) { log_message(MY_INFORMATION_LEVEL, "Error while sending stats message"); /* purecov: inspected */ } } Pipeline_member_stats::Pipeline_member_stats() : m_transactions_waiting_certification(0), m_transactions_waiting_apply(0), m_transactions_certified(0), m_delta_transactions_certified(0), m_transactions_applied(0), m_delta_transactions_applied(0), m_transactions_local(0), m_delta_transactions_local(0), m_transactions_negative_certified(0), m_transactions_rows_validating(0), m_transactions_committed_all_members(), m_transaction_last_conflict_free(), m_stamp(0) {} Pipeline_member_stats::Pipeline_member_stats(Pipeline_stats_member_message &msg) : m_transactions_waiting_certification(msg.get_transactions_waiting_certification()), m_transactions_waiting_apply(msg.get_transactions_waiting_apply()), m_transactions_certified(msg.get_transactions_certified()), m_delta_transactions_certified(0), m_transactions_applied(msg.get_transactions_applied()), m_delta_transactions_applied(0), m_transactions_local(msg.get_transactions_local()), m_delta_transactions_local(0), m_transactions_negative_certified(0), m_transactions_rows_validating(0), m_transactions_committed_all_members(), m_transaction_last_conflict_free(), m_stamp(0) {} Pipeline_member_stats::Pipeline_member_stats( Pipeline_stats_member_collector *pipeline_stats, ulonglong applier_queue, ulonglong negative_certified, ulonglong certification_size) : m_transactions_committed_all_members(), m_transaction_last_conflict_free() { m_transactions_waiting_certification= applier_queue; m_transactions_waiting_apply= pipeline_stats->get_transactions_waiting_apply(); m_transactions_certified= pipeline_stats->get_transactions_certified(); m_delta_transactions_certified= 0; m_transactions_applied= pipeline_stats->get_transactions_applied(); m_delta_transactions_applied= 0; m_transactions_local= pipeline_stats->get_transactions_local(); m_delta_transactions_local= 0; m_transactions_negative_certified= negative_certified; m_transactions_rows_validating= certification_size; m_stamp= 0; } Pipeline_member_stats::~Pipeline_member_stats() {} void Pipeline_member_stats::update_member_stats(Pipeline_stats_member_message &msg, uint64 stamp) { m_transactions_waiting_certification= msg.get_transactions_waiting_certification(); m_transactions_waiting_apply= msg.get_transactions_waiting_apply(); int64 previous_transactions_certified= m_transactions_certified; m_transactions_certified= msg.get_transactions_certified(); m_delta_transactions_certified= m_transactions_certified - previous_transactions_certified; int64 previous_transactions_applied= m_transactions_applied; m_transactions_applied= msg.get_transactions_applied(); m_delta_transactions_applied= m_transactions_applied - previous_transactions_applied; int64 previous_transactions_local= m_transactions_local; m_transactions_local= msg.get_transactions_local(); m_delta_transactions_local= m_transactions_local - previous_transactions_local; m_stamp= stamp; } bool Pipeline_member_stats::is_flow_control_needed() { return (m_transactions_waiting_certification > flow_control_certifier_threshold_var || m_transactions_waiting_apply > flow_control_applier_threshold_var); } int32 Pipeline_member_stats::get_transactions_waiting_certification() { return m_transactions_waiting_certification; } int32 Pipeline_member_stats::get_transactions_waiting_apply() { return m_transactions_waiting_apply; } int64 Pipeline_member_stats::get_delta_transactions_certified() { return m_delta_transactions_certified; } int64 Pipeline_member_stats::get_delta_transactions_applied() { return m_delta_transactions_applied; } int64 Pipeline_member_stats::get_delta_transactions_local() { return m_delta_transactions_local; } int64 Pipeline_member_stats::get_transactions_negative_certified() { return m_transactions_negative_certified; } int64 Pipeline_member_stats::get_transactions_rows_validating() { return m_transactions_rows_validating; } void Pipeline_member_stats::get_transaction_committed_all_members(std::string &value) { value.assign(m_transactions_committed_all_members); } void Pipeline_member_stats::set_transaction_committed_all_members(char *str, size_t len) { m_transactions_committed_all_members.assign(str, len); } void Pipeline_member_stats::get_transaction_last_conflict_free( std::string &value) { value.assign(m_transaction_last_conflict_free); } void Pipeline_member_stats::set_transaction_last_conflict_free( std::string &value) { m_transaction_last_conflict_free.assign(value); } int64 Pipeline_member_stats::get_transactions_certified() { return m_transactions_certified; } uint64 Pipeline_member_stats::get_stamp() { return m_stamp; } #ifndef NDEBUG void Pipeline_member_stats::debug(const char *member, int64 quota_size, int64 quota_used) { log_message(MY_INFORMATION_LEVEL, "Flow control - update member stats: " "%s stats: certifier_queue %d, applier_queue %d," " certified %ld (%ld), applied %ld (%ld), local %ld (%ld), quota %ld (%ld)", member, m_transactions_waiting_certification, m_transactions_waiting_apply, m_transactions_certified, m_delta_transactions_certified, m_transactions_applied, m_delta_transactions_applied, m_transactions_local, m_delta_transactions_local, quota_size, quota_used); /* purecov: inspected */ } #endif Flow_control_module::Flow_control_module() : m_holds_in_period(0), m_quota_used(0), m_quota_size(0), m_stamp(0) { mysql_mutex_init(key_GR_LOCK_pipeline_stats_flow_control, &m_flow_control_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_pipeline_stats_flow_control, &m_flow_control_cond); } Flow_control_module::~Flow_control_module() { mysql_mutex_destroy(&m_flow_control_lock); mysql_cond_destroy(&m_flow_control_cond); } void Flow_control_module::flow_control_step() { m_stamp++; int32 holds= my_atomic_fas32(&m_holds_in_period, 0); switch(static_cast<Flow_control_mode>(flow_control_mode_var)) { case FCM_QUOTA: { /* Postponed transactions */ int64 quota_size= my_atomic_fas64(&m_quota_size, 0); int64 quota_used= my_atomic_fas64(&m_quota_used, 0); int64 extra_quota= (quota_size > 0 && quota_used > quota_size) ? quota_used - quota_size : 0; /* Release waiting transactions on do_wait(). */ if (extra_quota > 0) { mysql_mutex_lock(&m_flow_control_lock); mysql_cond_broadcast(&m_flow_control_cond); mysql_mutex_unlock(&m_flow_control_lock); } if (holds > 0) { uint num_writing_members= 0; int64 min_certifier_capacity= MAXTPS, min_applier_capacity= MAXTPS, safe_capacity= MAXTPS; Flow_control_module_info::iterator it= m_info.begin(); while (it != m_info.end()) { if (it->second.get_stamp() < (m_stamp - 10)) { /* Purge member stats that were not updated on the last 10 flow control steps. */ m_info.erase(it++); } else { if (flow_control_certifier_threshold_var > 0 && it->second.get_delta_transactions_certified() > 0 && it->second.get_transactions_waiting_certification() - flow_control_certifier_threshold_var > 0 && min_certifier_capacity > it->second.get_delta_transactions_certified()) min_certifier_capacity= it->second.get_delta_transactions_certified(); if (it->second.get_delta_transactions_certified() > 0) safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_certified()); if (flow_control_applier_threshold_var > 0 && it->second.get_delta_transactions_applied() > 0 && it->second.get_transactions_waiting_apply() - flow_control_applier_threshold_var > 0 && min_applier_capacity > it->second.get_delta_transactions_applied()) min_applier_capacity= it->second.get_delta_transactions_applied(); if (it->second.get_delta_transactions_applied() > 0) safe_capacity= std::min(safe_capacity, it->second.get_delta_transactions_applied()); if (it->second.get_delta_transactions_local() > 0) num_writing_members++; ++it; } } // Avoid division by zero. num_writing_members= num_writing_members > 0 ? num_writing_members : 1; int64 min_capacity= (min_certifier_capacity > 0 && min_certifier_capacity < min_applier_capacity) ? min_certifier_capacity : min_applier_capacity; // Minimum capacity will never be less than lim_throttle. int64 lim_throttle= static_cast<int64>(0.05 * std::min(flow_control_certifier_threshold_var, flow_control_applier_threshold_var)); min_capacity= std::max(std::min(min_capacity, safe_capacity), lim_throttle); quota_size= static_cast<int64>((min_capacity * HOLD_FACTOR) / num_writing_members - extra_quota); my_atomic_store64(&m_quota_size, quota_size > 1 ? quota_size : 1); } else { if (quota_size > 0 && (quota_size * RELEASE_FACTOR) < MAXTPS) { int64 quota_size_next= static_cast<int64>(quota_size * RELEASE_FACTOR); quota_size= quota_size_next > quota_size ? quota_size_next : quota_size + 1; } else quota_size= 0; my_atomic_store64(&m_quota_size, quota_size); } my_atomic_store64(&m_quota_used, 0); break; } case FCM_DISABLED: my_atomic_store64(&m_quota_size, 0); my_atomic_store64(&m_quota_used, 0); break; default: assert(0); } } int Flow_control_module::handle_stats_data(const uchar *data, uint64 len, const std::string& member_id) { DBUG_ENTER("Flow_control_module::handle_stats_data"); int error= 0; Pipeline_stats_member_message message(data, len); /* This method is called synchronously by communication layer, so we do not need concurrency control. */ Flow_control_module_info::iterator it= m_info.find(member_id); if (it == m_info.end()) { Pipeline_member_stats stats; std::pair<Flow_control_module_info::iterator, bool> ret= m_info.insert(std::pair<std::string, Pipeline_member_stats> (member_id, stats)); error= !ret.second; it= ret.first; } it->second.update_member_stats(message, m_stamp); /* Verify if flow control is required. */ if (it->second.is_flow_control_needed()) { my_atomic_add32(&m_holds_in_period, 1); #ifndef NDEBUG it->second.debug(it->first.c_str(), my_atomic_load64(&m_quota_size), my_atomic_load64(&m_quota_used)); #endif } DBUG_RETURN(error); } int32 Flow_control_module::do_wait() { DBUG_ENTER("Flow_control_module::do_wait"); int64 quota_size= my_atomic_load64(&m_quota_size); int64 quota_used= my_atomic_add64(&m_quota_used, 1); if (quota_used > quota_size && quota_size != 0) { struct timespec delay; set_timespec(&delay, 1); mysql_mutex_lock(&m_flow_control_lock); mysql_cond_timedwait(&m_flow_control_cond, &m_flow_control_lock, &delay); mysql_mutex_unlock(&m_flow_control_lock); } DBUG_RETURN(0); }