403Webshell
Server IP : 104.21.38.3  /  Your IP : 162.158.106.49
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/mysql/src/rapid/plugin/group_replication/src/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/mysql/src/rapid/plugin/group_replication/src/applier.cc
/* Copyright (c) 2014, 2023, Oracle and/or its affiliates.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License, version 2.0,
   as published by the Free Software Foundation.

   This program is also distributed with certain software (including
   but not limited to OpenSSL) that is licensed under separate terms,
   as designated in a particular file or component or in included license
   documentation.  The authors of MySQL hereby grant you an additional
   permission to link the program and your derivative works with the
   separately licensed software that they have included with MySQL.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License, version 2.0, for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software Foundation,
   51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */

#include <signal.h>
#include "applier.h"
#include <mysql/group_replication_priv.h>
#include "plugin_log.h"
#include "plugin.h"
#include "single_primary_message.h"

char applier_module_channel_name[] = "group_replication_applier";
bool applier_thread_is_exiting= false;

static void *launch_handler_thread(void* arg)
{
  Applier_module *handler= (Applier_module*) arg;
  handler->applier_thread_handle();
  return 0;
}

Applier_module::Applier_module()
  :applier_running(false), applier_aborted(false), applier_error(0),
   suspended(false), waiting_for_applier_suspension(false),
   shared_stop_write_lock(NULL), incoming(NULL), pipeline(NULL),
   fde_evt(BINLOG_VERSION), stop_wait_timeout(LONG_TIMEOUT),
   applier_channel_observer(NULL)
{
  mysql_mutex_init(key_GR_LOCK_applier_module_run, &run_lock, MY_MUTEX_INIT_FAST);
  mysql_cond_init(key_GR_COND_applier_module_run, &run_cond);
  mysql_mutex_init(key_GR_LOCK_applier_module_suspend, &suspend_lock, MY_MUTEX_INIT_FAST);
  mysql_cond_init(key_GR_COND_applier_module_suspend, &suspend_cond);
  mysql_cond_init(key_GR_COND_applier_module_wait, &suspension_waiting_condition);
}

Applier_module::~Applier_module(){
  if (this->incoming)
  {
    while (!this->incoming->empty())
    {
      Packet *packet= NULL;
      this->incoming->pop(&packet);
      delete packet;
    }
    delete incoming;
  }
  delete applier_channel_observer;

  mysql_mutex_destroy(&run_lock);
  mysql_cond_destroy(&run_cond);
  mysql_mutex_destroy(&suspend_lock);
  mysql_cond_destroy(&suspend_cond);
  mysql_cond_destroy(&suspension_waiting_condition);
}

int
Applier_module::setup_applier_module(Handler_pipeline_type pipeline_type,
                                     bool reset_logs,
                                     ulong stop_timeout,
                                     rpl_sidno group_sidno,
                                     ulonglong gtid_assignment_block_size,
                                     Shared_writelock *shared_stop_lock)
{
  DBUG_ENTER("Applier_module::setup_applier_module");

  int error= 0;

  //create the receiver queue
  this->incoming= new Synchronized_queue<Packet*>();

  stop_wait_timeout= stop_timeout;

  pipeline= NULL;

  if ( (error= get_pipeline(pipeline_type, &pipeline)) )
  {
    DBUG_RETURN(error);
  }

  reset_applier_logs= reset_logs;
  group_replication_sidno= group_sidno;
  this->gtid_assignment_block_size= gtid_assignment_block_size;

  shared_stop_write_lock= shared_stop_lock;

  DBUG_RETURN(error);
}


