403Webshell
Server IP : 104.21.38.3  /  Your IP : 162.158.170.27
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/mysql/src/sql/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/mysql/src/sql/ndb_conflict_trans.cc
/*
   Copyright (c) 2011, 2023, Oracle and/or its affiliates.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License, version 2.0,
   as published by the Free Software Foundation.

   This program is also distributed with certain software (including
   but not limited to OpenSSL) that is licensed under separate terms,
   as designated in a particular file or component or in included license
   documentation.  The authors of MySQL hereby grant you an additional
   permission to link the program and your derivative works with the
   separately licensed software that they have included with MySQL.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License, version 2.0, for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
*/

#include "ndb_conflict_trans.h"

#ifdef HAVE_NDB_BINLOG
#include "my_sys.h"
#include "my_base.h"

/* Whether to track all transactions, or just
 * 'interesting' ones
 */
#define TRACK_ALL_TRANSACTIONS 0

/* Whether to check the transaction graph for
 * correctness at runtime
 */
#define CHECK_TRANS_GRAPH 0

/* st_row_event_key_info implementation */

st_row_event_key_info::
st_row_event_key_info(const NdbDictionary::Table* _table,
                      const uchar* _key_buff,
                      Uint32 _key_buff_len,
                      Uint64 _transaction_id):
  tableObj(_table),
  packed_key(_key_buff),
  packed_key_len(_key_buff_len),
  transaction_id(_transaction_id),
  hash_next(NULL)
{
}

Uint64
st_row_event_key_info::getTransactionId() const
{
  return transaction_id;
}

void
st_row_event_key_info::updateRowTransactionId(Uint64 mostRecentTransId)
{
  transaction_id = mostRecentTransId;
}

Uint32
st_row_event_key_info::hashValue() const
{
  /* Include Table Object Id + primary key */
  Uint32 h = (17 * 37) + tableObj->getObjectId();
  for (Uint32 i=0; i < packed_key_len; i++)
    h = (37 * h) + packed_key[i];
  return h;
}

bool
st_row_event_key_info::equal(const st_row_event_key_info* other) const
{
  /* Check same table + same PK */
  return
    ((tableObj == other->tableObj) &&
     (packed_key_len == other->packed_key_len) &&
     (memcmp(packed_key, other->packed_key, packed_key_len) == 0));
};

st_row_event_key_info*
st_row_event_key_info::getNext() const
{
  return hash_next;
};

void
st_row_event_key_info::setNext(st_row_event_key_info* _next)
{
  hash_next = _next;
};


/* st_trans_dependency implementation */

st_trans_dependency::
st_trans_dependency(st_transaction* _target_transaction,
                    st_transaction* _dependent_transaction,
                    const st_trans_dependency* _next)
  : target_transaction(_target_transaction),
    dependent_transaction(_dependent_transaction),
    next_entry(_next),
    hash_next(NULL)
{
};

st_transaction*
st_trans_dependency::getTargetTransaction() const
{
  return target_transaction;
}

st_transaction*
st_trans_dependency::getDependentTransaction() const
{
  return dependent_transaction;
}

const st_trans_dependency*
st_trans_dependency::getNextDependency() const
{
  return next_entry;
}

Uint32
st_trans_dependency::hashValue() const
{
  /* Hash the ptrs in a rather nasty way */
  UintPtr p =
    ((UintPtr) target_transaction) ^
    ((UintPtr) dependent_transaction);;

  if (sizeof(p) == 8)
  {
    /* Xor two words of 64 bit ptr */
    p =
      (p & 0xffffffff) ^
      ((((Uint64) p) >> 32) & 0xffffffff);
  }

  return 17 + (37 * (Uint32)p);
};

bool
st_trans_dependency::equal(const st_trans_dependency* other) const
{
  return ((target_transaction == other->target_transaction) &&
          (dependent_transaction == other->dependent_transaction));
};

st_trans_dependency*
st_trans_dependency::getNext() const
{
  return hash_next;
};

void
st_trans_dependency::setNext(st_trans_dependency* _next)
{
  hash_next = _next;
};


/* st_transaction implementation */

