403Webshell
Server IP : 172.67.216.182  /  Your IP : 162.158.190.8
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/mysql/src/rapid/plugin/x/ngs/src/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/mysql/src/rapid/plugin/x/ngs/src/client.cc
/*
 * Copyright (c) 2015, 2023, Oracle and/or its affiliates.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License, version 2.0,
 * as published by the Free Software Foundation.
 *
 * This program is also distributed with certain software (including
 * but not limited to OpenSSL) that is licensed under separate terms,
 * as designated in a particular file or component or in included license
 * documentation.  The authors of MySQL hereby grant you an additional
 * permission to link the program and your derivative works with the
 * separately licensed software that they have included with MySQL.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License, version 2.0, for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301  USA
 */

#include "ngs/client.h"
#include "ngs/capabilities/handler_auth_mech.h"
#include "ngs/capabilities/handler_readonly_value.h"
#include "ngs/capabilities/handler_tls.h"
#include "ngs/interface/server_interface.h"
#include "ngs/interface/session_interface.h"
#include "ngs/ngs_error.h"
#include "ngs/protocol/protocol_config.h"
#include "ngs/protocol_monitor.h"
#include "ngs/scheduler.h"
#include "ngs_common/operations_factory.h"

#include <string.h>
#include <algorithm>
#include <functional>
#ifndef WIN32
#include <arpa/inet.h>
#endif

#include "ngs/log.h"

#undef ERROR  // Needed to avoid conflict with ERROR in mysqlx.pb.h
#include "ngs_common/protocol_protobuf.h"

using namespace ngs;

Client::Client(Connection_ptr connection, Server_interface &server,
               Client_id client_id, Protocol_monitor_interface &pmon)
    : m_client_id(client_id),
      m_server(server),
      m_connection(connection),
      m_client_addr("n/c"),
      m_client_port(0),
      m_state(Client_invalid),
      m_removed(false),
      m_protocol_monitor(pmon),
      m_close_reason(Not_closing),
      m_msg_buffer(NULL),
      m_msg_buffer_size(0) {
  my_snprintf(m_id, sizeof(m_id), "%llu", static_cast<ulonglong>(client_id));
}

Client::~Client() {
  log_debug("%s: Delete client", m_id);
  if (m_connection) m_connection->close();

  if (m_msg_buffer) ngs::free_array(m_msg_buffer);
}

ngs::chrono::time_point Client::get_accept_time() const {
  return m_accept_time;
}

void Client::reset_accept_time() {
  m_accept_time = chrono::now();
  m_server.restart_client_supervision_timer();
}

void Client::activate_tls() {
  log_debug("%s: enabling TLS for client", client_id());

  if (m_server.ssl_context()->activate_tls(
          connection(),
          chrono::to_seconds(m_server.get_config()->connect_timeout))) {
    if (connection().options()->active_tls()) session()->mark_as_tls_session();
  } else {
    log_warning("%s: Error during SSL handshake", client_id());
    disconnect_and_trigger_close();
  }
}

void Client::on_auth_timeout() {
  m_close_reason = Close_connect_timeout;

  // XXX send an ERROR notice when it's available
  disconnect_and_trigger_close();
}

Capabilities_configurator *Client::capabilities_configurator() {
  std::vector<Capability_handler_ptr> handlers;

  handlers.push_back(ngs::allocate_shared<Capability_tls>(ngs::ref(*this)));
  handlers.push_back(
      ngs::allocate_shared<Capability_auth_mech>(ngs::ref(*this)));

  handlers.push_back(
      ngs::allocate_shared<Capability_readonly_value>("doc.formats", "text"));

  return ngs::allocate_object<Capabilities_configurator>(handlers);
}

void Client::get_capabilities(const Mysqlx::Connection::CapabilitiesGet &msg) {
  ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
      capabilities_configurator());
  ngs::Memory_instrumented<Mysqlx::Connection::Capabilities>::Unique_ptr caps(
      configurator->get());

  m_encoder->send_message(Mysqlx::ServerMessages::CONN_CAPABILITIES, *caps);
}