int
Applier_module::purge_applier_queue_and_restart_applier_module()
{
  DBUG_ENTER("Applier_module::purge_applier_queue_and_restart_applier_module");
  int error= 0;

  /*
    Here we are stopping applier thread intentionally and we will be starting
    the applier thread after purging the relay logs. So we should ignore any
    errors during the stop (eg: error due to stopping the applier thread in the
    middle of applying the group of events). Hence unregister the applier channel
    observer temporarily till the required work is done.
  */
  channel_observation_manager->unregister_channel_observer(applier_channel_observer);

  /* Stop the applier thread */
  Pipeline_action *stop_action= new Handler_stop_action();
  error= pipeline->handle_action(stop_action);
  delete stop_action;
  if (error)
    DBUG_RETURN(error); /* purecov: inspected */

  /* Purge the relay logs and initialize the channel*/
  Handler_applier_configuration_action *applier_conf_action=
    new Handler_applier_configuration_action(applier_module_channel_name,
                                             true, /* purge relay logs always*/
                                             stop_wait_timeout,
                                             group_replication_sidno);

  error= pipeline->handle_action(applier_conf_action);
  delete applier_conf_action;
  if (error)
    DBUG_RETURN(error); /* purecov: inspected */

  channel_observation_manager->register_channel_observer(applier_channel_observer);

  /* Start the applier thread */
  Pipeline_action *start_action = new Handler_start_action();
  error= pipeline->handle_action(start_action);
  delete start_action;

  DBUG_RETURN(error);
}


int
Applier_module::setup_pipeline_handlers()
{
  DBUG_ENTER("Applier_module::setup_pipeline_handlers");

  int error= 0;

  //Configure the applier handler trough a configuration action
  Handler_applier_configuration_action *applier_conf_action=
    new Handler_applier_configuration_action(applier_module_channel_name,
                                             reset_applier_logs,
                                             stop_wait_timeout,
                                             group_replication_sidno);

  error= pipeline->handle_action(applier_conf_action);
  delete applier_conf_action;
  if (error)
    DBUG_RETURN(error); /* purecov: inspected */

  Handler_certifier_configuration_action *cert_conf_action=
    new Handler_certifier_configuration_action(group_replication_sidno,
                                               gtid_assignment_block_size);

  error = pipeline->handle_action(cert_conf_action);

  delete cert_conf_action;

  DBUG_RETURN(error);
}

void
Applier_module::set_applier_thread_context()
{
  my_thread_init();
  THD *thd= new THD;
  thd->set_new_thread_id();
  thd->thread_stack= (char*) &thd;
  thd->store_globals();

  thd->get_protocol_classic()->init_net(0);
  thd->slave_thread= true;
  //TODO: See of the creation of a new type is desirable.
  thd->system_thread= SYSTEM_THREAD_SLAVE_IO;
  thd->security_context()->skip_grants();

  global_thd_manager_add_thd(thd);

  thd->init_for_queries();
  set_slave_thread_options(thd);
#ifndef _WIN32
  THD_STAGE_INFO(thd, stage_executing);
#endif
  applier_thd= thd;
}

void
Applier_module::clean_applier_thread_context()
{
  applier_thd->get_protocol_classic()->end_net();
  applier_thd->release_resources();
  THD_CHECK_SENTRY(applier_thd);
  global_thd_manager_remove_thd(applier_thd);
}

int
Applier_module::inject_event_into_pipeline(Pipeline_event* pevent,
                                           Continuation* cont)
{
  int error= 0;
  pipeline->handle_event(pevent, cont);

  if ((error= cont->wait()))
    log_message(MY_ERROR_LEVEL, "Error at event handling! Got error: %d", error);

  return error;
}



bool Applier_module::apply_action_packet(Action_packet *action_packet)
{
  enum_packet_action action= action_packet->packet_action;

  //packet used to break the queue blocking wait
  if (action == TERMINATION_PACKET)
  {
     return true;
  }
  //packet to signal the applier to suspend
  if (action == SUSPENSION_PACKET)
  {
    suspend_applier_module();
    return false;
  }
  return false; /* purecov: inspected */
}