st_transaction::st_transaction(Uint64 _transaction_id)
  : transaction_id(_transaction_id),
    in_conflict(false),
    dependency_list_head(NULL),
    hash_next(NULL)
{
};

Uint64
st_transaction::getTransactionId() const
{
  return transaction_id;
}

bool
st_transaction::getInConflict() const
{
  return in_conflict;
}

void
st_transaction::setInConflict()
{
  in_conflict = true;
}

const st_trans_dependency*
st_transaction::getDependencyListHead() const
{
  return dependency_list_head;
}

void
st_transaction::setDependencyListHead(st_trans_dependency* head)
{
  dependency_list_head = head;
}

/* Hash Api */
Uint32
st_transaction::hashValue() const
{
  return 17 + (37 * ((transaction_id & 0xffffffff) ^
                     (transaction_id >> 32 & 0xffffffff)));
};

bool
st_transaction::equal(const st_transaction* other) const
{
  return transaction_id == other->transaction_id;
};

st_transaction*
st_transaction::getNext() const
{
    return hash_next;
};

void
st_transaction::setNext(st_transaction* _next)
{
  hash_next = _next;
};



/*
   Unique HashMap(Set) of st_row_event_key_info ptrs, with bucket storage
   allocated from st_mem_root_allocator
*/
template class HashMap2<st_row_event_key_info, true, st_mem_root_allocator>;
template class HashMap2<st_transaction, true, st_mem_root_allocator>;
template class HashMap2<st_trans_dependency, true, st_mem_root_allocator>;
template class LinkedStack<Uint64, st_mem_root_allocator>;


/**
 * pack_key_to_buffer
 *
 * Given a table, key_record and record, this method will
 * determine how many significant bytes the key contains,
 * and if a buffer is passed, will copy the bytes into the
 * buffer.
 */
static
int
pack_key_to_buffer(const NdbDictionary::Table* table,
                   const NdbRecord* key_rec,
                   const uchar* record,
                   uchar* buffer,
                   Uint32& buff_len)
{
  /* Loop over attributes in key record, determining their actual
   * size based on column type and contents of record
   * If buffer supplied, copy them contiguously to buffer
   * return total length
   */
  Uint32 attr_id;
  Uint32 buff_offset = 0;
  NdbDictionary::getFirstAttrId(key_rec, attr_id);

  do
  {
    Uint32 from_offset = 0;
    Uint32 byte_len = 0;
    const NdbDictionary::Column* key_col = table->getColumn(attr_id);
    NdbDictionary::getOffset(key_rec, attr_id, from_offset);
    assert( ! NdbDictionary::isNull(key_rec, (const char*) record, attr_id));

    switch(key_col->getArrayType())
    {
    case NDB_ARRAYTYPE_FIXED:
      byte_len = key_col->getSizeInBytes();
      break;
    case NDB_ARRAYTYPE_SHORT_VAR:
      byte_len = record[from_offset];
      from_offset++;
      break;
    case NDB_ARRAYTYPE_MEDIUM_VAR:
      byte_len = uint2korr(&record[from_offset]);
      from_offset+= 2;
      break;
    };
    assert( (buff_offset + byte_len) <= buff_len );

    if (buffer)
      memcpy(&buffer[buff_offset], &record[from_offset], byte_len);

    buff_offset+= byte_len;
  } while (NdbDictionary::getNextAttrId(key_rec, attr_id));

  buff_len = buff_offset;
  return 0;
};

static
Uint32 determine_packed_key_size(const NdbDictionary::Table* table,
                                 const NdbRecord* key_rec,
                                 const uchar* record)
{
  Uint32 key_size = ~Uint32(0);
  /* Use pack_key_to_buffer to calculate length required */
  pack_key_to_buffer(table,
                     key_rec,
                     record,
                     NULL,
                     key_size);
  return key_size;
};

/* st_mem_root_allocator implementation */
void*
st_mem_root_allocator::alloc(void* ctx, size_t bytes)
{
  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
  return alloc_root(a->mem_root, bytes);
};

void*
st_mem_root_allocator::mem_calloc(void* ctx, size_t nelem, size_t bytes)
{
  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
  return alloc_root(a->mem_root,
                    nelem * bytes);
}

