403Webshell
Server IP : 172.67.216.182  /  Your IP : 162.158.108.124
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/one_key_wp_v2.py
# coding: utf-8
# +-------------------------------------------------------------------
# | aaPanel
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: zhwen <[email protected]>
# +-------------------------------------------------------------------
import os
import time
import json
import sys
import public
import re
import requests
from bs4 import BeautifulSoup
import panel_mysql_v2 as panelMysql
from public.validate import Param


# import wp-toolkit core
# from wp_toolkit import wp_version, wpmgr, wpfastcgi_cache


def get_mem():
    import psutil
    mem = psutil.virtual_memory()
    memInfo = {'memTotal': int(mem.total / 1024 / 1024), 'memFree': int(mem.free / 1024 / 1024),
               'memBuffers': int(mem.buffers / 1024 / 1024), 'memCached': int(mem.cached / 1024 / 1024)}
    return memInfo['memTotal']


class one_key_wp:
    wp_package_url = 'https://wordpress.org/latest.zip'
    # wp_package_url = 'http://download.bt.cn/install/package/wordpress-5.9.1.zip'
    base_path = "/www/server/panel/"
    wp_session_path = '{}/data/wp_session'.format(base_path)
    package_zip = '{}/package/wp.zip'.format(base_path)
    md5_file = '{}/package/md5'.format(base_path)
    log_path = '/tmp/schedule.log'
    __php_tmp_file = '/tmp/wp_tmp'.format(base_path)
    panel_db = "/www/server/panel/data/default.db"
    timeout_count = 0
    old_time = 0
    __wp_session = None
    __session_resp = None
    __ajax_nonce = None
    __domain = None
    __plugin_page_content = None
    wp_user = None
    wp_passwd = None

    def __init__(self):
        self.create_wp_table()
        if not self.__wp_session:
            self.__wp_session = requests.Session()

        # import PluginLoader
        # self.__IS_PRO_MEMBER = PluginLoader.get_auth_state() > 0

    def get_headers(self):
        return {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36',
            'Host': self.__domain
        }

    def get_plugin_page(self):
        _get_ajax_nonce_url = "http://{}/wp-admin/plugins.php".format(self.__domain)
        self.__plugin_page_content = self.__wp_session.get(_get_ajax_nonce_url, headers=self.get_headers(),
                                                           verify=False).text

    def get_wp_nonce(self, content, rep):
        rex = re.search(rep, content)
        if rex:
            return rex.group(1)

    def get_nonce(self):
        resp = self.__session_resp
        if resp != '1':
            resp = resp.text
        else:
            self.get_plugin_page()
            resp = self.__plugin_page_content
        _ajax_nonce_rep = '"ajax_nonce\\\"\\:\\\"(\\w+)'
        rex = re.search(_ajax_nonce_rep, resp)
        if rex:
            self.__ajax_nonce = rex.group(1)

    def __load_cookies(self, s_id):
        self.__domain = self.get_wp_auth(s_id)
        f = '{}/{}'.format(self.wp_session_path, self.__domain)
        c = public.readFile(f)
        if c:
            mtime = os.path.getmtime(f)
            expried = 86400
            if time.time() - mtime > expried:
                print('cookies expired, re-login wordpress')
                self.__login_wp()
            else:
                print('use local session')
                self.__wp_session.cookies.update(json.loads(c))
                self.__session_resp = '1'
        else:
            print('No local cookies, login wordpress')
            self.__login_wp()
        # self.__login_wp()
        # print('login success!')
        try:
            self.get_nonce()
        except:
            pass
        if not self.__ajax_nonce:
            self.__login_wp()
            self.get_nonce()
        self.get_plugin_page()

    def __save_cookies(self):
        if not os.path.exists(self.wp_session_path):
            os.mkdir(self.wp_session_path)
        f = '{}/{}'.format(self.wp_session_path, self.__domain)
        return public.writeFile(f, json.dumps(self.__wp_session.cookies.get_dict()))

    def __login_wp(self):

        wp_login = 'http://{}/wp-login.php'.format(self.__domain)
        headers1 = {'Cookie': 'wordpress_test_cookie=WP Cookie check',
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36',
                    # 'referer':"https://{}/wp-login.php?loggedout=true&wp_lang=en_US".format(self.__domain),
                    'Host': self.__domain
                    }
        datas = {
            'log': self.wp_user,
            'pwd': self.wp_passwd,
            'wp-submit': 'Log In',
            # 'redirect_to': 'https://{}/wp-admin'.format(self.__domain),
            'testcookie': '1'
        }
        # hosts = public.readFile('/etc/hosts')
        # if not hosts:
        #     return False
        # if not re.search(r'127.0.0.1\s+{}'.format(self.__domain),hosts):
        #     public.writeFile('/etc/hosts','\n127.0.0.1 {}'.format(self.__domain),'a+')

        self.__session_resp = retry(
            lambda: self.__wp_session.post(wp_login, headers=headers1, data=datas, verify=False))

        # cookies持久化
        self.__save_cookies()

    # 写输出日志
    def write_logs(self, log_msg, clean=None):
        if clean:
            fp = open(self.log_path, 'w')
            fp.write(log_msg)
            fp.close()
        fp = open(self.log_path, 'a+')
        fp.write(log_msg + '\n')
        fp.close()

    # 下载文件
    def download_file(self):
        try:
            path = os.path.dirname(self.package_zip)
            if not os.path.exists(path): os.makedirs(path)
            import urllib, socket, ssl
            ssl._create_default_https_context = ssl._create_unverified_context
            socket.setdefaulttimeout(10)
            self.pre = 0
            self.old_time = time.time()
            print("Download the installation package: {} --> {}".format(self.wp_package_url, self.package_zip))
            self.write_logs(
                "|-Download the installation package: {} --> {}".format(self.wp_package_url, self.package_zip))
            if sys.version_info[0] == 2:
                urllib.urlretrieve(self.wp_package_url, filename=self.package_zip, reporthook=self.download_hook)
            else:
                urllib.request.urlretrieve(self.wp_package_url, filename=self.package_zip,
                                           reporthook=self.download_hook)
            md5str = public.FileMd5(self.package_zip)
            public.writeFile(self.md5_file, str(md5str))
        except:
            print("Download error: {}".format(public.get_error_info()))
            if self.timeout_count > 5: return;
            self.timeout_count += 1
            time.sleep(5)
            self.download_file()

    # 下载文件进度回调
    def download_hook(self, count, blockSize, totalSize):
        used = count * blockSize
        pre1 = int((100.0 * used / totalSize))
        if self.pre != pre1:
            dspeed = used / (time.time() - self.old_time)
            self.pre = pre1

    def check_package(self):
        # 检查本地包
        download = False
        md5str = None
        if os.path.exists(self.package_zip):
            md5str = public.FileMd5(self.package_zip)
        if os.path.exists(self.md5_file) and md5str:
            if md5str != public.readFile(self.md5_file):
                download = True
        else:
            download = True
        return download

    def download_latest_package(self):
        print("Start downloading the installation package...")
        self.write_logs("|-Start downloading the installation package...")

        if os.path.exists(self.package_zip):
            # 获取文件的最后修改时间
            modified_time = os.path.getmtime(self.package_zip)
            # 计算当前时间与最后修改时间的差值
            time_diff = time.time() - modified_time
            time2 = 30 * 24 * 60 * 60

            if time_diff > time2:  # 如果超过30天,则删除文件
                os.remove(self.package_zip)
                os.remove(self.md5_file)
                self.write_logs("|-Del package...")

        if not self.check_package():
            print("|-MD5 consistent, no need to download...")
            return public.return_msg_gettext(True, public.lang("MD5 consistent!"))
        self.download_file()
        if not os.path.exists(self.package_zip):
            self.write_logs("|-Download failed...")
            print("Download failed...")
            return public.return_msg_gettext(False, public.lang("File download failed!"))
        return public.return_msg_gettext(True, public.lang("Download successfully"))

    def unzip_package(self, site_path):
        print("Start unzipping the installation package...")
        self.write_logs("|-Start unzipping the installation package...")
        public.ExecShell('unzip -o {} -d {}/'.format(self.package_zip, site_path))
        public.ExecShell('mv {}/wordpress/* {}'.format(site_path, site_path))
        os.removedirs("{}/wordpress/".format(site_path))
        print("Start setting up site permissions...")
        self.write_logs("|-Start setting up site permissions...")
        self.set_permission(site_path)

    def set_permission(self, site_path):
        os.system('chmod -R 755 ' + site_path)
        os.system('chown -R www.www ' + site_path)

    def set_urlrewrite(self, site_name, site_path):
        webserver = public.get_webserver()

        if webserver == 'openlitespeed':
            webserver = 'apache'

        swfile = '/www/server/panel/rewrite/{}/wordpress.conf'.format(webserver)
        if os.path.exists(swfile):
            rewriteConf = public.readFile(swfile)

            if webserver == 'nginx':
                dwfile = '{}/vhost/rewrite/{}.conf'.format(self.base_path, site_name)

            else:
                dwfile = '{}/.htaccess'.format(site_path)

            public.writeFile(dwfile, rewriteConf)

    def write_db(self, s_id, d_id, prefix, user_name, admin_password):
        print("Inserting data...")
        pdata = {"s_id": s_id, "d_id": d_id, "prefix": prefix, "user": user_name, "pass": admin_password}
        public.M('wordpress_onekey').where('s_id=?', (s_id,)).field('s_id').find()
        if public.M('wordpress_onekey').where('s_id=?', (s_id,)).field('s_id').find():
            print("Data already exists, update data...")
            public.M('wordpress_onekey').where('s_id=?', (s_id,)).update(pdata)
            return
        print("Insert data...")
        print("Insert data:{}".format(pdata))
        print("Result:{}".format(public.M('wordpress_onekey').insert(pdata)))

    def create_wp_table(self):
        if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'wordpress_onekey')).count():
            public.M('').execute('''CREATE TABLE "wordpress_onekey" (
                 "id" INTEGER PRIMARY KEY AUTOINCREMENT,
                 "s_id" INTEGER DEFAULT '',
                 "d_id" INTEGER DEFAULT '',
                 "prefix" TEXT DEFAULT '',
                 "user" TEXT DEFAULT '',
                 "pass" TEXT DEFAULT '');''')

    def init_wp(self, values):
        hosts = public.readFile('/etc/hosts')
        if not hosts:
            return False
        if not re.search(r'127.0.0.1\s+{}'.format(values['site_name']), hosts):
            public.writeFile('/etc/hosts', '\n127.0.0.1 {}'.format(values['site_name']), 'a+')

        self.write_logs("|-Start initializing Wordpress...")

        from wp_toolkit import wpmgr

        # 初始化WP管理类
        wpmgr_obj = wpmgr(values['s_id'])

        # 初始化WP网站配置
        self.write_logs("|-Start setup configurations...")
        ok, msg = wpmgr_obj.setup_config(values['dbname'], values['db_user'], values['db_pwd'], 'localhost',
                                         values['prefix'])
        self.write_logs('|-Setup config >>> {} {}'.format('OK' if ok else 'FAIL', msg))

        if not ok:
            raise Exception(msg)

        # 初始化WP网站信息
        self.write_logs("|-Start installations...")
        ok, msg = wpmgr_obj.wp_install(values['weblog_title'], values['user_name'], values['admin_email'],
                                       values['admin_password'], values['language'])
        self.write_logs('|-Installation {} {}'.format('OK' if ok else 'FAIL', msg))

        if not ok:
            raise Exception(msg)

        # if self.__IS_PRO_MEMBER:
        #     from wp_toolkit import wpmgr
        #
        #     # 初始化WP管理类
        #     wpmgr_obj = wpmgr(values['s_id'])
        #
        #     # 初始化WP网站配置
        #     self.write_logs("|-Start setup configurations...")
        #     ok, msg = wpmgr_obj.setup_config(values['dbname'], values['db_user'], values['db_pwd'], 'localhost', values['prefix'])
        #     self.write_logs('|-Setup config >>> {} {}'.format('OK' if ok else 'FAIL', msg))
        #
        #     if not ok:
        #         raise Exception(msg)
        #
        #     # 初始化WP网站信息
        #     self.write_logs("|-Start installations...")
        #     ok, msg = wpmgr_obj.wp_install(values['weblog_title'], values['user_name'], values['admin_email'], values['admin_password'], values['language'])
        #     self.write_logs('|-Installation {} {}'.format('OK' if ok else 'FAIL', msg))
        #
        #     if not ok:
        #         raise Exception(msg)
        # else:
        #     self.request_setup_0(values)
        #     time.sleep(1)
        #     if not self.request_setup_2(values):
        #         return public.return_msg_gettext(False,
        #                                          "The database connection is abnormal. Please check whether the root user authority or database configuration parameters are correct.")
        #     time.sleep(1)
        #     self.request_setup_3(values)
        #     time.sleep(1)
        #     self.request_setup_4(values)
        #     time.sleep(1)

        # 配置伪静态规则
        self.set_urlrewrite(values['site_name'], values['site_path'])

    def request_setup_0(self, values):
        self.write_logs("|-Start initializing Wordpress language...")
        url = "http://{}/wp-admin/setup-config.php?step=0".format(values['site_name'])
        param = {
            "url": url,
            "data": {"language": values['language']},
            "headers": {"Host": values['domain']}
        }
        # result = self.request_wp_api(param)
        response = retry(lambda: self.request_wp_api(param))
        # public.print_log("开始检查--request_setup_0_result:{}".format(response))

    def request_setup_2(self, values):
        self.write_logs("|-Start initializing Wordpress config...")
        url = "http://{}/wp-admin/setup-config.php?step=2".format(values['site_name'])
        param = {
            "url": url,
            "headers": {"Host": values['domain']},
            "data": {
                "dbname": values['dbname'],
                "uname": values['db_user'],
                "pwd": values['db_pwd'],
                "dbhost": "localhost",
                "prefix": values['prefix'],
                "language": values['language'],
                "submit": "Submit",
            }
        }
        # result = self.request_wp_api(param)
        response = retry(lambda: self.request_wp_api(param))
        if "install.php?language=ja":
            return True

    def request_setup_3(self, values):
        self.write_logs("|-Start initializing Wordpress install language...")
        url = "http://{}/wp-admin/install.php?language={}".format(values['site_name'], values['language'])
        param = {
            "url": url,
            "headers": {"Host": values['domain']},
            "data": {
                "language": values['language']
            }
        }
        # self.request_wp_api(param)
        response = retry(lambda: self.request_wp_api(param))

    def request_setup_4(self, values):
        self.write_logs("|-Start installing the wordpress program...")
        url = "http://{}/wp-admin/install.php?step=2".format(values['site_name'])
        param = {
            "url": url,
            "headers": {"Host": values['domain']},
            "data": {
                "weblog_title": values['weblog_title'],
                "user_name": values['user_name'],
                "admin_password": values['admin_password'],
                "admin_password2": values['admin_password'],
                # "pw_weak": values['pw_weak'],  # on/off
                "admin_email": values['admin_email'],
                # "Submit": "Install WordPress",
                "language": values['language']
            }
        }
        # self.request_wp_api(param)
        response = retry(lambda: self.request_wp_api(param))

    def request_wp_api(self, param):
        """需要指定域名得host为本机IP"""
        resp = requests.post(param['url'], data=param['data'], headers=param['headers'])
        # public.print_log("开始检查--request_wp_api:{}".format(resp))
        # public.print_log("开始检查--request_wp_api_resp.text:{}".format(resp.text))
        return resp.text

    def get_update_wp_nonce(self):
        url = "http://{}/wp-admin/update-core.php".format(self.__domain)
        # res = self.action_plugin_get(url)
        _wp_nonce_rep = '"_wpnonce\\\"\\svalue=\\\"(\\w+)'
        # rex = re.search(_wp_nonce_rep,res)
        # if rex:
        #     self.__wp_nonce = rex.group(1)

        # return self.get_wp_nonce(self.action_plugin_get(url),_wp_nonce_rep)
        response = retry(lambda: self.get_wp_nonce(self.action_plugin_get(url), _wp_nonce_rep))
        return response

    def action_plugin_post(self, param):
        headers = self.get_headers()
        if 'upgrade' in param['data']:
            param['data']['_wpnonce'] = self.get_update_wp_nonce()
            headers['referer'] = 'http://{}/wp-admin/update-core.php'.format(param['domain'])

        response = retry(
            lambda: self.__wp_session.post(param['url'], data=param['data'], headers=headers, verify=False).text)
        return response
        # return self.__wp_session.post(param['url'],data=param['data'], headers=headers, verify=False).text

    def action_plugin_get(self, url, headers=None):
        if not headers:
            headers = self.get_headers()
        # headers['Referer'] = "http://{}/wp-admin".format(self.__domain)

        response = retry(lambda: self.__wp_session.get(url, headers=headers, verify=False).text)
        return response
        # return self.__wp_session.get(url, headers=headers, verify=False).text

    def install_plugin(self, values):
        self.write_logs("|-Start installing the [nginx-helper] plugin...")
        self.__load_cookies(values['s_id'])
        param = {
            "url": "http://{}/wp-admin/admin-ajax.php".format(self.__domain),
            "domain": values['domain'],
            "data": {
                "slug": values['slug'],  # 插件名称 nginx-helper
                "action": values['wp_action'],  # action已经被面板占用 install-plugin
                "_ajax_nonce": self.__ajax_nonce,
                "_fs_nonce": "",
                "username": "",
                "password": "",
                "connection_type": "",
                "public_key": "",
                "private_key": ""
            }
        }
        res = self.action_plugin_post(param)
        return res

    def get_smart_http_expire_form_nonce(self, url):
        public.writeFile('/tmp/2', str(self.action_plugin_get(url)))
        smart_http_expire_form_nonce = '"smart_http_expire_form_nonce\\\"\\svalue=\\\"(\\w+)'
        return self.get_wp_nonce(self.action_plugin_get(url), smart_http_expire_form_nonce)

    def set_nginx_helper(self, values):
        self.write_logs("|-Setting up [nginx-helper] plugin cache rules...")
        self.__load_cookies(values['s_id'])
        url = "http://{}/wp-admin/options-general.php?page=nginx".format(self.__domain)
        param = {
            "url": url,
            "domain": values['domain'],
            "data": {
                "enable_purge": "1",
                "is_submit": "1",
                "cache_method": "enable_fastcgi",
                "purge_method": "unlink_files",
                "redis_hostname": "127.0.0.1",
                "redis_port": "6379",
                "redis_prefix": "nginx-cache",
                "purge_homepage_on_edit": "1",
                "purge_homepage_on_del": "1",
                "purge_page_on_mod": "1",
                "purge_page_on_new_comment": "1",
                "purge_page_on_deleted_comment": "1",
                "purge_archive_on_edit": "1",
                "purge_archive_on_del": "1",
                "purge_archive_on_new_comment": "1",
                "purge_archive_on_deleted_comment": "1",
                "purge_url": "",
                "log_level": "INFO",
                "log_filesize": "5",
                "smart_http_expire_form_nonce": self.get_smart_http_expire_form_nonce(url),
                "smart_http_expire_save": "Save All Changes"
            }
        }
        return self.action_plugin_post(param)

    def act_nginx_helper_active(self, values):
        self.write_logs("|-activating [nginx-helper] plugin...")
        self.__load_cookies(values['s_id'])
        values['active_id'] = "activate-nginx-helper"
        self.get_plugin_page()
        res = self.get_plugin_url(values['active_id'], self.__plugin_page_content)
        url = 'http://{}/wp-admin/{data}'.format(self.__domain,
                                                 data=str(str(res[0]).split('"')[5]))
        url = url.replace('amp;', '')
        return self.action_plugin_get(url)

    def get_plugin_url(self, active_id, content):
        # soup = BeautifulSoup(content)
        soup = BeautifulSoup(content, features="html.parser")
        res = soup.find_all(id=active_id)
        return res

    def generate_wp_passwd(self, site_path, new_pass):
        hash_password_code = public.readFile("{}/wp-includes/class-phpass.php".format(site_path))
        extra_code = """
         $passwordValue = "%s";
         $wp_hasher = new PasswordHash(8, TRUE);
         $sigPassword = $wp_hasher->HashPassword($passwordValue);
         $data = $wp_hasher->CheckPassword($passwordValue,$sigPassword);
         if($data){
             echo 'True|'.$sigPassword;;
         }else{
         	echo 'False|'.$sigPassword;;
         }
         """ % new_pass
        php_code = hash_password_code + extra_code
        public.writeFile(self.__php_tmp_file, php_code)
        a, e = public.ExecShell("php -f {}".format(self.__php_tmp_file))
        os.remove(self.__php_tmp_file)
        res = a.split('|')
        if res[0] == "True":
            return public.return_message(0, 0, res[1])
        return public.return_message(-1, 0, public.lang("Generated password detection failed!"))

    def get_cache_status(self, s_id):
        """
        s_id 网站id
        """
        import data
        site_info = public.M('sites').where('id=?', (s_id,)).field('name').find()

        if not isinstance(site_info, dict):
            return False

        site_name = site_info['name']

        # 获取WP站点绑定的PHP可执行文件
        from public import websitemgr

        php_v = websitemgr.get_site_php_version(site_name).replace('.', '')

        from wp_toolkit import wpfastcgi_cache
        return wpfastcgi_cache().get_fast_cgi_status(site_name, php_v)

        # return fast_cgi().get_fast_cgi_status(site_name, php_v)

    ##############################对外接口—BEGIN##############################
    def get_wp_username(self, get):
        """
        s_id 网站ID
        """
        # 校验参数
        try:
            get.validate([
                Param('s_id').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        values = self.check_param(get)
        if values['status'] == -1:
            return values
        values = values['message']
        db_info = public.M('wordpress_onekey').where('s_id=?', (values['s_id'],)).find()
        db_name = public.M('databases').where('id=?', (db_info['d_id'],)).field('name').find()
        if "name" not in db_name:
            return public.return_message(-1, 0, public.lang("The database of this wordpress was not found, this may be caused by the fact that you have manually deleted the database"))
        db_name = db_name['name']
        mysql_obj = panelMysql.panelMysql()
        res = mysql_obj.query('select * from {}.{}users'.format(
            db_name, db_info['prefix']))
        if hasattr(res, '__iter__'):
            return public.return_message(0, 0, [i[1] for i in res])
        else:
            return public.return_message(-1, 0,
                                         "Site database [{}] failed to query users, try setting the database for this site. Error: {}".format(
                                             db_name, res))

    def reset_wp_password(self, get):
        """
        重置wordpress用户密码
        s_id 网站ID
        user 要重置的用户名
        new_pass
        """
        # 校验参数
        try:
            get.validate([
                Param('user').String(),
                Param('new_pass').String(),
                Param('s_id').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        values = self.check_param(get)
        if values['status'] == -1:
            return values
        values = values['message']
        s_id = values['s_id']
        db_info = public.M('wordpress_onekey').where('s_id=?', (s_id,)).find()
        db_name = public.M('databases').where('id=?', (db_info['d_id'],)).field('name').find()['name']
        path = public.M('sites').where('id=?', (s_id,)).field('path').find()['path']
        new_pass = values['new_pass']
        passwd = self.generate_wp_passwd(path, new_pass)
        if passwd['status'] == -1:
            return passwd

        passwd = passwd['message']

        if isinstance(passwd, dict):
            passwd = passwd.get('result', None)

        if passwd is None:
            return public.fail_v2('Reset password failed')

        mysql_obj = panelMysql.panelMysql()
        sql = 'update {}.{}users set user_pass = "{}" where user_login = "{}"'.format(
            db_name, db_info['prefix'], passwd, values['user'])
        mysql_obj.execute(sql)
        return public.return_message(0, 0, public.lang("Password reset successful"))

    # 获取WP可用版本列表
    def get_wp_available_versions(self, args: public.dict_obj):
        # 校验参数
        try:
            args.validate([
                Param('php_version_short').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        from wp_toolkit import wp_version

        versions = list(map(lambda x: {
            'locale': x['locale'],
            'version': x['version'],
            'php_version': x['php_version'],
            'mysql_version': x['mysql_version'],
        }, wp_version().latest_versions()))

        if 'php_version_short' in args:
            versions = list(
                filter(lambda x: int(args.php_version_short) >= int(''.join(str(x['php_version']).split('.')[:2])),
                       versions))

        return public.success_v2(versions)

    # 获取WP网站本地版本号
    def get_wp_version(self, s_id):
        """获取wordpress本地版本
        s_id 网站id
        """
        path = public.M('sites').where('id=?', (s_id,)).field('path').find()['path']
        conf_file = "{}/wp-includes/version.php".format(path)
        conf = public.readFile(conf_file)
        try:
            version = re.search('\\$wp_version\\s*=\\s*[\'\"]{1}([\\d\\.]*)[\'\"]{1}', conf).groups(1)[0]
        except:
            version = "00"
        return version

    # 获取WP已发布的最新版本号
    def get_wp_version_online(self):
        """获取wordpress线上版本"""
        url = "http://wordpress.org/download/"
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36'
        }
        result = requests.get(url, headers=headers)
        result = re.search(r'Download\s+WordPress\s+([\d\.]+)', result.text)
        if result:
            return result.group(1)
        return "00"

    # 检查WP是否有新版本可以更新
    def is_update(self, get):
        """
        s_id 网站ID
        """
        values = self.check_param(get)
        if values['status'] == -1:
            return values
        values = values['message']
        online_v = self.get_wp_version_online()
        local_v = self.get_wp_version(values['s_id'])
        update = False
        if str(online_v) != str(local_v):
            update = True
        data = {
            "online_v": online_v,
            "local_v": local_v,
            "update": update
        }
        return_message = public.return_msg_gettext(True, data)
        del return_message['status']
        return public.return_message(0, 0, return_message['msg'])

    def purge_all_cache(self, get):
        """
        清理所有缓存
        """
        # 校验参数
        try:
            get.validate([
                Param('s_id').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        if public.get_webserver() != "nginx":
            return public.return_message(-1, 0, public.lang("This feature currently only supports Nginx"))

        from wp_toolkit import wpmgr
        wpmgr(get.s_id).purge_cache_with_nginx_helper()

        # if self.__IS_PRO_MEMBER:
        #     from wp_toolkit import wpmgr
        #     wpmgr(get.s_id).purge_cache_with_nginx_helper()
        # else:
        #     cache_dir = "/dev/shm/nginx-cache/wp"
        #     public.ExecShell("rm -rf {}/*".format(cache_dir))

        return public.return_message(0, 0, public.lang("Cleaned up successfully!"))

    def set_fastcgi_cache(self, get):
        """
        设置缓存
        version php版本
        sitename 完整名
        act disable/enable 开启关闭缓存
        """
        # 校验参数
        try:
            get.validate([
                Param('version').String(),
                Param('sitename').String(),
                Param('act').String(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        if public.get_webserver() != "nginx":
            return public.return_message(-1, 0, public.lang("This feature currently only supports Nginx"))
        values = self.check_param(get)
        if values['status'] == -1:
            return values
        values = values['message']
        if '.' in values['version']:
            values['version'] = values['version'].replace('.', '')

        from wp_toolkit import wpfastcgi_cache

        ok, msg = wpfastcgi_cache().set_website_conf(values['version'], values['sitename'], values['act'],
                                                     immediate=True)

        if not ok:
            return public.fail_v2(msg)

        return public.success_v2(msg)

        # if self.__IS_PRO_MEMBER:
        #     from wp_toolkit import wpfastcgi_cache
        #
        #     ok, msg = wpfastcgi_cache().set_website_conf(values['version'], values['sitename'], values['act'], immediate=True)
        #
        #     if not ok:
        #         return public.fail_v2(msg)
        #
        #     return public.success_v2(msg)
        # else:
        #     return fast_cgi().set_website_conf(values['version'], values['sitename'], values['act'])

    # 使用网站ID查询WP站点域名
    def get_wp_auth(self, s_id):
        domain = public.M('domain').where("pid=?", (s_id,)).field('name').find()['name']
        info = public.M('wordpress_onekey').where("s_id=?", (s_id,)).find()
        self.wp_user = info['user']
        self.wp_passwd = info['pass']
        return domain

    # 更新WP版本
    def update_wp(self, args):
        """更新wordpress版本
        s_id 网站ID
        version 需要更新的版本
        """
        from wp_toolkit import wpmgr

        # 校验参数
        try:
            args.validate([
                Param('s_id').Require().Integer('>', 0),
                Param('version').Require().Regexp(r'^\d+(?:\.\d+)+$'),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        ok, msg = wpmgr(args.s_id).update_version(args.version)

        if not ok:
            return public.fail_v2(msg)

        return public.success_v2('Upgrade to {}'.format(msg))

        # if self.__IS_PRO_MEMBER:
        #     from wp_toolkit import wpmgr
        #
        #     # 校验参数
        #     try:
        #         args.validate([
        #             Param('s_id').Require().Integer('>', 0),
        #             Param('version').Require().Regexp(r'^\d+(?:\.\d+)+$'),
        #         ], [
        #             public.validate.trim_filter(),
        #         ])
        #     except Exception as ex:
        #         public.print_log("error info: {}".format(ex))
        #         return public.return_message(-1, 0, str(ex))
        #
        #     ok, msg = wpmgr(args.s_id).update_version(args.version)
        #
        #     if not ok:
        #         return public.fail_v2(msg)
        #
        #     return public.success_v2('Upgrade to {}'.format(msg))
        # else:
        #     self.__load_cookies(args.s_id)
        #     param = {
        #         "url": "http://{}/wp-admin/update-core.php?action=do-core-upgrade".format(self.__domain),
        #         "domain": self.__domain,
        #         "data": {
        #             # "_wpnonce":get._wpnonce,
        #             "_wp_http_referer": '/wp-admin/update-core.php',
        #             "version": args.version,
        #             "locale": "en_US",
        #             "upgrade": "Update to version {}".format(args.version)
        #         }
        #     }
        #     self.action_plugin_post(param)
        #     return_message=public.return_msg_gettext(True, 'Updated successfully!')
        #     del return_message['status']
        #     return public.return_message(0,0, return_message['msg'])

    # 获取可用的语言列表
    def get_language(self, get=None):
        language = {
            "en": "English (United States)",
            "af": "Afrikaans",
            "am": "አማርኛ",
            "ar": "العربية",
            "ary": "العربية المغربية",
            "as": "অসমীয়া",
            "az": "Azərbaycan dili",
            "azb": "گؤنئی آذربایجان",
            "bel": "Беларуская мова",
            "bg_BG": "Български",
            "bn_BD": "বাংলা",
            "bo": "བོད་ཡིག",
            "bs_BA": "Bosanski",
            "ca": "Català",
            "ceb": "Cebuano",
            "cs_CZ": "Čeština",
            "cy": "Cymraeg",
            "da_DK": "Dansk",
            "de_CH_informal": "Deutsch (Schweiz, Du)",
            "de_DE": "Deutsch",
            "de_DE_formal": "Deutsch (Sie)",
            "de_CH": "Deutsch (Schweiz)",
            "de_AT": "Deutsch (Österreich)",
            "dsb": "Dolnoserbšćina",
            "dzo": "རྫོང་ཁ",
            "el": "Ελληνικά",
            "en_US": "English (United States)",
            "en_NZ": "English (New Zealand)",
            "en_AU": "English (Australia)",
            "en_CA": "English (Canada)",
            "en_GB": "English (UK)",
            "en_ZA": "English (South Africa)",
            "eo": "Esperanto",
            "es_ES": "Español",
            "es_EC": "Español de Ecuador",
            "es_CO": "Español de Colombia",
            "es_AR": "Español de Argentina",
            "es_DO": "Español de República Dominicana",
            "es_PE": "Español de Perú",
            "es_CR": "Español de Costa Rica",
            "es_UY": "Español de Uruguay",
            "es_CL": "Español de Chile",
            "es_PR": "Español de Puerto Rico",
            "es_VE": "Español de Venezuela",
            "es_GT": "Español de Guatemala",
            "es_MX": "Español de México",
            "et": "Eesti",
            "eu": "Euskara",
            "fa_IR": "فارسی",
            "fa_AF": "(فارسی (افغانستان",
            "fi": "Suomi",
            "fr_FR": "Français",
            "fr_BE": "Français de Belgique",
            "fr_CA": "Français du Canada",
            "fur": "Friulian",
            "gd": "Gàidhlig",
            "gl_ES": "Galego",
            "gu": "ગુજરાતી",
            "haz": "هزاره گی",
            "he_IL": "עִבְרִית",
            "hi_IN": "हिन्दी",
            "hr": "Hrvatski",
            "hsb": "Hornjoserbšćina",
            "hu_HU": "Magyar",
            "hy": "Հայերեն",
            "id_ID": "Bahasa Indonesia",
            "is_IS": "Íslenska",
            "it_IT": "Italiano",
            "ja": "日本語",
            "jv_ID": "Basa Jawa",
            "ka_GE": "ქართული",
            "kab": "Taqbaylit",
            "kk": "Қазақ тілі",
            "km": "ភាសាខ្មែរ",
            "kn": "ಕನ್ನಡ",
            "ko_KR": "한국어",
            "ckb": "كوردی‎",
            "lo": "ພາສາລາວ",
            "lt_LT": "Lietuvių kalba",
            "lv": "Latviešu valoda",
            "mk_MK": "Македонски јазик",
            "ml_IN": "മലയാളം",
            "mn": "Монгол",
            "mr": "मराठी",
            "ms_MY": "Bahasa Melayu",
            "my_MM": "ဗမာစာ",
            "nb_NO": "Norsk bokmål",
            "ne_NP": "नेपाली",
            "nl_NL_formal": "Nederlands (Formeel)",
            "nl_NL": "Nederlands",
            "nl_BE": "Nederlands (België)",
            "nn_NO": "Norsk nynorsk",
            "oci": "Occitan",
            "pa_IN": "ਪੰਜਾਬੀ",
            "pl_PL": "Polski",
            "ps": "پښتو",
            "pt_BR": "Português do Brasil",
            "pt_AO": "Português de Angola",
            "pt_PT_ao90": "Português (AO90)",
            "pt_PT": "Português",
            "rhg": "Ruáinga",
            "ro_RO": "Română",
            "ru_RU": "Русский",
            "sah": "Сахалыы",
            "snd": "سنڌي",
            "si_LK": "සිංහල",
            "sk_SK": "Slovenčina",
            "skr": "سرائیکی",
            "sl_SI": "Slovenščina",
            "sq": "Shqip",
            "sr_RS": "Српски језик",
            "sv_SE": "Svenska",
            "sw": "Kiswahili",
            "szl": "Ślōnskŏ gŏdka",
            "ta_IN": "தமிழ்",
            "ta_LK": "தமிழ்",
            "te": "తెలుగు",
            "th": "ไทย",
            "tl": "Tagalog",
            "tr_TR": "Türkçe",
            "tt_RU": "Татар теле",
            "tah": "Reo Tahiti",
            "ug_CN": "ئۇيغۇرچە",
            "uk": "Українська",
            "ur": "اردو",
            "uz_UZ": "O‘zbekcha",
            "vi": "Tiếng Việt",
            "zh_TW": "繁體中文",
            "zh_HK": "香港中文版	",
            "zh_CN": "简体中文",
        }
        return_message = public.return_msg_gettext(True, language)
        del return_message['status']
        return public.return_message(0, 0, return_message['msg'])

    # 请求参数校验
    def check_param(self, args):
        """
        @name 检测传入参数
        @author zhwen<2022-03-10>
        """
        # 检查email格式
        rep_email = r"[\w!#$%&'*+/=?^_`{|}~-]+(?:\.[\w!#$%&'*+/=?^_`{|}~-]+)*@(?:[\w](?:[\w-]*[\w])?\.)+[\w](?:[\w-]*[\w])?"
        # 检查域名格式
        rep_domain = r"^(?=^.{3,255}$)[a-zA-Z0-9\_\-][a-zA-Z0-9\_\-]{0,62}(\.[a-zA-Z0-9\_\-][a-zA-Z0-9\_\-]{0,62})+$"
        values = {}
        if hasattr(args, 'd_id'):
            if isinstance(args.d_id, int) or re.search(r'\d+', args.d_id):
                values["d_id"] = args.d_id
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "d_id", "99"))
        if hasattr(args, 's_id'):
            if isinstance(args.s_id, int) or re.search(r'\d+', args.s_id):
                values["s_id"] = args.s_id
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "s_id", "99"))
        if hasattr(args, 'language'):
            if args.language in self.get_language()['message']:
                values['language'] = args.language
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "language", "en"))
        if hasattr(args, 'domain'):
            if re.search(rep_domain, args.domain):
                values['domain'] = public.xssencode2(args.domain)
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "domain", "aapanel.com"))
        if hasattr(args, 'weblog_title'):
            values['weblog_title'] = public.xssencode2(args.weblog_title)
        if hasattr(args, 'user_name'):
            values['user_name'] = public.xssencode2(args.user_name)
        if hasattr(args, 'admin_password'):
            values['admin_password'] = public.xssencode2(args.admin_password)
        if hasattr(args, 'pw_weak'):
            if args.pw_weak in ['on', 'off']:
                values['pw_weak'] = args.pw_weak
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "pw_weak", "on/off"))
        if hasattr(args, 'admin_email'):
            if re.search(rep_email, args.admin_email):
                values['admin_email'] = public.xssencode2(args.admin_email)
            else:
                return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "admin_email", "[email protected]"))
        if hasattr(args, 'prefix'):
            values['prefix'] = public.xssencode2(args.prefix)
        if hasattr(args, 'php_version'):
            values['php_version'] = public.xssencode2(args.php_version)
        if hasattr(args, 'enable_cache'):
            values['enable_cache'] = public.xssencode2(args.enable_cache)
        if hasattr(args, 'wp_action'):
            values['slug'] = public.xssencode2(args.slug)
        if hasattr(args, 'slug'):
            values['wp_action'] = public.xssencode2(args.wp_action)
        if hasattr(args, 'active_id'):
            values['active_id'] = public.xssencode2(args.active_id)
        if hasattr(args, 'new_pass'):
            values['new_pass'] = public.xssencode2(args.new_pass)
        if hasattr(args, 'user'):
            values['user'] = public.xssencode2(args.user)
        if hasattr(args, 'sitename'):
            values['sitename'] = public.xssencode2(args.sitename)
        if hasattr(args, 'act'):
            values['act'] = public.xssencode2(args.act)
        if hasattr(args, 'version'):
            values['version'] = public.xssencode2(args.version)
        return public.return_message(0, 0, values)

    # 删除网站
    def del_site(self, get):
        import panelSite
        p = panelSite.panelSite()
        site_info = public.M('sites').where('id=?', (get.s_id,)).find()
        get.id = get.s_id
        get.webname = site_info['name']
        get.ftp = "1"
        get.database = "1"
        get.path = "1"
        p.DeleteSite(get)

    # 安装WP
    def deploy_wp(self, get):
        """
        d_id                    数据据库ID
        s_id                    网站ID
        language                部署wordpress后的语言
        weblog_title            wordpress博客名
        user_name               wordpress后台用户名
        admin_password          管理员密码
        admin_password2
        pw_weak                 允许弱密码
        admin_email             管理员邮箱
        prefix                  wordpress数据表前缀
        php_version             php版本
        enable_cache            开启缓存
        package_version         Wordpress版本
        @name 部署wordpress
        @author zhwen<2022-03-10>
        """
        # 校验参数
        try:
            get.validate([
                Param('domain').String().Host(),
                Param('weblog_title').String(),
                Param('language').String(),
                Param('php_version').String(),
                Param('user_name').String(),
                Param('admin_email').String().Email(),
                Param('prefix').String(),
                Param('pw_weak').String(),
                Param('admin_password').String(),
                Param('enable_cache').Integer(),
                Param('d_id').Integer(),
                Param('s_id').Integer(),
                Param('package_version').String(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        try:
            self.write_logs('', clean=True)
            values = self.check_param(get)
            if values['status'] == -1:
                # 删除自动创建的空白网站
                self.del_site(get)
                return values
            values = values['message']
            s_id = values['s_id']  # 网站ID
            d_id = values['d_id']  # 数据库ID
            prefix = values['prefix']  # 前缀
            site_info = public.M('sites').where('id=?', (s_id,)).find()
            print("Get site info:\n ID: {}\npath: {}\n".format(s_id, site_info['path']))
            self.write_logs("""
 |-Get website information:
     ID: {}
     Path: {}

 """.format(s_id, site_info['path']))
            db_info = public.M('databases').where('id=?', (d_id,)).find()

            print("Get database information: \nID: {}\nDBName: {}\nUser: {}\nPassWD: {}".format(
                d_id, db_info["name"], db_info["username"], db_info["password"]))
            self.write_logs("""
 |-Get database information: 
     ID: {}
     DBName: {}
     User: {}
     PassWD: {}

 """.format(d_id, db_info["name"], db_info["username"], db_info["password"]))
            values['dbname'] = db_info["name"]
            self.wp_user = values['user_name']
            self.wp_passwd = values['admin_password']
            values['db_user'] = db_info["username"]
            values['db_pwd'] = db_info["password"]
            values['site_path'] = site_info['path']
            values['site_name'] = site_info['name']

            # 开始下载安装包
            from wp_toolkit import wp_version

            wp_version_obj = wp_version()

            if 'package_version' not in get or get.package_version is None:
                get.package_version = wp_version_obj.latest_version()['version']

            self.write_logs('|-Package version: {}'.format(get.package_version))

            # 下载特定版本的安装包
            self.package_zip = wp_version_obj.download_package(get.package_version)

            self.unzip_package(site_info['path'])

            # 优化PHP
            res = optimize_php().optimize_php(get)
            if not res['status']:
                return public.return_message(-1, 0, res)

            # 初始化wp
            self.init_wp(values)
            self.write_db(s_id, d_id, prefix, get.user_name, get.admin_password)

            # 优化mysql
            optimize_db().self_db_cache(get)

            # 设置fastcgi缓存
            if int(get.enable_cache) == 1 and public.get_webserver() == 'nginx':
                from wp_toolkit import wpmgr, wpfastcgi_cache

                self.write_logs('|-WP Plugin nginx-helper installing...')
                wpmgr(s_id).init_plugin_nginx_helper()
                self.write_logs('|-WP Plugin nginx-helper installation succeeded')

                # 配置Nginx-fastcgi-cache
                wpfastcgi_cache().set_fastcgi(values['site_path'], values['site_name'], values['php_version'])

                # if self.__IS_PRO_MEMBER:
                #     from wp_toolkit import wpmgr, wpfastcgi_cache
                #
                #     # 配置Nginx-fastcgi-cache
                #     wpfastcgi_cache().set_fastcgi(values['site_path'], values['site_name'], values['php_version'])
                #
                #     self.write_logs('|-WP Plugin nginx-helper installing...')
                #     wpmgr(s_id).init_plugin_nginx_helper()
                #     self.write_logs('|-WP Plugin nginx-helper installation succeeded')
                # else:
                #     # 安装nginxHelper
                #     #  slug nginx-helper
                #     values['slug'] = "nginx-helper"
                #     values['wp_action'] = "install-plugin"
                #     self.install_plugin(values)
                #     self.act_nginx_helper_active(values)
                #
                #     # 安装并启用nginx-helper插件
                #     self.set_nginx_helper(get)

            # 设置登录入口保护
            if int(get.get('enable_whl', 0)) == 1:
                from wp_toolkit import wpmgr

                # 安装并启用wps-hide-login插件
                self.write_logs('|-WP Plugin wps-hide-login installing...')
                wpmgr(s_id).init_plugin_wps_hide_login(get.get('whl_page', 'login'),
                                                       get.get('whl_redirect_admin', '404'))
                self.write_logs('|-WP Plugin wps-hide-login installation succeeded')

            public.ServiceReload()

            self.write_logs("\n\n\n|-Deployment was successful!")

            return public.return_message(0, 0, public.lang("Deployment was successful!"))
        except:
            self.del_site(get)
            from traceback import format_exc
            public.print_log(format_exc())
            return public.return_message(-1, 0, public.lang("Deployment failed!"))

    # 重新关联WP网站数据库
    def reset_wp_db(self, args):
        """
        :param args db_name 数据库名
        :param args site_id 网站ID
        """
        db_name = public.xssencode2(args.db_name)
        try:
            site_id = int(args.site_id)
        except:
            return public.return_message(-1, 0, public.lang("Site ID must be numeric"))
        db_info = public.M("databases").where("name=?", (db_name,)).field('id').find()
        if 'id' not in db_info:
            return public.return_message(-1, 0, public.lang("This database was not found!"))
        pdata = {
            "d_id": db_info['id']
        }
        public.M('wordpress_onekey').where("s_id=?", (site_id,)).update(pdata)
        return public.return_message(0, 0, public.lang("Setup successfully!"))

    # 获取WP Toolkit配置信息
    def get_wp_configurations(self, args: public.dict_obj):
        # 校验参数
        try:
            args.validate([
                Param('s_id').Require().Integer('>', 0),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        from wp_toolkit import wpmgr
        wpmgr_obj = wpmgr(args.s_id)
        wp_local_version = wpmgr_obj.get_local_version()
        wp_latest_version = wpmgr_obj.get_latest_version(available=True).get('version', '')
        can_upgrade = wp_latest_version > wp_local_version

        wp_toolkit_config_data = wpmgr_obj.get_wp_toolkit_config_data()

        return public.success_v2({
            'local_version': wp_local_version,
            'latest_version': wp_latest_version,
            'can_upgrade': can_upgrade,
            'language': wp_toolkit_config_data['locale'],
            'login_url': wp_toolkit_config_data['login_url'],
            'home_url': wp_toolkit_config_data['site_url'],
            'cache_enabled': self.get_cache_status(args.s_id),
            'admin_user': wp_toolkit_config_data['admin_info']['user_login'],
            'admin_email': wp_toolkit_config_data['admin_info']['user_email'],
            'whl_enabled': wp_toolkit_config_data['whl_config'].get('activated', False),
            'whl_page': wp_toolkit_config_data['whl_config'].get('whl_page', 'login'),
            'whl_redirect_admin': wp_toolkit_config_data['whl_config'].get('whl_redirect_admin', '404'),
        })

    # 保存WP Toolkit配置
    def save_wp_configurations(self, args: public.dict_obj):
        # 校验参数
        try:
            args.validate([
                Param('s_id').Require().Integer('>', 0),
                Param('language').String('in', list(self.get_language(args)['message'].keys())),
                Param('admin_password').String('>=', 8),
                Param('admin_email').Email(),
                Param('whl_enabled').Integer('in', [0, 1]),
                Param('whl_page').SafePath(),
                Param('whl_redirect_admin').SafePath(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        from wp_toolkit import wpmgr

        wpmgr_obj = wpmgr(args.s_id)

        # 更新语言
        if 'language' in args:
            locale = wpmgr_obj.get_local_language()
            if args.language != locale:
                wpmgr_obj.update_language(args.language)

                # 写操作日志
                wpmgr.log_opt('Change language from [{}] to [{}] successfully', (locale, args.language))

        # 更新管理员密码
        if 'admin_password' in args:
            wpmgr_obj.set_admin_password(args.admin_password)

            # 写操作日志
            wpmgr.log_opt('Reset admin password successfully')

        # 更新管理员邮箱
        if 'admin_email' in args:
            wpmgr_obj.set_admin_email(args.admin_email)

            # 写操作日志
            wpmgr.log_opt('Reset admin email to [{}] successfully', (args.admin_email,))

        # 更新WPS-Hide-Login插件配置
        if 'whl_enabled' in args:
            # 停用插件
            if int(args.whl_enabled) == 0:
                wpmgr_obj.deactivate_plugins('wps-hide-login/wps-hide-login.php')

                # 写操作日志
                wpmgr.log_opt('Deactivate plugin [{}] successfully', ('WPS Hide Login',))

            # 启用插件
            else:
                wpmgr_obj.config_plugin_wps_hide_login(args.get('whl_page', 'login'),
                                                       args.get('whl_redirect_admin', '404'))

                # 写操作日志
                wpmgr.log_opt('Activate plugin [{}] successfully', ('WPS Hide Login',))

        return public.success_v2('Update successfully')


##############################对外接口-END##############################

class optimize_php:

    def optimize_php(self, get):
        get.version = get.php_version
        # 安装缓存插件
        # self.install_ext(get)
        # 根据主机性能调整参数
        return self.php_fpm(get)

    def install_ext(self, get):
        exts = ['opcache']
        import files
        mfile = files.files()
        for ext in exts:
            # print("开始安装php扩展 [{}]...".format(ext))

            if ext == 'pathinfo':
                import config
                con = config.config()
                get.version = get.php_version
                get.type = 'on'
                con.setPathInfo(get)
            else:
                get.name = ext
                get.version = get.php_version
                get.type = '1'
                mfile.InstallSoft(get)

    # 优化phpfpm
    def php_fpm(self, get):
        one_key_wp().write_logs("|-Start tuning PHP FPM parameters...")
        mem_total = int(get_mem())
        if mem_total <= 1024:
            get.max_children = '30'
            get.start_servers = '5'
            get.min_spare_servers = '5'
            get.max_spare_servers = '20'
        elif 1024 < mem_total <= 2048:
            get.max_children = '50'
            get.start_servers = '5'
            get.min_spare_servers = '5'
            get.max_spare_servers = '30'
        elif 2048 < mem_total <= 4098:
            get.max_children = '80'
            get.start_servers = '10'
            get.min_spare_servers = '10'
            get.max_spare_servers = '30'
        elif 4098 < mem_total <= 8096:
            get.max_children = '120'
            get.start_servers = '10'
            get.min_spare_servers = '10'
            get.max_spare_servers = '30'
        elif 8096 < mem_total <= 16192:
            get.max_children = '200'
            get.start_servers = '15'
            get.min_spare_servers = '15'
            get.max_spare_servers = '50'
        elif 16192 < mem_total <= 32384:
            get.max_children = '300'
            get.start_servers = '20'
            get.min_spare_servers = '20'
            get.max_spare_servers = '50'
        elif 32384 < mem_total:
            get.max_children = '500'
            get.start_servers = '20'
            get.min_spare_servers = '20'
            get.max_spare_servers = '50'
        # get.version = self.version
        import config
        current_conf = config.config().getFpmConfig(get)
        get.pm = current_conf['pm']
        get.listen = current_conf['unix']
        one_key_wp().write_logs("""
 ===================PHP FPM parameters=======================

     max_children: {}
     start_servers: {}
     min_spare_servers: {}
     max_spare_servers: {}
     Running mode: {}
     Connection: {}

 ===================PHP FPM parameters=======================


 """.format(get.max_children, get.start_servers, get.min_spare_servers, get.max_spare_servers, get.pm, get.listen))
        # self.backup_conf('/www/server/php/{}/etc/php-fpm.conf'.format(self.version))
        result = config.config().setFpmConfig(get)
        if not result['status']:
            one_key_wp().write_logs("|-PHP FPM Optimization failed: {}".format(result))
            return public.return_msg_gettext(False, "PHP FPM Optimization failed: {}", (result,))
        one_key_wp().write_logs("|-PHP FPM optimization succeeded")
        return public.return_msg_gettext(True, public.lang("PHP FPM optimization succeeded"))


class optimize_db:

    def self_db_cache(self, get):
        one_key_wp().write_logs("|-Start optimizing Mysql")
        mem_total = int(get_mem())
        if mem_total <= 2048:
            get.key_buffer_size = '128'
            get.tmp_table_size = '64'
            get.innodb_buffer_pool_size = '256'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '768'
            get.read_buffer_size = '768'
            get.read_rnd_buffer_size = '512'
            get.join_buffer_size = '1024'
            get.thread_stack = '256'
            get.binlog_cache_size = '64'
            get.thread_cache_size = '64'
            get.table_open_cache = '128'
            get.max_connections = '100'
            get.max_heap_table_size = '64'
        elif 2048 < mem_total <= 4096:
            get.key_buffer_size = '256'
            get.tmp_table_size = '384'
            get.innodb_buffer_pool_size = '384'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '768'
            get.read_buffer_size = '768'
            get.read_rnd_buffer_size = '512'
            get.join_buffer_size = '2048'
            get.thread_stack = '256'
            get.binlog_cache_size = '64'
            get.thread_cache_size = '96'
            get.table_open_cache = '192'
            get.max_connections = '200'
            get.max_heap_table_size = '384'
        elif 4096 < mem_total <= 8192:
            get.key_buffer_size = '384'
            get.tmp_table_size = '512'
            get.innodb_buffer_pool_size = '512'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '1024'
            get.read_buffer_size = '1024'
            get.read_rnd_buffer_size = '768'
            get.join_buffer_size = '2048'
            get.thread_stack = '256'
            get.binlog_cache_size = '128'
            get.thread_cache_size = '128'
            get.table_open_cache = '384'
            get.max_connections = '300'
            get.max_heap_table_size = '512'
        elif 8192 < mem_total <= 16384:
            get.key_buffer_size = '512'
            get.tmp_table_size = '1024'
            get.innodb_buffer_pool_size = '1024'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '2048'
            get.read_buffer_size = '2048'
            get.read_rnd_buffer_size = '1024'
            get.join_buffer_size = '4096'
            get.thread_stack = '384'
            get.binlog_cache_size = '192'
            get.thread_cache_size = '192'
            get.table_open_cache = '1024'
            get.max_connections = '400'
            get.max_heap_table_size = '1024'
        elif 16384 < mem_total <= 32768:
            get.key_buffer_size = '1024'
            get.tmp_table_size = '2048'
            get.innodb_buffer_pool_size = '4096'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '4096'
            get.read_buffer_size = '4096'
            get.read_rnd_buffer_size = '2048'
            get.join_buffer_size = '8192'
            get.thread_stack = '512'
            get.binlog_cache_size = '256'
            get.thread_cache_size = '256'
            get.table_open_cache = '2048'
            get.max_connections = '500'
            get.max_heap_table_size = '2048'
        elif 32768 < mem_total:
            get.key_buffer_size = '2048'
            get.tmp_table_size = '4096'
            get.innodb_buffer_pool_size = '8192'
            get.innodb_log_buffer_size = '16'
            get.sort_buffer_size = '8192'
            get.read_buffer_size = '8192'
            get.read_rnd_buffer_size = '4096'
            get.join_buffer_size = '16384'
            get.thread_stack = '1024'
            get.binlog_cache_size = '512'
            get.thread_cache_size = '512'
            get.table_open_cache = '2048'
            get.max_connections = '1000'
            get.max_heap_table_size = '4096'
        one_key_wp().write_logs("""
 =====================Mysql parameters=======================

     key_buffer_size: {}
     tmp_table_size: {}
     innodb_buffer_pool_size: {}
     innodb_log_buffer_size: {}
     sort_buffer_size: {}
     read_buffer_size: {}
     read_rnd_buffer_size: {}
     join_buffer_size: {}
     thread_stack: {}
     binlog_cache_size: {}
     thread_cache_size: {}
     table_open_cache: {}
     max_connections: {}
     max_heap_table_size: {}

 =====================Mysql parameters=======================

 """.format(get.key_buffer_size, get.tmp_table_size, get.innodb_buffer_pool_size,
            get.innodb_log_buffer_size, get.sort_buffer_size, get.read_buffer_size, get.read_rnd_buffer_size,
            get.join_buffer_size, get.thread_stack, get.binlog_cache_size, get.thread_cache_size, get.table_open_cache,
            get.max_connections, get.max_heap_table_size))
        import database
        result = database.database().SetDbConf(get)
        if not result['status']:
            one_key_wp().write_logs("|-Mysql optimization failed {}".format(result))
            return public.return_msg_gettext(False, "Mysql optimization failed {}", (result,))
        public.ExecShell("/etc/init.d/mysqld restart")
        one_key_wp().write_logs("|-Mysql optimization succeeded")
        return public.return_msg_gettext(True, public.lang("Mysql optimization succeeded"))


# Nginx缓存加速WP站点
class fast_cgi:

    def get_fastcgi_conf(self, version):
        conf = r"""
set $skip_cache 0;

if ($request_method = POST) {
    set $skip_cache 1;
}  

if ($query_string != "") {
    set $skip_cache 1;
} 

if ($request_uri ~* "/wp-admin/|/xmlrpc.php|wp-.*.php|/feed/|index.php|sitemap(_index)?.xml") {
    set $skip_cache 1;
}

if ($http_cookie ~* "comment_author|wordpress_[a-f0-9]+|wp-postpass|wordpress_no_cache|wordpress_logged_in") {
    set $skip_cache 1;
}

location ~ (/[^/]+\.php)(/|$) {
    if ( !-f $document_root$1 ) {
        return 404;
    }

    # try_files $uri =404;
    fastcgi_pass unix:/tmp/php-cgi-%s.sock;
    fastcgi_index index.php;
    include fastcgi.conf;  
    add_header Strict-Transport-Security "max-age=63072000; includeSubdomains; preload";
    fastcgi_cache_bypass $skip_cache;
    fastcgi_no_cache $skip_cache;
    add_header X-Cache "$upstream_cache_status From $host";
    fastcgi_cache WORDPRESS;
    add_header Cache-Control  max-age=0;
    add_header Nginx-Cache "$upstream_cache_status";
    add_header Last-Modified $date_gmt;
    add_header X-Frame-Options SAMEORIGIN;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    etag  on;
    fastcgi_cache_valid 200 301 302 1d;
}

location ~ /purge(/.*) {
    allow 127.0.0.1;
    deny all;
    fastcgi_cache_purge WORDPRESS "$scheme$request_method$host$1";
}
""" % version
        return conf

    def get_fast_cgi_status(self, sitename, php_v):
        if public.get_webserver() != "nginx":
            return False
        conf_path = "/www/server/panel/vhost/nginx/{}.conf".format(sitename)
        if not os.path.exists(conf_path):
            return False
        content = public.readFile(conf_path)
        fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(php_v)
        # return conf_path,fastcgi_conf
        if fastcgi_conf in content:
            return True
        return False

    def set_nginx_conf(self):
        if not os.path.exists("/dev/shm/nginx-cache/wp"):
            os.makedirs("/dev/shm/nginx-cache/wp")
            one_key_wp().set_permission("/dev/shm/nginx-cache")
        conf = """
 #AAPANEL_FASTCGI_CONF_BEGIN
 fastcgi_cache_key "$scheme$request_method$host$request_uri";
 fastcgi_cache_path /dev/shm/nginx-cache/wp levels=1:2 keys_zone=WORDPRESS:100m inactive=60m max_size=1g;
 fastcgi_cache_use_stale error timeout invalid_header http_500;
 fastcgi_ignore_headers Cache-Control Expires Set-Cookie;
 #AAPANEL_FASTCGI_CONF_END
 """
        conf_path = "/www/server/nginx/conf/nginx.conf"
        public.back_file(conf_path)
        content = public.readFile(conf_path)
        if not content:
            return
        if "#AAPANEL_FASTCGI_CONF_BEGIN" in content:
            one_key_wp().write_logs("|-Nginx FastCgi cache configuration already exists")
            print("Nginx FastCgi cache configuration already exists")
            return public.return_msg_gettext(True, public.lang("Nginx FastCgi cache configuration already exists"))
        rep = "http\\s*\n\\s*{"
        content = re.sub(rep, "http\n\t{" + conf, content)
        public.writeFile(conf_path, content)

        # 如果配置出错恢复
        conf_pass = public.checkWebConfig()
        if conf_pass != True:
            public.restore_file(conf_path)
            one_key_wp().write_logs("|-Nginx FastCgi configuration error! {}".format(conf_pass))
            print("Nginx FastCgi configuration error! {}".format(conf_pass))
            return public.return_msg_gettext(False, public.lang("Nginx FastCgi configuration error!"))
        one_key_wp().write_logs("|-Nginx FastCgi cache configuration complete...")
        print("Nginx FastCgi cache configuration complete")
        return public.return_msg_gettext(True, public.lang("Nginx FastCgi cache configuration complete"))

    def set_nginx_init(self):
        # if not os.path.exists("/dev/shm/nginx-cache/wp"):
        #     os.makedirs("/dev/shm/nginx-cache/wp")
        #     one_key_wp().set_permission("/dev/shm/nginx-cache")
        conf2 = """
     #AAPANEL_FASTCGI_CONF_BEGIN
     mkdir -p /dev/shm/nginx-cache/wp
     #AAPANEL_FASTCGI_CONF_END
 """
        init_path = "/etc/init.d/nginx"
        public.back_file(init_path)
        content_init = public.readFile(init_path)
        if not content_init:
            return
        if "#AAPANEL_FASTCGI_CONF_BEGIN" in content_init:
            one_key_wp().write_logs("|-Nginx init FastCgi cache configuration already exists")
            print("Nginx init FastCgi cache configuration already exists")
            return public.return_msg_gettext(True, public.lang("Nginx init FastCgi cache configuration already exists"))
        # content_init = re.sub(r"\$NGINX_BIN -c \$CONFIGFILE", + conf2, content_init)
        rep2 = r"\$NGINX_BIN -c \$CONFIGFILE"
        content_init = re.sub(rep2, conf2 + "        $NGINX_BIN -c $CONFIGFILE", content_init)
        public.writeFile(init_path, content_init)

        # 如果配置出错恢复
        public.ExecShell("/etc/init.d/nginx restart")
        conf_pass = public.is_nginx_process_exists()
        if conf_pass == False:
            public.restore_file(init_path)
            one_key_wp().write_logs("|-Nginx init FastCgi configuration error! {}".format(conf_pass))
            print("Nginx init FastCgi configuration error! {}".format(conf_pass))
            return public.return_msg_gettext(False, public.lang("Nginx init FastCgi configuration error!"))
        one_key_wp().write_logs("|-Nginx init FastCgi cache configuration complete...")
        print("Nginx init FastCgi cache configuration complete")
        return public.return_msg_gettext(True, public.lang("Nginx init FastCgi cache configuration complete"))

    def set_fastcgi_php_conf(self, version):
        conf_path = "/www/server/nginx/conf/enable-php-{}-wpfastcgi.conf".format(version)
        if os.path.exists(conf_path):
            one_key_wp().write_logs("|-Nginx FastCgi PHP configuration already exists...")
            print("Nginx FastCgi PHP configuration already exists")
            return True
        public.writeFile(conf_path, self.get_fastcgi_conf(version))

    def set_website_conf(self, version, sitename, act=None):
        conf_path = "/www/server/panel/vhost/nginx/{}.conf".format(sitename)
        public.back_file(conf_path)
        conf = public.readFile(conf_path)
        if not conf:
            print("Website configuration file does not exist {}".format(conf_path))
            one_key_wp().write_logs("|-Website configuration file does not exist: {}".format(conf_path))
            return public.return_message(-1, 0, False)
        if act == 'disable':
            fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(version)
            if fastcgi_conf not in conf:
                print("FastCgi configuration does not exist in website configuration")
                one_key_wp().write_logs("|-FastCgi configuration does not exist in website configuration, skip")
                return public.return_message(-1, 0, public.lang("FastCgi configuration does not exist in website configuration"))
            rep = r"include\s+enable-php-{}-wpfastcgi.conf;".format(version)
            conf = re.sub(rep, "include enable-php-{}.conf;".format(version), conf)
        else:
            fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(version)
            if fastcgi_conf in conf:
                one_key_wp().write_logs(
                    "|-The FastCgi configuration already exists in the website configuration, skip it")
                return public.return_message(0, 0, public.lang("The FastCgi configuration already exists in the website configuration"))
            rep = r"include\s+enable-php-{}.conf;".format(version)

            one_key_wp().write_logs("|-Current configuration: {}".format(conf))
            one_key_wp().write_logs("|-Regular expression: {}".format(rep))

            conf = re.sub(rep, fastcgi_conf, conf)

            one_key_wp().write_logs("|-Modified configuration: {}".format(conf))
        public.writeFile(conf_path, conf)
        conf_pass = public.checkWebConfig()
        if conf_pass != True:
            public.restore_file(conf_path)
            print("Website FastCgi configuration error {}".format(conf_pass))
            one_key_wp().write_logs("|-Website FastCgi configuration error: {}", (conf_pass,))
            return public.return_message(-1, 0, public.lang("Website FastCgi configuration error!"))
        print("Website FastCgi configuration complete")
        one_key_wp().write_logs("|-Website FastCgi configuration complete...")
        return public.return_message(0, 0, public.lang("Website FastCgi configuration complete"))

    def set_fastcgi(self, values):
        """
        get.version
        get.name
        """
        # 设置nginx启动文件
        self.set_nginx_init()
        # 设置nginx全局配置
        self.set_nginx_conf()
        # 设置fastcgi location
        self.set_fastcgi_php_conf(values['php_version'])
        # 设置网站配置文件
        self.set_website_conf(values['php_version'], values['site_name'])
        # 设置wp的变量用于nginxhelper插件清理缓存
        self.set_wp_nginx_helper(values['site_path'])
        # 设置userini允许访问 /dev/shm/nginx-cache/wp 目录
        self.set_userini(values['site_path'])

    def set_wp_nginx_helper(self, site_path):
        cache_conf = """
 #AAPANEL_FASTCGICACHE_BEGIN
 define('RT_WP_NGINX_HELPER_CACHE_PATH','/dev/shm/nginx-cache/wp');
 #AAPANEL_FASTCGICACHE_END
 """
        conf_file = "{}/wp-config.php".format(site_path)
        conf = public.readFile(conf_file)
        if not conf:
            print("Wordpress configuration file does not exist: {}".format(conf_file))
            one_key_wp().write_logs("|-Wordpress configuration file does not exist: {}".format(conf_file))
            return public.return_msg_gettext(False, "Wordpress configuration file does not exist: {}", (conf_file,))
        if re.search(r'''define\(\s*'RT_WP_NGINX_HELPER_CACHE_PATH'\s*,''', conf):
            # if "RT_WP_NGINX_HELPER_CACHE_PATH" in conf:
            one_key_wp().write_logs("|-Cache cleaning configuration already exists, skip")
            print("Cache cleaning configuration already exists")
            return
        conf = conf.replace("<?php", "<?php" + cache_conf)
        public.writeFile(conf_file, conf)

    def set_userini(self, site_path):
        conf_file = "{}/.user.ini".format(site_path)
        conf = public.readFile(conf_file)
        if not conf:
            print("Anti-cross-site configuration file does not exist: {}".format(conf_file))
            one_key_wp().write_logs("|-Anti-cross-site configuration file does not exist: {}".format(conf_file))
            return public.return_msg_gettext(False, "Anti-cross-site configuration file does not exist: {}",
                                             (conf_file,))
        if "/dev/shm/nginx-cache/wp" in conf:
            print("Anti-cross-site configuration is successful")
            one_key_wp().write_logs("|-Anti-cross-site configuration is successful...")
            return
        public.ExecShell('chattr -i {}'.format(conf_file))
        conf += ":/dev/shm/nginx-cache/wp"
        public.writeFile(conf_file, conf)
        public.ExecShell('chattr +i {}'.format(conf_file))
        one_key_wp().write_logs("|-Anti-cross-site configuration is successful...")


# one_key_wp请求帮助函数
def retry(func, max_retries=10):
    retry_count = 0

    while retry_count < max_retries:
        try:

            return func()  # 调用传入的函数并返回结果
        except Exception as e:
            retry_count += 1
            if retry_count < max_retries:
                public.print_log("Access failed {} times:{}".format(retry_count, e))
                time.sleep(2)  # 添加适当的延迟
            else:
                public.print_log("The maximum number of retries has been reached.")
                return False

    return None

Youez - 2016 - github.com/yon3zu
LinuXploit