int
Applier_module::apply_view_change_packet(View_change_packet *view_change_packet,
                                         Format_description_log_event *fde_evt,
                                         IO_CACHE *cache,
                                         Continuation *cont)
{
  int error= 0;

  Gtid_set *group_executed_set= NULL;
  Sid_map *sid_map= NULL;
  if (!view_change_packet->group_executed_set.empty())
  {
    sid_map= new Sid_map(NULL);
    group_executed_set= new Gtid_set(sid_map, NULL);
    if (intersect_group_executed_sets(view_change_packet->group_executed_set,
                                      group_executed_set))
    {
       log_message(MY_WARNING_LEVEL,
                   "Error when extracting group GTID execution information, "
                   "some recovery operations may face future issues"); /* purecov: inspected */
       delete sid_map;            /* purecov: inspected */
       delete group_executed_set; /* purecov: inspected */
       group_executed_set= NULL;  /* purecov: inspected */
    }
  }

  if (group_executed_set != NULL)
  {
    if (get_certification_handler()->get_certifier()->
        set_group_stable_transactions_set(group_executed_set))
    {
      log_message(MY_WARNING_LEVEL,
                  "An error happened when trying to reduce the Certification "
                  " information size for transmission"); /* purecov: inspected */
    }
    delete sid_map;
    delete group_executed_set;
  }

  View_change_log_event* view_change_event
      = new View_change_log_event((char*)view_change_packet->view_id.c_str());

  Pipeline_event* pevent= new Pipeline_event(view_change_event, fde_evt, cache);
  pevent->mark_event(SINGLE_VIEW_EVENT);
  error= inject_event_into_pipeline(pevent, cont);
  //When discarded, the VCLE logging was delayed, so don't delete it
  if (!cont->is_transaction_discarded())
    delete pevent;

  return error;
}

int Applier_module::apply_data_packet(Data_packet *data_packet,
                                      Format_description_log_event *fde_evt,
                                      IO_CACHE *cache,
                                      Continuation *cont)
{
  int error= 0;
  uchar* payload= data_packet->payload;
  uchar* payload_end= data_packet->payload + data_packet->len;

  DBUG_EXECUTE_IF("group_replication_before_apply_data_packet", {
    const char act[] = "now wait_for continue_apply";
    assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
  });

  if (check_single_primary_queue_status())
    return 1; /* purecov: inspected */

  while ((payload != payload_end) && !error)
  {
    uint event_len= uint4korr(((uchar*)payload) + EVENT_LEN_OFFSET);

    Data_packet* new_packet= new Data_packet(payload, event_len);
    payload= payload + event_len;

    Pipeline_event* pevent= new Pipeline_event(new_packet, fde_evt, cache);
    error= inject_event_into_pipeline(pevent, cont);

    delete pevent;
    DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event",
                    {
                      if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT)
                      {
                        error=1;
                      }
                    }
                   );
  }

  return error;
}

int
Applier_module::apply_single_primary_action_packet(Single_primary_action_packet *packet)
{
  int error= 0;
  Certifier_interface *certifier= get_certification_handler()->get_certifier();

  switch (packet->action)
  {
    case Single_primary_action_packet::NEW_PRIMARY:
      certifier->enable_conflict_detection();
      break;
    case Single_primary_action_packet::QUEUE_APPLIED:
      certifier->disable_conflict_detection();
      break;
    default:
      assert(0); /* purecov: inspected */
  }

  return error;
}


