Server IP : 172.67.216.182 / Your IP : 172.70.188.79 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/mysql/src/plugin/innodb_memcached/innodb_memcache/src/ |
Upload File : |
/*********************************************************************** Copyright (c) 2011, 2023, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA ***********************************************************************/ /**************************************************//** @file innodb_engine.c InnoDB Memcached Engine code Extracted and modified from NDB memcached project 04/12/2011 Jimmy Yang *******************************************************/ #include <stdlib.h> #include <string.h> #include <assert.h> #include <pthread.h> #include <arpa/inet.h> #include "default_engine.h" #include <memcached/util.h> #include <memcached/config_parser.h> #include <unistd.h> #include "innodb_engine.h" #include "innodb_engine_private.h" #include "innodb_api.h" #include "hash_item_util.h" #include "innodb_cb_api.h" /** Define also present in daemon/memcached.h */ #define KEY_MAX_LENGTH 250 /** Time (in seconds) that background thread sleeps before it wakes up and commit idle connection transactions */ #define BK_COMMIT_THREAD_SLEEP_INTERVAL 5 /** Maximum number of connections that background thread processes each time */ #define BK_MAX_PROCESS_COMMIT 5 /** Minimum time (in seconds) that a connection has been idle, that makes it candidate for background thread to commit it */ #define CONN_IDLE_TIME_TO_BK_COMMIT 5 /** Tells whether memcached plugin is being shutdown */ static bool memcached_shutdown = false; /** Tells whether the background thread is exited */ static bool bk_thd_exited = true; /** Tells whether all connections need to release MDL locks */ bool release_mdl_lock = false; /** InnoDB Memcached engine configuration info */ typedef struct eng_config_info { char* option_string; /*!< memcached config option string */ void* cb_ptr; /*!< call back function ptr */ unsigned int eng_read_batch_size; /*!< read batch size */ unsigned int eng_write_batch_size; /*!< write batch size */ bool eng_enable_binlog; /*!< whether binlog is enabled specifically for this memcached engine */ } eng_config_info_t; extern option_t config_option_names[]; /** Check if global read lock is active */ extern bool handler_check_global_read_lock_active(); /** Check the input key name implies a table mapping switch. The name would start with "@@", and in the format of "@@new_table_mapping.key" or simply "@@new_table_mapping" */ /**********************************************************************//** Unlock a table and commit the transaction return 0 if fail to commit the transaction */ extern int handler_unlock_table( /*=================*/ void* my_thd, /*!< in: thread */ void* my_table, /*!< in: Table metadata */ int my_lock_mode); /*!< in: lock mode */ /*******************************************************************//** Get InnoDB Memcached engine handle @return InnoDB Memcached engine handle */ static inline struct innodb_engine* innodb_handle( /*==========*/ ENGINE_HANDLE* handle) /*!< in: Generic engine handle */ { return((struct innodb_engine*) handle); } /*******************************************************************//** Cleanup idle connections if "clear_all" is false, and clean up all connections if "clear_all" is true. @return number of connection cleaned */ static void innodb_conn_clean_data( /*===================*/ innodb_conn_data_t* conn_data, bool has_lock, bool free_all); /*******************************************************************//** Get default Memcached engine handle @return default Memcached engine handle */ static inline struct default_engine* default_handle( /*===========*/ struct innodb_engine* eng) { return((struct default_engine*) eng->default_engine); } /****** Gateway to the default_engine's create_instance() function */ ENGINE_ERROR_CODE create_my_default_instance( /*=======================*/ uint64_t, GET_SERVER_API, ENGINE_HANDLE **); /*********** FUNCTIONS IMPLEMENTING THE PUBLISHED API BEGIN HERE ********/ /*******************************************************************//** Create InnoDB Memcached Engine. @return ENGINE_SUCCESS if successful, otherwise, error code */ ENGINE_ERROR_CODE create_instance( /*============*/ uint64_t interface, /*!< in: protocol version, currently always 1 */ GET_SERVER_API get_server_api, /*!< in: Callback the engines may call to get the public server interface */ ENGINE_HANDLE** handle ) /*!< out: Engine handle */ { ENGINE_ERROR_CODE err_ret; struct innodb_engine* innodb_eng; SERVER_HANDLE_V1 *api = get_server_api(); if (interface != 1 || api == NULL) { return(ENGINE_ENOTSUP); } innodb_eng = malloc(sizeof(struct innodb_engine)); if (innodb_eng == NULL) { return(ENGINE_ENOMEM); } memset(innodb_eng, 0, sizeof(*innodb_eng)); innodb_eng->engine.interface.interface = 1; innodb_eng->engine.get_info = innodb_get_info; innodb_eng->engine.initialize = innodb_initialize; innodb_eng->engine.destroy = innodb_destroy; innodb_eng->engine.allocate = innodb_allocate; innodb_eng->engine.remove = innodb_remove; innodb_eng->engine.release = innodb_release; innodb_eng->engine.clean_engine= innodb_clean_engine; innodb_eng->engine.get = innodb_get; innodb_eng->engine.get_stats = innodb_get_stats; innodb_eng->engine.reset_stats = innodb_reset_stats; innodb_eng->engine.store = innodb_store; innodb_eng->engine.arithmetic = innodb_arithmetic; innodb_eng->engine.flush = innodb_flush; innodb_eng->engine.unknown_command = innodb_unknown_command; innodb_eng->engine.item_set_cas = item_set_cas; innodb_eng->engine.get_item_info = innodb_get_item_info; innodb_eng->engine.get_stats_struct = NULL; innodb_eng->engine.errinfo = NULL; innodb_eng->engine.bind = innodb_bind; innodb_eng->server = *api; innodb_eng->get_server_api = get_server_api; /* configuration, with default values*/ innodb_eng->info.info.description = "InnoDB Memcache " VERSION; innodb_eng->info.info.num_features = 3; innodb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS; innodb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE; innodb_eng->info.info.features[0].feature = ENGINE_FEATURE_LRU; /* Now call create_instace() for the default engine */ err_ret = create_my_default_instance(interface, get_server_api, &(innodb_eng->default_engine)); if (err_ret != ENGINE_SUCCESS) { free(innodb_eng); return(err_ret); } innodb_eng->clean_stale_conn = false; innodb_eng->initialized = true; *handle = (ENGINE_HANDLE*) &innodb_eng->engine; return(ENGINE_SUCCESS); } /*******************************************************************//** background thread to commit trx. @return dummy parameter */ static void* innodb_bk_thread( /*=============*/ void* arg) { ENGINE_HANDLE* handle; struct innodb_engine* innodb_eng; innodb_conn_data_t* conn_data; void* thd = NULL; bk_thd_exited = false; handle = (ENGINE_HANDLE*) (arg); innodb_eng = innodb_handle(handle); if (innodb_eng->enable_binlog) { /* This thread will commit the transactions on behalf of the other threads. It will "pretend" to be each connection thread while doing it. */ thd = handler_create_thd(true); } conn_data = UT_LIST_GET_FIRST(innodb_eng->conn_data); while(!memcached_shutdown) { innodb_conn_data_t* next_conn_data; uint64_t time; uint64_t trx_start = 0; uint64_t processed_count = 0; if (handler_check_global_read_lock_active()) { release_mdl_lock = true; } else { release_mdl_lock = false; } /* Do the cleanup every innodb_eng->bk_commit_interval seconds. We also check if the plugin is being shutdown every second */ for (uint i = 0; i < innodb_eng->bk_commit_interval; i++) { sleep(1); /* If memcached is being shutdown, break */ if (memcached_shutdown) { break; } } time = mci_get_time(); if (UT_LIST_GET_LEN(innodb_eng->conn_data) == 0) { continue; } if (!conn_data) { conn_data = UT_LIST_GET_FIRST(innodb_eng->conn_data); } if (conn_data) { next_conn_data = UT_LIST_GET_NEXT(conn_list, conn_data); } else { next_conn_data = NULL; } /* Set the clean_stale_conn to prevent force clean in innodb_conn_clean. */ LOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); innodb_eng->clean_stale_conn = true; UNLOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); while (conn_data) { if (release_mdl_lock && !conn_data->is_stale) { int err; if(conn_data->is_waiting_for_mdl) { goto next_item; } err = LOCK_CURRENT_CONN_TRYLOCK(conn_data); if (err != 0) { goto next_item; } /* We have got the lock here */ } else { LOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); } if (conn_data->is_stale) { UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( false, conn_data); LOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); UT_LIST_REMOVE(conn_list, innodb_eng->conn_data, conn_data); UNLOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); innodb_conn_clean_data(conn_data, false, true); goto next_item; } if (release_mdl_lock) { if (conn_data->thd) { handler_thd_attach(conn_data->thd, NULL); } if (conn_data->in_use) { UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); goto next_item; } innodb_reset_conn(conn_data, true, true, innodb_eng->enable_binlog); if(conn_data->mysql_tbl) { handler_unlock_table(conn_data->thd, conn_data->mysql_tbl, HDL_READ); conn_data->mysql_tbl = NULL; } /*Close the data cursor */ if (conn_data->crsr) { innodb_cb_cursor_close(conn_data->crsr); conn_data->crsr = NULL; } if(conn_data->crsr_trx != NULL) { ib_cb_trx_release(conn_data->crsr_trx); conn_data->crsr_trx = NULL; } UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); goto next_item; } if (conn_data->crsr_trx) { trx_start = ib_cb_trx_get_start_time( conn_data->crsr_trx); } /* Check the trx, if it is qualified for reset and commit */ if ((conn_data->n_writes_since_commit > 0 || conn_data->n_reads_since_commit > 0) && trx_start && (time - trx_start > CONN_IDLE_TIME_TO_BK_COMMIT) && !conn_data->in_use) { /* binlog is running, make the thread attach to conn_data->thd for binlog committing */ if (thd && conn_data->thd) { handler_thd_attach( conn_data->thd, NULL); } innodb_reset_conn(conn_data, true, true, innodb_eng->enable_binlog); processed_count++; } UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); next_item: conn_data = next_conn_data; /* Process BK_MAX_PROCESS_COMMIT (5) trx at a time */ if (!release_mdl_lock && processed_count > BK_MAX_PROCESS_COMMIT) { break; } if (conn_data) { next_conn_data = UT_LIST_GET_NEXT( conn_list, conn_data); } } /* Set the clean_stale_conn back. */ LOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); innodb_eng->clean_stale_conn = false; UNLOCK_CONN_IF_NOT_LOCKED(false, innodb_eng); } bk_thd_exited = true; /* Change to its original state before close the MySQL THD */ if (thd) { handler_thd_attach(thd, NULL); handler_close_thd(thd); } pthread_detach(pthread_self()); pthread_exit(NULL); return((void*) 0); } /*******************************************************************//** Get engine info. @return engine info */ static const engine_info* innodb_get_info( /*============*/ ENGINE_HANDLE* handle) /*!< in: Engine handle */ { return(&innodb_handle(handle)->info.info); } /*******************************************************************//** Initialize InnoDB Memcached Engine. @return ENGINE_SUCCESS if successful */ static ENGINE_ERROR_CODE innodb_initialize( /*==============*/ ENGINE_HANDLE* handle, /*!< in/out: InnoDB memcached engine */ const char* config_str) /*!< in: configure string */ { ENGINE_ERROR_CODE return_status = ENGINE_SUCCESS; struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine* def_eng = default_handle(innodb_eng); eng_config_info_t* my_eng_config; pthread_attr_t attr; my_eng_config = (eng_config_info_t*) config_str; /* If no call back function registered (InnoDB engine failed to load), load InnoDB Memcached engine should fail too */ if (!my_eng_config->cb_ptr) { return(ENGINE_TMPFAIL); } /* Register the call back function */ register_innodb_cb((void*) my_eng_config->cb_ptr); innodb_eng->read_batch_size = (my_eng_config->eng_read_batch_size ? my_eng_config->eng_read_batch_size : CONN_NUM_READ_COMMIT); innodb_eng->write_batch_size = (my_eng_config->eng_write_batch_size ? my_eng_config->eng_write_batch_size : CONN_NUM_WRITE_COMMIT); innodb_eng->enable_binlog = my_eng_config->eng_enable_binlog; innodb_eng->cfg_status = innodb_cb_get_cfg(); /* If binlog is not enabled by InnoDB memcached plugin, let's check whether innodb_direct_access_enable_binlog is turned on */ if (!innodb_eng->enable_binlog) { innodb_eng->enable_binlog = innodb_eng->cfg_status & IB_CFG_BINLOG_ENABLED; } innodb_eng->enable_mdl = innodb_eng->cfg_status & IB_CFG_MDL_ENABLED; innodb_eng->trx_level = ib_cb_cfg_trx_level(); innodb_eng->bk_commit_interval = ib_cb_cfg_bk_commit_interval(); UT_LIST_INIT(innodb_eng->conn_data); pthread_mutex_init(&innodb_eng->conn_mutex, NULL); pthread_mutex_init(&innodb_eng->cas_mutex, NULL); pthread_mutex_init(&innodb_eng->flush_mutex, NULL); /* Fetch InnoDB specific settings */ innodb_eng->meta_info = innodb_config( NULL, 0, &innodb_eng->meta_hash); if (!innodb_eng->meta_info) { return(ENGINE_TMPFAIL); } if (innodb_eng->default_engine) { return_status = def_eng->engine.initialize( innodb_eng->default_engine, my_eng_config->option_string); } memcached_shutdown = false; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&innodb_eng->bk_thd_for_commit, &attr, innodb_bk_thread, handle); return(return_status); } extern void handler_close_thd(void*); /*******************************************************************//** Close table using handler functions. @param conn_data cursor information of connection */ void innodb_close_mysql_table( /*=====================*/ innodb_conn_data_t* conn_data) /*!< in: connection cursor*/ { if (conn_data->mysql_tbl) { assert(conn_data->thd); handler_unlock_table(conn_data->thd, conn_data->mysql_tbl, HDL_READ); conn_data->mysql_tbl = NULL; } if (conn_data->thd) { handler_close_thd(conn_data->thd); conn_data->thd = NULL; } } /*******************************************************************//** Cleanup idle connections if "clear_all" is false, and clean up all connections if "clear_all" is true. @return number of connection cleaned */ static void innodb_conn_clean_data( /*===================*/ innodb_conn_data_t* conn_data, bool has_lock, bool free_all) { if (!conn_data) { return; } LOCK_CURRENT_CONN_IF_NOT_LOCKED(has_lock, conn_data); if (conn_data->idx_crsr) { innodb_cb_cursor_close(conn_data->idx_crsr); conn_data->idx_crsr = NULL; } if (conn_data->idx_read_crsr) { innodb_cb_cursor_close(conn_data->idx_read_crsr); conn_data->idx_read_crsr = NULL; } if (conn_data->crsr) { innodb_cb_cursor_close(conn_data->crsr); conn_data->crsr = NULL; } if (conn_data->read_crsr) { innodb_cb_cursor_close(conn_data->read_crsr); conn_data->read_crsr = NULL; } if (conn_data->crsr_trx) { ib_err_t err; innodb_cb_trx_commit(conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; } innodb_close_mysql_table(conn_data); if (conn_data->tpl) { ib_cb_tuple_delete(conn_data->tpl); conn_data->tpl = NULL; } if (conn_data->idx_tpl) { ib_cb_tuple_delete(conn_data->idx_tpl); conn_data->idx_tpl = NULL; } if (conn_data->read_tpl) { ib_cb_tuple_delete(conn_data->read_tpl); conn_data->read_tpl = NULL; } if (conn_data->sel_tpl) { ib_cb_tuple_delete(conn_data->sel_tpl); conn_data->sel_tpl = NULL; } UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(has_lock, conn_data); if (free_all) { if (conn_data->result) { free(conn_data->result); conn_data->result = NULL; } if (conn_data->row_buf) { free(conn_data->row_buf); conn_data->row_buf = NULL; conn_data->row_buf_len = 0; } if (conn_data->cmd_buf) { free(conn_data->cmd_buf); conn_data->cmd_buf = NULL; conn_data->cmd_buf_len = 0; } if (conn_data->mul_col_buf) { free(conn_data->mul_col_buf); conn_data->mul_col_buf = NULL; conn_data->mul_col_buf_len = 0; } pthread_mutex_destroy(&conn_data->curr_conn_mutex); free(conn_data); } } /*******************************************************************//** Cleanup idle connections if "clear_all" is false, and clean up all connections if "clear_all" is true. @return number of connection cleaned */ static int innodb_conn_clean( /*==============*/ innodb_engine_t* engine, /*!< in/out: InnoDB memcached engine */ bool clear_all, /*!< in: Clear all connection */ bool has_lock) /*!< in: Has engine mutext */ { innodb_conn_data_t* conn_data; innodb_conn_data_t* next_conn_data; int num_freed = 0; void* thd = NULL; if (engine->enable_binlog && clear_all) { thd = handler_create_thd(true); } LOCK_CONN_IF_NOT_LOCKED(has_lock, engine); conn_data = UT_LIST_GET_FIRST(engine->conn_data); while (conn_data) { void* cookie = conn_data->conn_cookie; next_conn_data = UT_LIST_GET_NEXT(conn_list, conn_data); if (!clear_all && !conn_data->in_use) { innodb_conn_data_t* check_data; check_data = engine->server.cookie->get_engine_specific( cookie); /* The check data is the original conn_data stored in connection "cookie", it can be set to NULL if connection closed, or to a new conn_data if it is closed and reopened. So verify and see if our current conn_data is stale */ if (!check_data || check_data != conn_data) { assert(conn_data->is_stale); } } /* If current conn is stale or clear_all is true, clean up it.*/ if (conn_data->is_stale) { /* If bk thread is doing the same thing, stop the loop to avoid confliction.*/ if (engine->clean_stale_conn) break; UT_LIST_REMOVE(conn_list, engine->conn_data, conn_data); innodb_conn_clean_data(conn_data, false, true); num_freed++; } else { if (clear_all) { UT_LIST_REMOVE(conn_list, engine->conn_data, conn_data); if (thd && conn_data->thd ) { handler_thd_attach(conn_data->thd, NULL); } innodb_reset_conn(conn_data, false, true, engine->enable_binlog); if (conn_data->thd) { handler_thd_attach( conn_data->thd, NULL); } innodb_conn_clean_data(conn_data, false, true); engine->server.cookie->store_engine_specific( cookie, NULL); num_freed++; } } conn_data = next_conn_data; } assert(!clear_all || engine->conn_data.count == 0); UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); if (thd) { handler_thd_attach(thd, NULL); handler_close_thd(thd); } return(num_freed); } /*******************************************************************//** Destroy and Free InnoDB Memcached engine */ static void innodb_destroy( /*===========*/ ENGINE_HANDLE* handle, /*!< in: Destroy the engine instance */ bool force) /*!< in: Force to destroy */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine *def_eng = default_handle(innodb_eng); memcached_shutdown = true; /* Wait for the background thread to exit */ while (!bk_thd_exited) { sleep(1); } innodb_conn_clean(innodb_eng, true, false); if (innodb_eng->meta_hash) { HASH_CLEANUP(innodb_eng->meta_hash, meta_cfg_info_t*); } pthread_mutex_destroy(&innodb_eng->conn_mutex); pthread_mutex_destroy(&innodb_eng->cas_mutex); pthread_mutex_destroy(&innodb_eng->flush_mutex); if (innodb_eng->default_engine) { def_eng->engine.destroy(innodb_eng->default_engine, force); } free(innodb_eng); } /** Defines for connection initialization to indicate if we will do a read or write operation, or in the case of CONN_MODE_NONE, just get the connection's conn_data structure */ enum conn_mode { CONN_MODE_READ, CONN_MODE_WRITE, CONN_MODE_NONE }; /*******************************************************************//** Opens mysql table if enable_binlog or enable_mdl is set @param conn_data connection cursor data @param conn_optioin read or write operation @param engine Innodb memcached engine @returns DB_SUCCESS on success and DB_ERROR on failure */ ib_err_t innodb_open_mysql_table( /*====================*/ innodb_conn_data_t* conn_data, /*!< in/out:Connection cursor data */ int conn_option, /*!< in: Read or write operation */ innodb_engine_t* engine) /*!< in: InnoDB memcached engine */ { meta_cfg_info_t* meta_info; meta_info = conn_data->conn_meta; conn_data->is_waiting_for_mdl = true; /* Close the table before opening it again */ innodb_close_mysql_table(conn_data); if (conn_option == CONN_MODE_READ) { conn_data->is_waiting_for_mdl = false; return (DB_SUCCESS); } if (!conn_data->thd) { conn_data->thd = handler_create_thd(engine->enable_binlog); if (!conn_data->thd) { return(DB_ERROR); } } if (!conn_data->mysql_tbl) { conn_data->mysql_tbl = handler_open_table( conn_data->thd, meta_info->col_info[CONTAINER_DB].col_name, meta_info->col_info[CONTAINER_TABLE].col_name, HDL_WRITE); } conn_data->is_waiting_for_mdl = false; if(!conn_data->mysql_tbl) { return(DB_LOCK_WAIT); } return (DB_SUCCESS); } /*******************************************************************//** Cleanup connections @return number of connection cleaned */ /* Initialize a connection's cursor and transactions @return the connection's conn_data structure */ static innodb_conn_data_t* innodb_conn_init( /*=============*/ innodb_engine_t* engine, /*!< in/out: InnoDB memcached engine */ const void* cookie, /*!< in: This connection's cookie */ int conn_option, /*!< in: whether it is for read or write operation*/ ib_lck_mode_t lock_mode, /*!< in: Table lock mode */ bool has_lock, /*!< in: Has engine mutex */ meta_cfg_info_t* new_meta_info) /*!< in: meta info for table to open or NULL */ { innodb_conn_data_t* conn_data; meta_cfg_info_t* meta_info; meta_index_t* meta_index; ib_err_t err = DB_SUCCESS; ib_crsr_t crsr; ib_crsr_t read_crsr; ib_crsr_t idx_crsr; bool trx_updated = false; /* Get this connection's conn_data */ conn_data = engine->server.cookie->get_engine_specific(cookie); assert(!conn_data || !conn_data->in_use); if (!conn_data) { LOCK_CONN_IF_NOT_LOCKED(has_lock, engine); conn_data = engine->server.cookie->get_engine_specific(cookie); if (conn_data) { UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); goto have_conn; } if (UT_LIST_GET_LEN(engine->conn_data) > 2048) { /* Some of conn_data can be stale, recycle them */ innodb_conn_clean(engine, false, true); } conn_data = malloc(sizeof(*conn_data)); if (!conn_data) { UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); return(NULL); } memset(conn_data, 0, sizeof(*conn_data)); conn_data->result = malloc(sizeof(mci_item_t)); if (!conn_data->result) { UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); free(conn_data); conn_data = NULL; return(NULL); } conn_data->conn_meta = new_meta_info ? new_meta_info : engine->meta_info; conn_data->row_buf = malloc(1024); if (!conn_data->row_buf) { UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); free(conn_data->result); free(conn_data); conn_data = NULL; return(NULL); } conn_data->row_buf_len = 1024; conn_data->cmd_buf = malloc(1024); if (!conn_data->cmd_buf) { UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); free(conn_data->row_buf); free(conn_data->result); free(conn_data); conn_data = NULL; return(NULL); } conn_data->cmd_buf_len = 1024; conn_data->is_flushing = false; conn_data->conn_cookie = (void*) cookie; /* Add connection to the list after all memory allocations */ UT_LIST_ADD_LAST(conn_list, engine->conn_data, conn_data); engine->server.cookie->store_engine_specific( cookie, conn_data); pthread_mutex_init(&conn_data->curr_conn_mutex, NULL); UNLOCK_CONN_IF_NOT_LOCKED(has_lock, engine); } have_conn: meta_info = conn_data->conn_meta; meta_index = &meta_info->index_info; assert(engine->conn_data.count > 0); if (conn_option == CONN_MODE_NONE) { return(conn_data); } LOCK_CURRENT_CONN_IF_NOT_LOCKED(has_lock, conn_data); /* If flush is running, then wait for it complete. */ if (conn_data->is_flushing) { /* Request flush_mutex for waiting for flush completed. */ pthread_mutex_lock(&engine->flush_mutex); pthread_mutex_unlock(&engine->flush_mutex); } /* This special case added to facilitate unlocking of MDL lock during FLUSH TABLE WITH READ LOCK */ if (engine && release_mdl_lock && (engine->enable_binlog || engine->enable_mdl)) { if ( DB_SUCCESS != innodb_open_mysql_table(conn_data, conn_option, engine)){ UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return NULL; } } conn_data->in_use = true; crsr = conn_data->crsr; read_crsr = conn_data->read_crsr; if (lock_mode == IB_LOCK_TABLE_X) { if(!conn_data->crsr_trx) { conn_data->crsr_trx = ib_cb_trx_begin( engine->trx_level, true, false); } else { /* Write cursor transaction exists. Reuse this transaction.*/ if (ib_cb_trx_read_only(conn_data->crsr_trx)) { innodb_cb_trx_commit( conn_data->crsr_trx); } err = ib_cb_trx_start(conn_data->crsr_trx, engine->trx_level, true, false, NULL); assert(err == DB_SUCCESS); } err = innodb_api_begin( engine, meta_info->col_info[CONTAINER_DB].col_name, meta_info->col_info[CONTAINER_TABLE].col_name, conn_data, conn_data->crsr_trx, &conn_data->crsr, &conn_data->idx_crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->crsr); conn_data->crsr = NULL; innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(has_lock, conn_data); return(conn_data); } /* Write operation */ if (conn_option == CONN_MODE_WRITE) { if (!crsr) { if (!conn_data->crsr_trx) { conn_data->crsr_trx = ib_cb_trx_begin( engine->trx_level, true, false); trx_updated = true; } else { if (ib_cb_trx_read_only(conn_data->crsr_trx)) { innodb_cb_trx_commit( conn_data->crsr_trx); } ib_cb_trx_start(conn_data->crsr_trx, engine->trx_level, true, false, NULL); } err = innodb_api_begin( engine, meta_info->col_info[CONTAINER_DB].col_name, meta_info->col_info[CONTAINER_TABLE].col_name, conn_data, conn_data->crsr_trx, &conn_data->crsr, &conn_data->idx_crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->crsr); conn_data->crsr = NULL; innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } } else if (!conn_data->crsr_trx) { /* There exists a cursor, just need update with a new transaction */ conn_data->crsr_trx = ib_cb_trx_begin( engine->trx_level, true, false); innodb_cb_cursor_new_trx(crsr, conn_data->crsr_trx); trx_updated = true; err = innodb_cb_cursor_lock(engine, crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->crsr); conn_data->crsr = NULL; innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } if (meta_index->srch_use_idx == META_USE_SECONDARY) { idx_crsr = conn_data->idx_crsr; innodb_cb_cursor_new_trx( idx_crsr, conn_data->crsr_trx); innodb_cb_cursor_lock( engine, idx_crsr, lock_mode); } } else { if (ib_cb_trx_read_only(conn_data->crsr_trx)) { innodb_cb_trx_commit( conn_data->crsr_trx); } ib_cb_trx_start(conn_data->crsr_trx, engine->trx_level, true, false, NULL); ib_cb_cursor_stmt_begin(crsr); err = innodb_cb_cursor_lock(engine, crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->crsr); conn_data->crsr = NULL; innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } } if (trx_updated) { if (conn_data->read_crsr) { innodb_cb_cursor_new_trx( conn_data->read_crsr, conn_data->crsr_trx); } if (conn_data->idx_read_crsr) { innodb_cb_cursor_new_trx( conn_data->idx_read_crsr, conn_data->crsr_trx); } } } else { bool auto_commit = (engine->read_batch_size == 1 && !(engine->cfg_status & IB_CFG_DISABLE_ROWLOCK)) ? true : false; assert(conn_option == CONN_MODE_READ); if (!read_crsr) { if (!conn_data->crsr_trx) { /* This is read operation, start a trx with "read_write" parameter set to false */ conn_data->crsr_trx = ib_cb_trx_begin( engine->trx_level, false, auto_commit); trx_updated = true; } else { ib_cb_trx_start(conn_data->crsr_trx, engine->trx_level, false, auto_commit, NULL); } err = innodb_api_begin( engine, meta_info->col_info[CONTAINER_DB].col_name, meta_info->col_info[CONTAINER_TABLE].col_name, conn_data, conn_data->crsr_trx, &conn_data->read_crsr, &conn_data->idx_read_crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->read_crsr); innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->read_crsr = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } } else if (!conn_data->crsr_trx) { /* This is read operation, start a trx with "read_write" parameter set to false */ conn_data->crsr_trx = ib_cb_trx_begin( engine->trx_level, false, auto_commit); trx_updated = true; innodb_cb_cursor_new_trx( conn_data->read_crsr, conn_data->crsr_trx); if (conn_data->crsr) { innodb_cb_cursor_new_trx( conn_data->crsr, conn_data->crsr_trx); } err = innodb_cb_cursor_lock( engine, conn_data->read_crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->read_crsr); innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->read_crsr = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } if (meta_index->srch_use_idx == META_USE_SECONDARY) { ib_crsr_t idx_crsr = conn_data->idx_read_crsr; innodb_cb_cursor_new_trx( idx_crsr, conn_data->crsr_trx); innodb_cb_cursor_lock( engine, idx_crsr, lock_mode); } } else { /* This is read operation, start a trx with "read_write" parameter set to false */ ib_cb_trx_start(conn_data->crsr_trx, engine->trx_level, false, auto_commit, NULL); ib_cb_cursor_stmt_begin(conn_data->read_crsr); err = innodb_cb_cursor_lock( engine, conn_data->read_crsr, lock_mode); if (err != DB_SUCCESS) { innodb_cb_cursor_close( conn_data->read_crsr); innodb_cb_trx_commit( conn_data->crsr_trx); err = ib_cb_trx_release(conn_data->crsr_trx); assert(err == DB_SUCCESS); conn_data->crsr_trx = NULL; conn_data->read_crsr = NULL; conn_data->in_use = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(NULL); } if (meta_index->srch_use_idx == META_USE_SECONDARY) { ib_crsr_t idx_crsr = conn_data->idx_read_crsr; ib_cb_cursor_stmt_begin(idx_crsr); innodb_cb_cursor_lock( engine, idx_crsr, lock_mode); } } if (trx_updated) { if (conn_data->crsr) { innodb_cb_cursor_new_trx( conn_data->crsr, conn_data->crsr_trx); } if (conn_data->idx_crsr) { innodb_cb_cursor_new_trx( conn_data->idx_crsr, conn_data->crsr_trx); } } } UNLOCK_CURRENT_CONN_IF_NOT_LOCKED( has_lock, conn_data); return(conn_data); } /*** allocate ***/ /*******************************************************************//** Allocate gets a struct item from the slab allocator, and fills in everything but the value. It seems like we can just pass this on to the default engine; we'll intercept it later in store(). */ static ENGINE_ERROR_CODE innodb_allocate( /*============*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie, /*!< in: connection cookie */ item ** item, /*!< out: item to allocate */ const void* key, /*!< in: key */ const size_t nkey, /*!< in: key length */ const size_t nbytes, /*!< in: estimated value length */ const int flags, /*!< in: flag */ const rel_time_t exptime) /*!< in: expiration time */ { size_t len; struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine* def_eng = default_handle(innodb_eng); innodb_conn_data_t* conn_data; hash_item* it = NULL; meta_cfg_info_t* meta_info = innodb_eng->meta_info; conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); if (!conn_data) { conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_WRITE, IB_LOCK_X, false, NULL); if (!conn_data) { return(ENGINE_TMPFAIL); } } /* If system configured to use Memcached default engine (instead of InnoDB engine), continue to use Memcached's default memory allocation */ if (meta_info->set_option == META_CACHE_OPT_DEFAULT || meta_info->set_option == META_CACHE_OPT_MIX) { conn_data->use_default_mem = true; conn_data->in_use = false; return(def_eng->engine.allocate( innodb_eng->default_engine, cookie, item, key, nkey, nbytes, flags, exptime)); } conn_data->use_default_mem = false; len = sizeof(*it) + nkey + nbytes + sizeof(uint64_t); if (len > conn_data->cmd_buf_len) { free(conn_data->cmd_buf); conn_data->cmd_buf = malloc(len); conn_data->cmd_buf_len = len; } it = (hash_item*) conn_data->cmd_buf; it->next = it->prev = it->h_next = 0; it->refcount = 1; it->iflag = def_eng->config.use_cas ? ITEM_WITH_CAS : 0; it->nkey = nkey; it->nbytes = nbytes; it->flags = flags; it->slabs_clsid = 1; /* item_get_key() is a memcached code, here we cast away const return */ memcpy((void*) item_get_key(it), key, nkey); it->exptime = exptime; *item = it; conn_data->in_use = false; return(ENGINE_SUCCESS); } /*******************************************************************//** Cleanup connections @return number of connection cleaned */ static ENGINE_ERROR_CODE innodb_remove( /*==========*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie, /*!< in: connection cookie */ const void* key, /*!< in: key */ const size_t nkey, /*!< in: key length */ uint64_t cas __attribute__((unused)), /*!< in: cas */ uint16_t vbucket __attribute__((unused))) /*!< in: bucket, used by default engine only */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine* def_eng = default_handle(innodb_eng); ENGINE_ERROR_CODE err_ret = ENGINE_SUCCESS; innodb_conn_data_t* conn_data; meta_cfg_info_t* meta_info = innodb_eng->meta_info; ENGINE_ERROR_CODE cacher_err = ENGINE_KEY_ENOENT; if (meta_info->del_option == META_CACHE_OPT_DISABLE) { return(ENGINE_SUCCESS); } if (meta_info->del_option == META_CACHE_OPT_DEFAULT || meta_info->del_option == META_CACHE_OPT_MIX) { hash_item* item = item_get(def_eng, key, nkey); if (item != NULL) { item_unlink(def_eng, item); item_release(def_eng, item); cacher_err = ENGINE_SUCCESS; } if (meta_info->del_option == META_CACHE_OPT_DEFAULT) { return(cacher_err); } } conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_WRITE, IB_LOCK_X, false, NULL); if (!conn_data) { return(ENGINE_TMPFAIL); } /* In the binary protocol there is such a thing as a CAS delete. This is the CAS check. If we will also be deleting from the database, there are two possibilities: 1: The CAS matches; perform the delete. 2: The CAS doesn't match; delete the item because it's stale. Therefore we skip the check altogether if(do_db_delete) */ err_ret = innodb_api_delete(innodb_eng, conn_data, key, nkey); innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_DELETE, err_ret == ENGINE_SUCCESS); return((cacher_err == ENGINE_SUCCESS) ? ENGINE_SUCCESS : err_ret); } /*******************************************************************//** Switch the table mapping. Open the new table specified in "@@new_table_map.key" string. @return ENGINE_SUCCESS if successful, otherwise error code */ static ENGINE_ERROR_CODE innodb_switch_mapping( /*==================*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie, /*!< in: connection cookie */ const char* name, /*!< in: full name contains table map name, and possible key value */ size_t* name_len, /*!< in/out: name length, out with length excludes the table map name */ bool has_prefix) /*!< in: whether the name has "@@" prefix */ { struct innodb_engine* innodb_eng = innodb_handle(handle); innodb_conn_data_t* conn_data; char new_name[KEY_MAX_LENGTH]; meta_cfg_info_t* meta_info = innodb_eng->meta_info; char* new_map_name; unsigned int new_map_name_len = 0; char* last; meta_cfg_info_t* new_meta_info; int sep_len = 0; if (has_prefix) { char* sep = NULL; assert(*name_len > 2 && name[0] == '@' && name[1] == '@'); assert(*name_len < KEY_MAX_LENGTH); memcpy(new_name, &name[2], (*name_len) - 2); new_name[*name_len - 2] = 0; GET_OPTION(meta_info, OPTION_ID_TBL_MAP_SEP, sep, sep_len); assert(sep_len > 0); new_map_name = strtok_r(new_name, sep, &last); if (new_map_name == NULL) { return(ENGINE_KEY_ENOENT); } new_map_name_len = strlen(new_map_name); } else { /* This is used in the "bind" command, and without the "@@" prefix. */ if (name == NULL) { return(ENGINE_KEY_ENOENT); } new_map_name = (char*) name; new_map_name_len = *name_len; } conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); /* Check if we are getting the same configure setting as existing one */ if (conn_data && conn_data->conn_meta && (new_map_name_len == conn_data->conn_meta->col_info[CONTAINER_NAME].col_name_len) && (strcmp( new_map_name, conn_data->conn_meta->col_info[CONTAINER_NAME].col_name) == 0)) { goto get_key_name; } new_meta_info = innodb_config( new_map_name, new_map_name_len, &innodb_eng->meta_hash); if (!new_meta_info) { return(ENGINE_KEY_ENOENT); } /* Clean up the existing connection metadata if exists */ if (conn_data) { innodb_conn_clean_data(conn_data, false, false); /* Point to the new metadata */ conn_data->conn_meta = new_meta_info; } conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_NONE, 0, false, new_meta_info); assert(conn_data->conn_meta == new_meta_info); get_key_name: /* Now calculate name length exclude the table mapping name, this is the length for the remaining key portion */ if (has_prefix) { assert(*name_len >= strlen(new_map_name) + 2); if (*name_len >= strlen(new_map_name) + 2 + sep_len) { *name_len -= strlen(new_map_name) + 2 + sep_len; } else { /* the name does not even contain a delimiter, so there will be no keys either */ *name_len = 0; } } return(ENGINE_SUCCESS); } /*******************************************************************//** check whether a table mapping switch is needed, if so, switch the table mapping @return ENGINE_SUCCESS if successful otherwise error code */ static inline ENGINE_ERROR_CODE check_key_name_for_map_switch( /*==========================*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ const void* key, /*!< in: search key */ size_t* nkey) /*!< in/out: key length */ { ENGINE_ERROR_CODE err_ret = ENGINE_SUCCESS; if ((*nkey) > 3 && ((char*)key)[0] == '@' && ((char*)key)[1] == '@') { err_ret = innodb_switch_mapping(handle, cookie, key, nkey, true); } return(err_ret); } /*******************************************************************//** Function to support the "bind" command, bind the connection to a new table mapping. @return ENGINE_SUCCESS if successful, otherwise error code */ static ENGINE_ERROR_CODE innodb_bind( /*========*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie, /*!< in: connection cookie */ const void* name, /*!< in: table ID name */ size_t name_len) /*!< in: name length */ { ENGINE_ERROR_CODE err_ret = ENGINE_SUCCESS; err_ret = innodb_switch_mapping(handle, cookie, name, &name_len, false); return(err_ret); } /*******************************************************************//** Release the connection, free resource allocated in innodb_allocate */ static void innodb_clean_engine( /*================*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie __attribute__((unused)), /*!< in: connection cookie */ void* conn) /*!< in: item to free */ { innodb_conn_data_t* conn_data = (innodb_conn_data_t*)conn; struct innodb_engine* engine = innodb_handle(handle); void* orignal_thd; LOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); if (conn_data->thd) { handler_thd_attach(conn_data->thd, &orignal_thd); } innodb_reset_conn(conn_data, true, true, engine->enable_binlog); innodb_conn_clean_data(conn_data, true, false); conn_data->is_stale = true; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); } /*******************************************************************//** Release the connection, free resource allocated in innodb_allocate */ static void innodb_release( /*===========*/ ENGINE_HANDLE* handle, /*!< in: Engine handle */ const void* cookie __attribute__((unused)), /*!< in: connection cookie */ item* item __attribute__((unused))) /*!< in: item to free */ { struct innodb_engine* innodb_eng = innodb_handle(handle); innodb_conn_data_t* conn_data; conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); if (!conn_data) { return; } conn_data->result_in_use = false; /* If item's memory comes from Memcached default engine, release it through Memcached APIs */ if (conn_data->use_default_mem) { struct default_engine* def_eng = default_handle(innodb_eng); item_release(def_eng, (hash_item *) item); conn_data->use_default_mem = false; } return; } /* maximum number of characters that an 8 bytes integer can convert to */ #define MAX_INT_CHAR_LEN 21 /*******************************************************************//** Convert an bit int to string @return length of string */ static int convert_to_char( /*============*/ char* buf, /*!< out: converted integer value */ int buf_len, /*!< in: buffer len */ void* value, /*!< in: int value */ int value_len, /*!< in: int len */ bool is_unsigned) /*!< in: whether it is unsigned */ { assert(buf && buf_len); if (value_len == 8) { if (is_unsigned) { uint64_t int_val = *(uint64_t*)value; snprintf(buf, buf_len, "%" PRIu64, int_val); } else { int64_t int_val = *(int64_t*)value; snprintf(buf, buf_len, "%" PRIi64, int_val); } } else if (value_len == 4) { if (is_unsigned) { uint32_t int_val = *(uint32_t*)value; snprintf(buf, buf_len, "%" PRIu32, int_val); } else { int32_t int_val = *(int32_t*)value; snprintf(buf, buf_len, "%" PRIi32, int_val); } } else if (value_len == 2) { if (is_unsigned) { uint16_t int_val = *(uint16_t*)value; snprintf(buf, buf_len, "%" PRIu16, int_val); } else { int16_t int_val = *(int16_t*)value; snprintf(buf, buf_len, "%" PRIi16, int_val); } } else if (value_len == 1) { if (is_unsigned) { uint8_t int_val = *(uint8_t*)value; snprintf(buf, buf_len, "%" PRIu8, int_val); } else { int8_t int_val = *(int8_t*)value; snprintf(buf, buf_len, "%" PRIi8, int_val); } } else { return 0; } return(strlen(buf)); } /*******************************************************************//** Free value assocaited with key */ static void innodb_free_item( /*=====================*/ void* item) /*!< in: Item to be freed */ { mci_item_t* result = (mci_item_t*) item; if (result->extra_col_value) { for (int i = 0; i < result->n_extra_col; i++) { if(result->extra_col_value[i].allocated) free(result->extra_col_value[i].value_str); } free(result->extra_col_value); result->extra_col_value=NULL; } if (result->col_value[MCI_COL_VALUE].allocated) { free(result->col_value[MCI_COL_VALUE].value_str); result->col_value[MCI_COL_VALUE].allocated = false; } } /*******************************************************************//** Support memcached "GET" command, fetch the value according to key @return ENGINE_SUCCESS if successfully, otherwise error code */ static ENGINE_ERROR_CODE innodb_get( /*=======*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ item** item, /*!< out: item to fill */ const void* key, /*!< in: search key */ const int nkey, /*!< in: key length */ uint16_t vbucket __attribute__((unused))) /*!< in: bucket, used by default engine only */ { struct innodb_engine* innodb_eng = innodb_handle(handle); ib_crsr_t crsr; ib_err_t err = DB_SUCCESS; mci_item_t* result = NULL; ENGINE_ERROR_CODE err_ret = ENGINE_SUCCESS; innodb_conn_data_t* conn_data = NULL; meta_cfg_info_t* meta_info = innodb_eng->meta_info; int option_length; const char* option_delimiter; size_t key_len = nkey; int lock_mode; bool report_table_switch = false; if (meta_info->get_option == META_CACHE_OPT_DISABLE) { return(ENGINE_KEY_ENOENT); } if (meta_info->get_option == META_CACHE_OPT_DEFAULT || meta_info->get_option == META_CACHE_OPT_MIX) { *item = item_get(default_handle(innodb_eng), key, nkey); if (*item != NULL) { return(ENGINE_SUCCESS); } if (meta_info->get_option == META_CACHE_OPT_DEFAULT) { return(ENGINE_KEY_ENOENT); } } /* Check if we need to switch table mapping */ err_ret = check_key_name_for_map_switch(handle, cookie, key, &key_len); /* If specified new table map does not exist, or table does not qualify for InnoDB memcached, return error */ if (err_ret != ENGINE_SUCCESS) { goto err_exit; } /* If only the new mapping name is provided, and no key value, return here */ if (key_len <= 0) { /* If this is a command in the form of "get @@new_table_map", for the purpose of switching to the specified table with the table map name, if the switch is successful, we will return the table name as result */ if (nkey > 0) { report_table_switch = true; goto search_done; } err_ret = ENGINE_KEY_ENOENT; goto err_exit; } lock_mode = (innodb_eng->trx_level == IB_TRX_SERIALIZABLE && innodb_eng->read_batch_size == 1) ? IB_LOCK_S : IB_LOCK_NONE; conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_READ, lock_mode, false, NULL); if (!conn_data) { return(ENGINE_TMPFAIL); } result = (mci_item_t*)(conn_data->result); err = innodb_api_search(conn_data, &crsr, key + nkey - key_len, key_len, result, NULL, true); if (err != DB_SUCCESS) { err_ret = ENGINE_KEY_ENOENT; goto func_exit; } search_done: if (report_table_switch) { char table_name[MAX_TABLE_NAME_LEN + MAX_DATABASE_NAME_LEN]; char* name; char* dbname; conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); assert(nkey > 0); name = conn_data->conn_meta->col_info[CONTAINER_TABLE].col_name; dbname = conn_data->conn_meta->col_info[CONTAINER_DB].col_name; #ifdef __WIN__ sprintf(table_name, "%s\%s", dbname, name); #else snprintf(table_name, sizeof(table_name), "%s/%s", dbname, name); #endif assert(!conn_data->result_in_use); conn_data->result_in_use = true; result = (mci_item_t*)(conn_data->result); memset(result, 0, sizeof(*result)); memcpy(conn_data->row_buf, table_name, strlen(table_name)); result->col_value[MCI_COL_VALUE].value_str = conn_data->row_buf; result->col_value[MCI_COL_VALUE].value_len = strlen(table_name); result->col_value[MCI_COL_VALUE].is_str = true; result->col_value[MCI_COL_VALUE].is_valid = true; } result->col_value[MCI_COL_KEY].value_str = (char*)key; result->col_value[MCI_COL_KEY].value_len = nkey; /* Only if expiration field is enabled, and the value is not zero, we will check whether the item is expired */ if (result->col_value[MCI_COL_EXP].is_valid && result->col_value[MCI_COL_EXP].value_int) { uint64_t time; time = mci_get_time(); if (time > result->col_value[MCI_COL_EXP].value_int) { innodb_free_item(result); err_ret = ENGINE_KEY_ENOENT; goto func_exit; } } if (result->extra_col_value) { int i; char* c_value; char* value_end; unsigned int total_len = 0; char int_buf[MAX_INT_CHAR_LEN]; GET_OPTION(meta_info, OPTION_ID_COL_SEP, option_delimiter, option_length); assert(option_length > 0 && option_delimiter); for (i = 0; i < result->n_extra_col; i++) { mci_column_t* mci_item = &result->extra_col_value[i]; if (mci_item->value_len == 0) { total_len += option_length; continue; } if (!mci_item->is_str) { memset(int_buf, 0, sizeof int_buf); assert(!mci_item->value_str); total_len += convert_to_char( int_buf, sizeof int_buf, &mci_item->value_int, mci_item->value_len, mci_item->is_unsigned); } else { total_len += result->extra_col_value[i].value_len; } total_len += option_length; } /* No need to add the last separator */ total_len -= option_length; if (total_len > conn_data->mul_col_buf_len) { if (conn_data->mul_col_buf) { free(conn_data->mul_col_buf); } conn_data->mul_col_buf = malloc(total_len + 1); conn_data->mul_col_buf_len = total_len; } c_value = conn_data->mul_col_buf; value_end = conn_data->mul_col_buf + total_len; for (i = 0; i < result->n_extra_col; i++) { mci_column_t* col_value; col_value = &result->extra_col_value[i]; if (col_value->value_len != 0) { if (!col_value->is_str) { int int_len; memset(int_buf, 0, sizeof int_buf); int_len = convert_to_char( int_buf, sizeof int_buf, &col_value->value_int, col_value->value_len, col_value->is_unsigned); assert(int_len <= conn_data->mul_col_buf_len); memcpy(c_value, int_buf, int_len); c_value += int_len; } else { memcpy(c_value, col_value->value_str, col_value->value_len); c_value += col_value->value_len; } } if (i < result->n_extra_col - 1 ) { memcpy(c_value, option_delimiter, option_length); c_value += option_length; } assert(c_value <= value_end); if (col_value->allocated) { free(col_value->value_str); } } result->col_value[MCI_COL_VALUE].value_str = conn_data->mul_col_buf; result->col_value[MCI_COL_VALUE].value_len = total_len; result->col_value[MCI_COL_VALUE].is_str = true; result->col_value[MCI_COL_VALUE].is_valid = true; ((char*)result->col_value[MCI_COL_VALUE].value_str)[total_len] = 0; free(result->extra_col_value); } else if (!result->col_value[MCI_COL_VALUE].is_str && result->col_value[MCI_COL_VALUE].value_len != 0) { unsigned int int_len; char int_buf[MAX_INT_CHAR_LEN]; memset(int_buf, 0, sizeof int_buf); int_len = convert_to_char( int_buf, sizeof int_buf, &result->col_value[MCI_COL_VALUE].value_int, result->col_value[MCI_COL_VALUE].value_len, result->col_value[MCI_COL_VALUE].is_unsigned); if (int_len > conn_data->mul_col_buf_len) { if (conn_data->mul_col_buf) { free(conn_data->mul_col_buf); } conn_data->mul_col_buf = malloc(int_len + 1); conn_data->mul_col_buf_len = int_len; } memcpy(conn_data->mul_col_buf, int_buf, int_len); result->col_value[MCI_COL_VALUE].value_str = conn_data->mul_col_buf; result->col_value[MCI_COL_VALUE].value_len = int_len; result->col_value[MCI_COL_VALUE].is_str = true; result->col_value[MCI_COL_VALUE].is_valid = true; } *item = result; func_exit: if (!report_table_switch) { innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_READ, true); } err_exit: /* If error return, memcached will not call InnoDB Memcached's callback function "innodb_release" to reset the result_in_use value. So we reset it here */ if (err_ret != ENGINE_SUCCESS && conn_data) { conn_data->result_in_use = false; } return(err_ret); } /*******************************************************************//** Get statistics info @return ENGINE_SUCCESS if successfully, otherwise error code */ static ENGINE_ERROR_CODE innodb_get_stats( /*=============*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ const char* stat_key, /*!< in: statistics key */ int nkey, /*!< in: key length */ ADD_STAT add_stat) /*!< out: stats to fill */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine *def_eng = default_handle(innodb_eng); return(def_eng->engine.get_stats(innodb_eng->default_engine, cookie, stat_key, nkey, add_stat)); } /*******************************************************************//** reset statistics @return ENGINE_SUCCESS if successfully, otherwise error code */ static void innodb_reset_stats( /*===============*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie) /*!< in: connection cookie */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine *def_eng = default_handle(innodb_eng); def_eng->engine.reset_stats(innodb_eng->default_engine, cookie); } /*******************************************************************//** API interface for memcached's "SET", "ADD", "REPLACE", "APPEND" "PREPENT" and "CAS" commands @return ENGINE_SUCCESS if successfully, otherwise error code */ static ENGINE_ERROR_CODE innodb_store( /*=========*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ item* item, /*!< out: result to fill */ uint64_t* cas, /*!< in: cas value */ ENGINE_STORE_OPERATION op, /*!< in: type of operation */ uint16_t vbucket __attribute__((unused))) /*!< in: bucket, used by default engine only */ { struct innodb_engine* innodb_eng = innodb_handle(handle); uint16_t len = hash_item_get_key_len(item); char* value = hash_item_get_key(item); uint64_t exptime = hash_item_get_exp(item); uint64_t flags = hash_item_get_flag(item); ENGINE_ERROR_CODE result; uint64_t input_cas; innodb_conn_data_t* conn_data; meta_cfg_info_t* meta_info = innodb_eng->meta_info; uint32_t val_len = ((hash_item*)item)->nbytes; size_t key_len = len; ENGINE_ERROR_CODE err_ret = ENGINE_SUCCESS; if (meta_info->set_option == META_CACHE_OPT_DISABLE) { return(ENGINE_SUCCESS); } if (meta_info->set_option == META_CACHE_OPT_DEFAULT || meta_info->set_option == META_CACHE_OPT_MIX) { result = store_item(default_handle(innodb_eng), item, cas, op, cookie); if (meta_info->set_option == META_CACHE_OPT_DEFAULT) { return(result); } } err_ret = check_key_name_for_map_switch(handle, cookie, value, &key_len); if (err_ret != ENGINE_SUCCESS) { return(err_ret); } /* If no key is provided, return here */ if (key_len <= 0) { return(ENGINE_NOT_STORED); } conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_WRITE, IB_LOCK_X, false, NULL); if (!conn_data) { return(ENGINE_NOT_STORED); } input_cas = hash_item_get_cas(item); result = innodb_api_store(innodb_eng, conn_data, value + len - key_len, key_len, val_len, exptime, cas, input_cas, flags, op); innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_WRITE, result == ENGINE_SUCCESS); return(result); } /*******************************************************************//** Support memcached "INCR" and "DECR" command, add or subtract a "delta" value from an integer key value @return ENGINE_SUCCESS if successfully, otherwise error code */ static ENGINE_ERROR_CODE innodb_arithmetic( /*==============*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ const void* key, /*!< in: key for the value to add */ const int nkey, /*!< in: key length */ const bool increment, /*!< in: whether to increment or decrement */ const bool create, /*!< in: whether to create the key value pair if can't find */ const uint64_t delta, /*!< in: value to add/substract */ const uint64_t initial, /*!< in: initial */ const rel_time_t exptime, /*!< in: expiration time */ uint64_t* cas, /*!< out: new cas value */ uint64_t* result, /*!< out: result value */ uint16_t vbucket) /*!< in: bucket, used by default engine only */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine* def_eng = default_handle(innodb_eng); innodb_conn_data_t* conn_data; meta_cfg_info_t* meta_info = innodb_eng->meta_info; ENGINE_ERROR_CODE err_ret; if (meta_info->set_option == META_CACHE_OPT_DISABLE) { return(ENGINE_SUCCESS); } if (meta_info->set_option == META_CACHE_OPT_DEFAULT || meta_info->set_option == META_CACHE_OPT_MIX) { /* For cache-only, forward this to the default engine */ err_ret = def_eng->engine.arithmetic( innodb_eng->default_engine, cookie, key, nkey, increment, create, delta, initial, exptime, cas, result, vbucket); if (meta_info->set_option == META_CACHE_OPT_DEFAULT) { return(err_ret); } } conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_WRITE, IB_LOCK_X, false, NULL); if (!conn_data) { return(ENGINE_NOT_STORED); } err_ret = innodb_api_arithmetic(innodb_eng, conn_data, key, nkey, delta, increment, cas, exptime, create, initial, result); innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_WRITE, true); return(err_ret); } /*******************************************************************//** Cleanup idle connections if "clear_all" is false, and clean up all connections if "clear_all" is true. @return number of connection cleaned */ static bool innodb_flush_sync_conn( /*===================*/ innodb_engine_t* engine, /*!< in/out: InnoDB memcached engine */ const void* cookie, /*!< in: connection cookie */ bool flush_flag) /*!< in: flush is running or not */ { innodb_conn_data_t* conn_data = NULL; innodb_conn_data_t* curr_conn_data; bool ret = true; curr_conn_data = engine->server.cookie->get_engine_specific(cookie); assert(curr_conn_data); conn_data = UT_LIST_GET_FIRST(engine->conn_data); while (conn_data) { if (conn_data != curr_conn_data && (!conn_data->is_stale)) { if (conn_data->thd) { handler_thd_attach(conn_data->thd, NULL); } LOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); if (flush_flag == false) { conn_data->is_flushing = flush_flag; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); conn_data = UT_LIST_GET_NEXT(conn_list, conn_data); continue; } if (!conn_data->in_use) { /* Set flushing flag to conn_data for preventing it is get by other request. */ conn_data->is_flushing = flush_flag; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); } else { ret = false; UNLOCK_CURRENT_CONN_IF_NOT_LOCKED(false, conn_data); break; } } conn_data = UT_LIST_GET_NEXT(conn_list, conn_data); } if (curr_conn_data->thd) { handler_thd_attach(curr_conn_data->thd, NULL); } return(ret); } /*******************************************************************//** Support memcached "FLUSH_ALL" command, clean up storage (trunate InnoDB Table) @return ENGINE_SUCCESS if successfully, otherwise error code */ static ENGINE_ERROR_CODE innodb_flush( /*=========*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ time_t when) /*!< in: when to flush, not used by InnoDB */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine* def_eng = default_handle(innodb_eng); ENGINE_ERROR_CODE err = ENGINE_SUCCESS; meta_cfg_info_t* meta_info = innodb_eng->meta_info; ib_err_t ib_err = DB_SUCCESS; innodb_conn_data_t* conn_data; if (meta_info->flush_option == META_CACHE_OPT_DISABLE) { return(ENGINE_SUCCESS); } if (meta_info->flush_option == META_CACHE_OPT_DEFAULT || meta_info->flush_option == META_CACHE_OPT_MIX) { /* default engine flush */ err = def_eng->engine.flush(innodb_eng->default_engine, cookie, when); if (meta_info->flush_option == META_CACHE_OPT_DEFAULT) { return(err); } } /* Lock the whole engine, so no other connection can start new opeartion */ pthread_mutex_lock(&innodb_eng->conn_mutex); /* Lock the flush_mutex for blocking other DMLs. */ pthread_mutex_lock(&innodb_eng->flush_mutex); conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); if (conn_data) { /* Commit any work on this connection */ innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_FLUSH, true); } conn_data = innodb_conn_init(innodb_eng, cookie, CONN_MODE_WRITE, IB_LOCK_TABLE_X, true, NULL); if (!conn_data) { pthread_mutex_unlock(&innodb_eng->flush_mutex); pthread_mutex_unlock(&innodb_eng->conn_mutex); return(ENGINE_SUCCESS); } /* Commit any previous work on this connection */ innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_FLUSH, true); if (!innodb_flush_sync_conn(innodb_eng, cookie, true)) { pthread_mutex_unlock(&innodb_eng->flush_mutex); pthread_mutex_unlock(&innodb_eng->conn_mutex); innodb_flush_sync_conn(innodb_eng, cookie, false); return(ENGINE_TMPFAIL); } ib_err = innodb_api_flush(innodb_eng, conn_data, meta_info->col_info[CONTAINER_DB].col_name, meta_info->col_info[CONTAINER_TABLE].col_name); /* Commit work and release the MDL table. */ innodb_api_cursor_reset(innodb_eng, conn_data, CONN_OP_FLUSH, true); innodb_conn_clean_data(conn_data, false, false); pthread_mutex_unlock(&innodb_eng->flush_mutex); pthread_mutex_unlock(&innodb_eng->conn_mutex); innodb_flush_sync_conn(innodb_eng, cookie, false); return((ib_err == DB_SUCCESS) ? ENGINE_SUCCESS : ENGINE_TMPFAIL); } /*******************************************************************//** Deal with unknown command. Currently not used @return ENGINE_SUCCESS if successfully processed, otherwise error code */ static ENGINE_ERROR_CODE innodb_unknown_command( /*===================*/ ENGINE_HANDLE* handle, /*!< in: Engine Handle */ const void* cookie, /*!< in: connection cookie */ protocol_binary_request_header *request, /*!< in: request */ ADD_RESPONSE response) /*!< out: respondse */ { struct innodb_engine* innodb_eng = innodb_handle(handle); struct default_engine *def_eng = default_handle(innodb_eng); return(def_eng->engine.unknown_command(innodb_eng->default_engine, cookie, request, response)); } /*******************************************************************//** Callback functions used by Memcached's process_command() function to get the result key/value information @return true if info fetched */ static bool innodb_get_item_info( /*=================*/ ENGINE_HANDLE* handle __attribute__((unused)), /*!< in: Engine Handle */ const void* cookie __attribute__((unused)), /*!< in: connection cookie */ const item* item, /*!< in: item in question */ item_info* item_info) /*!< out: item info got */ { struct innodb_engine* innodb_eng = innodb_handle(handle); innodb_conn_data_t* conn_data; conn_data = innodb_eng->server.cookie->get_engine_specific(cookie); if (!conn_data || !conn_data->result_in_use) { hash_item* it; if (item_info->nvalue < 1) { return(false); } /* Use a hash item */ it = (hash_item*) item; item_info->cas = hash_item_get_cas(it); item_info->exptime = it->exptime; item_info->nbytes = it->nbytes; item_info->flags = it->flags; item_info->clsid = it->slabs_clsid; item_info->nkey = it->nkey; item_info->nvalue = 1; item_info->key = hash_item_get_key(it); item_info->value[0].iov_base = hash_item_get_data(it); item_info->value[0].iov_len = it->nbytes; } else { mci_item_t* it; if (item_info->nvalue < 1) { return(false); } /* Use a hash item */ it = (mci_item_t*) item; if (it->col_value[MCI_COL_CAS].is_valid) { item_info->cas = it->col_value[MCI_COL_CAS].value_int; } else { item_info->cas = 0; } if (it->col_value[MCI_COL_EXP].is_valid) { item_info->exptime = it->col_value[MCI_COL_EXP].value_int; } else { item_info->exptime = 0; } item_info->nbytes = it->col_value[MCI_COL_VALUE].value_len; if (it->col_value[MCI_COL_FLAG].is_valid) { item_info->flags = ntohl( it->col_value[MCI_COL_FLAG].value_int); } else { item_info->flags = 0; } item_info->clsid = 1; item_info->nkey = it->col_value[MCI_COL_KEY].value_len; item_info->nvalue = 1; item_info->key = it->col_value[MCI_COL_KEY].value_str; item_info->value[0].iov_base = it->col_value[ MCI_COL_VALUE].value_str;; item_info->value[0].iov_len = it->col_value[ MCI_COL_VALUE].value_len; } return(true); }