Server IP : 172.67.216.182 / Your IP : 172.68.164.90 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/databaseModelV2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hezhihong <[email protected]> # ------------------------------------------------------------------- # postgresql模型 import json import os import re import time import warnings from typing import Tuple, Union import public from databaseModelV2.base import databaseBase from public import Param, validate try: from BTPanel import session except: pass try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: pass warnings.filterwarnings("ignore", category=SyntaxWarning) """ 2025/6/7 同步国内 """ class panelPgsql: _CONFIG_PATH = os.path.join(public.get_setup_path(), "pgsql/data/postgresql.conf") def __init__(self): self.check_package() self.__CONN_KWARGS = { "host": "localhost", "port": 5432, "user": "postgres", "password": None, "database": None, "connect_timeout": 3, } self.__DB_CONN = None self.__DB_CUR = None # 检查python包是否存在 @classmethod def check_package(cls): """ @name检测依赖是否正常 """ try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: os.system('btpip install psycopg2-binary') try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: return False return True def _socket(self) -> str: # 临时 s_path = os.path.join(public.get_setup_path(), "pgsql/socket") if not os.path.exists(s_path): public.ExecShell(f"mkdir -p {s_path}") public.ExecShell(f"chown postgres:postgres {s_path}") public.ExecShell(f"chmod 700 {s_path}") public.ExecShell("/etc/init.d/pgsql restart") conf = public.readFile(self._CONFIG_PATH) if conf and "#unix_socket_directories" in conf: pattern = r'^\s*#\s*unix_socket_directories\b.*$' new_content = re.sub( pattern, f"unix_socket_directories = '{s_path}'", conf, flags=re.MULTILINE ) public.writeFile(self._CONFIG_PATH, new_content) public.ExecShell("/etc/init.d/pgsql restart") return "/www/server/pgsql/socket" # 连接PGSQL数据库 def connect(self) -> Tuple[bool, str]: if self.__CONN_KWARGS.get("password") is None: if self.__CONN_KWARGS["host"] in ["localhost", "127.0.0.1"] and os.path.exists("/www/server/pgsql/data"): self.__CONN_KWARGS["port"] = self.get_config_options("port", int, 5432) tmp_args = public.dict_obj() tmp_args.is_True = True self.__CONN_KWARGS["password"] = main().get_root_pwd(tmp_args)["message"].get("result") try: self.__DB_CONN = psycopg2.connect(**self.__CONN_KWARGS) self.__DB_CONN.autocommit = True self.__DB_CONN.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # <-- ADD THIS LINE self.__DB_CUR = self.__DB_CONN.cursor() return True, public.lang("normal") except Exception as e: public.print_log(f"connect pgsql error: {e}") err_msg = public.get_error_info() return False, public.lang(err_msg) # 设置连接参数 def set_host(self, *args, **kwargs): """ 设置连接参数 """ # args 兼容老版本,后续新增禁止使用 args if len(args) >= 5: kwargs["host"] = args[0] kwargs["port"] = args[1] kwargs["database"] = args[2] kwargs["user"] = args[3] kwargs["password"] = args[4] if kwargs.get("db_host") is not None: kwargs["host"] = kwargs.get("db_host") if kwargs.get("db_port") is not None: kwargs["port"] = kwargs.get("db_port") if kwargs.get("db_name") is not None: kwargs["database"] = kwargs.get("db_name") if kwargs.get("db_user") is not None: kwargs["user"] = kwargs.get("db_user") if kwargs.get("db_password") is not None: kwargs["password"] = kwargs.get("db_password") self.__CONN_KWARGS.update(kwargs) if not isinstance(self.__CONN_KWARGS["port"], int): self.__CONN_KWARGS["port"] = int(self.__CONN_KWARGS["port"]) return self def execute(self, sql): # 执行SQL语句返回受影响行 if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return err_msg if self.__DB_CONN.closed or self.__DB_CUR.closed: # 判断是否关闭,关闭重新连接 status, err_msg = self.connect() if status is False: return err_msg try: result = self.__DB_CUR.execute(sql) self.__DB_CONN.commit() self.__Close() return result except Exception as ex: return ex def query(self, sql): # 执行SQL语句返回数据集 if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return err_msg if self.__DB_CONN.closed or self.__DB_CUR.closed: # 判断是否关闭,关闭重新连接 status, err_msg = self.connect() if status is False: return err_msg try: self.__DB_CUR.execute(sql) result = self.__DB_CUR.fetchall() data = list(map(list, result)) self.__Close() return data except Exception as ex: return ex # 关闭连接 def __Close(self): self.__DB_CUR.close() self.__DB_CONN.close() # 获取未注释的配置文件参数 @classmethod def get_config_options(cls, name: str, value_type: type, default=None): """ 获取未注释的配置文件参数 name: 参数名称 value_type: 参数类型 """ conf_data = public.readFile(cls._CONFIG_PATH) if not conf_data: public.ExecShell("/www/server/pgsql/data/postgresql.conf /www/server/pgsql/data/postgresql.bak") public.ExecShell( "wget -O /www/server/pgsql/data/postgresql.conf https://node.aapanel.com/conf/postgresql.conf;chmod 600 /www/server/pgsql/data/postgresql.conf;chown postgres:postgres /www/server/pgsql/data/postgresql.conf" ) time.sleep(2) conf_data = public.readFile(cls._CONFIG_PATH) if conf_data is None or not isinstance(conf_data, str): # 文件不存在或读取出错时,设置conf_data为默认空字符串 conf_data = "" elif not conf_data.endswith("\n"): conf_data += "\n" re_type_dict = { str: "([^\n]*)", int: "(\d+)", bool: "(on|off)", } re_str = re_type_dict.get(value_type, "([^\n]*)") conf_obj = re.search(r"\n{}\s*=\s*{}".format(name, re_str), conf_data) if conf_obj: value = conf_obj.group(1) value = value.strip() if value_type == bool: value = value == "on" else: value = value_type(value) return value if default is not None: if isinstance(default, value_type): return default elif value_type == bool: default = default == "on" else: default = value_type(default) return default return None class main(databaseBase, panelPgsql): _DB_BACKUP_DIR = os.path.join(public.M("config").where("id=?", (1,)).getField("backup_path"), "database") _PGSQL_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "pgsql") _PGDUMP_BIN = os.path.join(public.get_setup_path(), "pgsql/bin/pg_dump") _PSQL_BIN = os.path.join(public.get_setup_path(), "pgsql/bin/psql") __ser_name = None __soft_path = '/www/server/pgsql' __setup_path = '/www/server/panel/' __dbuser_info_path = "{}plugin/pgsql_manager_dbuser_info.json".format(__setup_path) __plugin_path = "{}plugin/pgsql_manager/".format(__setup_path) def __init__(self): super().__init__() if not os.path.exists(self._PGSQL_BACKUP_DIR): os.makedirs(self._PGSQL_BACKUP_DIR, exist_ok=True) s_path = public.get_setup_path() v_info = public.readFile("{}/pgsql/version.pl".format(s_path)) if v_info: ver = v_info.split('.')[0] self.__ser_name = 'postgresql-x64-{}'.format(ver) self.__soft_path = '{}/pgsql/{}'.format(s_path) # 获取配置项 def get_options(self, get): data = {} options = ['port', 'listen_addresses'] if not self.__soft_path: self.__soft_path = '{}/pgsql'.format(public.get_setup_path()) conf = public.readFile('{}/data/postgresql.conf'.format(self.__soft_path)) for opt in options: tmp = re.findall(r"\s+" + opt + r"\s*=\s*(.+)#", conf) if not tmp: continue data[opt] = tmp[0].strip() if opt == 'listen_addresses': data[opt] = data[opt].replace('\'', '') data['password'] = self.get_root_pwd(None)['msg'] return public.return_message(0, 0, data) def get_list(self, args): """ @获取数据库列表 @sql_type = pgsql """ # 获取基础数据库列表 base_list = self.get_base_list(args, sql_type='pgsql') # 检查文件是否存在 if os.path.exists('/www/server/pgsql/data/pg_hba.conf'): # 读取 pg_hba.conf 文件 with open('/www/server/pgsql/data/pg_hba.conf', 'r') as file: lines = file.readlines() # 遍历数据库列表 for db in base_list['data']: # 遍历 pg_hba.conf 文件的每一行 for line in lines: # 如果这一行精确匹配数据库名 if re.search(r'\b' + db['name'] + r'\b', line): # 提取访问权限 listen_ip = line.split()[-2] # 更新数据库的访问权限 db.update({'listen_ip': listen_ip}) break return public.return_message(0, 0, base_list) def get_sql_obj_by_sid(self, sid: Union[int, str] = 0, conn_config=None): """ @取pgsql数据库对像 By sid @sid 数据库分类,0:本地 """ if isinstance(sid, str): sid = int(sid) if sid: if not conn_config: conn_config = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('pgsql')", sid).find() db_obj = panelPgsql() try: db_obj = db_obj.set_host( host=conn_config["db_host"], port=conn_config["db_port"], database=None, user=conn_config["db_user"], password=conn_config["db_password"] ) except Exception as e: raise public.PanelError(e) else: db_obj = panelPgsql() return db_obj def get_sql_obj(self, db_name): """ @取pgsql数据库对象 @db_name 数据库名称 """ is_cloud_db = False if db_name: db_find = public.M('databases').where("name=? AND LOWER(type)=LOWER('PgSql')", db_name).find() if db_find['sid']: return self.get_sql_obj_by_sid(db_find['sid']) is_cloud_db = db_find['db_type'] in ['1', 1] if is_cloud_db: db_obj = panelPgsql() conn_config = json.loads(db_find['conn_config']) try: db_obj = db_obj.set_host(host=conn_config["db_host"], port=conn_config["db_port"], database=conn_config["db_name"], user=conn_config["db_user"], password=conn_config["db_password"]) except Exception as e: raise public.PanelError(e) else: db_obj = panelPgsql() return db_obj def GetCloudServer(self, args): """ @name 获取远程服务器列表 @author hwliang<2021-01-10> @return list """ check_result = os.system('/www/server/pgsql/bin/psql --version') if check_result != 0 and not public.M('database_servers').where("LOWER(db_type)=LOWER('pgsql')", ()).count(): return public.return_message(0, 0, []) return public.return_message(0, 0, self.GetBaseCloudServer(args)) def AddCloudServer(self, args): ''' @添加远程数据库 ''' return self.AddBaseCloudServer(args) def RemoveCloudServer(self, args): ''' @删除远程数据库 ''' return self.RemoveBaseCloudServer(args) def ModifyCloudServer(self, args): ''' @修改远程数据库 ''' return self.ModifyBaseCloudServer(args) def AddDatabase(self, args): """ @添加数据库 """ try: args.validate([ Param('name').Require().String(), Param('sid').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) db_name = args.name sid = int(args.sid) if re.search(r"\W", db_name): return public.fail_v2(public.lang("The database name cannot contain special characters, please reset it.")) dtype = "pgsql" res = self.add_base_database(args, dtype) if not res['status']: return public.fail_v2(res.get("msg")) data_name = res['data_name'] username = res['username'] password = res['data_pwd'] listen_ip = args.get("listen_ip", "127.0.0.1/32") # 添加对 listen_ip 的格式检查 if not re.match(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}\b", listen_ip): return public.fail_v2(public.lang("The ip format is incorrect!")) pgsql_obj = self.get_sql_obj_by_sid(sid) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2("Failed to connect to the database!") result = pgsql_obj.execute("""CREATE DATABASE "{}";""".format(data_name)) isError = self.IsSqlError(result) if isError is not None: return isError if str(result).find("permission denied to create database") != -1: return public.fail_v2("Create database permission denied!") # 添加用户 self.__CreateUsers(sid, data_name, username, password, '127.0.0.1') if not hasattr(args, 'ps'): args['ps'] = public.getMsg('INPUT_PS') addTime = time.strftime('%Y-%m-%d %X', time.localtime()) pid = 0 if hasattr(args, 'pid'): pid = args.pid if hasattr(args, 'contact'): site = public.M('sites').where("id=?", (args.contact,)).field('id,name').find() if site: pid = int(args.contact) args['ps'] = site['name'] db_type = 0 if sid: db_type = 2 public.set_module_logs('pgsql', 'AddDatabase', 1) # 添加入SQLITE public.M('databases').add('pid,sid,db_type,name,username,password,accept,ps,addtime,type', ( pid, sid, db_type, data_name, username, password, '127.0.0.1', args['ps'], addTime, dtype)) public.WriteLog("TYPE_DATABASE", 'DATABASE_ADD_SUCCESS', (data_name,)) # 添加访问权限 config_file_path = self.get_data_directory(args)['data'] + "/pg_hba.conf" public.WriteFile( config_file_path.strip(), "\nhost {} {} {} md5".format(data_name, username, listen_ip), mode='a' ) public.ExecShell("/etc/init.d/pgsql reload") return public.success_v2(public.lang('ADD_SUCCESS')) def get_data_directory(self, args): # 获取储存路径 if os.path.isfile("/www/server/pgsql/data_directory"): data_directory = public.ReadFile("/www/server/pgsql/data_directory", mode='r') else: data_directory = "/www/server/pgsql/data" # 返回数据到前端 return {'data': data_directory.strip(), "status": True} def modify_pgsql_listen_ip(self, args): # 修改pgsql用户访问权限 try: args.validate([ Param('username').Require().String(), Param('listen_ip').Require().String(), Param('data_name').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: if 'p' not in args: args.p = 1 if 'rows' not in args: args.rows = 12 if 'callback' not in args: args.callback = '' args.p = int(args.p) args.rows = int(args.rows) username = args.username listen_ip = args.listen_ip database = args.data_name if not re.match(r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}/\d+", listen_ip.strip()): return public.fail_v2(public.lang("you entered is not legal, the modification failed")) pgsql_conf_path = '/www/server/pgsql/data/postgresql.conf' pgsql_conf = public.ReadFile(pgsql_conf_path) pgsql_conf = re.sub(r"(#?\s*listen_addresses\s*=\s*)'.*'", r"listen_addresses = '*'", pgsql_conf) public.WriteFile(pgsql_conf_path, pgsql_conf) config_file_path = os.path.join(self.get_data_directory(args)['data'], "pg_hba.conf").strip() con_str = public.ReadFile(config_file_path).strip() + '\n' head_index = '# PostgreSQL Client Authentication Configuration File' tmp_1 = re.search(head_index + r"(\n|.)+" + head_index, con_str) if tmp_1: con_str = con_str.replace(tmp_1.group(), head_index) tmp = re.search(r"(host\s+" + database + r"\s+" + username + r"\s+[\d./]+\s+\w+)", con_str) host_str = "host {} {} {} md5\n".format(database, username, listen_ip) if tmp: sub_str = tmp.group() con_str = con_str.replace(sub_str, host_str).strip() else: con_str = con_str.strip() con_str += "\n" + host_str public.WriteFile(config_file_path, con_str.replace('\n\nhost', '\nhost').strip() + '\n') public.ExecShell("/etc/init.d/pgsql reload") return public.success_v2(public.lang("Modify pgsql user access rights successfully")) except Exception as e: return public.fail_v2(str(e)) def DeleteDatabase(self, get): """ @删除数据库 """ try: get.validate([ Param('id').Require().Integer(), Param('name').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) id = get['id'] find = public.M('databases').where("id=? AND LOWER(type)=LOWER('PgSql')", (id,)).field( 'id,pid,name,username,password,accept,ps,addtime,db_type,conn_config,sid,type' ).find() if not find: return public.return_message(-1, 0, public.lang("The specified database does not exist.")) name = get['name'] username = find['username'] pgsql_obj = self.get_sql_obj_by_sid(find['sid']) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2("Failed to connect to the database!") # 查询当前数据是否被连接 data_list = pgsql_obj.query( """SELECT * FROM pg_stat_activity WHERE datname = '{}';""".format(name) ) if data_list: return public.fail_v2(public.lang("Deletion failed!The current database is being connected!")) resp = pgsql_obj.execute("""DROP DATABASE "{}";""".format(name)) if resp is not None: return public.fail_v2(public.lang("Failed to delete the database!")) pgsql_obj.execute("""DROP USER "{}";""".format(username)) # 删除SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('PgSql')", (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_DEL_SUCCESS', (name,)) return public.return_message(0, 0, public.lang("Delete successfully!")) def ToBackup(self, args): """ @备份数据库 id 数据库id """ try: args.validate([ Param('id').Require().Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if not os.path.exists(self._PGDUMP_BIN): return public.fail_v2(public.lang( 'Lack of backup tools, please go through the software store pgsql manager first!' )) db_id = args.id table_list = getattr(args, "table_list", []) storage_type = getattr(args, "storage_type", "db") # 默认备份db if storage_type not in ["db", "table"]: return public.fail_v2("Parameter error! storage_type") db_find = public.M('databases').where("id=? AND LOWER(type)=LOWER('pgsql')", (db_id,)).find() if not db_find: return public.fail_v2(public.lang('The database does not exist!')) if not db_find["password"].strip(): return public.fail_v2(public.lang('The database password is empty, please set it first.')) db_name = db_find["name"] db_user = "postgres" db_host = "127.0.0.1" if db_find["db_type"] == 0: db_port = panelPgsql.get_config_options("port", int, 5432) t_path = os.path.join(public.get_panel_path(), "data/postgresAS.json") if not os.path.isfile(t_path): return public.fail_v2(public.lang("Please set the administrator password first!")) db_password = json.loads(public.readFile(t_path)).get("password", "") if not db_password: return public.fail_v2( public.lang("The database password is empty!Please set the database password first!")) elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where( "id=? AND LOWER(db_type)=LOWER('pgsql')", db_find["sid"] ).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] else: return public.fail_v2(public.lang("Unknown database type")) pgsql_obj = panelPgsql().set_host(host=db_host, port=db_port, database=None, user=db_user, password=db_password) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2("Failed to connect to database [{}:{}].".format(db_host, int(db_port))) db_backup_dir = os.path.join(self._PGSQL_BACKUP_DIR, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{backup_time}_pgsql_data".format(db_name=db_name, backup_time=time.strftime( "%Y-%m-%d_%H-%M-%S", time.localtime() )) shell = "'{pgdump_bin}' --host='{db_host}' --port={db_port} --username='{db_user}' --dbname='{db_name}' --clean".format( pgdump_bin=self._PGDUMP_BIN, db_host=db_host, db_port=int(db_port), db_user=db_user, db_name=db_name, ) backup_ps = "Manual Backup" if storage_type == "db": # 导出单个文件 file_name = file_name + ".sql.gz" backup_path = os.path.join(db_backup_dir, file_name) table_shell = "" if len(table_list) != 0: backup_ps += "-Merge Export" table_shell = "--table='" + "' --table='".join(table_list) + "'" shell += " {table_shell} | gzip > '{backup_path}'".format(table_shell=table_shell, backup_path=backup_path) public.ExecShell(shell, env={"PGPASSWORD": db_password}) else: # 按表导出 backup_ps += "-Export by Table" export_dir = os.path.join(db_backup_dir, file_name) if not os.path.isdir(export_dir): os.makedirs(export_dir) for table_name in table_list: tb_backup_path = os.path.join(export_dir, "{table_name}.sql".format(table_name=table_name)) tb_shell = shell + " --table='{table_name}' > '{tb_backup_path}'".format(table_name=table_name, tb_backup_path=tb_backup_path) public.ExecShell(tb_shell, env={"PGPASSWORD": db_password}) backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell( "cd '{backup_dir}' && zip -m '{backup_path}' -r '{file_name}'".format(backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name)) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}".format(export_dir)) if not os.path.exists(backup_path): return public.fail_v2(public.lang('Database backup failed, export file does not exist!')) addTime = time.strftime('%Y-%m-%d %X', time.localtime()) backup_size = os.path.getsize(backup_path) public.M('backup').add( 'type,name,pid,filename,size,addtime,ps', (1, os.path.basename(backup_path), db_id, backup_path, backup_size, addTime, backup_ps) ) public.WriteLog("TYPE_DATABASE", "DATABASE_BACKUP_SUCCESS", (db_find['name'],)) if backup_size < 2048: return public.success_v2(public.lang('Backup execution was successful, ' 'the backup file is smaller than 2Kb, ' 'please check the backup integrity.')) return public.return_message(0, 0, public.lang("Backup Succeeded!")) # 导入数据库备份 def InputSql(self, get): """ 导入数据库备份 """ try: get.validate([ Param('name').Require().String(), Param('file').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if not os.path.exists(self._PSQL_BIN): return public.fail_v2(public.lang("Lack of recovery tools, " "please install pgsql first via Software Manager!")) db_name = get.name file = get.file if not os.path.exists(file): return public.fail_v2(public.lang("Import path does not exist!")) if not os.path.isfile(file): return public.fail_v2(public.lang("Importing zip files is only supported!")) file_name = os.path.basename(file) ext_list = ["sql", "tar.gz", "gz", "zip"] ext_tmp = file_name.split(".") file_ext = ".".join(ext_tmp[1:]) ext_temp = [ ext.lower() for ext in ext_list if ext.lower() in file_ext ] if len(ext_temp) == 0: return public.fail_v2(public.lang("Please choose sql, tar.gz, gz, zip file format!")) db_find = public.M('databases').where("name=? AND LOWER(type)=LOWER('PgSql')", (db_name,)).find() if not db_find: return public.fail_v2(public.lang('The database does not exist!')) if not db_find["password"].strip(): return public.fail_v2(public.lang('The database password is empty, please set it first.')) # 解压 input_dir = os.path.join( self._PGSQL_BACKUP_DIR, db_name, "input_tmp_{}".format(int(time.time() * 1000_000)) ) is_zip = False if "zip" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("unzip -o '{file}' -d '{input_dir}'".format(file=file, input_dir=input_dir)) is_zip = True elif "tar.gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("tar zxf '{file}' -C '{input_dir}'".format(file=file, input_dir=input_dir)) is_zip = True elif "gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) temp_file = os.path.join(input_dir, file_name) public.ExecShell( "cp '{file}' '{temp_file}' && gunzip -q '{temp_file}'".format(file=file, temp_file=temp_file)) is_zip = True input_path_list = [] if is_zip is True: # 遍历临时目录 for name in os.listdir(input_dir): path = os.path.join(input_dir, name) if os.path.isfile(path) and str(path).endswith(".sql"): input_path_list.append(path) elif os.path.isdir(path): for t_name in os.listdir(path): t_path = os.path.join(path, t_name) if os.path.isfile(t_path) and str(t_path).endswith(".sql"): input_path_list.append(t_path) else: input_path_list.append(file) db_name = db_find["name"] db_user = "postgres" db_host = "127.0.0.1" if db_find["db_type"] == 0: args_one = public.dict_obj() db_port = self.get_port(args_one)["data"] t_path = os.path.join(public.get_panel_path(), "data/postgresAS.json") if not os.path.isfile(t_path): return public.fail_v2(public.lang("Please set the administrator password first!")) db_password = json.loads(public.readFile(t_path)).get("password", "") elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('pgsql')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] else: return public.fail_v2(public.lang("Unknown database type")) shell = "'{psql_bin}' --host='{db_host}' --port={db_port} --username='{db_user}' --dbname='{db_name}'".format( psql_bin=self._PSQL_BIN, db_host=db_host, db_port=int(db_port), db_user=db_user, db_name=db_name, ) for path in input_path_list: public.ExecShell("{shell} < '{path}'".format(shell=shell, path=path), env={"PGPASSWORD": db_password}) # 清理导入临时目录 if is_zip is True: public.ExecShell("rm -rf '{input_dir}'".format(input_dir=input_dir)) public.WriteLog("TYPE_DATABASE", '导入数据库[{}]成功'.format(db_name)) return public.success_v2(public.lang('DATABASE_INPUT_SUCCESS')) # 获取备份文件 def GetBackup(self, get): try: get.validate([ Param('p').Integer(), Param('limit').Integer(), Param('search').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) p = getattr(get, "p", 1) limit = getattr(get, "limit", 10) return_js = getattr(get, "return_js", "") search = getattr(get, "search", None) p = int(p) limit = int(limit) ext_list = ["sql", "tar.gz", "gz", "zip"] backup_list = [] # 递归获取备份文件 def get_dir_backup(backup_dir: str, backup_list: list, is_recursion: bool): for name in os.listdir(backup_dir): path = os.path.join(backup_dir, name) if os.path.isfile(path): ext = name.split(".")[-1] if ext.lower() not in ext_list: continue if search is not None and search not in name: continue stat_file = os.stat(path) path_data = { "name": name, "path": path, "size": stat_file.st_size, "mtime": int(stat_file.st_mtime), "ctime": int(stat_file.st_ctime), } backup_list.append(path_data) elif os.path.isdir(path) and is_recursion is True: get_dir_backup(path, backup_list, is_recursion) get_dir_backup(self._PGSQL_BACKUP_DIR, backup_list, True) get_dir_backup(self._DB_BACKUP_DIR, backup_list, False) try: from flask import request uri = public.url_encode(request.full_path) except: uri = '' # 包含分页类 import page # 实例化分页类 page = page.Page() info = { "p": p, "count": len(backup_list), "row": limit, "return_js": return_js, "uri": uri, } page_info = page.GetPage(info) start_idx = (int(p) - 1) * limit end_idx = p * limit if p * limit < len(backup_list) else len(backup_list) backup_list.sort(key=lambda data: data["mtime"], reverse=True) backup_list = backup_list[start_idx:end_idx] return public.success_v2({"data": backup_list, "page": page_info}) def DelBackup(self, args): """ @删除备份文件 """ return self.delete_base_backup(args) def get_port(self, args): # 获取端口号 str_shell = '''netstat -luntp|grep postgres|head -1|awk '{print $4}'|awk -F: '{print $NF}' ''' try: port = public.ExecShell(str_shell)[0] if port.strip(): return {'data': port.strip(), "status": True} else: return {'data': 5432, "status": False} except: return {'data': 5432, "status": False} def SyncToDatabases(self, get): """ @name同步数据库到服务器 """ tmp_type = int(get['type']) n = 0 sql = public.M('databases') if tmp_type == 0: data = sql.field( 'id,name,username,password,accept,type,sid,db_type' ).where("LOWER(type)=LOWER('PgSql')", ()).select() for value in data: if value['db_type'] in ['1', 1]: continue # 跳过远程数据库 result = self.ToDataBase(value) if result == 1: n += 1 else: import json data = json.loads(get.ids) for value in data: find = sql.where("id=?", (value,)).field('id,name,username,password,sid,db_type,accept,type').find() result = self.ToDataBase(find) if result == 1: n += 1 if n == 1: return public.return_message(0, 0, public.lang("Synchronization succeeded")) elif n == 0: return public.return_message(-1, 0, public.lang("Sync failed")) return public.return_message(0, 0, public.lang("Database sync success {}", n)) def ToDataBase(self, find): """ @name 添加到服务器 """ if find['username'] == 'bt_default': return 0 if len(find['password']) < 3: find['username'] = find['name'] find['password'] = public.md5(str(time.time()) + find['name'])[0:10] public.M('databases').where( "id=? AND LOWER(type)=LOWER('PgSql')", (find['id'],) ).save('password,username', (find['password'], find['username'])) sid = find['sid'] pgsql_obj = self.get_sql_obj_by_sid(sid) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2(public.lang("Failed to connect to the database!")) result = pgsql_obj.execute("""CREATE DATABASE "{}";""".format(find['name'])) isError = self.IsSqlError(result) if (isError is not None and isError['status'] == False and isError['msg'] == 'The specified database already exists,' ' please do not add it repeatedly.'): return 1 self.__CreateUsers(sid, find['name'], find['username'], find['password'], '127.0.0.1') return 1 def SyncGetDatabases(self, get): """ @name 从服务器获取数据库 @param sid 0为本地数据库 1为远程数据库 """ n = 0 s = 0 db_type = 0 sid = get.get('sid/d', 0) if sid: db_type = 2 pgsql_obj = self.get_sql_obj_by_sid(sid) status, err_msg = pgsql_obj.connect() if status is False: return public.returnMsg(False, "连接数据库失败!") data = pgsql_obj.query('SELECT datname FROM pg_database;') # select * from pg_database order by datname isError = self.IsSqlError(data) if isError is not None: return public.fail_v2(isError) if type(data) == str: return public.fail_v2(data) sql = public.M('databases') nameArr = ['information_schema', 'postgres', 'template1', 'template0', 'performance_schema', 'mysql', 'sys', 'master', 'model', 'msdb', 'tempdb', 'ReportServerTempDB', 'YueMiao', 'ReportServer'] for item in data: dbname = item[0] if dbname in nameArr: continue if sql.where("name=? AND LOWER(type)=LOWER('PgSql')", (dbname,)).count(): continue if public.M('databases').add( 'name,username,password,accept,ps,addtime,type,sid,db_type', (dbname, dbname, "", "", public.getMsg('INPUT_PS'), time.strftime('%Y-%m-%d %X', time.localtime()), 'PgSql', sid, db_type) ): n += 1 return public.return_message(0, 0, public.lang("Database success {}", n)) def ResDatabasePassword(self, args): """ @修改用户密码 """ id = args['id'] username = args['name'].strip() newpassword = public.trim(args['password']) if not newpassword: return public.return_message(-1, 0, public.lang("The database password cannot be empty")) find = public.M('databases').where( "id=? AND LOWER(type)=LOWER('PgSql')", (id,) ).field('id,pid,name,username,password,type,accept,ps,addtime,sid').find() if not find: return public.return_message(-1, 0, public.lang("Modify the failure,The specified database does not exist")) pgsql_obj = self.get_sql_obj_by_sid(find['sid']) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2(public.lang("Failed to connect to the database!")) data = pgsql_obj.query('SELECT rolname FROM pg_roles;') if username not in data: # 添加用户 result = self.__CreateUsers(find['sid'], username, username, newpassword, "127.0.0.0.1") else: result = pgsql_obj.execute("""ALTER USER "{}" with password '{}';""".format(username, newpassword)) isError = self.IsSqlError(result) if isError is not None: return isError # 修改SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('PgSql')", (id,)).setField('password', newpassword) public.WriteLog("TYPE_DATABASE", 'DATABASE_PASS_SUCCESS', (find['name'],)) return public.return_message(0, 0, public.lang("Database password success {}", find['name'], )) def get_root_pwd(self, args): """ @获取sa密码 """ # check_result = os.system('/www/server/pgsql/bin/psql --version') if not os.path.exists("/www/server/pgsql/bin/psql"): return public.fail_v2(public.lang("If PgSQL is not installed or started, install or start it first")) password = '' path = '{}/data/postgresAS.json'.format(public.get_panel_path()) if os.path.isfile(path): try: password = json.loads(public.readFile(path))['password'] except: pass if 'is_True' in args and args.is_True: return public.return_message(-1, 0, password) return public.return_message(0, 0, password) def set_root_pwd(self, args): """ @设置sa密码 """ password = public.trim(args['password']) if len(password) < 8: return public.fail_v2(public.lang("The password must not be less than 8 digits.")) check_result = os.system('/www/server/pgsql/bin/psql --version') if check_result != 0: return public.fail_v2(public.lang("If PgSQL is not installed or started, install or start it first")) pgsql_obj = self.get_sql_obj_by_sid('0') status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2(public.lang("connect fail")) data = pgsql_obj.query('SELECT datname FROM pg_database;') isError = self.IsSqlError(data) if isError is not None: return isError path = '{}/data/pg_hba.conf'.format(self.__soft_path) p_path = '{}/data/postgresAS.json'.format(public.get_panel_path()) if not os.path.isfile(path): return public.return_message(-1, 0, public.lang( "{}File does not exist, please check the installation is complete!", path)) src_conf = public.readFile(path) add_conf = src_conf.replace('md5', 'trust') public.writeFile(path, add_conf) pg_obj = panelPgsql() pg_obj.execute("""ALTER USER "postgres" WITH PASSWORD '{}';""".format(password)) data = {"username": "postgres", "password": ""} try: data = json.loads(public.readFile(p_path)) except: pass data['password'] = password public.writeFile(p_path, json.dumps(data)) public.writeFile(path, src_conf) return public.return_message(0, 0, public.lang( "The administrator password is successfully changed. Procedure.") ) def get_info_by_db_id(self, db_id): """ @获取数据库连接详情 @db_id 数据库id """ find = public.M('databases').where("id=? AND LOWER(type)=LOWER('pgsql')", db_id).find() if not find: return False if find["db_type"] == 1: # 远程数据库 conn_config = json.loads(find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] elif find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('pgsql')", find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] else: # 本地数据库 db_host = '127.0.0.1' args_one = public.dict_obj() db_port = self.get_port(args_one) db_user = "postgres" t_path = os.path.join(public.get_panel_path(), "data/postgresAS.json") db_password = json.loads(public.readFile(t_path)).get("password", "") data = { 'db_name': find["name"], 'db_host': db_host, 'db_port': int(db_port), 'db_user': db_user, 'db_password': db_password, } return data def get_database_size_by_id(self, args): """ @获取数据库尺寸(批量删除验证) @args json/int 数据库id """ total = 0 db_id = args if not isinstance(args, int): db_id = args['db_id'] try: name = public.M('databases').where("id=? AND LOWER(type)=LOWER('PgSql')", db_id).getField('name') sql_obj = self.get_sql_obj(name) tables = sql_obj.query( "select name,size,type from sys.master_files where type=0 and name = '{}'".format(name) ) total = tables[0][1] if not total: total = 0 except: pass return total # todo 前端未使用新接口 def check_del_data(self, args): """ @删除数据库前置检测 """ return public.return_message(0, 0, self.check_base_del_data(args)) # 本地创建数据库 def __CreateUsers(self, sid, data_name, username, password, address): """ @创建数据库用户 """ sql_obj = self.get_sql_obj_by_sid(sid) sql_obj.execute("""CREATE USER "{}" WITH PASSWORD '{}';""".format(username, password)) sql_obj.execute("""GRANT ALL PRIVILEGES ON DATABASE "{}" TO "{}";""".format(data_name, username)) return True def __get_db_list(self, sql_obj): """ 获取pgsql数据库列表 """ data = [] ret = sql_obj.query('SELECT datname FROM pg_database;') if type(ret) == list: for x in ret: data.append(x[0]) return data def __new_password(self): """ 生成随机密码 """ import random import string # 生成随机密码 password = "".join(random.sample(string.ascii_letters + string.digits, 16)) return password def CheckDatabaseStatus(self, get): """ 数据库状态检测 """ try: get.validate([ Param('sid').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = int(get.sid) pgsql_obj = panelPgsql() if sid == 0: db_status, err_msg = pgsql_obj.connect() else: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('pgsql')", (sid,)).find() if not conn_config: db_status = False err_msg = public.lang("Remote database information does not exist!") else: pgsql_obj.set_host(host=conn_config["db_host"], port=conn_config["db_port"], database=None, user=conn_config["db_user"], password=conn_config["db_password"]) db_status, err_msg = pgsql_obj.connect() return {"status": True if db_status is True else False, "msg": "normalcy" if db_status is True else "exceptions", "db_status": db_status, "err_msg": err_msg} def check_cloud_database_status(self, conn_config): """ @检测远程数据库是否连接 @conn_config 远程数据库配置,包含host port pwd等信息 旧方法,添加数据库时调用 """ try: if conn_config.get("db_name"): conn_config["db_name"] = None pgsql_obj = panelPgsql().set_host(host=conn_config['db_host'], port=conn_config['db_port'], database=None, user=conn_config['db_user'], password=conn_config['db_password']) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2(public.lang("Remote database connection failed!")) return public.success_v2(status) except: return public.fail_v2(public.lang("Remote database connection failed!")) # 获取当前数据库信息 def GetInfo(self, get): db_name = get.db_name pgsql_obj = panelPgsql() db_find = public.M('databases').where("name=? AND LOWER(type)=LOWER('PgSql')", db_name).find() if not db_find: return public.fail_v2(public.lang('The database does not exist!')) if db_find['sid']: db_find = public.M('database_servers').where( "id=? AND LOWER(db_type)=LOWER('pgsql')", db_find['sid'] ).find() else: path = os.path.join(public.get_panel_path(), "data/postgresAS.json") if not os.path.isfile(path): return public.fail_v2(public.lang("Please set the administrator password first!")) db_find = { "db_host": "127.0.0.1", "db_port": panelPgsql.get_config_options("port", int, 5432), "db_user": "postgres", "db_password": json.loads(public.readFile(path))['password'], } pgsql_obj.set_host( host=db_find['db_host'], port=db_find['db_port'], database=db_name, user=db_find['db_user'], password=db_find['db_password'] ) status, err_msg = pgsql_obj.connect() if status is False: return public.fail_v2(public.lang("Failed to connect to the database!")) database_list = pgsql_obj.query(f"select datname from pg_database where datname='{db_name}';") database_list = [database[0] for database in database_list] if not database_list: return public.fail_v2(public.lang('The database does not exist!')) collation = pgsql_obj.query( f"select pg_encoding_to_char(encoding) from pg_database where datname='{db_name}';" )[0][0] result = { "database": db_name, "total_size": 0, "table_list": [], } temp_list = pgsql_obj.query(""" select schemaname, tablename from pg_tables where schemaname NOT IN ('pg_catalog', 'information_schema'); """) for table in temp_list: schema_name = table[0] table_name = table[1] full_table_name = f'"{schema_name}"."{table_name}"' # 注意加引号防止大小写或特殊字符问题 temp_data = pgsql_obj.query(f"select count(*) from {full_table_name};") if not isinstance(temp_data, list): continue if len(temp_data) == 0: continue rows_count = temp_data[0][0] data_size = pgsql_obj.query(f""" select pg_size_pretty(pg_table_size('{full_table_name}')) AS table_size, pg_size_pretty(pg_indexes_size('{full_table_name}')) AS indexes_size, pg_size_pretty(pg_total_relation_size('{full_table_name}')) AS total_size, pg_total_relation_size('{full_table_name}') as total_size_2; """) if not isinstance(data_size, list): continue if len(data_size) == 0: continue table_size = data_size[0][0] indexes_size = data_size[0][1] total_size = data_size[0][2] total_size_2 = data_size[0][3] table_temp = { "schema": schema_name, "table_name": table_name, "rows_count": rows_count, "table_size": table_size, "indexes_size": indexes_size, "total_size": total_size, } result["total_size"] += total_size_2 result["table_list"].append(table_temp) result["total_size"] = public.to_size(result["total_size"]) return public.success_v2(result)