Server IP : 104.21.38.3 / Your IP : 172.69.176.32 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/databaseModelV2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <[email protected]> # ------------------------------------------------------------------- # 角色说明: # read:允许用户读取指定数据库 # readWrite:允许用户读写指定数据库 # dbAdmin:允许用户在指定数据库中执行管理函数,如索引创建、删除,查看统计或访问system.profile # userAdmin:允许用户向system.users集合写入,可以找指定数据库里创建、删除和管理用户 # clusterAdmin:只在admin数据库中可用,赋予用户所有分片和复制集相关函数的管理权限。 # readAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读权限 # readWriteAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读写权限 # userAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的userAdmin权限 # dbAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的dbAdmin权限。 # root:只在admin数据库中可用。超级账号,超级权限 # mg模型 import json import os import re import time import warnings from typing import Tuple, Any, Union import yaml import public from databaseModelV2.base import databaseBase from public.validate import Param try: import pymongo except: public.ExecShell("btpip install pymongo") import pymongo try: from BTPanel import session except: pass warnings.filterwarnings("ignore", category=SyntaxWarning) """ 2025/6/6 同步国内 """ class panelMongoDB: DEFUALT_DB = ["admin", "config", "local"] CONFIG_PATH = os.path.join(public.get_setup_path(), "mongodb/config.conf") def __init__(self): self.check_package() self.__CONN_KWARGS = { "host": "localhost", "port": 27017, "username": None, "password": None, "socketTimeoutMS": 3000, # 套接字超时时间 "connectTimeoutMS": 3000, # 连接超时时间 "serverSelectionTimeoutMS": 3000, # 服务器选择超时时间 } self.__DB_CONN = None # 检查python包是否存在 @classmethod def check_package(cls): """ @name检测依赖是否正常 """ try: import pymongo except: public.ExecShell("btpip install pymongo") try: import pymongo except: return False return True # 连接MongoDB数据库 def connect(self) -> Tuple[bool, str]: auth = self.get_config_options("security", "authorization", "disabled") == "enabled" is_localhost = self.__CONN_KWARGS["host"] in ["localhost", "127.0.0.1"] # 本地连接自动补充 port username password if is_localhost: self.__CONN_KWARGS["port"] = self.get_config_options("net", "port", 27017) if auth: if self.__CONN_KWARGS.get("username") is None and auth: # 自动补充 username # noinspection PyTypedDict self.__CONN_KWARGS["username"] = "root" if self.__CONN_KWARGS.get("password") is None: # 自动补充 password mongodb_root_path = os.path.join(public.get_panel_path(), "data/mongo.root") if not os.path.exists(mongodb_root_path): return False, public.lang("Local login password is empty") self.__CONN_KWARGS["password"] = public.readFile(mongodb_root_path) if not isinstance(self.__CONN_KWARGS["port"], int): self.__CONN_KWARGS["port"] = int(self.__CONN_KWARGS["port"]) try: self.__DB_CONN = pymongo.MongoClient(**self.__CONN_KWARGS) self.__DB_CONN.admin.command({"listDatabases": 1}) return True, public.lang("normal") except Exception as err: err_msg = str(err) return False, public.lang(err_msg) # 设置连接参数 def set_host(self, *args, **kwargs): """ 设置连接参数 """ # args 兼容老版本,后续新增禁止使用 args if len(args) >= 5: kwargs["host"] = args[0] kwargs["port"] = args[1] kwargs["username"] = args[2] kwargs["password"] = args[3] if kwargs.get("db_host") is not None: kwargs["host"] = kwargs.get("db_host") if kwargs.get("db_port") is not None: kwargs["port"] = kwargs.get("db_port") if kwargs.get("db_user") is not None: kwargs["username"] = kwargs.get("db_user") if kwargs.get("db_password") is not None: kwargs["password"] = kwargs.get("db_password") self.__CONN_KWARGS.update(kwargs) if not isinstance(self.__CONN_KWARGS["port"], int): self.__CONN_KWARGS["port"] = int(self.__CONN_KWARGS["port"]) return self # 已弃用 def get_db_obj(self, db_name="admin"): if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return err_msg return self.__DB_CONN[db_name] # 新方法 def get_db_obj_new(self, db_name="admin") -> Tuple[bool, Any]: if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return status, "Failed to connect to database [{}:{}]! {}".format(self.__CONN_KWARGS["db_host"], self.__CONN_KWARGS["db_port"], err_msg) return True, self.__DB_CONN[db_name] # 获取配置文件 @classmethod def get_config(cls, name: str = None, default=None) -> dict: config_data = public.readFile(cls.CONFIG_PATH) try: config = yaml.safe_load(config_data) except: config = { "systemLog": { "destination": "file", "logAppend": True, "path": "/www/server/mongodb/log/config.log" }, "storage": { "dbPath": "/www/server/mongodb/data", "directoryPerDB": True, "journal": { "enabled": True } }, "processManagement": { "fork": True, "pidFilePath": "/www/server/mongodb/log/configsvr.pid" }, "net": { "port": 27017, "bindIp": "0.0.0.0" }, "security": { "authorization": "enabled", "javascriptEnabled": False } } if name is not None: config.get(name, default) return config # 获取未注释的配置文件参数 @classmethod def get_config_options(cls, key: str, name: str, default=None): config = cls.get_config() config_info = config.get(key) if config_info is None: return default return config_info.get(name, default) # 获取配置项 @classmethod def get_options(cls, *args, **kwargs): config_info = { "port": 27017, "bind_ip": "127.0.0.1", "logpath": "", "dbpath": "", "authorization": "disabled" } if not os.path.exists(cls.CONFIG_PATH): return config_info conf = public.readFile(cls.CONFIG_PATH) for opt in config_info.keys(): tmp = re.findall(opt + ":\s+(.+)", conf) if not tmp: continue config_info[opt] = tmp[0] # public.writeFile("/www/server/1.txt",json.dumps(data)) return config_info # 重启 mongodb 服务 @classmethod def restart_localhost_services(cls): """ @重启服务 """ public.ExecShell('/etc/init.d/mongodb restart') @classmethod def set_auth_open(cls, status): """ @设置数据库密码访问开关 @状态 status:1 开启,2:关闭 """ conf = public.readFile(cls.CONFIG_PATH) if conf: if status: conf = re.sub('authorization\s*:\s*disabled', 'authorization: enabled', conf) else: conf = re.sub('authorization\s*:\s*enabled', 'authorization: disabled', conf) public.writeFile(cls.CONFIG_PATH, conf) cls.restart_localhost_services() return True class main(databaseBase): _DB_BACKUP_DIR = os.path.join(public.M("config").where("id=?", (1,)).getField("backup_path"), "database") _MONGODB_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "mongodb") _MONGODBDUMP_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongodump") _MONGOEXPORT_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongoexport") _MONGORESTORE_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongorestore") _MONGOIMPORT_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongoimport") _MONGO_ROLE_DICT = { # 数据库用户角色 "read": "read", "readWrite": "readWrite", # 数据库管理角色 # "dbAdmin": "数据库管理员", "dbOwner": "dbOwner", "userAdmin": "userAdmin", # 集群管理角色 # "clusterAdmin": "集群管理员", # "clusterManager": "集群管理器", # "clusterMonitor": "集群监视器", # "hostManager": "主机管理员", # 备份和恢复角色 # "backup": "备份数据", # "restore": "还原数据", # 所有数据库角色 # "readAnyDatabase": "任意数据库读取", # "readWriteAnyDatabase": "任意数据库读取和写入", # "userAdminAnyDatabase": "任意数据库用户管理员", # "dbAdminAnyDatabase": "任意数据库管理员", # 超级用户角色 # "root": "超级管理员", # 内部角色 # "__queryableBackup": "可查询备份", # "__system": "系统角色", # "enableSharding": "启用分片", } def __init__(self): if not os.path.exists(self._MONGODB_BACKUP_DIR): os.makedirs(self._MONGODB_BACKUP_DIR, exist_ok=True) def get_list(self, get): """ @获取数据库列表 @sql_type = sqlserver """ # 校验参数 try: get.validate([ Param('table').Require().String(), Param('search').String(), Param('order').String(), Param('limit').Integer(), Param('p').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) search = '' if 'search' in get: search = get['search'] conditions = '' if '_' in search: cs = '' for i in search: if i == '_': cs += '/_' else: cs += i search = cs conditions = " escape '/'" SQL = public.M('databases') where = "lower(type) = lower('mongodb')" if search: where += "AND (name like '%{search}%' or ps like '%{search}%'{conditions})".format(search=search, conditions=conditions) if 'db_type' in get: where += " AND db_type='{}'".format(get['db_type']) if 'sid' in get: where += " AND sid='{}'".format(get['sid']) order = "id desc" if hasattr(get, 'order'): order = get.order info = {} rdata = {} info['p'] = 1 info['row'] = 20 result = '1,2,3,4,5,8' info['count'] = SQL.where(where, ()).count() if hasattr(get, 'limit'): info['row'] = int(get.limit) if hasattr(get, 'result'): result = get.result; if hasattr(get, 'p'): info['p'] = int(get['p']) import page # 实例化分页类 page = page.Page() info['uri'] = get info['return_js'] = '' if hasattr(get, 'tojs'): info['return_js'] = get.tojs rdata['where'] = where # 获取分页数据 rdata['page'] = page.GetPage(info, result) # 取出数据 rdata['data'] = SQL.where(where, ()).order(order).field( 'id,sid,pid,name,username,password,accept,ps,addtime,type,db_type,conn_config').limit( str(page.SHIFT) + ',' + str(page.ROW)).select() for sdata in rdata['data']: # 清除不存在的 backup_count = 0 backup_list = public.M('backup').where("pid=? AND type=1", (sdata['id'])).select() for backup in backup_list: if not os.path.exists(backup["filename"]): public.M('backup').where("id=? AND type=1", (backup['id'])).delete() continue backup_count += 1 sdata['backup_count'] = backup_count sdata['conn_config'] = json.loads(sdata['conn_config']) return public.success_v2(rdata) def GetCloudServer(self, get): """ @name 获取远程服务器列表 @author hwliang<2021-01-10> @return list """ where = '1=1' if 'type' in get: where = "db_type = '{}'".format(get['type']) data = public.M('database_servers').where(where, ()).select() if not isinstance(data, list): data = [] if get['type'] == 'mysql': bt_mysql_bin = public.get_mysql_info()['path'] + '/bin/mysql.exe' if os.path.exists(bt_mysql_bin): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 3306, 'db_user': 'root', 'db_password': '', 'ps': 'local server', 'addtime': 0, 'db_type': 'mysql'}) elif get['type'] == 'sqlserver': pass elif get['type'] == 'mongodb': if os.path.exists('/www/server/mongodb/bin'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 27017, 'db_user': 'root', 'db_password': '', 'ps': 'local server', 'addtime': 0, 'db_type': 'mongodb'}) elif get['type'] == 'redis': if os.path.exists('/www/server/redis'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 6379, 'db_user': 'root', 'db_password': '', 'ps': 'local server', 'addtime': 0, 'db_type': 'redis'}) elif get['type'] == 'pgsql': if os.path.exists('/www/server/pgsql'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 5432, 'db_user': 'postgres', 'db_password': '', 'ps': 'local server', 'addtime': 0, 'db_type': 'pgsql'}) return public.success_v2(data) def AddCloudServer(self, get): """ @name 添加远程服务器 @author hwliang<2021-01-10> @param db_host<string> 服务器地址 @param db_port<port> 数据库端口 @param db_user<string> 用户名 @param db_password<string> 数据库密码 @param db_ps<string> 数据库备注 @param type<string> 数据库类型,mysql/sqlserver/sqlite @return dict """ arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type'] if get.type == 'redis': arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type'] for key in arrs: if key not in get: return public.fail_v2(public.lang('Parameter passing error, missing parameter {}!'.format(key))) get['db_name'] = None mongodb_obj = panelMongoDB().set_host( host=get.get("db_host"), port=get.get("db_port"), username=get.get("db_user"), password=get.get("db_password") ) status, err_msg = mongodb_obj.connect() if status is False: return public.fail_v2(public.lang("Failed to connect to the database!")) if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count(): return public.fail_v2( public.lang('Specifies that the server already exists: [{}:{}]'.format(get['db_host'], get['db_port']))) get['db_port'] = int(get['db_port']) pdata = { 'db_host': get['db_host'], 'db_port': int(get['db_port']), 'db_user': get['db_user'], 'db_password': get['db_password'], 'db_type': get['type'], 'ps': public.xssencode2(get['db_ps'].strip()), 'addtime': int(time.time()) } result = public.M("database_servers").insert(pdata) if isinstance(result, int): public.WriteLog('Database management', 'Adding a Remote MongoDB Server[{}:{}]'.format(get['db_host'], get['db_port'])) return public.success_v2(public.lang('Add successfully!')) return public.fail_v2(public.lang('Add Failure: {}'.format(result))) def RemoveCloudServer(self, get): """ @删除远程数据库 """ id = int(get.id) if not id: return public.fail_v2(public.lang('Parameter passing error, please try again!')) db_find = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", (id,)).find() if not db_find: return public.fail_v2(public.lang('The specified remote server does not exist!')) public.M('databases').where('sid=?', id).delete() result = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('mongodb')", id).delete() if isinstance(result, int): public.WriteLog( 'Database management', 'Removing a Remote MonogoDB Server[{}:{}]'.format(db_find['db_host'], int(db_find['db_port'])) ) return public.success_v2(public.lang('Deleted successfully!')) return public.fail_v2(public.lang('Failed to delete: {}'.format(result))) def ModifyCloudServer(self, get): """ @name 修改远程服务器 @author hwliang<2021-01-10> @param id<int> 远程服务器ID @param db_host<string> 服务器地址 @param db_port<port> 数据库端口 @param db_user<string> 用户名 @param db_password<string> 数据库密码 @param db_ps<string> 数据库备注 @return dict """ arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type'] if get.type == 'redis': arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type'] for key in arrs: if key not in get: return public.returnMsg(False, 'Parameter passing error, missing parameter{}!'.format(key)) id = int(get.id) get['db_port'] = int(get['db_port']) db_find = public.M('database_servers').where('id=?', (id,)).find() if not db_find: return public.returnMsg(False, 'Specifies that the remote server does not exist!') _modify = False if db_find['db_host'] != get['db_host'] or db_find['db_port'] != get['db_port']: _modify = True if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count(): return public.returnMsg(False, 'Specifies that the server already exists: [{}:{}]'.format(get['db_host'], get['db_port'])) if db_find['db_user'] != get['db_user'] or db_find['db_password'] != get['db_password']: _modify = True _modify = True pdata = { 'db_host': get['db_host'], 'db_port': int(get['db_port']), 'db_user': get['db_user'], 'db_password': get['db_password'], 'db_type': get['type'], 'ps': public.xssencode2(get['db_ps'].strip()) } result = public.M("database_servers").where('id=?', (id,)).update(pdata) if isinstance(result, int): public.WriteLog('Database management', 'Modifying a Remote MySQL Server[{}:{}]'.format(get['db_host'], get['db_port'])) return public.returnMsg(True, 'Modified successfully!') return public.returnMsg(False, 'Modification Failure: {}'.format(result)) def set_auth_status(self, get): """ @设置密码认证状态 @status int 0:关闭,1:开启 """ if not public.process_exists("mongod"): return public.return_message(-1, 0, public.lang("Mongodb service has not been started yet!")) status = int(get.status) path = '{}/data/mongo.root'.format(public.get_panel_path()) if status: if hasattr(get, 'password'): password = get['password'].strip() if not password or not re.search(r"^[\w@.]+$", password): return public.return_message(-1, 0, public.lang( "Database password cannot be empty or have special characters!")) if re.search('[\u4e00-\u9fa5]', password): return public.return_message(-1, 0, public.lang( "Database password cannot be Chinese, please change the name!")) else: password = public.GetRandomString(16) panelMongoDB.set_auth_open(False) status, mongodb_obj = self.get_obj_by_sid(0) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new("admin") if status is False: return public.fail_v2(db_obj) try: db_obj.command("dropUser", "root") except: pass db_obj.command("createUser", "root", pwd=password, roles=[ {'role': 'root', 'db': 'admin'}, {'role': 'clusterAdmin', 'db': 'admin'}, {'role': 'readAnyDatabase', 'db': 'admin'}, {'role': 'readWriteAnyDatabase', 'db': 'admin'}, {'role': 'userAdminAnyDatabase', 'db': 'admin'}, {'role': 'dbAdminAnyDatabase', 'db': 'admin'}, {'role': 'userAdmin', 'db': 'admin'}, {'role': 'dbAdmin', 'db': 'admin'} ]) panelMongoDB.set_auth_open(True) public.writeFile(path, password) else: if os.path.exists(path): os.remove(path) panelMongoDB.set_auth_open(False) return public.return_message(0, 0, public.lang("Setup successfully!")) def get_obj_by_sid(self, sid=0, conn_config=None) -> Tuple[bool, Union[str, panelMongoDB]]: """ @取mssql数据库对像 By sid @sid 数据库分类,0:本地 """ if type(sid) == str: try: sid = int(sid) except: sid = 0 if sid: if not conn_config: conn_config = public.M('database_servers').where( "id=? AND LOWER(db_type)=LOWER('mongodb')", sid ).find() mongodb_obj = panelMongoDB().set_host( host=conn_config["db_host"], port=conn_config["db_port"], username=conn_config["db_user"], password=conn_config["db_password"] ) status, err_msg = mongodb_obj.connect() if status is False: return status, public.lang("Failed to connect to database [{}:{}].".format(conn_config["db_host"], int(conn_config["db_port"]))) return status, mongodb_obj else: mongodb_obj = panelMongoDB() status, err_msg = mongodb_obj.connect() if status is False: return status, public.lang("Connecting to database [localhost:27017] failed!{}".format(err_msg)) return status, mongodb_obj def AddDatabase(self, get): """ @添加数据库 """ # 校验参数 try: get.validate([ Param('sid').Require().Integer(), Param('name').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) sid = int(get.sid) if not int(get.sid) and not public.process_exists("mongod"): return public.fail_v2(public.lang("Mongodb service is not turned on yet!")) dtype = 'MongoDB' # username = '' password = '' auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" # auth为true时如果__DB_USER为空则将它赋值为 root,用于开启本地认证后数据库用户为空的情况 data_name = get.name.strip() if not data_name: return public.fail_v2(False, "The database name cannot be empty!") if auth_status: res = self.add_base_database(get, dtype) if not res['status']: return public.fail_v2(res.get("msg")) data_name = res['data_name'] username = res['username'] password = res['data_pwd'] else: username = data_name # 检查数据库名称是否含有非法字符 if any(char in data_name for char in '/\\. "$*<>:|?'): return public.fail_v2( public.lang("The database name cannot contain the following characters. /\\. \"$*<>:|?")) if ' ' in data_name: return public.fail_v2(public.lang('The database name contains spaces and cannot be added properly!')) sql = public.M('databases') if sql.where( "(username=?) AND LOWER(type)=LOWER('MongoDB')", (username,) ).count(): return public.fail_v2(public.lang('Database user already exists, please use another database name!')) status, mongodb_obj = self.get_obj_by_sid(get.sid) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(data_name) if status is False: return public.fail_v2(db_obj) if not hasattr(get, 'ps'): get['ps'] = public.getMsg('INPUT_PS') addTime = time.strftime('%Y-%m-%d %X', time.localtime()) pid = 0 if hasattr(get, 'pid'): pid = get.pid if hasattr(get, 'contact'): site = public.M('sites').where("id=?", (get.contact,)).field('id,name').find() if site: pid = int(get.contact) get['ps'] = site['name'] db_type = 0 if sid: db_type = 2 db_obj.chat.insert_one({}) if auth_status: db_obj.command( "createUser", username, pwd=password, roles=[{'role': 'dbOwner', 'db': data_name}, {'role': 'userAdmin', 'db': data_name}] ) public.set_module_logs('linux_mongodb', 'AddDatabase', 1) # 添加入SQLITE public.M('databases').add( 'pid,sid,db_type,name,username,password,accept,ps,addtime,type', (pid, sid, db_type, data_name, username, password, '127.0.0.1', get['ps'], addTime, dtype) ) public.WriteLog("TYPE_DATABASE", 'DATABASE_ADD_SUCCESS', (data_name,)) return public.success_v2(public.lang('ADD_SUCCESS')) def DeleteDatabase(self, get): """ @删除数据库 """ try: get.validate([ Param('id').Require().Integer(), Param('name').Require().String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) id = get['id'] find = public.M('databases').where( "id=? AND LOWER(type)=LOWER('MongoDB')", (id,) ).field('id,pid,name,username,password,type,accept,ps,addtime,sid,db_type').find() if not find: return public.fail_v2(public.lang('database does not exist.')) if not public.process_exists("mongod") and not int(find['sid']): return public.fail_v2(public.lang("Mongodb service is not yet turned on!")) name = get['name'] username = find['username'] if name == "admin": return public.fail_v2( public.lang('Deletion of the admin name database is prohibited due to Mongodb restrictions!')) status, mongodb_obj = self.get_obj_by_sid(find['sid']) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(name) if status is False: return public.fail_v2(db_obj) try: db_obj.command("dropUser", username) except: pass db_obj.command('dropDatabase') # 删除SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_DEL_SUCCESS', (name,)) return public.return_message(0, 0, public.lang("Successfully deleted!")) def get_info_by_db_id(self, db_id): """ @获取数据库连接详情 @db_id 数据库id """ find = public.M('databases').where("id=? AND LOWER(type)=LOWER('mongodb')", db_id).find() if not find: return False if find["db_type"] == 1: # 远程数据库 conn_config = json.loads(find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] elif find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] else: # 本地数据库 db_host = '127.0.0.1' db_port = panelMongoDB.get_config_options("net", "port", 27017) data = { 'db_name': find["name"], 'db_host': db_host, 'db_port': int(db_port), 'db_user': find['username'], 'db_password': find['password'], } return data # 备份数据库 def ToBackup(self, get): """ 备份数据库 """ if not os.path.exists(self._MONGODBDUMP_BIN): return public.fail_v2( public.lang("Lack of backup tools, please install MongoDB via Software Manager first!")) if not os.path.exists(self._MONGOEXPORT_BIN): return public.fail_v2( public.lang("Lack of backup tools, please install MongoDB via Software Manager first!")) if not hasattr(get, "id"): return public.fail_v2(public.lang("Missing parameter! id")) db_id = get.id file_type = getattr(get, "file_type", "bson") collection_list = getattr(get, "collection_list", []) field_list = getattr(get, "field_list", []) db_find = public.M("databases").where("id=? AND LOWER(type)=LOWER('mongodb')", (db_id,)).find() if not db_find: return public.fail_v2(public.lang("The database does not exist! {db_id}".format(db_id=db_id))) if not public.process_exists("mongod") and not int(db_find["sid"]): return public.fail_v2(public.lang("Mongodb service is not turned on yet!")) if file_type not in ["bson", "json", "csv"]: return public.fail_v2(public.lang("The bson json csv format is currently supported!")) if file_type == "csv" and len(field_list) == 0: return public.fail_v2(public.lang("You need to specify the export fields when exporting to csv format!")) db_name = db_find["name"] db_host = "127.0.0.1" db_user = db_find["username"] db_password = db_find["password"] conn_data = {} if db_find["db_type"] == 0: if panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled": if not db_password: return public.fail_v2( public.lang("The database password is empty!Please set the database password first!")) else: db_password = None db_port = panelMongoDB.get_config_options("net", "port", 27017) elif db_find["db_type"] == 1: if not db_password: return public.fail_v2( public.lang("The database password is empty!Please set the database password first!")) # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] elif db_find["db_type"] == 2: if not db_password: return public.fail_v2(public.lang("MongoDB has enabled security authentication, " "the database password cannot be empty, " "please set the password and try again!")) conn_config = public.M("database_servers").where( "id=? AND LOWER(db_type)=LOWER('mongodb')", db_find["sid"] ).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] else: return public.fail_v2(public.lang("Unknown database type")) mongodb_obj = panelMongoDB().set_host(**conn_data) status, err_msg = mongodb_obj.connect() if status is False: return public.fail_v2(public.lang("Failed to connect to database [{}:{}].".format(db_host, int(db_port)))) db_backup_dir = os.path.join(self._MONGODB_BACKUP_DIR, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{file_type}_{backup_time}_mongodb_data".format(db_name=db_name, file_type=file_type, backup_time=time.strftime( "%Y-%m-%d_%H-%M-%S", time.localtime())) export_dir = os.path.join(db_backup_dir, file_name) mongodump_shell = "'{mongodump_bin}' --host='{db_host}' --port={db_port} --db='{db_name}' --out='{out}'".format( mongodump_bin=self._MONGODBDUMP_BIN, db_host=db_host, db_port=int(db_port), db_name=db_name, out=export_dir, ) mongoexport_shell = "'{mongoexport_bin}' --host='{db_host}' --port={db_port} --db='{db_name}'".format( mongoexport_bin=self._MONGOEXPORT_BIN, db_host=db_host, db_port=int(db_port), db_name=db_name, ) if db_password is not None: # 本地未开启安全认证 mongodump_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote( str(db_password))) mongoexport_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote( str(db_password))) backup_ps = "Manual Backup" if file_type == "bson": if len(collection_list) == 0: public.ExecShell(mongodump_shell) else: backup_ps += "-bson" for collection_name in collection_list: shell = "{mongodump_shell} --collection='{collection}'".format( mongodump_shell=mongodump_shell, collection=collection_name ) public.ExecShell(shell) else: # 导出 json csv 格式 backup_ps += "-json" fields = None if file_type == "csv": # csv fields = "--fields='{}'".format(",".join(field_list)) for collection_name in collection_list: file_path = os.path.join(export_dir, "{collection_name}.{file_type}".format(collection_name=collection_name, file_type=file_type)) shell = "{mongoexport_shell} --collection='{collection}' --type='{type}' --out='{out}'".format( mongoexport_shell=mongoexport_shell, collection=collection_name, type=file_type, out=file_path, ) if fields is not None: shell += " --fields='{fields}'".format(fields=fields) public.ExecShell(shell) if not os.path.exists(export_dir): return public.fail_v2(public.lang("Database backup failed, export file does not exist!")) backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell("cd {backup_dir} && zip -m {backup_path} -r {file_name}".format( backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name) ) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}".format(export_dir)) return public.fail_v2(public.lang("Backup failed!")) backup_size = os.path.getsize(backup_path) public.M("backup").add("type,name,pid,filename,size,addtime,ps", ( 1, os.path.basename(backup_path), db_id, backup_path, backup_size, time.strftime("%Y-%m-%d %X", time.localtime()), backup_ps)) public.WriteLog("TYPE_DATABASE", "DATABASE_BACKUP_SUCCESS", (db_name,)) if backup_size < 1: return public.success_v2(public.lang( "Backup executed successfully, backup file is smaller than 1b, please check backup integrity." )) else: return public.success_v2(public.lang("BACKUP_SUCCESS")) # 导入 def InputSql(self, get): # 校验参数 try: get.validate([ Param('file').SafePath(), # 文件路径 Param('name').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if not os.path.exists(self._MONGORESTORE_BIN): return public.fail_v2( public.lang("Lack of backup tools, please install MongoDB via Software Manager first!")) if not os.path.exists(self._MONGOIMPORT_BIN): return public.fail_v2( public.lang("Lack of backup tools, please install MongoDB via Software Manager first!")) db_name = get.name file = get.file if not os.path.exists(file): return public.fail_v2(public.lang("Import path does not exist!")) if not os.path.isfile(file): return public.fail_v2(public.lang("Importing zip files is only supported!")) db_find = public.M("databases").where("name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,)).find() if not db_find: return public.fail_v2(public.lang("Database does not exist!")) if not public.process_exists("mongod") and not int(db_find["sid"]): return public.fail_v2(public.lang("Mongodb service is not yet turned on!")) file_name = os.path.basename(file) ext_list = ['json', 'csv', 'tar.gz', 'zip'] ext_tmp = file_name.split(".") file_ext = ".".join(ext_tmp[1:]) ext_temp = [ext.lower() for ext in ext_list if ext.lower() in file_ext] if len(ext_temp) == 0: return public.fail_v2("Please choose json, csv, tar.gz, zip file formats!") input_dir = os.path.join(self._MONGODB_BACKUP_DIR, db_name, "input_tmp_{}".format(int(time.time() * 1000_000))) is_zip = False if "zip" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("unzip -o '{file}' -d {input_dir}".format(file=file, input_dir=input_dir)) is_zip = True elif "tar.gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("tar zxf '{file}' -C {input_dir}".format(file=file, input_dir=input_dir)) is_zip = True elif "gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) temp_file = os.path.join(input_dir, file_name) public.ExecShell( "cp '{file}' '{temp_file}' && gunzip -q '{temp_file}'".format(file=file, temp_file=temp_file)) is_zip = True input_path_list = [] if is_zip is True: def get_input_path(input_dir: str, input_path_list: list): for name in os.listdir(input_dir): path = os.path.join(input_dir, name) if os.path.isfile(path) and (path.endswith(".json") or path.endswith(".csv")): input_path_list.append(path) elif os.path.isdir(path): is_bson = False for t_name in os.listdir(path): t_path = os.path.join(path, t_name) if os.path.isfile(t_path) and t_path.endswith(".bson"): input_path_list.append(path) is_bson = True break if is_bson is False: get_input_path(path, input_path_list) get_input_path(input_dir, input_path_list) else: input_path_list.append(file) # json,csv db_name = db_find["name"] db_host = "127.0.0.1" db_user = db_find["username"] db_password = db_find["password"] if db_find["db_type"] == 0: if panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled": if not db_password: return public.fail_v2(public.lang("MongoDB has enabled security authentication, " "the database password cannot be empty, please set the password and try again!")) else: db_password = None db_port = panelMongoDB.get_config_options("net", "port", 27017) elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] else: return public.fail_v2(public.lang("Unknown database type")) status, err_msg = panelMongoDB().connect() if status is False: return public.fail_v2(public.lang("Failed to connect to database [{}:{}].".format(db_host, int(db_port)))) mongorestore_shell = "'{mongorestore}' --host='{host}' --port={port} --db='{db_name}' --drop ".format( mongorestore=self._MONGORESTORE_BIN, host=db_host, port=int(db_port), db_name=db_name, ) mongoimport_shell = "'{mongoimport}' --host='{host}' --port={port} --db='{db_name}' --drop ".format( mongoimport=self._MONGOIMPORT_BIN, host=db_host, port=int(db_port), db_name=db_name, ) if db_password is not None: # 本地未开启安全认证 mongorestore_shell += " --username='{db_user}' --password='{db_password}'".format(db_user=db_user, db_password=db_password) mongoimport_shell += " --username='{db_user}' --password='{db_password}'".format(db_user=db_user, db_password=db_password) for path in input_path_list: if os.path.isdir(path): # bson public.ExecShell( "{mongorestore_shell} '{path}'".format(mongorestore_shell=mongorestore_shell, path=path)) elif os.path.isfile(path) and (path.endswith(".json") or path.endswith(".csv")): # json/csv fields = None if path.endswith(".csv"): # csv fp = open(path, "r") fields = fp.readline() fp.close() file_name = os.path.basename(path) collection = file_name.split(".")[0] file_type = file_name.split(".")[-1] shell = "{mongoimport_shell} --collection='{collection}' --file='{file}' --type='{type}'".format( mongoimport_shell=mongoimport_shell, collection=collection, file=path, type=file_type, ) if fields is not None: shell += " --fields='{fields}'".format(fields=fields) public.ExecShell(shell) # 清理导入临时目录 if is_zip is True: public.ExecShell("rm -rf {input_dir}".format(input_dir=input_dir)) public.WriteLog("TYPE_DATABASE", 'Importing the database[{}]successes'.format(db_name)) return public.success_v2(public.lang('DATABASE_INPUT_SUCCESS')) # 获取备份文件 def GetBackup(self, get): p = getattr(get, "p", 1) limit = getattr(get, "limit", 10) return_js = getattr(get, "return_js", "") search = getattr(get, "search", None) if not str(p).isdigit(): return public.fail_v2("Parameter error! p") if not str(limit).isdigit(): return public.fail_v2("Parameter error! limit") p = int(p) limit = int(limit) ext_list = ['json', 'csv', 'tar.gz', 'zip'] backup_list = [] # 递归获取备份文件 def get_dir_backup(backup_dir: str, backup_list: list, is_recursion: bool): for name in os.listdir(backup_dir): path = os.path.join(backup_dir, name) if os.path.isfile(path): ext = name.split(".")[-1] if ext.lower() not in ext_list: continue if search is not None and search not in name: continue stat_file = os.stat(path) path_data = { "name": name, "path": path, "size": stat_file.st_size, "mtime": int(stat_file.st_mtime), "ctime": int(stat_file.st_ctime), } backup_list.append(path_data) elif os.path.isdir(path) and is_recursion is True: get_dir_backup(path, backup_list, is_recursion) get_dir_backup(self._MONGODB_BACKUP_DIR, backup_list, True) get_dir_backup(self._DB_BACKUP_DIR, backup_list, False) try: from flask import request uri = public.url_encode(request.full_path) except: uri = '' # 包含分页类 import page # 实例化分页类 page = page.Page() info = { "p": p, "count": len(backup_list), "row": limit, "return_js": return_js, "uri": uri, } page_info = page.GetPage(info) start_idx = (int(p) - 1) * limit end_idx = p * limit if p * limit < len(backup_list) else len(backup_list) backup_list.sort(key=lambda data: data["mtime"], reverse=True) backup_list = backup_list[start_idx:end_idx] return {"status": True, "msg": "OK", "data": backup_list, "page": page_info} def DelBackup(self, get): """ @删除备份文件 """ try: name = '' id = get.id where = "id=?" filename = public.M('backup').where(where, (id,)).getField('filename') if os.path.exists(filename): os.remove(filename) # if filename == 'qiniu': # name = public.M('backup').where(where, (id,)).getField('name'); # # public.ExecShell(public.get_run_python("[PYTHON] " + public.GetConfigValue( # 'setup_path') + '/panel/script/backup_qiniu.py delete_file ' + name)) public.M('backup').where(where, (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_BACKUP_DEL_SUCCESS', (name, filename)) return public.success_v2(public.lang('DEL_SUCCESS')) except Exception as _: return public.fail_v2(public.lang("Failed to delete backup file!")) # 同步数据库到服务器 def SyncToDatabases(self, get): type = int(get['type']) n = 0 sql = public.M('databases') if type == 0: data = sql.field('id,name,username,password,accept,type,sid,db_type').where("LOWER(type)=LOWER('MongoDB')", ()).select() for value in data: if value['db_type'] in ['1', 1]: continue # 跳过远程数据库 result = self.ToDataBase(value) if result == 1: n += 1 else: import json data = json.loads(get.ids) for value in data: find = sql.where("id=?", (value,)).field('id,name,username,password,sid,db_type,accept,type').find() result = self.ToDataBase(find) if result == 1: n += 1 if n == 1: return public.return_message(0, 0, public.lang("Synchronization succeeded")) elif n == 0: return public.return_message(-1, 0, public.lang("No synchronized database")) return public.return_message(0, 0, public.lang("Database sync success {}", n)) # 添加到服务器 def ToDataBase(self, find): if find['username'] == 'bt_default': return public.return_message(0, 0, 0) if len(find['password']) < 3: find['username'] = find['name'] find['password'] = public.md5(str(time.time()) + find['name'])[0:10] public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (find['id'],)).save( 'password,username', (find['password'], find['username'])) try: sid = int(find['sid']) except: return public.return_message(-1, 0, public.lang("Database type sid needs int type!!")) if not public.process_exists("mongod") and not int(find['sid']): return public.return_message(-1, 0, public.lang("Mongodb service has not been started yet!!")) get = public.dict_obj() get.sid = sid auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" if auth_status: status, mongodb_obj = self.get_obj_by_sid(sid) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(find['name']) if status is False: return public.returnMsg(False, db_obj) try: db_obj.chat.insert_one({}) db_obj.command("dropUser", find['username']) except: pass try: db_obj.command( "createUser", find['username'], pwd=find['password'], roles=[{'role': 'dbOwner', 'db': find['name']}, {'role': 'userAdmin', 'db': find['name']}] ) except: pass return public.return_message(0, 0, 1) def SyncGetDatabases(self, get): """ @从服务器获取数据库 """ n = 0 # s = 0 db_type = 0 if public.process_exists("mongod") and get.sid is None: sid = 0 else: sid = get.get('sid/d', 0) if sid: db_type = 2 try: int(get.sid) except: return public.fail_v2(public.lang('The database type sid requires the int type!')) if not public.process_exists("mongod") and not int(get.sid): return public.fail_v2(public.lang("Mongodb service is not turned on yet!")) status, mongodb_obj = self.get_obj_by_sid(sid) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new('admin') if status is False: return public.fail_v2(db_obj) data = db_obj.command({"listDatabases": 1}) sql = public.M('databases') for item in data['databases']: dbname = item['name'] if sql.where("name=? AND LOWER(type)=LOWER('MongoDB')", (dbname,)).count(): continue if dbname in panelMongoDB.DEFUALT_DB: continue if sql.table('databases').add( 'name,username,password,accept,ps,addtime,type,sid,db_type', (dbname, dbname, "", "", public.getMsg('INPUT_PS'), time.strftime('%Y-%m-%d %X', time.localtime()), 'MongoDB', sid, db_type) ): n += 1 return public.success_v2(public.lang('DATABASE_GET_SUCCESS')) def ResDatabasePassword(self, get): """ @修改用户密码 """ id = get['id'] username = get['name'].strip() newpassword = public.trim(get['password']) try: if not newpassword: return public.fail_v2( public.lang('Modification failed, database [' + username + ']password cannot be empty.')) if len(re.search("^[\w@.]+$", newpassword).groups()) > 0: return public.fail_v2(public.lang('The database password cannot be empty or have special characters.')) if re.search('[\u4e00-\u9fa5]', newpassword): return public.fail_v2(public.lang('Database password can not be Chinese, please change the name!')) except: return public.fail_v2(public.lang('The database password cannot be empty or have special characters.')) find = public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).field( 'id,pid,name,username,password,type,accept,ps,addtime,sid').find() if not find: return public.fail_v2(public.lang('The modification failed, the specified database does not exist.')) get = public.dict_obj() get.sid = find['sid'] try: int(find['sid']) except: return public.fail_v2(public.lang('The database type sid requires the int type!')) if not public.process_exists("mongod") and not int(find['sid']): return public.fail_v2("Mongodb service is not turned on yet!") auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" if auth_status: status, mongodb_obj = self.get_obj_by_sid(find['sid']) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(username) if status is False: return public.fail_v2(db_obj) try: db_obj.command("updateUser", username, pwd=newpassword) except: db_obj.command("createUser", username, pwd=newpassword, roles=[{'role': 'dbOwner', 'db': find['name']}, {'role': 'userAdmin', 'db': find['name']}]) else: return public.fail_v2( public.lang('Modification failed, the database is not enabled for security authentication.')) # 修改SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).setField('password', newpassword) public.WriteLog("TYPE_DATABASE", 'DATABASE_PASS_SUCCESS', (find['name'],)) return public.success_v2(f"Successfully modifyied password for database [{find['name']}]!") def get_root_pwd(self, get): """ @获取root密码 """ config = panelMongoDB.get_config() config_info = { "port": config["net"].get("port", 27017), "bind_ip": config["net"].get("bindIp", "127.0.0.1"), "logpath": config["systemLog"].get("path", ""), "dbpath": config["storage"].get("dbPath", ""), "authorization": config["security"].get("authorization", "disabled") } sa_path = '{}/data/mongo.root'.format(public.get_panel_path()) if os.path.exists(sa_path): config_info['msg'] = public.readFile(sa_path) else: config_info['msg'] = '' config_info['root'] = config_info['msg'] return public.return_message(0, 0, config_info) def get_database_size_by_id(self, get): """ @获取数据库尺寸(批量删除验证) @get json/int 数据库id """ # if not public.process_exists("mongod"): # return public.returnMsg(False,"Mongodb服务还未开启!") total = 0 db_id = get if not isinstance(get, int): db_id = get['db_id'] find = public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", db_id).find() try: int(find['sid']) except: return 0 if not public.process_exists("mongod") and not int(find['sid']): return 0 return public.return_message(0, 0, total) # todo 前端未使用新接口 def check_del_data(self, args): """ @删除数据库前置检测 """ return public.return_message(0, 0, self.check_base_del_data(args)) def __new_password(self): """ 生成随机密码 """ import random import string # 生成随机密码 password = "".join(random.sample(string.ascii_letters + string.digits, 16)) return password # 数据库状态检测 def CheckDatabaseStatus(self, get): """ 数据库状态检测 """ if not hasattr(get, "sid"): return public.fail_v2("Missing parameters! sid") if not str(get.sid).isdigit(): return public.fail_v2("Parameter error! sid") sid = int(get.sid) mongodb_obj = panelMongoDB() if sid == 0: db_status, err_msg = mongodb_obj.connect() else: conn_config = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('mongodb')", sid).find() if not conn_config: db_status = False err_msg = public.lang("Remote database information does not exist!") else: mongodb_obj.set_host(host=conn_config.get("db_host"), port=conn_config.get("db_port"), username=conn_config.get("db_user"), password=conn_config.get("db_password")) db_status, err_msg = mongodb_obj.connect() return {"status": True, "msg": "normal" if db_status is True else "exceptions", "db_status": db_status, "err_msg": err_msg} def check_cloud_database_status(self, conn_config): """ @检测远程数据库是否连接 @conn_config 远程数据库配置,包含host port pwd等信息 旧方法,添加数据库时调用 """ try: mongodb_obj = panelMongoDB().set_host(host=conn_config.get("db_host"), port=conn_config.get("db_port"), username=conn_config.get("db_user"), password=conn_config.get("db_password")) status, err_msg = mongodb_obj.connect() return status except: return public.fail_v2("Remote database connection failed!") # 获取数据库集合 def GetInfo(self, get): """ 获取数据库集合 """ db_name = get.db_name db_find = public.M("databases").where("name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,)).find() if not db_find: return public.fail_v2("The database does not exist!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.fail_v2("Mongodb service is not turned on yet!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: return public.fail_v2(db_obj) result = db_obj.command("dbStats") result["collection_list"] = [] for collection_name in db_obj.list_collection_names(): collection = db_obj.command("collStats", collection_name) data = { "collection_name": collection_name, "count": collection.get("count"), # 文档数 "size": collection.get("size"), # 内存中的大小 "avg_obj_size": collection.get("avgObjSize"), # 对象平均大小 "storage_size": collection.get("storageSize"), # 存储大小 "capped": collection.get("capped"), "nindexes": collection.get("nindexes"), # 索引数 "total_index_size": collection.get("totalIndexSize"), # 索引大小 } result["collection_list"].append(data) return {"status": True, "msg": "ok", "data": result} def GetRole(self, get): """ @获取所有角色权限 """ status, mongodb_obj = self.get_obj_by_sid(0) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new("admin") if status is False: return public.fail_v2(db_obj) # 获取所有角色 role_data = db_obj.command('rolesInfo', showBuiltinRoles=True) result = [] for role in role_data["roles"]: if self._MONGO_ROLE_DICT.get(role["role"]) is not None: role["name"] = self._MONGO_ROLE_DICT.get(role["role"]) result.append(role) return {"status": True, "msg": "ok", "data": result} def GetDatabaseAccess(self, get): """ @获取用户权限 @user_name: 用户名 """ user_name = get.get("user_name") if user_name is None: return public.fail_v2('Parameter error!Missing database user name!') db_find = public.M("databases").where("username=? AND LOWER(type)=LOWER('MongoDB')", (user_name,)).find() if not db_find: return public.fail_v2("The database does not exist!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.fail_v2("Mongodb service is not turned on yet!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(user_name) if status is False: return public.fail_v2(db_obj) # 查看用户信息 user_data = db_obj.command('usersInfo', user_name) # 打印用户的权限信息 result = { "user": user_name, "db": user_name, "roles": [], } if user_data: if len(user_data["users"]) != 0: user = user_data["users"][0] result["user"] = user.get("user", user_name) result["db"] = user.get("db", user_name) result["roles"] = [info.get("role") for info in user.get("roles", []) if info.get("role")] return {"status": True, "msg": "ok", "data": result} def SetDatabaseAccess(self, get): """ @设置用户权限 @remote_ip: 远程连接地址 """ user_name = get.get("user_name", None) db_permission = get.get("db_permission", None) if user_name is None: return public.fail_v2('Parameter error!Missing database username!') if db_permission is None or not db_permission: return public.fail_v2('Please set permissions!') # if db_permission not in ["read","readWrite","dbAdmin","clusterAdmin","userAdmin","backup","restore","root"]: # return public.returnMsg(False, '数据库权限错误!') role_permission = [{"role": permission, "db": user_name} for permission in db_permission] db_find = public.M("databases").where("username=? AND LOWER(type)=LOWER('MongoDB')", (user_name,)).find() if not db_find: return public.fail_v2("The database does not exist!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.fail_v2("The Mongodb service is not turned on yet!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.fail_v2(mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(user_name) if status is False: return public.fail_v2(db_obj) try: user_data = db_obj.command('usersInfo', user_name) if user_data: if len(user_data["users"]) != 0: del_role_permission = [{"role": role.get("role"), "db": user_name} for role in user_data["users"][0].get("roles", [])] db_obj.command('revokeRolesFromUser', user_name, roles=del_role_permission) db_obj.command("grantRolesToUser", user_name, roles=role_permission) return public.success_v2(f"{user_name} Authorisation successful!") except Exception as err: return public.fail_v2(f"Authorisation failed!{err}")