void
st_mem_root_allocator::mem_free(void* ctx, void* mem)
{
  /* Do nothing, will be globally freed when arena (mem_root)
   * released
   */
};

st_mem_root_allocator::st_mem_root_allocator(MEM_ROOT* _mem_root)
  : mem_root(_mem_root)
{
};


/* DependencyTracker implementation */

DependencyTracker*
DependencyTracker::newDependencyTracker(MEM_ROOT* mem_root)
{
  DependencyTracker* dt = NULL;
  void* mem = alloc_root(mem_root,
                         sizeof(DependencyTracker));
  if (mem)
  {
    dt = new (mem) DependencyTracker(mem_root);
  }

  return dt;
}


DependencyTracker::
DependencyTracker(MEM_ROOT* mem_root)
  : mra(mem_root), key_hash(&mra), trans_hash(&mra),
    dependency_hash(&mra),
    iteratorTodo(ITERATOR_STACK_BLOCKSIZE, &mra),
    conflicting_trans_count(0),
    error_text(NULL)
{
  /* TODO Get sizes from somewhere */
  key_hash.setSize(1024);
  trans_hash.setSize(100);
  dependency_hash.setSize(100);
};


int
DependencyTracker::
track_operation(const NdbDictionary::Table* table,
                const NdbRecord* key_rec,
                const uchar* row,
                Uint64 transaction_id)
{
  DBUG_ENTER("track_operation");

  Uint32 required_buff_size = determine_packed_key_size(table,
                                                        key_rec,
                                                        row);
  DBUG_PRINT("info", ("Required length for key : %u", required_buff_size));

  /* Alloc space for packed key and struct*/
  uchar* packed_key_buff = (uchar*) alloc_root(mra.mem_root,
                                               required_buff_size);
  void* element_mem = alloc_root(mra.mem_root,
                                 sizeof(st_row_event_key_info));

  if (pack_key_to_buffer(table,
                         key_rec,
                         row,
                         packed_key_buff,
                         required_buff_size))
  {
    if (!error_text)
      error_text="track_operation : Failed packing key";
    DBUG_RETURN(-1);
  }

  if (TRACK_ALL_TRANSACTIONS)
  {
    st_transaction* transEntry = get_or_create_transaction(transaction_id);
    if (!transEntry)
    {
      error_text = "track_operation : Failed to get or create transaction";
      DBUG_RETURN(HA_ERR_OUT_OF_MEM);
    }
  }

  st_row_event_key_info* key_info = new (element_mem)
    st_row_event_key_info(table,
                          packed_key_buff,
                          required_buff_size,
                          transaction_id);

  /* Now try to add element to hash */
  if (! key_hash.add(key_info))
  {
    /*
      Already an element in the keyhash with this primary key
      If it's for the same transaction then ignore, otherwise
      it's an inter-transaction dependency
    */
    st_row_event_key_info* existing = key_hash.get(key_info);

    Uint64 existingTransIdOnRow = existing->getTransactionId();
    Uint64 newTransIdOnRow = key_info->getTransactionId();

    if (existingTransIdOnRow != newTransIdOnRow)
    {
      int res = add_dependency(existingTransIdOnRow,
                               newTransIdOnRow);
      /*
        Update stored transaction_id to be latest for key.
        Further key operations on this row will depend on this
        transaction, and transitively on the previous
        transaction.
      */
      existing->updateRowTransactionId(newTransIdOnRow);

      assert(res == 0 || error_text != NULL);

      DBUG_RETURN(res);
    }
    else
    {
      /*
         How can we have two updates to the same row with the
         same transaction id?  Only if the transaction id
         is invalid (e.g. not set)
         In normal cases with only one upstream master, each
         distinct master user transaction will have a unique
         id, and all operations on a row in that transaction
         will be merged in TUP prior to emitting a SUMA
         event.
         This could be relaxed for more complex upstream
         topologies, but acts as a sanity guard currently.
      */
      if (existingTransIdOnRow != InvalidTransactionId)
      {
        assert(false);
        error_text= "Two row operations to same key sharing user transaction id";
        DBUG_RETURN(-1);
      }
    }
  }

  DBUG_RETURN(0);
};

