Server IP : 104.21.38.3 / Your IP : 172.71.81.230 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/group_replication/include/ |
Upload File : |
/* Copyright (c) 2014, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #ifndef PLUGIN_UTILS_INCLUDED #define PLUGIN_UTILS_INCLUDED #include <map> #include <queue> #include <string> #include <vector> #include "plugin_psi.h" #include <mysql/group_replication_priv.h> void log_primary_member_details(); void abort_plugin_process(const char *message); class Blocked_transaction_handler { public: Blocked_transaction_handler(); virtual ~Blocked_transaction_handler(); /** This method instructs all local transactions to rollback when certification is no longer possible. */ void unblock_waiting_transactions(); private: /* The lock that disallows concurrent method executions */ mysql_mutex_t unblocking_process_lock; }; template <typename T> class Synchronized_queue { public: Synchronized_queue() { mysql_mutex_init(key_GR_LOCK_synchronized_queue, &lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_synchronized_queue, &cond); } /** Checks if the queue is empty @return if is empty @retval true empty @retval false not empty */ bool empty() { bool res= true; mysql_mutex_lock(&lock); res= queue.empty(); mysql_mutex_unlock(&lock); return res; } /** Inserts an element in the queue. Alerts any other thread lock on pop() or front() @param value The value to insert */ void push(const T& value) { mysql_mutex_lock(&lock); queue.push(value); mysql_mutex_unlock(&lock); mysql_cond_broadcast(&cond); } /** Fetches the front of the queue and removes it. @note The method will block if the queue is empty until a element is pushed @param out The fetched reference. */ void pop(T* out) { *out= NULL; mysql_mutex_lock(&lock); while (queue.empty()) mysql_cond_wait(&cond, &lock); /* purecov: inspected */ *out= queue.front(); queue.pop(); mysql_mutex_unlock(&lock); } /** Pops the front of the queue removing it. @note The method will block if the queue is empty until a element is pushed */ void pop() { mysql_mutex_lock(&lock); while (queue.empty()) mysql_cond_wait(&cond, &lock); /* purecov: inspected */ queue.pop(); mysql_mutex_unlock(&lock); } /** Fetches the front of the queue but does not remove it. @note The method will block if the queue is empty until a element is pushed @param out The fetched reference. */ void front(T* out) { *out= NULL; mysql_mutex_lock(&lock); while (queue.empty()) mysql_cond_wait(&cond, &lock); *out= queue.front(); mysql_mutex_unlock(&lock); } /** Checks the queue size @return the size of the queue */ size_t size() { size_t qsize= 0; mysql_mutex_lock(&lock); qsize= queue.size(); mysql_mutex_unlock(&lock); return qsize; } private: mysql_mutex_t lock; mysql_cond_t cond; std::queue<T> queue; }; /** Synchronization auxiliary class that allows one or more threads to wait on a given number of requirements. Usage: CountDownLatch(count): Create the latch with the number of requirements to wait. wait(): Block until the number of requirements reaches zero. countDown(): Decrease the number of requirements by one. */ class CountDownLatch { public: /** Create the latch with the number of requirements to wait. @param count The number of requirements to wait */ CountDownLatch(uint count) : count(count) { mysql_mutex_init(key_GR_LOCK_count_down_latch, &lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_count_down_latch, &cond); } virtual ~CountDownLatch() { mysql_cond_destroy(&cond); mysql_mutex_destroy(&lock); } /** Block until the number of requirements reaches zero. */ void wait() { mysql_mutex_lock(&lock); while (count > 0) mysql_cond_wait(&cond, &lock); mysql_mutex_unlock(&lock); } /** Decrease the number of requirements by one. */ void countDown() { mysql_mutex_lock(&lock); --count; if (count == 0) mysql_cond_broadcast(&cond); mysql_mutex_unlock(&lock); } /** Get current number requirements. @return the number of requirements */ uint getCount() { uint res= 0; mysql_mutex_lock(&lock); res= count; mysql_mutex_unlock(&lock); return res; } private: mysql_mutex_t lock; mysql_cond_t cond; int count; }; /** Ticket register/wait auxiliary class. Usage: registerTicket(k): create a ticket with key k with status ongoing. releaseTicket(k): set ticket with key k status to done. waitTicket(k): wait until ticket with key k status is changed to done. */ template <typename K> class Wait_ticket { public: Wait_ticket() :blocked(false), waiting(false) { mysql_mutex_init(key_GR_LOCK_wait_ticket, &lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_GR_COND_wait_ticket, &cond); } virtual ~Wait_ticket() { for (typename std::map<K,CountDownLatch*>::iterator it= map.begin(); it != map.end(); ++it) delete it->second; /* purecov: inspected */ map.clear(); mysql_cond_destroy(&cond); mysql_mutex_destroy(&lock); } /** Register ticker with status ongoing. @param key The key that identifies the ticket @return @retval 0 success @retval !=0 key already exists, error on insert or it is blocked */ int registerTicket(const K& key) { int error= 0; mysql_mutex_lock(&lock); if (blocked) { mysql_mutex_unlock(&lock); /* purecov: inspected */ return 1; /* purecov: inspected */ } typename std::map<K,CountDownLatch*>::iterator it= map.find(key); if (it != map.end()) { mysql_mutex_unlock(&lock); /* purecov: inspected */ return 1; /* purecov: inspected */ } CountDownLatch *cdl = new CountDownLatch(1); std::pair<typename std::map<K,CountDownLatch*>::iterator,bool> ret; ret= map.insert(std::pair<K,CountDownLatch*>(key,cdl)); if (ret.second == false) { error= 1; /* purecov: inspected */ delete cdl; /* purecov: inspected */ } mysql_mutex_unlock(&lock); return error; } /** Wait until ticket status is done. @note The ticket is removed after the wait. @param key The key that identifies the ticket @return @retval 0 success @retval !=0 key doesn't exist, or the Ticket is blocked */ int waitTicket(const K& key) { int error= 0; CountDownLatch *cdl= NULL; mysql_mutex_lock(&lock); if (blocked) { mysql_mutex_unlock(&lock); /* purecov: inspected */ return 1; /* purecov: inspected */ } typename std::map<K,CountDownLatch*>::iterator it= map.find(key); if (it == map.end()) error= 1; else cdl= it->second; mysql_mutex_unlock(&lock); if (cdl != NULL) { cdl->wait(); mysql_mutex_lock(&lock); delete cdl; map.erase(it); if (waiting) { if (map.empty()) { mysql_cond_broadcast(&cond); } } mysql_mutex_unlock(&lock); } return error; } /** Set ticket status to done. @param key The key that identifies the ticket @return @retval 0 success @retval !=0 (key doesn't exist) */ int releaseTicket(const K& key) { int error= 0; mysql_mutex_lock(&lock); typename std::map<K,CountDownLatch*>::iterator it= map.find(key); if (it == map.end()) error= 1; else it->second->countDown(); mysql_mutex_unlock(&lock); return error; } /** Gets all the waiting keys. @param[out] key_list all the keys to return */ void get_all_waiting_keys(std::vector<K>& key_list) { mysql_mutex_lock(&lock); for(typename std::map<K,CountDownLatch*>::iterator iter = map.begin(); iter != map.end(); ++iter) { K key= iter->first; key_list.push_back(key); } mysql_mutex_unlock(&lock); } /** Blocks or unblocks the class from receiving waiting requests. @param[in] blocked_flag if the class should block or not */ void set_blocked_status(bool blocked_flag) { mysql_mutex_lock(&lock); blocked= blocked_flag; mysql_mutex_unlock(&lock); } int block_until_empty(int timeout) { mysql_mutex_lock(&lock); waiting= true; while (!map.empty()) { struct timespec abstime; set_timespec(&abstime, 1); #ifndef NDEBUG int error= #endif mysql_cond_timedwait(&cond, &lock, &abstime); assert(error == ETIMEDOUT || error == 0); if (timeout >= 1) { timeout= timeout - 1; } else if (!map.empty()) { //time out waiting= false; mysql_mutex_unlock(&lock); return 1; } } waiting= false; mysql_mutex_unlock(&lock); return 0; } private: mysql_mutex_t lock; mysql_cond_t cond; std::map<K,CountDownLatch*> map; bool blocked; bool waiting; }; class Mutex_autolock { public: Mutex_autolock(mysql_mutex_t *arg) : ptr_mutex(arg) { DBUG_ENTER("Mutex_autolock::Mutex_autolock"); assert(arg != NULL); mysql_mutex_lock(ptr_mutex); DBUG_VOID_RETURN; } ~Mutex_autolock() { mysql_mutex_unlock(ptr_mutex); } private: mysql_mutex_t *ptr_mutex; Mutex_autolock(Mutex_autolock const&); // no copies permitted void operator=(Mutex_autolock const&); }; class Shared_writelock { public: Shared_writelock(Checkable_rwlock *arg) : shared_write_lock(arg), write_lock_in_use(false) { DBUG_ENTER("Shared_writelock::Shared_writelock"); assert(arg != NULL); mysql_mutex_init(key_GR_LOCK_write_lock_protection, &write_lock, MY_MUTEX_INIT_FAST); DBUG_VOID_RETURN; } virtual ~Shared_writelock() { mysql_mutex_destroy(&write_lock); } int try_grab_write_lock() { int res= 0; mysql_mutex_lock(&write_lock); if (write_lock_in_use) res= 1; /* purecov: inspected */ else { shared_write_lock->wrlock(); write_lock_in_use= true; } mysql_mutex_unlock(&write_lock); return res; } void grab_write_lock() { mysql_mutex_lock(&write_lock); shared_write_lock->wrlock(); write_lock_in_use= true; mysql_mutex_unlock(&write_lock); } void release_write_lock() { mysql_mutex_lock(&write_lock); shared_write_lock->unlock(); write_lock_in_use= false; mysql_mutex_unlock(&write_lock); } /** Grab a read lock only if there is no write lock acquired. @return @retval 0 read lock acquired @retval !=0 there is a write lock acquired */ int try_grab_read_lock() { int res= 0; mysql_mutex_lock(&write_lock); if (write_lock_in_use) res= 1; else shared_write_lock->rdlock(); mysql_mutex_unlock(&write_lock); return res; } void grab_read_lock() { shared_write_lock->rdlock(); } void release_read_lock() { shared_write_lock->unlock(); } private: Checkable_rwlock *shared_write_lock; mysql_mutex_t write_lock; bool write_lock_in_use; }; #endif /* PLUGIN_UTILS_INCLUDED */