int
Applier_module::applier_thread_handle()
{
  DBUG_ENTER("ApplierModule::applier_thread_handle()");

  //set the thread context
  set_applier_thread_context();

  Handler_THD_setup_action *thd_conf_action= NULL;
  Format_description_log_event* fde_evt= NULL;
  Continuation* cont= NULL;
  Packet *packet= NULL;
  bool loop_termination = false;
  int packet_application_error= 0;

  IO_CACHE *cache= (IO_CACHE*) my_malloc(PSI_NOT_INSTRUMENTED,
                                         sizeof(IO_CACHE),
                                         MYF(MY_ZEROFILL));
  if (!cache || (!my_b_inited(cache) &&
                 open_cached_file(cache, mysql_tmpdir,
                                  "group_replication_pipeline_applier_cache",
                                  SHARED_EVENT_IO_CACHE_SIZE,
                                  MYF(MY_WME))))
  {
    my_free(cache);   /* purecov: inspected */
    cache= NULL;      /* purecov: inspected */
    log_message(MY_ERROR_LEVEL,
                "Failed to create group replication pipeline applier cache!"); /* purecov: inspected */
    applier_error= 1; /* purecov: inspected */
    goto end;         /* purecov: inspected */
  }

  applier_error= setup_pipeline_handlers();

  applier_channel_observer= new Applier_channel_state_observer();
  channel_observation_manager
      ->register_channel_observer(applier_channel_observer);

  if (!applier_error)
  {
    Pipeline_action *start_action = new Handler_start_action();
    applier_error= pipeline->handle_action(start_action);
    delete start_action;
  }

  if (applier_error)
  {
    goto end;
  }

  mysql_mutex_lock(&run_lock);
  applier_thread_is_exiting= false;
  applier_running= true;
  mysql_cond_broadcast(&run_cond);
  mysql_mutex_unlock(&run_lock);

  fde_evt= new Format_description_log_event(BINLOG_VERSION);
  cont= new Continuation();

  //Give the handlers access to the applier THD
  thd_conf_action= new Handler_THD_setup_action(applier_thd);
  // To prevent overwrite last error method
  applier_error+= pipeline->handle_action(thd_conf_action);
  delete thd_conf_action;

  //applier main loop
  while (!applier_error && !packet_application_error && !loop_termination)
  {
    if (is_applier_thread_aborted())
      break;

    this->incoming->front(&packet); // blocking

    switch (packet->get_packet_type())
    {
      case ACTION_PACKET_TYPE:
          this->incoming->pop();
          loop_termination= apply_action_packet((Action_packet*)packet);
          break;
      case VIEW_CHANGE_PACKET_TYPE:
          packet_application_error=
            apply_view_change_packet((View_change_packet*)packet,
                                     fde_evt, cache, cont);
          this->incoming->pop();
          break;
      case DATA_PACKET_TYPE:
          packet_application_error= apply_data_packet((Data_packet*)packet,
                                                      fde_evt, cache, cont);
          //Remove from queue here, so the size only decreases after packet handling
         this->incoming->pop();
          break;
      case SINGLE_PRIMARY_PACKET_TYPE:
          packet_application_error=
              apply_single_primary_action_packet((Single_primary_action_packet*)packet);
          this->incoming->pop();
          break;
      default:
        assert(0); /* purecov: inspected */
    }

    delete packet;
  }
  if (packet_application_error)
    applier_error= packet_application_error;
  delete fde_evt;
  delete cont;

end:

  //always remove the observer even the thread is no longer running
  channel_observation_manager
      ->unregister_channel_observer(applier_channel_observer);

  //only try to leave if the applier managed to start
  if (applier_error && applier_running)
    leave_group_on_failure();

  //Even on error cases, send a stop signal to all handlers that could be active
  Pipeline_action *stop_action= new Handler_stop_action();
  int local_applier_error= pipeline->handle_action(stop_action);
  delete stop_action;

  Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());

  log_message(MY_INFORMATION_LEVEL, "The group replication applier thread"
                                    " was killed");

  DBUG_EXECUTE_IF("applier_thd_timeout",
                  {
                    const char act[]= "now wait_for signal.applier_continue";
                    assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                  });

  if (cache != NULL)
  {
    close_cached_file(cache);
    my_free(cache);
  }

  clean_applier_thread_context();

  mysql_mutex_lock(&run_lock);
  delete applier_thd;

  /*
    Don't overwrite applier_error when stop_applier_thread() doesn't return
    error. So applier_error which is also referred by main thread
    doesn't return true from initialize_applier_thread() when
    start_applier_thread() fails and stop_applier_thread() succeeds.
    Also use local var - local_applier_error, as the applier can be deleted
    before the thread returns.
  */
  if (local_applier_error)
    applier_error= local_applier_error; /* purecov: inspected */
  else
    local_applier_error= applier_error;

  applier_running= false;
  mysql_cond_broadcast(&run_cond);
  mysql_mutex_unlock(&run_lock);

  my_thread_end();
  applier_thread_is_exiting= true;
  my_thread_exit(0);

  DBUG_RETURN(local_applier_error); /* purecov: inspected */
}