int
DependencyTracker::
mark_conflict(Uint64 trans_id)
{
  DBUG_ENTER("mark_conflict");
  DBUG_PRINT("info", ("trans_id : %llu", trans_id));

  st_transaction* entry = get_or_create_transaction(trans_id);
  if (!entry)
  {
    error_text = "mark_conflict : get_or_create_transaction() failure";
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }

  if (entry->getInConflict())
  {
    /* Nothing to do here */
    DBUG_RETURN(0);
  }

  /* Have entry, mark it, and any dependents */
  bool fetch_node_dependents;
  st_transaction* dependent = entry;
  reset_dependency_iterator();
  do
  {
    DBUG_PRINT("info", ("Visiting transaction %llu, conflict : %u",
                        dependent->getTransactionId(),
                        dependent->getInConflict()));
    /*
      If marked already, don't fetch dependents, as
      they will also be marked already
    */
    fetch_node_dependents = false;

    if (!dependent->getInConflict())
    {
      dependent->setInConflict();
      conflicting_trans_count++;
      fetch_node_dependents = true;
    }
  } while ((dependent = get_next_dependency(dependent, fetch_node_dependents)));

  assert( verify_graph() );

  DBUG_RETURN(0);
}

bool
DependencyTracker::in_conflict(Uint64 trans_id)
{
  DBUG_ENTER("in_conflict");
  DBUG_PRINT("info", ("trans_id %llu", trans_id));
  st_transaction key(trans_id);
  const st_transaction* entry = NULL;

  /*
    If transaction hash entry exists, check it for
    conflicts.  If it doesn't exist, no conflict
  */
  if ((entry = trans_hash.get(&key)))
  {
    DBUG_PRINT("info", ("in_conflict : %u", entry->getInConflict()));
    DBUG_RETURN(entry->getInConflict());
  }
  else
  {
    assert(! TRACK_ALL_TRANSACTIONS);
  }
  DBUG_RETURN(false);
};

st_transaction*
DependencyTracker::
get_or_create_transaction(Uint64 trans_id)
{
  DBUG_ENTER("get_or_create_transaction");
  st_transaction transKey(trans_id);
  st_transaction* transEntry = NULL;

  if (! (transEntry = trans_hash.get(&transKey)))
  {
    /*
       Transaction does not exist.  Allocate it
       and add to the hash
    */
    DBUG_PRINT("info", ("Creating new hash entry for transaction (%llu)",
                        trans_id));

    transEntry = (st_transaction*)
      st_mem_root_allocator::alloc(&mra, sizeof(st_transaction));

    if (transEntry)
    {
      new (transEntry) st_transaction(trans_id);

      if (!trans_hash.add(transEntry))
      {
        st_mem_root_allocator::mem_free(&mra, transEntry); /* For show */
        transEntry = NULL;
      }
    }
  }

  DBUG_RETURN(transEntry);
}

int
DependencyTracker::
add_dependency(Uint64 trans_id, Uint64 dependent_trans_id)
{
  DBUG_ENTER("add_dependency");
  DBUG_PRINT("info", ("Recording dependency of %llu on %llu",
                      dependent_trans_id, trans_id));
  st_transaction* targetEntry = get_or_create_transaction(trans_id);
  if (!targetEntry)
  {
    error_text = "add_dependency : Failed get_or_create_transaction";
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }

  st_transaction* dependentEntry = get_or_create_transaction(dependent_trans_id);
  if (!dependentEntry)
  {
    error_text = "add_dependency : Failed get_or_create_transaction";
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }

  /* Now lookup dependency.  Add it if not already present */
  st_trans_dependency depKey(targetEntry, dependentEntry, NULL);
  st_trans_dependency* dep = NULL;
  if (! (dep = dependency_hash.get(&depKey)))
  {
    DBUG_PRINT("info", ("Creating new dependency hash entry for "
                        "dependency of %llu on %llu.",
                        dependentEntry->getTransactionId(),
                        targetEntry->getTransactionId()));

    dep = (st_trans_dependency*)
      st_mem_root_allocator::alloc(&mra, sizeof(st_trans_dependency));

    new (dep) st_trans_dependency(targetEntry, dependentEntry,
                                  targetEntry->getDependencyListHead());

    targetEntry->setDependencyListHead(dep);

    /* New dependency, propagate in_conflict if necessary */
    if (targetEntry->getInConflict())
    {
      DBUG_PRINT("info", ("Marking new dependent as in-conflict"));
      DBUG_RETURN(mark_conflict(dependentEntry->getTransactionId()));
    }
  }

  assert(verify_graph());

  DBUG_RETURN(0);
};

