403Webshell
Server IP : 172.67.216.182  /  Your IP : 172.71.81.11
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/databaseModelV2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/databaseModelV2/base.py
# coding: utf-8
import json
import os
import sys
import time

panelPath = '/www/server/panel'
os.chdir(panelPath)
if not panelPath + "/class/" in sys.path:
    sys.path.insert(0, panelPath + "/class/")
import public, re
from public.exceptions import HintException


class databaseBase:
    def get_base_list(self, args, sql_type='mysql'):
        """
        @获取数据库列表
        @type:数据库类型,MySQL,SQLServer
        """
        search = ''
        if 'search' in args: search = args['search']

        conditions = ''
        if '_' in search:
            cs = ''
            for i in search:
                if i == '_':
                    cs += '/_'
                else:
                    cs += i
            search = cs
            conditions = " escape '/'"

        SQL = public.M('databases')

        where = "lower(type) = lower('{}')".format(sql_type)
        if search:
            where += "AND (name like '%{search}%' or ps like '%{search}%'{conditions})".format(search=search,
                                                                                               conditions=conditions)
        if 'db_type' in args:
            where += " AND db_type='{}'".format(args['db_type'])

        if 'sid' in args:
            where += " AND sid='{}'".format(args['sid'])

        order = "id desc"
        if hasattr(args, 'order'): order = args.order

        info = {}
        rdata = {}

        info['p'] = 1
        info['row'] = 20
        result = '1,2,3,4,5,8'
        info['count'] = SQL.where(where, ()).count()

        if hasattr(args, 'limit'): info['row'] = int(args.limit)
        if hasattr(args, 'result'): result = args.result
        if hasattr(args, 'p'): info['p'] = int(args['p'])

        import page
        # 实例化分页类
        page = page.Page()

        info['uri'] = args
        info['return_js'] = ''
        if hasattr(args, 'tojs'): info['return_js'] = args.tojs

        rdata['where'] = where

        # 获取分页数据
        rdata['page'] = page.GetPage(info, result)
        # 取出数据
        rdata['data'] = SQL.where(where, ()).order(order).field(
            'id,sid,pid,name,username,password,accept,ps,addtime,type,db_type,conn_config').limit(
            str(page.SHIFT) + ',' + str(page.ROW)).select()

        if type(rdata['data']) == str:
            raise HintException("Database query error: " + rdata['data'])

        for sdata in rdata['data']:
            # 清除不存在的
            backup_count = 0
            backup_list = public.M('backup').where("pid=? AND type=1", (sdata['id'])).select()
            for backup in backup_list:
                if not os.path.exists(backup["filename"]):
                    public.M('backup').where("id=? AND type=1", (backup['id'])).delete()
                    continue
                backup_count += 1
            sdata['backup_count'] = backup_count
            sdata['conn_config'] = json.loads(sdata['conn_config'])
        return rdata

    def get_databaseModel(self):
        '''
        获取数据库模型对象
        @db_type 数据库类型
        '''
        from panelDatabaseControllerV2 import DatabaseController

        project_obj = DatabaseController()

        return project_obj

    def get_average_num(self, slist):
        """
        @批量删除获取平均值
        """
        count = len(slist)
        limit_size = 1 * 1024 * 1024
        if count <= 0: return limit_size

        if len(slist) > 1:
            slist = sorted(slist)
            limit_size = int((slist[0] + slist[-1]) / 2 * 0.85)
        return limit_size

    def get_database_size(self, ids, is_pid=False):
        """
        获取数据库大小
        """
        result = {}
        p = self.get_databaseModel()
        for id in ids:
            if not is_pid:
                x = public.M('databases').where('id=?', id).field('id,sid,pid,name,type,ps,addtime').find()
            else:
                x = public.M('databases').where('pid=?', id).field('id,sid,pid,name,type,ps,addtime').find()
            if not x: continue
            x['backup_count'] = public.M('backup').where("pid=? AND type=?", (x['id'], '1')).count()
            if x['type'] == 'MySQL':
                x['total'] = int(public.get_database_size_by_id(id))
            else:
                try:

                    get = public.dict_obj()
                    get['data'] = {'db_id': x['id']}
                    get['mod_name'] = x['type'].lower()
                    get['def_name'] = 'get_database_size_by_id'
                    try:
                        x['total'] = p.model(get)["message"]["result"]
                    except:
                        x['total'] = 0
                except:
                    x['total'] = 0
            result[x['name']] = x

        return result

    def check_base_del_data(self, get):
        """
        @删除数据库前置检测
        """
        if not hasattr(get, 'ids'):
            raise HintException("Parameter 'ids' is required for deletion.")
        ids = json.loads(get.ids)
        slist = {}
        result = []
        db_list_size = []
        db_data = self.get_database_size(ids)

        for key in db_data:
            data = db_data[key]
            if not data['id'] in ids: continue

            db_addtime = public.to_date(times=data['addtime'])
            data['score'] = int(time.time() - db_addtime) + data['total']
            data['st_time'] = db_addtime

            if data['total'] > 0: db_list_size.append(data['total'])
            result.append(data)

        slist['data'] = sorted(result, key=lambda x: x['score'], reverse=True)
        slist['db_size'] = self.get_average_num(db_list_size)
        return slist

    def get_test(self, args):

        p = self.get_databaseModel()
        get = public.dict_obj()
        get['data'] = {'db_id': 18}
        get['mod_name'] = args['type'].lower()
        get['def_name'] = 'get_database_size_by_id'

        return p.model(get)

    def add_base_database(self, get, dtype):
        """
        @添加数据库前置检测
        @return username 用户名
                data_name 数据库名
                data_pwd:数据库密码
        """
        data_name = get['name'].strip().lower()
        if self.check_recyclebin(data_name):
            return public.returnMsg(False, public.lang(
                "Database [' + data_name + '] is already in recycle bin, please restore from recycle bin!"))

        if len(data_name) > 16:
            return public.returnMsg(False, public.lang("Database name cannot be more than 16 characters!"))

        if not hasattr(get, 'db_user'): get.db_user = data_name
        username = get.db_user.strip()
        checks = ['root', 'mysql', 'test', 'sys', 'panel_logs']
        if username in checks or len(username) < 1:
            return public.returnMsg(False, public.lang("Database username is invalid!"))
        if data_name in checks or len(data_name) < 1:
            return public.returnMsg(False, public.lang("Database name is invalid!"))

        reg = r"^\w+$"
        if not re.match(reg, data_name):
            return public.returnMsg(False, public.lang("Database name cannot contain special characters!"))

        data_pwd = get['password']
        if len(data_pwd) < 1:
            data_pwd = public.md5(str(time.time()))[0:8]

        if public.M('databases').where("(name=? or username=?) AND LOWER(type)=LOWER(?)",
                                       (data_name, username, dtype)).count():
            return public.returnMsg(False, public.lang("Database exists!"))

        res = {
            'data_name': data_name,
            'username': username,
            'data_pwd': data_pwd,
            'status': True
        }
        return res

    def delete_base_backup(self, get):
        """
        @删除备份文件
        """

        name = ''
        id = get.id
        where = "id=?"
        filename = public.M('backup').where(where, (id,)).getField('filename')
        if os.path.exists(filename): os.remove(filename)

        # if filename == 'qiniu':
        #     name = public.M('backup').where(where, (id,)).getField('name')
        #
        #     public.ExecShell(public.get_run_python("[PYTHON] " + public.GetConfigValue('setup_path') + '/panel/script/backup_qiniu.py delete_file ' + name))
        public.M('backup').where(where, (id,)).delete()
        public.WriteLog("TYPE_DATABASE", 'DATABASE_BACKUP_DEL_SUCCESS', (name, filename))
        return public.return_message(0, 0, 'DEL_SUCCESS')

    # 检查是否在回收站
    def check_recyclebin(self, name):
        try:
            for n in os.listdir('{}/Recycle_bin'.format(public.get_setup_path())):
                if n.find('BTDB_' + name + '_t_') != -1: return True
            return False
        except:
            return False

    # map to list
    def map_to_list(self, map_obj):
        try:
            if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
            return map_obj
        except:
            return []

    # ******************************************** 远程数据库 ******************************************/

    def check_cloud_args(self, get, nlist=[]):
        """
        验证参数是否合法
        @get param
        @args 参数列表
        """
        for key in nlist:
            if not key in get:
                return public.return_message(-1, 0, public.lang("Parameter passing error, missing parameter {}!", key))
        return public.return_message(0, 0, "success")

    def check_cloud_database(self, args):
        '''
        @检查远程数据库是否存在
        @conn_config param
        '''
        p = self.get_databaseModel()

        get = public.dict_obj()
        get['data'] = args
        get['mod_name'] = args['type']
        get['def_name'] = 'check_cloud_database_status'
        return p.model(get)

    def AddBaseCloudServer(self, get):
        """
        @name 添加远程服务器
        @author hwliang<2021-01-10>
        @param db_host<string> 服务器地址
        @param db_port<port> 数据库端口
        @param db_user<string> 用户名
        @param db_password<string> 数据库密码
        @param db_ps<string> 数据库备注
        @param type<string> 数据库类型,mysql/sqlserver/sqlite
        @return dict
        """
        arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type']
        if get.type == 'redis':
            arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type']
        cRet = self.check_cloud_args(get, arrs)
        if isinstance(cRet, dict):
            return cRet

        get['db_name'] = None
        res = self.check_cloud_database(get)
        if isinstance(res, dict):
            return res

        if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count():
            return public.return_message(-1, 0, 'The specified server already exists: [{}:{}]'.format(get['db_host'],
                                                                                                      get['db_port']))
        get['db_port'] = int(get['db_port'])
        pdata = {
            'db_host': get['db_host'],
            'db_port': int(get['db_port']),
            'db_user': get['db_user'],
            'db_password': get['db_password'],
            'db_type': get['type'],
            'ps': public.xssencode2(get['db_ps'].strip()),
            'addtime': int(time.time())
        }
        result = public.M("database_servers").insert(pdata)

        if isinstance(result, int):
            public.WriteLog('Database manager', 'Add remote MySQL server[{}:{}]'.format(get['db_host'], get['db_port']))
            return public.return_message(0, 0, public.lang("Added successfully!"))
        return public.return_message(0, 0, public.lang("Add failed: {}", result))

    def GetBaseCloudServer(self, get):
        '''
            @name 获取远程服务器列表
            @author hwliang<2021-01-10>
            @return list
        '''

        # 解决get外多一层data
        if not get.get('type', None) and get.get('data', None):
            get['type'] = get['data'].get('type', '')
        where = '1=1'
        if 'type' in get: where = "db_type = '{}'".format(get['type'])

        data = public.M('database_servers').where(where, ()).select()

        if not isinstance(data, list): data = []

        if get['type'] == 'mysql':
            bt_mysql_bin = public.get_mysql_info()['path'] + '/bin/mysql.exe'
            if os.path.exists(bt_mysql_bin):
                data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 3306, 'db_user': 'root', 'db_password': '',
                                'ps': 'local server', 'addtime': 0, 'db_type': 'mysql'})
        elif get['type'] == 'sqlserver':
            pass
        elif get['type'] == 'mongodb':
            if os.path.exists('/www/server/mongodb/bin'):
                data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 27017, 'db_user': 'root', 'db_password': '',
                                'ps': 'local server', 'addtime': 0, 'db_type': 'mongodb'})
        elif get['type'] == 'redis':
            if os.path.exists('/www/server/redis'):
                data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 6379, 'db_user': 'root', 'db_password': '',
                                'ps': 'local server', 'addtime': 0, 'db_type': 'redis'})
        elif get['type'] == 'pgsql':
            if os.path.exists('/www/server/pgsql'):
                data.insert(0,
                            {'id': 0, 'db_host': '127.0.0.1', 'db_port': 5432, 'db_user': 'postgres', 'db_password': '',
                             'ps': 'local server', 'addtime': 0, 'db_type': 'pgsql'})
        return data

    def RemoveBaseCloudServer(self, get):
        '''
            @name 删除远程服务器
            @author hwliang<2021-01-10>
            @param id<int> 远程服务器ID
            @return dict
        '''

        id = int(get.id)
        if not id:
            return public.return_message(-1, 0, public.lang("Parameter passed error, please try again!"))
        db_find = public.M('database_servers').where('id=?', (id,)).find()
        if not db_find:
            return public.return_message(-1, 0, public.lang("The specified remote server does not exist!"))
        public.M('databases').where('sid=?', id).delete()
        result = public.M('database_servers').where('id=?', id).delete()
        if isinstance(result, int):
            public.WriteLog('Database manager',
                            'Delete remote MySQL server [{}:{}]'.format(db_find['db_host'], int(db_find['db_port'])))
            return public.return_message(0, 0, public.lang("Successfully deleted!"))
        return public.return_message(0, 0, public.lang("Successfully deleted: {}", result))

    def ModifyBaseCloudServer(self, get):
        '''
            @name 修改远程服务器
            @author hwliang<2021-01-10>
            @param id<int> 远程服务器ID
            @param db_host<string> 服务器地址
            @param db_port<port> 数据库端口
            @param db_user<string> 用户名
            @param db_password<string> 数据库密码
            @param db_ps<string> 数据库备注
            @return dict
        '''

        arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type']
        if get.type == 'redis':
            arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type']

        cRet = self.check_cloud_args(get, arrs)
        if isinstance(cRet, dict):
            return cRet

        get['db_name'] = None
        id = int(get.id)
        get['db_port'] = int(get['db_port'])
        db_find = public.M('database_servers').where('id=?', (id,)).find()
        if not db_find:
            return public.return_message(-1, 0, public.lang("The specified remote server does not exist!"))
        _modify = False
        if db_find['db_host'] != get['db_host'] or db_find['db_port'] != get['db_port']:
            _modify = True
            if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count():
                return public.return_message(-1, 0,
                                             'The specified server already exists: [{}:{}]'.format(get['db_host'],
                                                                                                   get['db_port']))
        if db_find['db_user'] != get['db_user'] or db_find['db_password'] != get['db_password']:
            _modify = True
        _modify = True

        if _modify:
            res = self.check_cloud_database(get)
            if isinstance(res, dict): return res
        pdata = {
            'db_host': get['db_host'],
            'db_port': int(get['db_port']),
            'db_user': get['db_user'],
            'db_password': get['db_password'],
            'db_type': get['type'],
            'ps': public.xssencode2(get['db_ps'].strip())
        }

        result = public.M("database_servers").where('id=?', (id,)).update(pdata)
        if isinstance(result, int):
            public.WriteLog('Database manager',
                            'Modify the remote MySQL server[{}:{}]'.format(get['db_host'], get['db_port']))

            return public.return_message(0, 0, public.lang("Successfully modified!"))
        return public.return_message(-1, 0, public.lang("Fail to edit: {}", result))

    # 检测数据库执行错误
    def IsSqlError(self, mysqlMsg):

        if mysqlMsg:
            mysqlMsg = str(mysqlMsg)
            if "MySQLdb" in mysqlMsg:
                return public.return_message(-1, 0, public.lang("MySQLdb component is missing!"))
            if "2002," in mysqlMsg:
                return public.return_message(-1, 0, public.lang("ERROR to connect database"))
            if "2003," in mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "Database connection timed out, please check if the configuration is correct."))
            if "1045," in mysqlMsg:
                return public.return_message(-1, 0, public.lang("MySQL password error."))
            if "1040," in mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "Exceeded maximum number of connections, please try again later."))
            if "1130," in mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "Database connection failed, please check whether the root user is authorized to access 127.0.0.1."))
            if "using password:" in mysqlMsg:
                return public.return_message(-1, 0, public.lang("Database password is incorrect!"))
            if "Connection refused" in mysqlMsg:
                return public.return_message(-1, 0, public.lang("ERROR to connect database"))
            if "1133" in mysqlMsg:
                return public.return_message(-1, 0, public.lang("Database user does NOT exist!"))
            if "2005_login_error" == mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "The connection times out, please manually enable the TCP/IP function (Start Menu->SQL 2005->Configuration Tools->2005 Network Configuration->TCP/IP->Enable)"))
            if 'already exists' in mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "The specified database already exists, please do not add it repeatedly."))
            if 'Cannot open backup device' in mysqlMsg:
                return public.return_message(-1, 0, public.lang(
                    "The operation failed, the remote database does not support the operation."))

            if '1142' in mysqlMsg:
                return public.return_message(-1, 0, public.lang("Insufficient permissions, please use root user."))

            if "DB-Lib error message 20018" in mysqlMsg:
                return public.return_message(-1, 0, public.lang("Create failed, SQL Server requires GUI support"))

        return None

    # ******************************************** 数据库公用方法 ******************************************/


if __name__ == "__main__":
    # get = {}
    # get['db_host'] = '192.168.1.37'
    # get['db_port'] = '3306'
    # get['db_user'] = 'root'

    # get['db_password'] = 'HLANEMJFRbPE7Ny2'
    # get['db_ps'] = '2'
    # get['type'] = 'mysql'
    # bt = databaseBase()
    # ret = bt.AddCloudServer(get)
    # print(ret)

    get = {}
    get['db_host'] = '192.168.66.73'
    get['db_port'] = '1433'
    get['db_user'] = 'sa'

    get['db_password'] = 'dPYi6Gt8GC7SL58C'
    get['db_ps'] = '2'
    get['type'] = 'sqlserver'
    bt = databaseBase()
    ret = bt.get_test(get)
    print(ret)

Youez - 2016 - github.com/yon3zu
LinuXploit