int
Applier_module::initialize_applier_thread()
{
  DBUG_ENTER("Applier_module::initialize_applier_thd");

  //avoid concurrency calls against stop invocations
  mysql_mutex_lock(&run_lock);

  applier_error= 0;

  if ((mysql_thread_create(key_GR_THD_applier_module_receiver,
                           &applier_pthd,
                           get_connection_attrib(),
                           launch_handler_thread,
                           (void*)this)))
  {
    mysql_mutex_unlock(&run_lock); /* purecov: inspected */
    DBUG_RETURN(1);                /* purecov: inspected */
  }

  while (!applier_running && !applier_error)
  {
    DBUG_PRINT("sleep",("Waiting for applier thread to start"));
    mysql_cond_wait(&run_cond, &run_lock);
  }

  mysql_mutex_unlock(&run_lock);
  DBUG_RETURN(applier_error);
}

int
Applier_module::terminate_applier_pipeline()
{
  int error= 0;
  if (pipeline != NULL)
  {
    if ((error= pipeline->terminate_pipeline()))
    {
      log_message(MY_WARNING_LEVEL,
                  "The group replication applier pipeline was not properly"
                  " disposed. Check the error log for further info."); /* purecov: inspected */
    }
    //delete anyway, as we can't do much on error cases
    delete pipeline;
    pipeline= NULL;
  }
  return error;
}

int
Applier_module::terminate_applier_thread()
{
  DBUG_ENTER("Applier_module::terminate_applier_thread");

  /* This lock code needs to be re-written from scratch*/
  mysql_mutex_lock(&run_lock);

  applier_aborted= true;

  if (!applier_running)
  {
    goto delete_pipeline;
  }

  while (applier_running)
  {
    DBUG_PRINT("loop", ("killing group replication applier thread"));

    mysql_mutex_lock(&applier_thd->LOCK_thd_data);

    applier_thd->awake(THD::NOT_KILLED);
    mysql_mutex_unlock(&applier_thd->LOCK_thd_data);

    //before waiting for termination, signal the queue to unlock.
    add_termination_packet();

    //also awake the applier in case it is suspended
    awake_applier_module();

    /*
      There is a small chance that thread might miss the first
      alarm. To protect against it, resend the signal until it reacts
    */
    struct timespec abstime;
    set_timespec(&abstime, 2);
#ifndef NDEBUG
    int error=
#endif
      mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
    if (stop_wait_timeout >= 2)
    {
      stop_wait_timeout= stop_wait_timeout - 2;
    }
    else if (applier_running) // quit waiting
    {
      mysql_mutex_unlock(&run_lock);
      DBUG_RETURN(1);
    }
    assert(error == ETIMEDOUT || error == 0);
  }

  assert(!applier_running);

delete_pipeline:

  //The thread ended properly so we can terminate the pipeline
  terminate_applier_pipeline();

  while (!applier_thread_is_exiting)
  {
    /* Check if applier thread is exiting per microsecond. */
    my_sleep(1);
  }

  /*
    Give applier thread one microsecond to exit completely after
    it set applier_thread_is_exiting to true.
  */
  my_sleep(1);

  mysql_mutex_unlock(&run_lock);

  DBUG_RETURN(0);
}