void
DependencyTracker::
reset_dependency_iterator()
{
  iteratorTodo.reset();
};

st_transaction*
DependencyTracker::
get_next_dependency(const st_transaction* current,
                    bool include_dependents_of_current)
{
  DBUG_ENTER("get_next_dependency");
  DBUG_PRINT("info", ("node : %llu", current->getTransactionId()));
  /*
    Depth first traverse, with option to ignore sub graphs.
  */
  if (include_dependents_of_current)
  {
    /* Add dependents to stack */
    const st_trans_dependency* dependency = current->getDependencyListHead();

    while (dependency)
    {
      assert(dependency->getTargetTransaction() == current);
      DBUG_PRINT("info", ("Adding dependency %llu->%llu",
                          dependency->getDependentTransaction()->getTransactionId(),
                          dependency->getTargetTransaction()->getTransactionId()));

      Uint64 dependentTransactionId =
        dependency->getDependentTransaction()->getTransactionId();
      iteratorTodo.push(dependentTransactionId);
      dependency= dependency->getNextDependency();
    }
  }

  Uint64 nextId;
  if (iteratorTodo.pop(nextId))
  {
    DBUG_PRINT("info", ("Returning transaction id %llu",
                        nextId));
    st_transaction key(nextId);
    st_transaction* dependent = trans_hash.get(&key);
    assert(dependent);
    DBUG_RETURN(dependent);
  }

  assert(iteratorTodo.size() == 0);
  DBUG_PRINT("info", ("No more dependencies to visit"));
  DBUG_RETURN(NULL);
};

void
DependencyTracker::
dump_dependents(Uint64 trans_id)
{
  fprintf(stderr, "Dumping dependents of transid %llu : ", trans_id);

  st_transaction key(trans_id);
  const st_transaction* dependent = NULL;

  if ((dependent = trans_hash.get(&key)))
  {
    reset_dependency_iterator();
    const char* comma = ", ";
    const char* sep = "";
    do
    {
      {
        fprintf(stderr, "%s%llu%s", sep, dependent->getTransactionId(),
                (dependent->getInConflict()?"-C":""));
        sep = comma;
      }
    } while ((dependent = get_next_dependency(dependent)));
    fprintf(stderr, "\n");
  }
  else
  {
    fprintf(stderr, "None\n");
  }
};

bool
DependencyTracker::
verify_graph()
{
  if (! CHECK_TRANS_GRAPH)
    return true;

  /*
     Check the graph structure obeys its invariants

     1) There are no cycles in the graph such that
        a transaction is a dependent of itself

     2) If a transaction is marked in_conflict, all
        of its dependents (transitively), are also
        marked in conflict

     This is expensive to verify, so not always on
  */
  HashMap2<st_transaction, true, st_mem_root_allocator>::Iterator it(trans_hash);

  st_transaction* root = NULL;

  while ((root = it.next()))
  {
    bool in_conflict = root->getInConflict();

    /* Now visit all dependents */
    st_transaction* dependent = root;
    reset_dependency_iterator();

    while((dependent = get_next_dependency(dependent, true)))
    {
      if (dependent == root)
      {
        /* Must exit, or we'll be here forever */
        fprintf(stderr, "Error : Cycle discovered in graph\n");
        abort();
        return false;
      }

      if (in_conflict &&
          ! dependent->getInConflict())
      {
        fprintf(stderr, "Error : Dependent transaction not marked in-conflict\n");
        abort();
        return false;
      }
    }
  }

  return true;
}


const char*
DependencyTracker::get_error_text() const
{
  return error_text;
};

Uint32
DependencyTracker::get_conflict_count() const
{
  return conflicting_trans_count;
}

/* #ifdef HAVE_NDB_BINLOG */

#endif

Youez - 2016 - github.com/yon3zu
LinuXploit