Server IP : 172.67.216.182 / Your IP : 172.70.208.5 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/rapid/plugin/x/ngs/src/ |
Upload File : |
/* * Copyright (c) 2015, 2023, Oracle and/or its affiliates. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2.0, * as published by the Free Software Foundation. * * This program is also distributed with certain software (including * but not limited to OpenSSL) that is licensed under separate terms, * as designated in a particular file or component or in included license * documentation. The authors of MySQL hereby grant you an additional * permission to link the program and your derivative works with the * separately licensed software that they have included with MySQL. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License, version 2.0, for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA */ #include "ngs/interface/connection_acceptor_interface.h" #include "ngs_common/connection_vio.h" #include "ngs_common/operations_factory.h" #include <algorithm> #include "ngs/socket_events.h" // Surpressing numerous warnings generated by libevent on Windows. #ifdef WIN32 #pragma warning(push) #pragma warning(disable: 4005) #endif // WIN32 #include <event.h> // libevent #ifdef WIN32 #pragma warning(pop) #endif // WIN32 namespace ngs { class Connection_acceptor_socket : public Connection_acceptor_interface { public: typedef Socket_interface::Shared_ptr Socket_ptr; Connection_acceptor_socket(Socket_ptr listener, System_interface &system_interface) : m_socket_listener(listener), m_system_interface(system_interface) { } Vio *accept() { Vio *vio; sockaddr_storage accept_address; MYSQL_SOCKET sock = MYSQL_INVALID_SOCKET; for (int i = 0; i < MAX_ACCEPT_REATTEMPT; ++i) { socklen_t accept_len = sizeof(accept_address); sock = m_socket_listener->accept(KEY_socket_x_client_connection, (struct sockaddr*)&accept_address, &accept_len); if (INVALID_SOCKET != mysql_socket_getfd(sock)) break; const int error_code = m_system_interface.get_socket_errno(); if (error_code != SOCKET_EINTR && error_code != SOCKET_EAGAIN) return NULL; } const bool is_tcpip = (accept_address.ss_family == AF_INET || accept_address.ss_family == AF_INET6); vio = mysql_socket_vio_new(sock, is_tcpip ? VIO_TYPE_TCPIP : VIO_TYPE_SOCKET, 0); if (!vio) throw std::bad_alloc(); // enable TCP_NODELAY vio_fastsend(vio); vio_keepalive(vio, TRUE); return vio; } private: Socket_ptr m_socket_listener; System_interface &m_system_interface; static const int MAX_ACCEPT_REATTEMPT = 10; }; struct Socket_events::Timer_data { ngs::function<bool ()> callback; event ev; timeval tv; Socket_events *self; static void free(Timer_data *data) { evtimer_del(&data->ev); ngs::free_object(data); } }; struct Socket_events::Socket_data { ngs::function<void (Connection_acceptor_interface &)> callback; event ev; Socket_interface::Shared_ptr socket; static void free(Socket_data *data) { event_del(&data->ev); ngs::free_object(data); } }; Socket_events::Socket_events() { m_evbase = event_base_new(); if (!m_evbase) throw std::bad_alloc(); } Socket_events::~Socket_events() { std::for_each(m_timer_events.begin(), m_timer_events.end(), &Timer_data::free); std::for_each(m_socket_events.begin(), m_socket_events.end(), &Socket_data::free); event_base_free(m_evbase); } bool Socket_events::listen(Socket_interface::Shared_ptr sock, ngs::function<void (Connection_acceptor_interface &)> callback) { m_socket_events.push_back(ngs::allocate_object<Socket_data>()); Socket_data *socket_event = m_socket_events.back(); socket_event->callback = callback; socket_event->socket = sock; event_set(&socket_event->ev, static_cast<int>(sock->get_socket_fd()), EV_READ|EV_PERSIST, &Socket_events::socket_data_avaiable, socket_event); event_base_set(m_evbase, &socket_event->ev); return 0 == event_add(&socket_event->ev, NULL); } /** Register a callback to be executed in a fixed time interval. The callback is called from the server's event loop thread, until either the server is stopped or the callback returns false. NOTE: This method may only be called from the same thread as the event loop. */ void Socket_events::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback) { Timer_data *data = ngs::allocate_object<Timer_data>(); data->tv.tv_sec = static_cast<long>(delay_ms / 1000); data->tv.tv_usec = (delay_ms % 1000) * 1000; data->callback = callback; data->self = this; //XXX use persistent timer events after switch to libevent2 evtimer_set(&data->ev, timeout_call, data); event_base_set(m_evbase, &data->ev); evtimer_add(&data->ev, &data->tv); Mutex_lock lock(m_timers_mutex); m_timer_events.push_back(data); } void Socket_events::loop() { event_base_loop(m_evbase, 0); } void Socket_events::break_loop() { event_base_loopbreak(m_evbase); } void Socket_events::timeout_call(socket_type sock, short which, void *arg) { Timer_data *data = (Timer_data*)arg; if (!data->callback()) { evtimer_del(&data->ev); { Mutex_lock timer_lock(data->self->m_timers_mutex); data->self->m_timer_events.erase(std::remove(data->self->m_timer_events.begin(), data->self->m_timer_events.end(), data), data->self->m_timer_events.end()); } ngs::free_object(data); } else { // schedule for another round evtimer_add(&data->ev, &data->tv); } } void Socket_events::socket_data_avaiable(socket_type sock, short which, void *arg) { Socket_data *data = (Socket_data*)arg; Operations_factory operations_factory; System_interface::Shared_ptr system_interface(operations_factory.create_system_interface()); Connection_acceptor_socket acceptor(data->socket, *system_interface); data->callback(acceptor); } } // namespace ngs