void Applier_module::inform_of_applier_stop(char* channel_name,
                                            bool aborted)
{
  DBUG_ENTER("Applier_module::inform_of_applier_stop");

  if (!strcmp(channel_name, applier_module_channel_name) &&
      aborted && applier_running )
  {
    log_message(MY_ERROR_LEVEL,
                "The applier thread execution was aborted."
                " Unable to process more transactions,"
                " this member will now leave the group.");

    applier_error= 1;

    //before waiting for termination, signal the queue to unlock.
    add_termination_packet();

    //also awake the applier in case it is suspended
    awake_applier_module();
  }

  DBUG_VOID_RETURN;
}

void Applier_module::leave_group_on_failure()
{
  DBUG_ENTER("Applier_module::leave_group_on_failure");

  log_message(MY_ERROR_LEVEL,
              "Fatal error during execution on the Applier process of "
              "Group Replication. The server will now leave the group.");

  group_member_mgr->update_member_status(local_member_info->get_uuid(),
                                         Group_member_info::MEMBER_ERROR);

  bool set_read_mode= false;
  if (view_change_notifier != NULL &&
      !view_change_notifier->is_view_modification_ongoing())
  {
    view_change_notifier->start_view_modification();
  }
  Gcs_operations::enum_leave_state state= gcs_module->leave();

  int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
                              stop_wait_timeout);
  if (error)
  {
    log_message(MY_ERROR_LEVEL,
                "Error stopping all replication channels while server was"
                " leaving the group. Please check the error log for additional"
                " details. Got error: %d", error);
  }

  std::stringstream ss;
  plugin_log_level log_severity= MY_WARNING_LEVEL;
  switch (state)
  {
    case Gcs_operations::ERROR_WHEN_LEAVING:
      ss << "Unable to confirm whether the server has left the group or not. "
            "Check performance_schema.replication_group_members to check group membership information.";
      log_severity= MY_ERROR_LEVEL;
      break;
    case Gcs_operations::ALREADY_LEAVING:
      ss << "Skipping leave operation: concurrent attempt to leave the group is on-going."; /* purecov: inspected */
      break; /* purecov: inspected */
    case Gcs_operations::ALREADY_LEFT:
      ss << "Skipping leave operation: member already left the group."; /* purecov: inspected */
      break; /* purecov: inspected */
    case Gcs_operations::NOW_LEAVING:
      set_read_mode= true;
      ss << "The server was automatically set into read only mode after an error was detected.";
      log_severity= MY_ERROR_LEVEL;
      break;
  }
  log_message(log_severity, ss.str().c_str());

  kill_pending_transactions(set_read_mode, false);

  DBUG_VOID_RETURN;
}

void Applier_module::kill_pending_transactions(bool set_read_mode,
                                               bool threaded_sql_session)
{
  DBUG_ENTER("Applier_module::kill_pending_transactions");

  //Stop any more transactions from waiting
  bool already_locked= shared_stop_write_lock->try_grab_write_lock();

  //kill pending transactions
  blocked_transaction_handler->unblock_waiting_transactions();

  if (!already_locked)
    shared_stop_write_lock->release_write_lock();

  if (set_read_mode)
  {
    if (threaded_sql_session)
      enable_server_read_mode(PSESSION_INIT_THREAD);
    else
      enable_server_read_mode(PSESSION_USE_THREAD);
  }

  if (view_change_notifier != NULL)
  {
    log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
    if (view_change_notifier->wait_for_view_modification())
    {
      log_message(MY_ERROR_LEVEL, "On shutdown there was a timeout receiving a "
                                  "view change. This can lead to a possible "
                                  "inconsistent state. Check the log for more "
                                  "details");
    }
  }

  /*
    Only abort() if we successfully asked to leave() the group (and we have
    group_replication_exit_state_action set to ABORT_SERVER).
    We don't want to abort() during the execution of START GROUP_REPLICATION or
    STOP GROUP_REPLICATION.
  */
  if (set_read_mode &&
      exit_state_action_var == EXIT_STATE_ACTION_ABORT_SERVER)
  {
    abort_plugin_process("Fatal error during execution of Group Replication");
  }

  DBUG_VOID_RETURN;
}

