Server IP : 172.67.216.182 / Your IP : 172.71.124.68 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/databaseModelV2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <[email protected]> # ------------------------------------------------------------------- import json # sqlite模型 # ------------------------------ import os import re import shutil import time from typing import Union import public from databaseModelV2.base import databaseBase from public.validate import Param try: import redis except: public.ExecShell("btpip install redis") import redis try: from BTPanel import session except: pass # 2025/6/6 同步国内 class panelRedisDB(): __DB_PASS = None __DB_USER = None __DB_PORT = 6379 __DB_HOST = '127.0.0.1' __DB_CONN = None __DB_ERR = None __DB_CLOUD = None def __init__(self): self.__config = self.get_options(None) self.error_message = "" def redis_conn(self, db_idx=0): if self.__DB_HOST in ['127.0.0.1', 'localhost']: if not os.path.exists('/www/server/redis'): return False if self.__config == "Config Error": return False if not self.__DB_CLOUD: self.__DB_PASS = self.__config['requirepass'] self.__DB_PORT = int(self.__config['port']) try: redis_pool = redis.ConnectionPool( host=self.__DB_HOST, port=self.__DB_PORT, password=self.__DB_PASS, db=db_idx ) self.__DB_CONN = redis.Redis(connection_pool=redis_pool) self.__DB_CONN.ping() return self.__DB_CONN except redis.exceptions.ConnectionError: return False except Exception: self.__DB_ERR = public.get_error_info() return False def set_host(self, host, port, name, username, password, prefix=''): self.__DB_HOST = host self.__DB_PORT = int(port) self.__DB_NAME = name if self.__DB_NAME: self.__DB_NAME = str(self.__DB_NAME) self.__DB_USER = str(username) self._USER = str(username) self.__DB_PASS = str(password) self.__DB_PREFIX = prefix self.__DB_CLOUD = 1 return self # 获取配置项 def get_options(self, get=None): import ipaddress result = {} redis_conf = public.readFile("{}/redis/redis.conf".format(public.get_setup_path())) if not redis_conf: if not os.path.exists('/www/server/redis'): return False public.ExecShell("mv /www/server/redis/redis.conf /www/server/redis/redis.conf.bak") public.ExecShell( "wget -O /www/server/redis/redis.conf https://node.aapanel.com/conf/redis.conf;chmod 600 /www/server/redis/redis.conf;chown redis:redis /www/server/redis/redis.conf" ) time.sleep(1) redis_conf = public.readFile("{}/redis/redis.conf".format(public.get_setup_path())) keys = ["bind", "port", "timeout", "maxclients", "databases", "requirepass", "maxmemory"] defaults = ["127.0.0.1", "6379", "300", "10000", "16", "", "0"] errors = [] for n, k in enumerate(keys): rep = r"\n{}\s+(.*)".format(k) # 更准确地捕获整行 group = re.search(rep, redis_conf) if group: value = group.group(1).strip() try: if k == "maxmemory": # 将 maxmemory 从字节转换为兆字节并赋值 value = str(int(value) // 1024 // 1024) except: pass else: value = defaults[n] if k in ["port", "timeout", "maxclients", "databases", "maxmemory"]: if not value.isdigit(): errors.append(f"'{k}' value must be a number, and the currently configured value is '{value}'") continue if k == "bind": try: # 尝试解析IP地址以验证其格式 ipaddress.ip_address(value) except ValueError: errors.append( f"'{k}' value must be a valid IP address, the current configuration value is '{value}'") continue result[k] = value if errors: error_message = "The following error was detected in the configuration file for the Redis database:\n" + "\n".join( errors) error_message += "\nPlease go to the software shop to make the correct changes to the configuration file of the redis plugin!" self.error_message = error_message return "Config Error" return result class main(databaseBase): _DB_BACKUP_DIR = os.path.join(public.M("config").where("id=?", (1,)).getField("backup_path"), "database") _REDIS_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "redis") _REDIS_CONF = os.path.join(public.get_setup_path(), "redis/redis.conf") def __init__(self): if not os.path.exists(self._REDIS_BACKUP_DIR): os.makedirs(self._REDIS_BACKUP_DIR) self._db_num = 16 if os.path.exists(self._REDIS_CONF): redis_conf = public.readFile(self._REDIS_CONF) db_obj = re.search("\ndatabases\s+(\d+)", redis_conf) if db_obj: self._db_num = int(db_obj.group(1)) def GetCloudServer(self, args): """ @name 获取远程服务器列表 @author hwliang<2021-01-10> @return list """ return public.return_message(0, 0, self.GetBaseCloudServer(args)) def AddCloudServer(self, args): """ @添加远程数据库 """ # {"db_host":"192.168.66.129","db_port":"6379","db_user":"root","db_password":"password1","db_ps":"192.168.66.129","type":"redis"} # 校验参数 try: args.validate([ Param('db_host').Require(), Param('db_port').Require().Number(">=", 1).Number("<=", 65535), Param('db_user').Require().String(), Param('db_password').Require().String(), Param('db_ps').Require().String(), Param('type').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) return self.AddBaseCloudServer(args) def RemoveCloudServer(self, args): """ @删除远程数据库 """ return self.RemoveBaseCloudServer(args) def ModifyCloudServer(self, args): """ @修改远程数据库 """ return self.ModifyBaseCloudServer(args) def get_obj_by_sid(self, sid: Union[int, str] = 0, conn_config: dict = None): """ @取mssql数据库对像 By sid @sid 数据库分类,0:本地 """ if isinstance(sid, str): sid = int(sid) if sid != 0: if not conn_config: conn_config = public.M('database_servers').where("id=?", sid).find() db_obj = panelRedisDB() try: db_obj = db_obj.set_host( conn_config['db_host'], conn_config['db_port'], None, conn_config['db_user'], conn_config['db_password'] ) except Exception as e: raise public.PanelError(e) else: db_obj = panelRedisDB() return db_obj def local_xsssec(self, text): """ @name XSS防御,只替换关键字符,不转义字符 @author hwliang @param text 要转义的字符 @return str """ sub_list = { '<': '<', '>': '>' } for s in sub_list.keys(): text = text.replace(s, sub_list[s]) return text def get_list(self, args): """ @获取数据库列表 @sql_type = redis """ # 校验参数 try: args.validate([ Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) result = [] sid = args.get('sid/d', 0) redis_obj = self.get_obj_by_sid(sid).redis_conn(0) if redis_obj is False: if panelRedisDB().error_message: return public.fail_v2(public.lang(str(panelRedisDB().error_message))) return public.success_v2(result) redis_info = redis_obj.info() is_cluster = redis_info.get("cluster_enabled", 0) if is_cluster != 0: return public.fail_v2(public.lang("not support redis cluster!")) db_num = self._db_num if sid != 0: db_num = 1000 for x in range(0, db_num): data = {} data['id'] = x data['name'] = 'DB{}'.format(x) try: redis_obj = self.get_obj_by_sid(sid).redis_conn(x) data['keynum'] = redis_obj.dbsize() result.append(data) except Exception as ex: break return public.return_message(0, 0, result) def set_redis_val(self, args): """ @设置或修改指定值 """ # 校验参数 try: args.validate([ Param('db_idx').Require().Integer(), Param('sid').Require().Integer(), Param('name').Require().String(), # 键 Param('val').Require().String(), # 值 Param('endtime').Integer(), # 过期时间 ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = args.get("sid/d", 0) db_idx = args.get("db_idx") name = args.get("name") val = args.get("val") endtime = args.get("endtime", None) redis_obj = self.get_obj_by_sid(sid).redis_conn(db_idx) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail!")) if endtime is not None: redis_obj.set(name, val, int(endtime)) else: redis_obj.set(name, val) public.set_module_logs('linux_redis', 'set_redis_val', 1) return public.return_message(0, 0, public.lang("Operation is successful")) def del_redis_val(self, args): """ @删除key值 """ # 校验参数 try: args.validate([ Param('db_idx').Require().Integer(), Param('sid').Require().Integer(), Param('key').Require().String(), # 键 ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = args.get('sid/d', 0) redis_obj = self.get_obj_by_sid(sid).redis_conn(args.db_idx) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail!")) redis_obj.delete(args.key) return public.return_message(0, 0, public.lang("Operation is successful")) def clear_flushdb(self, args): """ 清空数据库 @ids 清空数据库列表,不传则清空所有 """ # 校验参数 try: args.validate([ Param('ids').String(), # "ids":"[0,1]" Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = args.get('sid/d', 0) ids = json.loads(args.ids) # ids = [] if len(ids) == 0: for x in range(0, self._db_num): ids.append(x) redis_obj = self.get_obj_by_sid(sid).redis_conn(0) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail!")) for x in ids: redis_obj = self.get_obj_by_sid(sid).redis_conn(x) redis_obj.flushdb() return public.return_message(0, 0, public.lang("Operation is successful")) def get_db_keylist(self, args): """ @获取指定数据库key集合 """ # 校验参数 try: args.validate([ Param('db_type').Integer(), Param('db_idx').Require().Integer(), Param('limit').Integer(), Param('p').Integer(), Param('search').String(), Param('tojs').String(), # 不知道 ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) search = '*' if 'search' in args: search = "*" + args.search + "*" db_idx = args.db_idx sid = args.get('sid/d', 0) redis_obj = self.get_obj_by_sid(sid).redis_conn(db_idx) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail!")) try: total = redis_obj.dbsize() except Exception as err: if str(err).find("Connection refused"): return public.fail_v2(public.lang(f"redis connection failed, please check if the database service is started!")) return public.fail_v2(public.lang(f"redis connection failed, please check if the database service is started!{err}")) info = {'p': 1, 'row': 20, 'count': total} if hasattr(args, 'limit'): info['row'] = int(args.limit) if hasattr(args, 'p'): info['p'] = int(args['p']) try: if search != '*': keylist = redis_obj.keys(search) info['count'] = len(keylist) else: keys = redis_obj.scan(match="{}".format(search), count=info['p'] * info['row']) keylist = keys[1] except: keylist = [] info = {'p': 1, 'row': 10, 'count': len(keylist)} if hasattr(args, 'limit'): info['row'] = int(args.limit) if hasattr(args, 'p'): info['p'] = int(args['p']) import page # 实例化分页类 page = page.Page() info['uri'] = args info['return_js'] = '' if hasattr(args, 'tojs'): info['return_js'] = args.tojs slist = keylist[(info['p'] - 1) * info['row']:info['p'] * info['row']] rdata = {} rdata['page'] = page.GetPage(info, '1,2,3,4,5,8') rdata['where'] = '' rdata['data'] = [] from datetime import timedelta idx = 0 for key in slist: item = {} try: item['name'] = key.decode() except: item['name'] = str(key) item['endtime'] = redis_obj.ttl(key) if item['endtime'] == -1: item['endtime'] = 0 item['showtime'] = "royalty-free" else: key_ttl = redis_obj.ttl(key) INT_MAX = 2147483647 INT_MIN = -2147483648 if key_ttl > INT_MAX or key_ttl < INT_MIN: item['showtime'] = str(key_ttl) + "second" else: delta = timedelta(seconds=key_ttl) days, remainder = divmod(delta.total_seconds(), 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) formatted_ttl = f"{int(days)}days{int(hours)}hours{int(minutes)}min{int(seconds)}second" item['showtime'] = str(formatted_ttl) item['type'] = redis_obj.type(key).decode() if item['type'] == 'string': try: item['val'] = redis_obj.get(key).decode() except: item['val'] = str(redis_obj.get(key)) elif item['type'] == 'hash': if redis_obj.hlen(key) > 500: item['val'] = "The amount of data is too large to display!Total {} entries".format( redis_obj.hlen(key)) else: item['val'] = str(redis_obj.hgetall(key)) elif item['type'] == 'list': if redis_obj.llen(key) > 500: item['val'] = "The amount of data is too large to display!Total {} entries".format( redis_obj.llen(key)) else: item['val'] = str(redis_obj.lrange(key, 0, -1)) elif item['type'] == 'set': if redis_obj.scard(key) > 500: item['val'] = "The amount of data is too large to display!Total {} entries".format( redis_obj.scard(key)) else: item['val'] = str(redis_obj.smembers(key)) elif item['type'] == 'zset': if redis_obj.zcard(key) > 500: item['val'] = "The amount of data is too large to display!Total {} entries".format( redis_obj.zcard(key)) else: item['val'] = str(redis_obj.zrange(key, 0, -1, withscores=True)) else: item['val'] = '' try: item['len'] = redis_obj.strlen(key) except: item['len'] = len(item['val']) item['val'] = self.local_xsssec(item['val']) item['name'] = public.xsssec(item['name']) rdata['data'].append(item) idx += 1 return public.return_message(0, 0, rdata) # 备份数据库 def ToBackup(self, args): """ @备份数据库 """ # 校验参数 try: args.validate([ Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = args.get('sid/d', 0) if sid != 0: return public.fail_v2(public.lang("Backing up remote databases is not supported at this time!")) db_fidx = None if not hasattr(args, "db_idx"): db_fname = "all_db" else: db_fidx = args.db_idx db_fname = "db_{}".format(db_fidx) redis_obj = self.get_obj_by_sid(sid) if redis_obj.redis_conn(0) is False: return public.fail_v2(public.lang("redis connect fail!")) if db_fidx: redis_obj.redis_conn(0).execute_command("SELECT", int(db_fidx)) redis_obj.redis_conn(0).execute_command("SAVE") else: for db_idx in range(0, self._db_num): redis_obj.redis_conn(db_idx).save() redis_obj = redis_obj.redis_conn(0) src_path = os.path.join(redis_obj.config_get().get("dir", ""), "dump.rdb") if not os.path.exists(src_path): return public.fail_v2(public.lang('BACKUP_ERROR')) file_name = "{db_fname}_{backup_time}_redis_data.rdb".format(db_fname=db_fname, backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) file_path = os.path.join(self._REDIS_BACKUP_DIR, file_name) shutil.copyfile(src_path, file_path) if not os.path.exists(file_path): return public.fail_v2(public.lang('BACKUP_ERROR')) return public.success_v2(public.lang('BACKUP_SUCCESS')) def DelBackup(self, args): """ @删除备份文件 """ # 校验参数 try: args.validate([ Param('file').Require().SafePath(), # Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) file = args.file if os.path.exists(file): os.remove(file) return public.return_message(0, 0, public.lang("Delete successfully!")) def InputSql(self, get): """ @导入数据库 """ # 校验参数 try: get.validate([ Param('file').Require().SafePath(), Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) file = get.file sid = get.get('sid/d', 0) if not os.path.isfile(file): return public.fail_v2(public.lang("file not found")) redis_obj = self.get_obj_by_sid(sid).redis_conn(0) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail")) rpath = redis_obj.config_get()['dir'] dst_path = '{}/dump.rdb'.format(rpath) public.ExecShell("/etc/init.d/redis stop") if os.path.exists(dst_path): os.remove(dst_path) shutil.copy2(file, dst_path) public.ExecShell("chown redis.redis {dump} && chmod 644 {dump}".format(dump=dst_path)) public.ExecShell("/etc/init.d/redis start") if os.path.exists(dst_path): return public.return_message(0, 0, public.lang("Restore Successful.")) return public.return_message(-1, 0, public.lang("Restore failure.")) def get_backup_list(self, get): """ @获取备份文件列表 """ # 校验参数 try: get.validate([ Param('search').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) search = '' if hasattr(get, 'search'): search = get['search'].strip().lower() nlist = [] cloud_list = {} for x in self.GetCloudServer({'type': 'redis'}).get("message", []): cloud_list['id-' + str(x['id'])] = x for name in os.listdir(self._REDIS_BACKUP_DIR): if search: if name.lower().find(search) == -1: continue arrs = name.split('_') file_path = os.path.join(self._REDIS_BACKUP_DIR, name).replace('//', '/') if not os.path.isfile(file_path): continue stat = os.stat(file_path) item = {} item['name'] = name item['filepath'] = file_path item['size'] = stat.st_size item['mtime'] = int(stat.st_mtime) item['sid'] = arrs[0] try: if 0 <= int(arrs[0]) <= 15: item['conn_config'] = cloud_list['id-' + str(arrs[0])] except ValueError: pass nlist.append(item) if hasattr(get, 'sort'): nlist = sorted(nlist, key=lambda data: data['mtime'], reverse=get["sort"] == "desc") return public.return_message(0, 0, nlist) def restart_services(self): """ @重启服务 """ public.ExecShell('net stop redis') public.ExecShell('net start redis') return True def check_cloud_database_status(self, conn_config): """ @检测远程数据库是否连接 @conn_config 远程数据库配置,包含host port pwd等信息 """ try: sql_obj = panelRedisDB().set_host(conn_config['db_host'], conn_config['db_port'], conn_config['db_name'], conn_config['db_user'], conn_config['db_password']) redis_obj = sql_obj.redis_conn(0) if redis_obj is False: return public.fail_v2(public.lang("redis connect fail")) keynum = redis_obj.dbsize() return True except Exception as ex: return public.fail_v2(public.lang("remote database connect fail {}".format(ex))) # 数据库状态检测 def CheckDatabaseStatus(self, get): """ 数据库状态检测 """ if not hasattr(get, "sid"): return public.fail_v2("params not found! sid") if not str(get.sid).isdigit(): return public.fail_v2("params not found! sid") sid = int(get.sid) if sid != 0: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('redis')", (sid,)).find() if not conn_config: return public.fail_v2(public.lang("Remote database information does not exist!")) conn_config["db_name"] = None redis_obj = panelRedisDB().set_host(conn_config['db_host'], conn_config['db_port'], conn_config.get("db_name"), conn_config['db_user'], conn_config['db_password']) else: redis_obj = panelRedisDB() if redis_obj.redis_conn(0) is False: return {"status": True, "msg": "exceptions", "db_status": False} try: redis_obj.redis_conn(0).dbsize() db_status = True except: db_status = False return {"status": True, "msg": "normalcy" if db_status is True else "exceptions", "db_status": db_status}