void Client::set_capabilities(
    const Mysqlx::Connection::CapabilitiesSet &setcap) {
  ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
      capabilities_configurator());
  Error_code error_code = configurator->prepare_set(setcap.capabilities());
  m_encoder->send_result(error_code);
  if (!error_code) {
    configurator->commit();
  }
}

void Client::handle_message(Request &request) {
  log_message_recv(request);

  Client_state expected_state = Client_accepted;

  // there is no session before authentication, so we handle the messages
  // ourselves
  log_debug("%s: Client got message %i", client_id(), request.get_type());

  switch (request.get_type()) {
    case Mysqlx::ClientMessages::CON_CLOSE:
      m_encoder->send_ok("bye!");
      m_close_reason = Close_normal;
      disconnect_and_trigger_close();
      break;

    case Mysqlx::ClientMessages::SESS_RESET:
      // no-op, since we're only going to get called here before session is
      // authenticated
      break;

    case Mysqlx::ClientMessages::CON_CAPABILITIES_GET:
      get_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesGet &>(
          *request.message()));
      break;

    case Mysqlx::ClientMessages::CON_CAPABILITIES_SET:
      set_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesSet &>(
          *request.message()));
      break;

    case Mysqlx::ClientMessages::SESS_AUTHENTICATE_START:
      if (m_state.compare_exchange_strong(expected_state,
                                          Client_authenticating_first) &&
          server().is_running()) {
        log_debug("%s: Authenticating client...", client_id());

        ngs::shared_ptr<Session_interface> s(session());
        // start redirecting incoming messages directly to the session
        if (s) {
          // forward the message to the pre-allocated session, rest of auth will
          // be handled by the session
          s->handle_message(request);
        }
        break;
      }
      // Fall through.

    default:
      // invalid message at this time
      m_protocol_monitor.on_error_unknown_msg_type();
      log_info("%s: Invalid message %i received during client initialization",
               client_id(), request.get_type());
      m_encoder->send_result(ngs::Fatal(ER_X_BAD_MESSAGE, "Invalid message"));
      m_close_reason = Close_error;
      disconnect_and_trigger_close();
      break;
  }
}

void Client::disconnect_and_trigger_close() {
  if (m_close_reason == Not_closing) m_close_reason = Close_normal;

  shutdown_connection();
}

// this will be called on socket errors, but also when halt_and_wait() is called
// which will shutdown the socket for reading and trigger a eof
// (meaning, closed for reads, but writes would still be ok)
void Client::on_network_error(int error) {
  if (error == 0)
    log_debug("%s: peer disconnected (state %i)", client_id(), m_state.load());
  else
    log_debug("%s: network error %i (state %i)", client_id(), error,
              m_state.load());
  if (m_close_reason == Not_closing && m_state != Client_closing && error != 0)
    m_close_reason = Close_net_error;

  m_state.exchange(Client_closing);

  if (m_session &&
      (Client_authenticating_first == m_state || Client_running == m_state)) {
    // trigger all sessions to close and stop whatever they're doing
    log_debug("%s: killing session", client_id());
    if (Session_interface::Closing != m_session->state())
      server().get_worker_scheduler()->post_and_wait(
          ngs::bind(&Client::on_kill, this, ngs::ref(*m_session)));
  }
}

void Client::on_kill(Session_interface &session) { m_session->on_kill(); }

void Client::remove_client_from_server() {
  if (false == m_removed.exchange(true)) m_server.on_client_closed(*this);
}

void Client::on_client_addr(const bool skip_resolve) {
  m_client_addr.resize(INET6_ADDRSTRLEN);

  switch (m_connection->connection_type()) {
    case Connection_tcpip: {
      m_connection->peer_address(m_client_addr, m_client_port);
    } break;

    case Connection_namedpipe:
    case Connection_unixsocket:  // fall through
      m_client_host = "localhost";
      return;

    default:
      return;
  }

  // turn IP to hostname for auth uses
  if (skip_resolve) return;

  m_client_host = "";

  try {
    m_client_host = resolve_hostname();
  } catch (...) {
    m_close_reason = Close_reject;
    disconnect_and_trigger_close();

    throw;
  }
}