int
Applier_module::wait_for_applier_complete_suspension(bool *abort_flag,
                                                     bool wait_for_execution)
{
  int error= 0;

  mysql_mutex_lock(&suspend_lock);

  /*
   We use an external flag to avoid race conditions.
   A local flag could always lead to the scenario of
     wait_for_applier_complete_suspension()

   >> thread switch

     break_applier_suspension_wait()
       we_are_waiting = false;
       awake

   thread switch <<

      we_are_waiting = true;
      wait();
  */
  while (!suspended && !(*abort_flag) && !applier_aborted && !applier_error)
  {
    mysql_cond_wait(&suspension_waiting_condition, &suspend_lock);
  }

  mysql_mutex_unlock(&suspend_lock);

  if (applier_aborted || applier_error)
      return APPLIER_THREAD_ABORTED; /* purecov: inspected */

  /**
    Wait for the applier execution of pre suspension events (blocking method)
    while(the wait method times out)
      wait()
  */
  if (wait_for_execution)
  {
    error= APPLIER_GTID_CHECK_TIMEOUT_ERROR; //timeout error
    while (error == APPLIER_GTID_CHECK_TIMEOUT_ERROR && !(*abort_flag))
      error= wait_for_applier_event_execution(1, true); //blocking
  }

  return (error == APPLIER_RELAY_LOG_NOT_INITED);
}

void
Applier_module::interrupt_applier_suspension_wait()
{
  mysql_mutex_lock(&suspend_lock);
  mysql_cond_broadcast(&suspension_waiting_condition);
  mysql_mutex_unlock(&suspend_lock);
}

bool
Applier_module::is_applier_thread_waiting()
{
  DBUG_ENTER("Applier_module::is_applier_thread_waiting");
  Event_handler* event_applier= NULL;
  Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);

  if (event_applier == NULL)
    return false; /* purecov: inspected */

  bool result= ((Applier_handler*)event_applier)->is_applier_thread_waiting();

  DBUG_RETURN(result);
}

int
Applier_module::wait_for_applier_event_execution(double timeout,
                                                 bool check_and_purge_partial_transactions)
{
  DBUG_ENTER("Applier_module::wait_for_applier_event_execution");
  int error= 0;
  Event_handler* event_applier= NULL;
  Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);

  if (event_applier &&
      !(error= ((Applier_handler*)event_applier)->wait_for_gtid_execution(timeout)))
  {
    /*
      After applier thread is done, check if there is partial transaction
      in the relay log. If so, applier thread must be holding the lock on it
      and will never release it because there will not be any more events
      coming into this channel. In this case, purge the relaylogs and restart
      the applier thread will release the lock and update the applier thread
      execution position correctly and safely.
    */
    if (check_and_purge_partial_transactions &&
        ((Applier_handler*)event_applier)->is_partial_transaction_on_relay_log())
    {
        error= purge_applier_queue_and_restart_applier_module();
    }
  }
  DBUG_RETURN(error);
}


Certification_handler* Applier_module::get_certification_handler(){

  Event_handler* event_applier= NULL;
  Event_handler::get_handler_by_role(pipeline, CERTIFIER, &event_applier);

  //The only certification handler for now
  return (Certification_handler*) event_applier;
}

int
Applier_module::intersect_group_executed_sets(std::vector<std::string>& gtid_sets,
                                              Gtid_set* output_set)
{
  Sid_map* sid_map= output_set->get_sid_map();

  std::vector<std::string>::iterator set_iterator;
  for (set_iterator= gtid_sets.begin();
       set_iterator!= gtid_sets.end();
       set_iterator++)
  {

    Gtid_set member_set(sid_map, NULL);
    Gtid_set intersection_result(sid_map, NULL);

    std::string exec_set_str= (*set_iterator);

    if (member_set.add_gtid_text(exec_set_str.c_str()) != RETURN_STATUS_OK)
    {
      return 1; /* purecov: inspected */
    }

    if (output_set->is_empty())
    {
      if (output_set->add_gtid_set(&member_set))
      {
      return 1; /* purecov: inspected */
      }
    }
    else
    {
      /*
        We have three sets:
          member_set:          the one sent from a given member;
          output_set:        the one that contains the intersection of
                               the computed sets until now;
          intersection_result: the intersection between set and
                               intersection_result.
        So we compute the intersection between member_set and output_set, and
        set that value to output_set to be used on the next intersection.
      */
      if (member_set.intersection(output_set, &intersection_result) != RETURN_STATUS_OK)
      {
        return 1; /* purecov: inspected */
      }

      output_set->clear();
      if (output_set->add_gtid_set(&intersection_result) != RETURN_STATUS_OK)
      {
        return 1; /* purecov: inspected */
      }
    }
  }

#if !defined(NDEBUG)
  char *executed_set_string;
  output_set->to_string(&executed_set_string);
  DBUG_PRINT("info", ("View change GTID information: output_set: %s",
             executed_set_string));
  my_free(executed_set_string);
#endif

  return 0;
}

int Applier_module::check_single_primary_queue_status()
{
  /*
    If the 1) group is on single primary mode, 2) this member is the
    primary one, and 3) the group replication applier did apply all
    previous primary transactions, we can switch off conflict
    detection since all transactions will originate from the same
    primary.
  */
  if (get_certification_handler()->get_certifier()->is_conflict_detection_enable() &&
      local_member_info->in_primary_mode() &&
      local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY &&
      is_applier_thread_waiting())
  {
    Single_primary_message
        single_primary_message(Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE);
    if (gcs_module->send_message(single_primary_message))
    {
      log_message(MY_ERROR_LEVEL,
                  "Error sending single primary message informing "
                  "that primary did apply relay logs"); /* purecov: inspected */
      return 1; /* purecov: inspected */
    }
  }

  return 0;
}

Pipeline_member_stats *Applier_module::get_local_pipeline_stats()
{
  // We need run_lock to get protection against STOP GR command.

  Mutex_autolock auto_lock_mutex(&run_lock);
  Pipeline_member_stats *stats= NULL;
  Certification_handler *cert= this->get_certification_handler();
  Certifier_interface *cert_module= (cert ? cert->get_certifier() : NULL);
  if (cert_module)
  {
    stats= new Pipeline_member_stats(
        get_pipeline_stats_member_collector(), get_message_queue_size(),
        cert_module->get_negative_certified(),
        cert_module->get_certification_info_size());
    {
      char *committed_transactions_buf= NULL;
      size_t committed_transactions_buf_length= 0;
      int outcome= cert_module->get_group_stable_transactions_set_string(
          &committed_transactions_buf, &committed_transactions_buf_length);

      if (!outcome && committed_transactions_buf_length > 0)
        stats->set_transaction_committed_all_members(committed_transactions_buf,
                committed_transactions_buf_length);
      my_free(committed_transactions_buf);
    }
    {
      std::string last_conflict_free_transaction;
      cert_module->get_last_conflict_free_transaction(
          &last_conflict_free_transaction);
      stats->set_transaction_last_conflict_free(last_conflict_free_transaction);
    }
  }
  else
  {
    stats= new Pipeline_member_stats(get_pipeline_stats_member_collector(),
                                     get_message_queue_size(), 0, 0);
  }
  return stats;
}

Youez - 2016 - github.com/yon3zu
LinuXploit