void Client::on_accept() {
  log_debug("%s: Accepted client connection from %s", client_id(),
            client_address());

  m_connection->set_socket_thread_owner();

  // it can be accessed directly (no other thread access thus object)
  m_state = Client_accepted;

  m_encoder.reset(ngs::allocate_object<Protocol_encoder>(
      m_connection,
      ngs::bind(&Client::on_network_error, this, ngs::placeholders::_1),
      ngs::ref(m_protocol_monitor)));

  // pre-allocate the initial session
  // this is also needed for the srv_session to correctly report us to the
  // audit.log as in the Pre-authenticate state
  ngs::shared_ptr<Session_interface> session(
      m_server.create_session(*this, *m_encoder, 1));
  if (!session) {
    log_warning("%s: Error creating session for connection from %s",
                client_id(), m_client_addr.c_str());
    m_encoder->send_init_error(
        ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate session"));
  } else {
    ngs::Error_code error(session->init());
    if (error) {
      log_warning("%s: Error initializing session for connection: %s",
                  client_id(), error.message.c_str());
      m_encoder->send_result(error);
      session.reset();
    } else
      m_session = session;
  }
  if (!session) {
    m_close_reason = Close_error;
    disconnect_and_trigger_close();
  }
}

void Client::on_session_auth_success(Session_interface &s) {
  // this is called from worker thread
  Client_state expected = Client_authenticating_first;
  m_state.compare_exchange_strong(expected, Client_running);
}

void Client::on_session_close(Session_interface &s) {
  log_debug("%s: Session %i removed", client_id(), s.session_id());

  // no more open sessions, disconnect
  if (m_close_reason == Not_closing) m_close_reason = Close_normal;

  m_state = Client_closing;

  shutdown_connection();

  remove_client_from_server();
}

void Client::on_session_reset(Session_interface &s) {
  log_debug("%s: Resetting session %i", client_id(), s.session_id());

  m_state = Client_accepted_with_session;
  ngs::shared_ptr<Session_interface> session(
      m_server.create_session(*this, *m_encoder, 1));
  if (!session) {
    log_warning("%s: Error creating session for connection from %s",
                client_id(), m_client_addr.c_str());
    m_encoder->send_result(
        ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate new session"));
    m_state = Client_closing;
  } else {
    ngs::Error_code error(session->init());
    if (error) {
      log_warning("%s: Error initializing session for connection: %s",
                  client_id(), error.message.c_str());
      m_encoder->send_result(error);
      session.reset();
      m_state = Client_closing;
    } else {
      m_session = session;
      m_encoder->send_ok();
    }
  }
}

void Client::on_server_shutdown() {
  log_info("%s: closing client because of shutdown (state: %i)", client_id(),
           m_state.load());
  // XXX send a server shutdown notice
  disconnect_and_trigger_close();
}

Protocol_monitor_interface &Client::get_protocol_monitor() {
  return m_protocol_monitor;
}

void Client::get_last_error(int &error_code, std::string &message) {
  ngs::Operations_factory operations_factory;
  System_interface::Shared_ptr system_interface(
      operations_factory.create_system_interface());

  system_interface->get_socket_error_and_message(error_code, message);
}

void Client::shutdown_connection() {
  m_state = Client_closing;

  if (m_connection->shutdown(Connection_vio::Shutdown_recv) < 0) {
    int err;
    std::string strerr;

    get_last_error(err, strerr);
    log_debug("%s: connection shutdown error %s (%i)", client_id(),
              strerr.c_str(), err);
  }
}

Request *Client::read_one_message(Error_code &ret_error) {
  union {
    char buffer[4];  // Must be properly aligned
    longlong dummy;
  };
  uint32_t msg_size;

  /*
    Use dummy, otherwise g++ 4.4 reports: unused variable 'dummy'
    MY_ATTRIBUTE((unused)) did not work, so we must use it.
  */
  dummy = 0;

  // untill we get another message to process we mark the connection as idle
  // (for PSF)
  m_connection->mark_idle();
  // read the frame
  ssize_t nread = m_connection->read(buffer, 4);
  m_connection->mark_active();

  if (nread == 0)  // EOF
  {
    on_network_error(0);
    return NULL;
  }
  if (nread < 0) {
    int err;
    std::string strerr;
    get_last_error(err, strerr);
    if (!(err == EBADF && m_close_reason == Close_connect_timeout)) {
      log_debug("%s: %s (%i) %i", client_id(), strerr.c_str(), err,
                m_close_reason);
      on_network_error(err);
    }
    return NULL;
  }

  m_protocol_monitor.on_receive(static_cast<long>(nread));

#ifdef WORDS_BIGENDIAN
  std::swap(buffer[0], buffer[3]);
  std::swap(buffer[1], buffer[2]);
#endif
  const uint32_t *pdata = (uint32_t *)(buffer);
  msg_size = *pdata;

  if (msg_size > m_server.get_config()->max_message_size) {
    log_warning("%s: Message of size %u received, exceeding the limit of %i",
                client_id(), msg_size, m_server.get_config()->max_message_size);
    // invalid message size
    // Don't send error, just abort connection
    // ret_error = Fatal(ER_X_BAD_MESSAGE, "Message too large");
    return NULL;
  }

  if (0 == msg_size) {
    ret_error =
        Error(ER_X_BAD_MESSAGE, "Messages without payload are not supported");
    return NULL;
  }

  if (m_msg_buffer_size < msg_size) {
    m_msg_buffer_size = msg_size;
    ngs::reallocate_array(m_msg_buffer, m_msg_buffer_size,
                          KEY_memory_x_recv_buffer);
  }

  nread = m_connection->read(&m_msg_buffer[0], msg_size);
  if (nread == 0)  // EOF
  {
    log_info("%s: peer disconnected while reading message body", client_id());
    on_network_error(0);
    return NULL;
  }

  if (nread < 0) {
    int err;
    std::string strerr;

    get_last_error(err, strerr);
    log_debug("%s: %s (%i)", client_id(), strerr.c_str(), err);
    on_network_error(err);
    return NULL;
  }

  m_protocol_monitor.on_receive(static_cast<long>(nread));

  int8_t type = (int8_t)m_msg_buffer[0];
  Request_unique_ptr request(ngs::allocate_object<Request>(type));

  if (msg_size > 1) request->buffer(&m_msg_buffer[1], msg_size - 1);

  ret_error = m_decoder.parse(*request);

  return request.release();
}

void Client::run(const bool skip_name_resolve) {
  try {
    on_client_addr(skip_name_resolve);
    on_accept();

    while (m_state != Client_closing && m_session) {
      Error_code error;
      Request_unique_ptr message(read_one_message(error));

      // read could took some time, thus lets recheck the state
      if (m_state == Client_closing) break;

      if (error || !message) {
        // !message and !error = EOF
        if (error) m_encoder->send_result(ngs::Fatal(error));
        disconnect_and_trigger_close();
        break;
      }
      ngs::shared_ptr<Session_interface> s(session());
      if (m_state != Client_accepted && s) {
        // pass the message to the session
        s->handle_message(*message);
      } else
        handle_message(*message);
    }
  } catch (std::exception &e) {
    log_error("%s: Force stopping client because exception occurred: %s",
              client_id(), e.what());
  }

  {
    Mutex_lock lock(server().get_client_exit_mutex());
    m_state = Client_closed;

    remove_client_from_server();
  }
}

Youez - 2016 - github.com/yon3zu
LinuXploit