403Webshell
Server IP : 104.21.38.3  /  Your IP : 104.23.175.25
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/panel_ssl_v2.py
# coding: utf-8
# -------------------------------------------------------------------
# aaPanel
# -------------------------------------------------------------------
# Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: hwliang <[email protected]>
# -------------------------------------------------------------------

# ------------------------------
# SSL接口
# ------------------------------
from panel_auth_v2 import panelAuth as panelAuth
import public, os, sys, binascii, urllib, json, time, datetime, re
from ssl_manage import SSLManger  # 新的ssl管理

from Crypto import Random
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
import base64
from public.validate import Param

try:
    from BTPanel import cache, session
except:
    pass


class panelSSL:
    # __APIURL = public.GetConfigValue('home') + '/api/Auth'
    # __APIURL2 = public.GetConfigValue('home') + '/api/Cert'
    # __BINDURL = 'https://wafapi.aapanel.com/Auth/GetAuthToken'   # 获取token 获取官网token




    __BINDURL = '{}/api/user'.format(public.OfficialApiBase())  # 获取token 获取官网token
    # __BINDURL = 'http://dev.aapanel.com/api/user'  # 获取token 获取官网token

    __CODEURL = 'https://wafapi.aapanel.com/Auth/GetBindCode'  # 获取绑定验证码
    __UPATH = 'data/userInfo.json'

    # __APIURL = 'http://dev.aapanel.com/api'
    __APIURL = '{}/api'.format(public.OfficialApiBase())

    __PUBKEY = 'data/public.key'

    # 证书购买
    # __APIURL_CERT = '{}/api/cert'.format(public.OfficialApiBase())

    __userInfo = None  # 用户信息  从文件中读取的
    __PDATA = None
    _check_url = None

    # 构造方法
    def __init__(self):
        pdata = {}
        data = {}  # 存放调用接口的参数
        # 记录了用户信息
        if os.path.exists(self.__UPATH):
            my_tmp = public.readFile(self.__UPATH)
            if my_tmp:
                try:
                    self.__userInfo = json.loads(my_tmp)
                except:
                    self.__userInfo = {}
            else:
                self.__userInfo = {}

            # public.print_log('初始化 !!!!!!!!!!!!!!!!!!!用户信息:  {}'.format(self.__userInfo))
            try:
                if self.__userInfo:
                    # 记录里没有这两个key
                    pdata['access_key'] = self.__userInfo['access_key']
                    data['secret_key'] = self.__userInfo['secret_key']
                    # pdata['access_key'] = 'test'
                    # data['secret_key'] = '123456'
            except:
                # self.__userInfo = {}
                pdata['access_key'] = 'test'
                data['secret_key'] = '123456'
        else:
            pdata['access_key'] = 'test'
            data['secret_key'] = '123456'
        pdata['data'] = data
        self.__PDATA = pdata

        # public.print_log('初始化------->最后 !!!!!!!!!!!!!!!!!!!用户信息:  {}'.format(self.__userInfo))

    def en_code_rsa(self, data):
        pk = public.readFile(self.__PUBKEY)
        if not pk:
            return False
        pub_k = RSA.importKey(pk)
        cipher = PKCS1_cipher.new(pub_k)
        rsa_text = base64.b64encode(cipher.encrypt(bytes(data.encode("utf8"))))
        return str(rsa_text, encoding='utf-8')

    # 获取Token  最新
    # def GetToken(self, get):
    #     rtmp = ""
    #     data = {}
    #     data['username'] = get.username
    #     data['password'] = public.md5(get.password)
    #     data['serverid'] = panelAuth().get_serverid()
    #     pdata = {}
    #     pdata['data'] = self.De_Code(data)
    #     try:
    #         rtmp = public.httpPost(self.__BINDURL, pdata)
    #         result = json.loads(rtmp)
    #         result['data'] = self.En_Code(result['data'])
    #         if result['data']:
    #             result['data']['serverid'] = data['serverid']
    #             public.writeFile(self.__UPATH, json.dumps(result['data']))
    #             public.flush_plugin_list()
    #         del (result['data'])
    #         session['focre_cloud'] = True
    #         return result
    #     except Exception as ex:
    #         # bind = 'data/bind.pl'
    #         # if os.path.exists(bind): os.remove(bind)
    #         # return public.returnMsg(False,'连接服务器失败!<br>' + str(ex))
    #         raise public.error_conn_cloud(str(ex))
    #
    # # 删除Token 最新
    # def DelToken(self, get):
    #     if os.path.exists(self.__UPATH): os.remove(self.__UPATH)
    #     session['focre_cloud'] = True
    #     return public.returnMsg(True, public.lang("SSL_BTUSER_UN"))

    # 获取Token  todo  设置绑定账号在用
    def GetToken(self, get):
        rtmp = ""
        data = {}
        data['identification'] = self.en_code_rsa(get.username)
        # data['username'] = self.en_code_rsa(get.username)
        data['password'] = self.en_code_rsa(get.password)
        data['from_panel'] = self.en_code_rsa('1')  # 1 代表从面板登录
        data['environment_info'] = json.dumps(public.fetch_env_info())

        try:
            # https://www.aapanel.com
            isPro = False
            if hasattr(get, 'isPro') and get.isPro:
                isPro = True
                sUrl = '{}/pro/api/user/login'.format(public.OfficialApiBase())
            else:
                sUrl = '{}/api/user/login'.format(public.OfficialApiBase())

            rtmp = public.httpPost(sUrl, data)

            # public.print_log("写入用户信息  @@@@222 {}".format(self.__APIURL + '/user/login'))
            result = json.loads(rtmp)
            # public.print_log("登录写入用户信息  @@@@ {}".format(result))
            if isPro:
                if result.get('err_no', None) == 2002:
                    # return public.return_message(-1, 0, public.lang("The email has not been validated"))
                    # return public.return_message(-1, 0, {"validated": False})
                    return public.return_message(-1, 0, {"result": "The email has not been validated","validated": False})

            if result['success']:
                bind = 'data/bind.pl'
                if os.path.exists(bind): os.remove(bind)
                userinfo = result['res']['user_data']
                userinfo['token'] = result['res']['access_token']
                # 用户信息写入文件
                public.writeFile(self.__UPATH, json.dumps(userinfo))
                # if bool:
                #     # public.print_log("写入用户信息  成功 {}".format(userinfo))
                # else:
                #     public.print_log("写入用户信息  失败")

                session['focre_cloud'] = True
                return public.return_message(0, 0, public.lang("Bind successfully"))
                # return result
            else:
                return public.return_message(-1, 0, result.get('res', public.lang("Invalid username or email or password! please check and try again!")))
        except Exception as ex:
            # public.print_log(public.get_error_info())
            # public.print_log("获取token 失败 {}".format(ex))
            bind = 'data/bind.pl'
            if os.path.exists(bind): os.remove(bind)
            return public.return_message(-1,0, '%s<br>%s' % (
                public.lang("Failed to connect server!"), str(rtmp)))

# 删除Token  todo
    def DelToken(self, get):
        uinfo = public.readFile(self.__UPATH)
        try:
            uinfo = json.loads(uinfo)
            public.writeFile(self.__UPATH, json.dumps({'server_id': uinfo['server_id']}))
        except:
            public.ExecShell("rm -f " + self.__UPATH)
        session['focre_cloud'] = True

        return public.return_message(0, 0, public.lang("Unbound!"))

    # 获取用户信息  todo
    # def GetUserInfo(self, get):
    #     result = {}
    #
    #     # public.print_log("@@@@@@@@@@获取用户信息  开始----- {}".format(self.__userInfo))
    #
    #     if self.__userInfo:
    #         userTmp = {}
    #         userTmp['username'] = self.__userInfo['username'][0:3] + '****' + self.__userInfo['username'][-4:]
    #         result['status'] = True
    #         result['msg'] = public.lang("SSL_GET_SUCCESS")
    #         result['data'] = userTmp
    #     else:
    #         userTmp = {}
    #         userTmp['username'] = public.lang("SSL_NOT_BTUSER")
    #         result['status'] = False
    #         result['msg'] = public.lang("SSL_NOT_BTUSER")
    #         result['data'] = userTmp
    #     return result

    def GetUserInfo(self, get):
        # public.print_log("获取用户信息222")
        return_status = -1
        result = {}
        try:
            if self.__userInfo:
                userTmp = {}
                userTmp['username'] = self.__userInfo['email'][0:3] + '****' + self.__userInfo['email'][-4:]
                return_status= 0
                result['msg'] = public.lang("Got successfully!")
                result['data'] = userTmp
            else:
                userTmp = {}
                userTmp['username'] = public.lang("Please bind your account!")
                result['msg'] = public.lang("Please bind your account!")
                result['data'] = userTmp
        except:
            userTmp = {}
            userTmp['username'] = public.lang("Please bind your account!")
            result['msg'] = public.lang("Please bind your account!")
            result['data'] = userTmp
        return public.return_message(return_status,0,result)

    # 获取产品列表  todo
    # def get_product_list(self, get):
    #     p_type = 'dv'
    #     if 'p_type' in get: p_type = get.p_type
    #     result = self.request('get_product_list?p_type={}'.format(p_type))
    #     return result

    # # 获取产品列表2  todo
    # def get_product_list_v2(self, get):
    #     p_type = 'dv'
    #     if 'p_type' in get: p_type = get.p_type
    #
    #     result = self.request('get_product_list_v2?p_type={}'.format(p_type))
    #     return result

    # 获取产品列表2  todo  产品列表
    def get_product_list_v2(self, get):
        return_status = 0
        result = self.request('cert/product/list')
        # if "success" in result:
        #     print(f"键 '{key_to_check}' 存在于字典中。")
        # else:
        #     print(f"键 '{key_to_check}' 不存在于字典中。")
        return public.return_message(0,0,result)

    # 获取商业证书订单列表  todo 用户订单列表
    def get_order_list(self, get):
        result = self.request('cert/user/list')  # 获取当前登录用户的SSL证书列表
        return public.return_message(0,0,result)


    # 下载证书 todo
    def download_cert(self, get):
        self.__PDATA['uc_id'] = get.uc_id
        result = self.request('cert/user/download')
        return result

    # 获指定商业证书订单
    def get_order_find(self, get):
        self.__PDATA['uc_id'] = get.uc_id
        result = self.request('cert/user/info')
        return result

    # 获取证书管理员信息  todo
    def get_cert_admin(self, get):
        result = self.request('cert/user/administrator')
        return result

    # 完善资料CA(先支付接口)   todo  可能是支付后的完善信息接口
    def apply_order_ca(self, args):
        pdata = json.loads(args.pdata)

        result = self.check_ssl_caa(pdata['domains'])
        if result:
            return result

        self.__PDATA['data'] = pdata
        result = self.request('cert/user/update_profile')

        return result


    # 部署指定商业证书 todo 部署证书
    def set_cert(self, get):
        siteName = get.siteName
        certInfoall = self.get_order_find(get)

        if certInfoall["success"] is False:
            return public.return_message(-1, 0, certInfoall["res"])
        certInfo = certInfoall["res"]
        path = '/www/server/panel/vhost/cert/' + siteName
        if not os.path.exists(path):
            public.ExecShell('mkdir -p ' + path)
        csrpath = path + "/fullchain.pem"
        keypath = path + "/privkey.pem"
        pidpath = path + "/certOrderId"

        other_file = path + '/partnerOrderId'
        if os.path.exists(other_file):
            os.remove(other_file)
        other_file = path + '/README'
        if os.path.exists(other_file):
            os.remove(other_file)

        public.writeFile(keypath, certInfo['private_key'])
        public.writeFile(csrpath, certInfo['certificate'] + "\n" + certInfo['ca_certificate'])
        # 改记录 uc_id
        public.writeFile(pidpath, get.uc_id)
        import panel_site_v2 as panelSite
        panelSite.panelSite().SetSSLConf(get)
        public.serviceReload()
        return public.return_message(0, 0, public.lang("Setup successfully!"))

    # # 生成商业证书支付订单   暂无
    # def apply_order_pay(self, args):
    #     self.__PDATA['data'] = json.loads(args.pdata)
    #     result = self.check_ssl_caa(self.__PDATA['data']['domains'])
    #     if result: return result
    #     result = self.request('apply_cert_order')
    #     return result

    # 检查CAA记录是否正确
    def check_ssl_caa(self, domains, clist=['sectigo.com', 'digicert.com', 'comodoca.com']):
        '''
            @name 检查CAA记录是否正确
            @param domains 域名列表
            @param clist 正确的记录值关键词
            @return bool
        '''
        try:
            data = {}
            for domain in domains:
                root, zone = public.get_root_domain(domain)
                for d in [domain, root, '_acme-challenge.{}'.format(root), '_acme-challenge.{}'.format(domain)]:
                    ret = public.query_dns(d, 'CAA')
                    if not ret: continue
                    slist = []
                    for val in ret:
                        if val['value'] in clist:
                            return False
                        slist.append(val)

                    if len(slist) > 0:
                        data[d] = slist
            if data:
                result = {}
                result['status'] = False
                result[
                    'msg'] = 'error: There is a CAA record in the DNS resolution of the domain name. Please delete it and apply again '
                result['data'] = json.dumps(data)
                result['caa_list'] = data
                return result
        except:
            pass
        return False


    # 提交商业证书订单到CA
    # def apply_order(self, args):
    #     self.__PDATA['data']['oid'] = args.oid
    #     result = self.request('apply_cert')
    #     if result['status'] == True:
    #         self.__PDATA['data'] = {}
    #         result['verify_info'] = self.get_verify_info(args)
    #     return result

    # 获取证书域名验证结果 todo  暂未使用
    #  用到: 处理验证信息 set_verify_info    完善资料 apply_order_ca  续签证书 renew_cert_order
    def get_verify_info(self, args):
        self.__PDATA['uc_id'] = args.uc_id
        verify_info = self.request('cert/user/validate_domains')
        if verify_info['success']:
            return "success"
        return "error"

        # is_file_verify = 'fileName' in verify_info
        # verify_info['paths'] = []
        # verify_info['hosts'] = []
        # for domain in verify_info['domains']:
        #     if is_file_verify:
        #         siteRunPath = self.get_domain_run_path(domain)
        #         if not siteRunPath:
        #             # if domain[:4] == 'www.': domain = domain[:4]
        #             verify_info['paths'].append(verify_info['path'].replace('example.com', domain))
        #             continue
        #         verify_path = siteRunPath + '/.well-known/pki-validation'
        #         if not os.path.exists(verify_path):
        #             os.makedirs(verify_path)
        #         verify_file = verify_path + '/' + verify_info['fileName']
        #         if os.path.exists(verify_file): continue
        #         public.writeFile(verify_file, verify_info['content'])
        #     else:
        #         original_domain = domain
        #         # if domain[:4] == 'www.': domain = domain[:4]
        #         verify_info['hosts'].append(verify_info['host'] + '.' + domain)
        #         if 'auth_to' in args:
        #             root, zone = public.get_root_domain(domain)
        #             res = self.create_dns_record(args['auth_to'], verify_info['host'] + '.' + root,
        #                                          verify_info['value'], original_domain)
        #             print(res)
        # return verify_info

    # 处理验证信息  todo  如果传参 要传uc_id
    def set_verify_info(self, args):
        # self.__PDATA['uc_id'] = args.uc_id   # 新增
        verify_info = self.get_verify_info(args)
        is_file_verify = 'fileName' in verify_info
        verify_info['paths'] = []
        verify_info['hosts'] = []
        for domain in verify_info['domains']:
            if domain[:2] == '*.': domain = domain[2:]
            if is_file_verify:
                siteRunPath = self.get_domain_run_path(domain)
                if not siteRunPath:
                    # if domain[:4] == 'www.': domain = domain[4:]
                    verify_info['paths'].append(verify_info['path'].replace('example.com', domain))
                    continue
                verify_path = siteRunPath + '/.well-known/pki-validation'
                if not os.path.exists(verify_path):
                    os.makedirs(verify_path)
                verify_file = verify_path + '/' + verify_info['fileName']
                if os.path.exists(verify_file): continue
                public.writeFile(verify_file, verify_info['content'])
            else:
                original_domain = domain
                # if domain[:4] == 'www.': domain = domain[4:]
                verify_info['hosts'].append(verify_info['host'] + '.' + domain)

                if 'auth_to' in args:
                    root, zone = public.get_root_domain(domain)
                    self.create_dns_record(args['auth_to'], verify_info['host'] + '.' + root,
                                           verify_info['value'], original_domain)
        return verify_info

    # 获取指定域名的PATH
    def get_domain_run_path(self, domain):
        pid = public.M('domain').where('name=?', (domain,)).getField('pid')
        if not pid: return False
        return self.get_site_run_path(pid)

    # 获取网站运行目录
    def get_site_run_path(self, pid):
        '''
            @name 获取网站运行目录
            @author hwliang<2020-08-05>
            @param pid(int) 网站标识
            @return string
        '''
        siteInfo = public.M('sites').where('id=?', (pid,)).find()
        siteName = siteInfo['name']
        sitePath = siteInfo['path']
        webserver_type = public.get_webserver()
        setupPath = '/www/server'
        path = None
        if webserver_type == 'nginx':
            filename = setupPath + '/panel/vhost/nginx/' + siteName + '.conf'
            if os.path.exists(filename):
                conf = public.readFile(filename)
                rep = r'\s*root\s+(.+);'
                tmp1 = re.search(rep, conf)
                if tmp1: path = tmp1.groups()[0]

        elif webserver_type == 'apache':
            filename = setupPath + '/panel/vhost/apache/' + siteName + '.conf'
            if os.path.exists(filename):
                conf = public.readFile(filename)
                rep = r'\s*DocumentRoot\s*"(.+)"\s*\n'
                tmp1 = re.search(rep, conf)
                if tmp1: path = tmp1.groups()[0]
        else:
            filename = setupPath + '/panel/vhost/openlitespeed/' + siteName + '.conf'
            if os.path.exists(filename):
                conf = public.readFile(filename)
                rep = r"vhRoot\s*(.*)"
                path = re.search(rep, conf)
                if not path:
                    path = None
                else:
                    path = path.groups()[0]

        if not path:
            path = sitePath
        return path

    # 验证URL是否匹配
    def check_url_txt(self, args, timeout=5):
        url = args.url
        content = args.content

        import http_requests
        res = http_requests.get(url, s_type='curl', timeout=timeout)
        result = res.text
        if not result: return 0

        if result.find('11001') != -1 or result.find('curl: (6)') != -1: return -1
        if result.find('curl: (7)') != -1 or res.status_code in [403, 401]: return -5
        if result.find('Not Found') != -1 or result.find('not found') != -1 or res.status_code in [404]: return -2
        if result.find('timed out') != -1: return -3
        if result.find('301') != -1 or result.find('302') != -1 or result.find(
                'Redirecting...') != -1 or res.status_code in [301, 302]: return -4
        if result == content: return 1
        return 0

    # 更换验证方式  # todo? ['data']
    def again_verify(self, args):
        self.__PDATA['uc_id'] = args.uc_id
        self.__PDATA['dcv_method'] = args.dcv_method
        result = self.request('cert/user/update_dcv')
        return result

    # 获取商业证书验证结果
    def get_verify_result(self, args):
        self.__PDATA['uc_id'] = args.uc_id
        res = self.request('cert/user/validate')
        if res['success'] is False:
            return res
        verify_info = res['res']

        if verify_info['status'] in ['COMPLETE', False]:
            return verify_info

        is_file_verify = 'CNAME_CSR_HASH' != verify_info['data']['dcvList'][0]['dcvMethod']
        verify_info['paths'] = []
        verify_info['hosts'] = []
        if verify_info['data']['application']['status'] == 'ongoing':
            return public.return_message(-1, 0, public.lang("In verification, please contact aaPanel if the audit still fails after 24 hours"))

        for dinfo in verify_info['data']['dcvList']:
            is_https = dinfo['dcvMethod'] == 'HTTPS_CSR_HASH'
            if is_https:
                is_https = 's'
            else:
                is_https = ''
            domain = dinfo['domainName']
            if domain[:2] == '*.':
                domain = domain[2:]
            dinfo['domainName'] = domain

            if is_file_verify:
                # 判断是否是Springboot 项目
                if public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField(
                    'project_type') == 'Java' or public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField(
                    'project_type') == 'Go' or public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField(
                    'project_type') == 'Other':
                    siteRunPath = '/www/wwwroot/java_node_ssl'
                else:
                    siteRunPath = self.get_domain_run_path(domain)
                # if domain[:4] == 'www.': domain = domain[4:]
                status = 0
                url = 'http' + is_https + '://' + domain + '/.well-known/pki-validation/' + verify_info['data'][
                    'DCVfileName']
                get = public.dict_obj()
                get.url = url
                get.content = verify_info['data']['DCVfileContent']
                status = self.check_url_txt(get)

                verify_info['paths'].append({'url': url, 'status': status})
                if not siteRunPath:
                    continue

                verify_path = siteRunPath + '/.well-known/pki-validation'
                if not os.path.exists(verify_path):
                    os.makedirs(verify_path)
                verify_file = verify_path + '/' + verify_info['data']['DCVfileName']
                if os.path.exists(verify_file):
                    continue
                public.writeFile(verify_file, verify_info['data']['DCVfileContent'])
            else:
                # if domain[:4] == 'www.': domain = domain[4:]
                domain, subb = public.get_root_domain(domain)
                dinfo['domainName'] = domain
                verify_info['hosts'].append(verify_info['data']['DCVdnsHost'] + '.' + domain)

        return verify_info

    # 取消订单  暂无
    def cancel_cert_order(self, args):
        self.__PDATA['data']['oid'] = args.oid
        result = self.request('cancel_cert_order')
        return result

    # 单独购买人工安装服务
    def apply_cert_install_pay(self, args):
        '''
            @name 单独购买人工安装服务
            @param args<dict_obj>{
                'uc_id'<int> 订单ID
            }
        '''
        self.__PDATA['uc_id'] = args.uc_id
        result = self.request('cert/order/deployment_assistance')
        return result

    # 生成商业证书支付订单  todo  生成支付订单 下单支付
    def apply_cert_order_pay(self, args):
        pdata = json.loads(args.pdata)
        self.__PDATA['data'] = pdata
        result = self.request('cert/order/create')
        return public.return_message(0,0,result)

    # 模拟支付
    # def pay_test(self, args):
    #     out_trade_no = args.out_trade_no
    #     # /api/common/stripe/{out_trade_no}
    #     # result = self.request_test('order/pay')
    #     result = public.return_msg_gettext(False, '测试用 模拟支付!')
    #     url = "https://dev.aapanel.com/api/common/stripe/" + out_trade_no
    #     response_data = public.httpGet(url)
    #
    #     # public.print_log("******************** url: {}".format(url))
    #
    #     try:
    #         result = json.loads(response_data)
    #     except:
    #         pass
    #     return result



    # 申请证书  ???
    def ApplyDVSSL(self, get):

        """
        申请证书
        """
        if not 'orgName' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgName"))
        if not 'orgPhone' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgPhone"))
        if not 'orgPostalCode' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgPostalCode"))
        if not 'orgRegion' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgRegion"))
        if not 'orgCity' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgCity"))
        if not 'orgAddress' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgAddress"))
        if not 'orgDivision' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgDivision"))

        get.id = public.M('domain').where('name=?', (get.domain,)).getField('pid')
        if hasattr(get, 'siteName'):
            get.path = public.M('sites').where('id=?', (get.id,)).getField('path')
        else:
            get.siteName = public.M('sites').where('id=?', (get.id,)).getField('name')

        # 当申请二级域名为www时,检测主域名是否绑定到同一网站
        if get.domain[:4] == 'www.':
            if not public.M('domain').where('name=? AND pid=?', (get.domain[4:], get.id)).count():
                return public.return_message(-1, 0,
                                        "Request for [%s] certificate requires verification [%s] Please bind and resolve [%s] to the site!" % (
                                            get.domain, get.domain[4:], get.domain[4:]))
        # 判断是否是Java项目
        if public.M('sites').where('id=?', (get.id,)).getField('project_type') == 'Java' or public.M('sites').where(
                'id=?', (get.id,)).getField('project_type') == 'Go' or public.M('sites').where('id=?',
                                                                                               (get.id,)).getField(
            'project_type') == 'Other':
            get.path = '/www/wwwroot/java_node_ssl/'
            runPath = ''
        # 判断是否是Node项目
        elif public.M('sites').where('id=?', (get.id,)).getField('project_type') == 'Node':
            get.path = public.M('sites').where('id=?', (get.id,)).getField('path')
            runPath = ''
        # 判断是否是python项目
        elif public.M('sites').where(
                'id=?', (get.id,)).getField('project_type') == 'Python':
            get.path = public.M('sites').where('id=?',
                                               (get.id,)).getField('path')
            runPath = ''
        else:
            runPath = self.GetRunPath(get)
        if runPath != False and runPath != '/': get.path += runPath
        authfile = get.path + '/.well-known/pki-validation/fileauth.txt'
        if not self.CheckDomain(get):
            if not os.path.exists(authfile):
                return public.return_message(-1, 0, public.lang("Unable to write validation file: {}", authfile))
            else:
                msg = '''can't correct access validation file <br><a class="btlink" href="{c_url}" target="_blank">{c_url}</a> <br><br>
                <p></b>Possible cause:</b></p>
                1、the resolution is not correct, or the resolution does not work [please resolve the domain correctly, or wait for the resolution to work and try again]<br>
                2、 check whether the 301/302 redirection is set [please temporarily turn off the redirection related configuration]<br>
                3、 Check whether the site has HTTPS deployed and set mandatory HTTPS [Please temporarily turn off mandatory HTTPS feature]<br>'''.format(
                    c_url=self._check_url)
                return public.return_message(-1, 0, msg)

        action = 'ApplyDVSSL'
        if hasattr(get, 'partnerOrderId'):
            self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId
            action = 'ReDVSSL'

        self.__PDATA['data']['domain'] = get.domain
        self.__PDATA['data']['orgPhone'] = get.orgPhone
        self.__PDATA['data']['orgPostalCode'] = get.orgPostalCode
        self.__PDATA['data']['orgRegion'] = get.orgRegion
        self.__PDATA['data']['orgCity'] = get.orgCity
        self.__PDATA['data']['orgAddress'] = get.orgAddress
        self.__PDATA['data']['orgDivision'] = get.orgDivision
        self.__PDATA['data']['orgName'] = get.orgName
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        try:
            result = public.httpPost(self.__APIURL + 'user/' + action, self.__PDATA)
        except Exception as ex:
            raise public.error_conn_cloud(str(ex))
        try:
            result = json.loads(result)
        except:
            return result
        if 'status' in result:
            if not result['status']: return result
        result['data'] = self.En_Code(result['data'])
        try:
            if not 'authPath' in result['data']: result['data']['authPath'] = '/.well-known/pki-validation/'
            authfile = get.path + result['data']['authPath'] + result['data']['authKey']
        except:
            if 'authKey' in result['data']:
                authfile = get.path + '/.well-known/pki-validation/' + result['data']['authKey']
            else:
                return public.return_message(-1, 0, public.lang(" Failed to get the validation file!"))

        if 'authValue' in result['data']:
            public.writeFile(authfile, result['data']['authValue'])
        return result


    # 发送请求  todo
    def request(self, dname):
        self.__PDATA['data'] = json.dumps(self.__PDATA['data'])
        url_headers = {
            "authorization": "bt {}".format(self.__userInfo['token'])
        }

        result = public.return_msg_gettext(False, 'Failed to connect to the official website, please try again later!')
        try:
            # response_data = public.httpPost(self.__APIURL + '/' + dname, self.__PDATA)
            response_data = public.httpPost(self.__APIURL + '/' + dname, data=self.__PDATA, headers=url_headers)
        except Exception as ex:
            raise public.error_conn_cloud(str(ex))
        try:
            result = json.loads(response_data)
        except:
            pass
        return result

    # # 发送请求  todo  测试购买证书
    # def request_test(self, dname):
    #     self.__PDATA['data'] = json.dumps(self.__PDATA['data'])
    #     # "Content-Type": "application/json",
    #     url_headers = {
    #         "authorization": "bt {}".format(self.__userInfo['token'])
    #     }
    #
    #     result = public.return_msg_gettext(False, '测试用 The request failed, please try again later!')
    #     try:
    #         response_data = public.httpPost(self.__APIURLtest + '/' + dname, data=self.__PDATA, headers=url_headers)
    #
    #         # public.print_log("******************** url: {}".format(self.__APIURLtest + '/' + dname))
    #
    #     except Exception as ex:
    #         raise public.error_conn_cloud(str(ex))
    #
    #
    #     try:
    #         result = json.loads(response_data)
    #     except:
    #         pass
    #     return result

    # 获取订单列表  ???
    def GetOrderList(self, get):
        if hasattr(get, 'siteName'):
            path = '/etc/letsencrypt/live/' + get.siteName + '/partnerOrderId'
            if os.path.exists(path):
                self.__PDATA['data']['partnerOrderId'] = public.readFile(path)
            else:
                path = '/www/server/panel/vhost/cert/' + get.siteName + '/partnerOrderId'
                if os.path.exists(path):
                    self.__PDATA['data']['partnerOrderId'] = public.readFile(path)

        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        try:
            rs = public.httpPost(self.__APIURL + 'user/GetSSLList', self.__PDATA)
        except Exception as ex:
            raise public.error_conn_cloud(str(ex))
        try:
            result = json.loads(rs)
        except:
            return public.return_message(-1, 0, public.lang("Failed to get, please try again later!"))

        result['data'] = self.En_Code(result['data'])
        for i in range(len(result['data'])):
            result['data'][i]['endtime'] = self.add_months(result['data'][i]['createTime'],
                                                           result['data'][i]['validityPeriod'])
        return result

    # 计算日期增加(月)
    def add_months(self, dt, months):
        import calendar
        dt = datetime.datetime.fromtimestamp(dt / 1000)
        month = dt.month - 1 + months
        year = dt.year + month // 12
        month = month % 12 + 1

        day = min(dt.day, calendar.monthrange(year, month)[1])
        return (time.mktime(dt.replace(year=year, month=month, day=day).timetuple()) + 86400) * 1000

    # 申请证书
    def GetDVSSL(self, get):
        get.id = public.M('domain').where('name=?', (get.domain,)).getField('pid')
        if hasattr(get, 'siteName'):
            get.path = public.M('sites').where('id=?', (get.id,)).getField('path')
        else:
            get.siteName = public.M('sites').where('id=?', (get.id,)).getField('name')

        # 当申请二级域名为www时,检测主域名是否绑定到同一网站
        if get.domain[:4] == 'www.':
            if not public.M('domain').where('name=? AND pid=?', (get.domain[4:], get.id)).count():
                return public.return_message(-1, 0,
                                                 "Apply for [{}] certificate to verify [{}] Please bind [{}] and resolve to the site!".format(
                                                     get.domain, get.domain[4:], get.domain[4:]))

        # 检测是否开启强制HTTPS
        if not self.CheckForceHTTPS(get.siteName):
            return public.return_message(-1, 0, public.lang("[Force HTTPS] is enabled on the current website, please turn off this function before applying for an SSL certificate!"))

        # 获取真实网站运行目录
        runPath = self.GetRunPath(get)
        if runPath != False and runPath != '/': get.path += runPath

        # 提前模拟测试验证文件值是否正确
        authfile = get.path + '/.well-known/pki-validation/fileauth.txt'
        if not self.CheckDomain(get):
            if not os.path.exists(authfile):
                return public.return_message(-1, 0, 'Cannot create [{}]', (authfile,))
            else:
                msg = ''''Unable to access the verification file<br><a class="btlink" href="{c_url}" target="_blank">{c_url}</a> <br><br>
                <p></b>Possible reasons:</b></p>
                1. Incorrect or ineffective DNS resolution [Please ensure correct domain name resolution or wait for the resolution to take effect and try again]<br>
                2. Check if there are any 301/302 redirects set [Temporarily disable redirect-related configurations]<br>
                3. Check if the website has enforced HTTPS [Temporarily disable the enforced HTTPS feature]<br>'''.format(
                    c_url=self._check_url)
                return public.return_message(-1, 0, msg)

        action = 'GetDVSSL'
        if hasattr(get, 'partnerOrderId'):
            self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId
            action = 'ReDVSSL'

        self.__PDATA['data']['domain'] = get.domain
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        result = public.httpPost(self.__APIURL + 'user/' + action, self.__PDATA)
        try:
            result = json.loads(result)
        except:
            return result
        result['data'] = self.En_Code(result['data'])

        try:
            if 'authValue' in result['data'].keys():
                public.writeFile(authfile, result['data']['authValue'])
        except:
            try:
                public.writeFile(authfile, result['data']['authValue'])
            except:
                return result

        return result

    # 检测是否强制HTTPS
    def CheckForceHTTPS(self, siteName):
        conf_file = '/www/server/panel/vhost/nginx/{}.conf'.format(siteName)
        if not os.path.exists(conf_file):
            return True

        conf_body = public.readFile(conf_file)
        if not conf_body: return True
        if conf_body.find('HTTP_TO_HTTPS_START') != -1:
            return False
        return True

    # 获取运行目录
    def GetRunPath(self, get):
        if hasattr(get, 'siteName'):
            get.id = public.M('sites').where('name=?', (get.siteName,)).getField('id')
        else:
            get.id = public.M('sites').where('path=?', (get.path,)).getField('id')
        if not get.id: return False
        import panelSite
        result = panelSite.panelSite().GetSiteRunPath(get)
        return result['runPath']

    # 检查域名是否解析
    def CheckDomain(self, get):
        try:
            # 创建目录
            spath = get.path + '/.well-known/pki-validation'
            if not os.path.exists(spath):
                os.makedirs(spath, 0o755, True)
                # public.ExecShell("mkdir -p '" + spath + "'")

            # 生成并写入检测内容
            epass = public.GetRandomString(32)
            public.writeFile(spath + '/fileauth.txt', epass)

            # 检测目标域名访问结果
            if get.domain[:4] == 'www.':  # 申请二级域名为www时检测主域名
                get.domain = get.domain[4:]

            import http_requests
            self._check_url = 'http://127.0.0.1/.well-known/pki-validation/fileauth.txt'
            result = http_requests.get(self._check_url, s_type='curl', timeout=6, headers={"host": get.domain}).text
            self.__test = result
            if result == epass: return True
            self._check_url = self._check_url.replace('127.0.0.1', get.domain)
            return False
        except:
            self._check_url = self._check_url.replace('127.0.0.1', get.domain)
            return False

    # 确认域名
    def Completed(self, get):
        self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        if hasattr(get, 'siteName'):
            get.path = public.M('sites').where('name=?', (get.siteName,)).getField('path')
            if public.M('sites').where('id=?',
                                       (public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                'project_type') == 'Java' or public.M('sites').where('id=?', (
                    public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                'project_type') == 'Go' or public.M('sites').where('id=?', (
                    public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                'project_type') == 'Other':
                runPath = '/www/wwwroot/java_node_ssl'
            else:
                runPath = self.GetRunPath(get)
            if runPath != False and runPath != '/': get.path += runPath
            tmp = public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA)
            try:
                sslInfo = json.loads(tmp)
            except:
                return public.return_message(-1, 0, tmp)

            sslInfo['data'] = self.En_Code(sslInfo['data'])
            try:

                if public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                    'project_type') == 'Java' or public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                    'project_type') == 'Go' or public.M('sites').where('id=?', (
                        public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField(
                    'project_type') == 'Other':
                    spath = '/www/wwwroot/java_node_ssl/.well-known/pki-validation'
                else:
                    spath = get.path + '/.well-known/pki-validation'
                if not os.path.exists(spath): public.ExecShell("mkdir -p '" + spath + "'")
                public.writeFile(spath + '/' + sslInfo['data']['authKey'], sslInfo['data']['authValue'])
            except:
                return public.return_message(-1, 0, public.lang("Verification error!"))
        try:
            result = json.loads(public.httpPost(self.__APIURL + 'user/Completed', self.__PDATA))
            if 'data' in result:
                result['data'] = self.En_Code(result['data'])
        except:
            result = public.return_msg_gettext(True, 'Checking...')
        n = 0;
        my_ok = False
        while True:
            if n > 5: break
            time.sleep(5)
            rRet = json.loads(public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA))
            n += 1
            rRet['data'] = self.En_Code(rRet['data'])
            try:
                if rRet['data']['stateCode'] == 'COMPLETED':
                    my_ok = True
                    break
            except:
                return public.get_error_info()
        if not my_ok: return result
        return rRet

    # 同步指定订单
    def SyncOrder(self, get):
        self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        result = json.loads(public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA))
        result['data'] = self.En_Code(result['data'])
        return result

    # 获取证书
    def GetSSLInfo(self, get):
        # 校验参数
        try:
            get.validate([
                Param('siteName').String(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        time.sleep(3)
        result = json.loads(public.httpPost(self.__APIURL + 'user/GetSSLInfo', self.__PDATA))
        result['data'] = self.En_Code(result['data'])
        if not 'privateKey' in result['data']: return result

        # 写配置到站点
        if hasattr(get, 'siteName'):
            try:
                siteName = get.siteName
                path = '/www/server/panel/vhost/cert/' + siteName
                if not os.path.exists(path):
                    public.ExecShell('mkdir -p ' + path)
                csrpath = path + "/fullchain.pem"
                keypath = path + "/privkey.pem"
                pidpath = path + "/partnerOrderId"
                # 清理旧的证书链
                public.ExecShell('rm -f ' + keypath)
                public.ExecShell('rm -f ' + csrpath)
                public.ExecShell('rm -rf ' + path + '-00*')
                public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName)
                public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName + '-00*')
                public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '.conf')
                public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '-00*.conf')
                public.ExecShell('rm -f ' + path + '/README')
                public.ExecShell('rm -f ' + path + '/certOrderId')

                public.writeFile(keypath, result['data']['privateKey'])
                public.writeFile(csrpath, result['data']['cert'] + result['data']['certCa'])
                public.writeFile(pidpath, get.partnerOrderId)
                import panel_site_v2 as panelSite
                panelSite.panelSite().SetSSLConf(get)
                public.serviceReload()
                return public.return_message(0, 0, public.lang("Setup successfully!"))
            except:
                return public.return_message(-1, 0, public.lang("Failed to set"))
        result['data'] = self.En_Code(result['data'])
        return result

    def GetSiteDomain(self, get):
        """
        @name 获取网站域名对应的站点名
        @param cert_list 证书域名列表
        @auther hezhihong
        return 证书域名对应的站点名字典,如证书域名未绑定则为空
        """
        all_site = []  # 所有站点名列表
        cert_list = []  # 证书域名列表
        site_list = []  # 证书域名列表对应的站点名列表
        all_domain = []  # 所有域名列表
        try:
            cert_list = json.loads(get.cert_list)
        except:
            pass
        result = {}
        # 取所有站点名和所有站点的绑定域名
        all_sites = public.M('sites').field('name').select()
        for site in all_sites:
            all_site.append(site['name'])
            if not cert_list: continue
            tmp_dict = {}
            tmp_dict['name'] = site['name']
            pid = public.M('sites').where("name=?", (site['name'],)).getField('id')
            domain_list = public.M('domain').where("pid=?", (pid,)).field('name').select()
            for domain in domain_list:
                all_domain.append(domain['name'])
        # 取证书域名所在的所有域名列表
        site_domain = []  # 证书域名对应的站点名列表
        if cert_list and all_domain:
            for cert in cert_list:
                d_cert = ''
                if re.match(r"^\*\..*", cert):
                    d_cert = cert.replace('*.', '')
                for domain in all_domain:
                    if cert == domain:
                        site_domain.append(domain)
                    else:
                        replace_str = domain.split('.')[0] + '.'
                        if d_cert and d_cert == domain.replace(replace_str, ''):
                            site_domain.append(domain)
        # 取证书域名对应的站点名
        for site in site_domain:
            site_id = public.M('domain').where("name=?", (site,)).getField('pid')
            site_name = public.M('sites').where("id=?", (site_id,)).getField('name')
            site_list.append(site_name)
        site_list = sorted(set(site_list), key=site_list.index)
        result['all'] = all_site
        result['site'] = site_list
        return result

    def SetBatchCertToSite(self, get):
        """
        @name 批量部署证书
        @auther hezhihong
        """
        ssl_list = []
        if not hasattr(get, 'BatchInfo') or not get.BatchInfo:
            return public.return_message(-1, 0, public.lang("parameter error"))
        else:
            ssl_list = json.loads(get.BatchInfo)
        if isinstance(ssl_list, list):
            total_num = len(ssl_list)
            resultinfo = {"total": total_num, "success": 0, "faild": 0, "successList": [], "faildList": []}
            successList = []
            faildList = []
            successnum = 0
            failnum = 0
            for Info in ssl_list:
                set_result = {}
                set_result['status'] = True
                get.certName = set_result['certName'] = Info['certName']
                get.siteName = set_result['siteName'] = str(Info['siteName'])  # 站点名称必定为字符串
                get.isBatch = True
                if "ssl_hash" in Info:
                    get.ssl_hash = Info['ssl_hash']
                result = self.SetCertToSite(get)
                if not result or result.get("status") == -1 or result.get("status") is False:
                    set_result['status'] = False
                    failnum += 1
                    faildList.append(set_result)
                else:
                    successnum += 1
                    successList.append(set_result)
                public.writeSpeed('setssl', successnum + failnum, total_num)
            import firewalls
            get.port = '443'
            get.ps = 'HTTPS'
            firewalls.firewalls().AddAcceptPort(get)
            public.serviceReload()
            resultinfo['success'] = successnum
            resultinfo['faild'] = failnum
            resultinfo['successList'] = successList
            resultinfo['faildList'] = faildList

            if hasattr(get, "set_https_mode") and get.set_https_mode.strip() in (True, 1, "1", "true"):
                import panelSite
                sites_obj = panelSite.panelSite()
                if not sites_obj.get_https_mode():
                    sites_obj.set_https_mode()

        else:
            return public.return_message(-1, 0, public.lang("Parameter type error"))
        return resultinfo

    # 部署证书夹证书
    def SetCertToSite(self, get):
        """
        @name 兼容批量部署
        @auther hezhihong
        """
        
        # 校验参数
        try:
            get.validate([
                Param('siteName').String(),
                Param('certName').String(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))


        try:
            result = self.GetCert(get)
            if not 'privkey' in result: return result
            siteName = get.siteName
            path = '/www/server/panel/vhost/cert/' + siteName
            if not os.path.exists(path):
                public.ExecShell('mkdir -p ' + path)
            csrpath = path + "/fullchain.pem"
            keypath = path + "/privkey.pem"

            # 清理旧的证书链
            public.ExecShell('rm -f ' + keypath)
            public.ExecShell('rm -f ' + csrpath)
            public.ExecShell('rm -rf ' + path + '-00*')
            public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName)
            public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName + '-00*')
            public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '.conf')
            public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '-00*.conf')
            public.ExecShell('rm -f ' + path + '/README')
            if os.path.exists(path + '/certOrderId'): os.remove(path + '/certOrderId')

            public.writeFile(keypath, result['privkey'])
            public.writeFile(csrpath, result['fullchain'])
            import panel_site_v2 as panelSite
            panelSite.panelSite().SetSSLConf(get)
            public.serviceReload()
            return public.return_message(0, 0, public.lang("Setup successfully!"))
        except Exception as ex:
            import traceback
            public.print_log(traceback.format_exc())
            public.print_log(f"error: {ex}")
            if 'isBatch' in get:
                return public.return_message(-1,0,"")
            return public.return_message(-1,0, 'SET_ERROR,' + public.get_error_info())

    # 获取证书列表
    def GetCertList(self, get):
        try:
            vpath = '/www/server/panel/vhost/ssl'
            if not os.path.exists(vpath): public.ExecShell("mkdir -p " + vpath)
            data = []
            for d in os.listdir(vpath):
                mpath = vpath + '/' + d + '/info.json'
                if not os.path.exists(mpath): continue
                tmp = public.readFile(mpath)
                if not tmp: continue
                tmp1 = json.loads(tmp)
                data.append(tmp1)
            return public.return_message(0,0,  data)
        except:
            return public.return_message(-1,0,  [])

    # 删除证书
    def RemoveCert(self, get):
        # 校验参数
        try:
            get.validate([
                Param('certName').String(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        try:
            vpath = '/www/server/panel/vhost/ssl/' + get.certName.replace("*.", '')
            if not os.path.exists(vpath): return public.return_message(-1, 0, public.lang("Certificate does NOT exist!"))
            public.ExecShell("rm -rf " + vpath)
            return public.return_message(0, 0, public.lang("Certificate deleted!"))
        except:
            return public.return_message(-1, 0, public.lang("Failed to delete!"))

    # 保存证书
    def SaveCert(self, get):
        try:
            certInfo = self.GetCertName(get)
            if not certInfo: 
                return_message=public.return_msg_gettext(False, public.lang('Certificate parsing failed'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            SSLManger().save_by_file(get.certPath, get.keyPath)
            vpath = '/www/server/panel/vhost/ssl/' + certInfo['subject']
            vpath = vpath.replace("*.", '')
            if not os.path.exists(vpath):
                public.ExecShell("mkdir -p " + vpath)
            public.writeFile(vpath + '/privkey.pem', public.readFile(get.keyPath))
            public.writeFile(vpath + '/fullchain.pem', public.readFile(get.certPath))
            public.writeFile(vpath + '/info.json', json.dumps(certInfo))
            return_message=public.return_msg_gettext(True, public.lang('Successfully saved certificate!'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except:
            return_message=public.return_msg_gettext(False, public.lang('Failed to save certificate!'))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])

    # 读取证书
    def GetCert(self, get):
        if "ssl_hash" in get:
            path = f"/www/server/panel/vhost/ssl_saved/{get.ssl_hash}"
            return {
                "privkey": public.readFile(f"{path}/privkey.pem"),
                "fullchain": public.readFile(f"{path}/fullchain.pem"),
            }

        vpath = os.path.join('/www/server/panel/vhost/ssl', get.certName.replace("*.", ''))
        if not os.path.exists(vpath):
            return public.return_message(-1, 0, public.lang("Certificate does NOT exist!"))
        data = {}
        data['privkey'] = public.readFile(vpath + '/privkey.pem')
        data['fullchain'] = public.readFile(vpath + '/fullchain.pem')
        return data

    # 获取证书名称
    def GetCertName(self, get):
        return self.get_cert_init(get.certPath)
        # try:
        #     openssl = '/usr/local/openssl/bin/openssl'
        #     if not os.path.exists(openssl): openssl = 'openssl'
        #     result = public.ExecShell(openssl + " x509 -in "+get.certPath+" -noout -subject -enddate -startdate -issuer")
        #     tmp = result[0].split("\n")
        #     data = {}
        #     data['subject'] = tmp[0].split('=')[-1]
        #     data['notAfter'] = self.strfToTime(tmp[1].split('=')[1])
        #     data['notBefore'] = self.strfToTime(tmp[2].split('=')[1])
        #     if tmp[3].find('O=') == -1:
        #         data['issuer'] = tmp[3].split('CN=')[-1]
        #     else:
        #         data['issuer'] = tmp[3].split('O=')[-1].split(',')[0]
        #     if data['issuer'].find('/') != -1: data['issuer'] = data['issuer'].split('/')[0]
        #     result = public.ExecShell(openssl + " x509 -in "+get.certPath+" -noout -text|grep DNS")
        #     data['dns'] = result[0].replace('DNS:','').replace(' ','').strip().split(',')
        #     return data
        # except:
        #     print(public.get_error_info())
        #     return None

    def get_unixtime(self, data, format="%Y-%m-%d %H:%M:%S"):
        import time
        timeArray = time.strptime(data, format)
        timeStamp = int(time.mktime(timeArray))
        return timeStamp

    # 获取指定证书基本信息
    def get_cert_init_o(self, pem_file):
        if not os.path.exists(pem_file):
            return None
        try:
            import OpenSSL
            result = {}
            x509 = OpenSSL.crypto.load_certificate(
                OpenSSL.crypto.FILETYPE_PEM, public.readFile(pem_file))
            # 取产品名称
            issuer = x509.get_issuer()
            result['issuer'] = ''
            if hasattr(issuer, 'CN'):
                result['issuer'] = issuer.CN
            if not result['issuer']:
                is_key = [b'0', '0']
                issue_comp = issuer.get_components()
                if len(issue_comp) == 1:
                    is_key = [b'CN', 'CN']
                for iss in issue_comp:
                    if iss[0] in is_key:
                        result['issuer'] = iss[1].decode()
                        break
            if not result['issuer']:
                if hasattr(issuer, 'O'):
                    result['issuer'] = issuer.O
            # 取到期时间
            result['notAfter'] = self.strf_date(
                bytes.decode(x509.get_notAfter())[:-1])
            # 取申请时间
            result['notBefore'] = self.strf_date(
                bytes.decode(x509.get_notBefore())[:-1])
            # 取可选名称
            result['dns'] = []
            for i in range(x509.get_extension_count()):
                s_name = x509.get_extension(i)
                if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
                    s_dns = str(s_name).split(',')
                    for d in s_dns:
                        result['dns'].append(d.split(':')[1])
            subject = x509.get_subject().get_components()
            # 取主要认证名称
            if len(subject) == 1:
                result['subject'] = subject[0][1].decode()
            else:
                if not result['dns']:
                    for sub in subject:
                        if sub[0] == b'CN':
                            result['subject'] = sub[1].decode()
                            break
                    # result['dns'].append(result['subject'])
                    if 'subject' in result:
                        result['dns'].append(result['subject'])

                else:
                    result['subject'] = result['dns'][0]
            result['endtime'] = int(
                int(time.mktime(time.strptime(result['notAfter'], "%Y-%m-%d")) - time.time()) / 86400)
            return result
        except:
            return None

    # 获取指定证书基本信息
    def get_cert_init(self, pem_file):
        if "/www/server/panel/class" not in sys.path:
            sys.path.insert(0, "/www/server/panel/class")
        import ssl_info
        return ssl_info.ssl_info().load_ssl_info(pem_file)
    
    # 转换时间
    def strf_date(self, sdate):
        return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))

    # 转换时间
    def strfToTime(self, sdate):
        import time
        return time.strftime('%Y-%m-%d', time.strptime(sdate, '%b %d %H:%M:%S %Y %Z'))

    # 获取产品列表
    def GetSSLProduct(self, get):
        self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
        result = json.loads(public.httpPost(self.__APIURL + 'user/GetSSLProduct', self.__PDATA))
        result['data'] = self.En_Code(result['data'])
        return result

    # 加密数据
    def De_Code(self, data):
        if sys.version_info[0] == 2:
            import urllib
            pdata = urllib.urlencode(data)
            return binascii.hexlify(pdata)
        else:
            import urllib.parse
            pdata = urllib.parse.urlencode(data)
            if type(pdata) == str: pdata = pdata.encode('utf-8')
            return binascii.hexlify(pdata).decode()

    # 解密数据
    def En_Code(self, data):
        if sys.version_info[0] == 2:
            import urllib
            result = urllib.unquote(binascii.unhexlify(data))
        else:
            import urllib.parse
            if type(data) == str: data = data.encode('utf-8')
            tmp = binascii.unhexlify(data)
            if type(tmp) != str: tmp = tmp.decode('utf-8')
            result = urllib.parse.unquote(tmp)

        if type(result) != str: result = result.decode('utf-8')
        return json.loads(result)

    # 手动一键续签
    def renew_lets_ssl(self, get):
        if not os.path.exists('vhost/cert/crontab.json'):
            return public.return_message(-1, 0, public.lang("There are currently no certificates to renew!"))

        old_list = json.loads(public.ReadFile("vhost/cert/crontab.json"))
        cron_list = old_list
        if hasattr(get, 'siteName'):
            if not get.siteName in old_list:
                return public.return_message(-1, 0, public.lang("There is no certificate that can be renewed on the current website.."))
            cron_list = {}
            cron_list[get.siteName] = old_list[get.siteName]

        import panelLets
        lets = panelLets.panelLets()

        result = {}
        result['status'] = True
        result['sucess_list'] = []
        result['err_list'] = []
        for siteName in cron_list:
            data = cron_list[siteName]
            ret = lets.renew_lest_cert(data)
            if ret['status']:
                result['sucess_list'].append(siteName)
            else:
                result['err_list'].append({"siteName": siteName, "msg": ret['msg']})
        return result

    # todo?
    def renew_cert_order(self, args):
        '''
            @name 续签商用证书
            @author cjx
            @version 1.0
        '''
        if not 'pdata' in args:
            return public.return_message(-1, 0, public.lang("The pdata parameter cannot be empty!"))
        pdata = json.loads(args.pdata)
        self.__PDATA['data'] = pdata

        result = self.request('renew_cert_order')
        if result['status'] == True:
            self.__PDATA['data'] = {}
            args['oid'] = result['oid']
            result['verify_info'] = self.get_verify_info(args)
        return result

    def GetAuthToken(self, get):
        """
        登录官网获取Token
        @get.username 官网手机号
        @get.password 官网账号密码
        """
        rtmp = ""
        data = {}
        data['username'] = public.rsa_decrypt(get.username)
        data['password'] = public.md5(public.rsa_decrypt(get.password))
        data['serverid'] = panelAuth().get_serverid()

        if 'code' in get: data['code'] = get.code
        if 'token' in get: data['token'] = get.token

        pdata = {}
        pdata['data'] = self.De_Code(data)
        try:
            rtmp = public.httpPost(self.__BINDURL, pdata)
            result = json.loads(rtmp)
            result['data'] = self.En_Code(result['data'])
            if not result['status']: return result

            if result['data']:
                if result['data']['serverid'] != data['serverid']:  # 保存新的serverid
                    public.writeFile('data/sid.pl', result['data']['serverid'])
                public.writeFile(self.__UPATH, json.dumps(result['data']))
                if os.path.exists('data/bind_path.pl'): os.remove('data/bind_path.pl')
                public.flush_plugin_list()
            del (result['data'])
            session['focre_cloud'] = True
            return result
        except Exception as ex:
            error = str(ex)
            if error.lower().find('json') >= 0:
                error = '<br>错误:连接宝塔官网异常,请按照以下方法排除问题后重试:<br>解决方法:<a target="_blank" class="btlink" href="https://www.bt.cn/bbs/thread-87257-1-1.html">https://www.bt.cn/bbs/thread-87257-1-1.html</a><br>'
                # raise public.PanelError(error)
                return public.return_message(-1, 0, 6)
            else:
                return public.return_message(-1, 0, 6)
                # raise public.error_conn_cloud(error)
            # return public.return_message(-1, 0, public.lang("连接服务器失败!<br>{}", rtmp))

    def GetBindCode(self, get):
        """
        获取验证码
        """
        rtmp = ""
        data = {}
        data['username'] = get.username
        data['token'] = get.token
        pdata = {}
        pdata['data'] = self.De_Code(data)
        try:
            rtmp = public.httpPost(self.__CODEURL, pdata)
            result = json.loads(rtmp)
            return result
        except Exception as ex:
            raise public.error_conn_cloud(str(ex))
            # return public.returnMsg(False,'连接服务器失败!<br>' + rtmp)

    # 解析DNSAPI信息
    def get_dnsapi(self, auth_to):
        tmp = auth_to.split('|')
        dns_name = tmp[0]
        key = "None"
        secret = "None"
        if len(tmp) < 3:
            dnsapi_config = json.loads(public.readFile('{}/config/dns_api.json'.format(public.get_panel_path())))
            for dc in dnsapi_config:
                if dc['name'] != dns_name:
                    continue
                if not dc['data']:
                    continue
                key = dc['data'][0]['value']
                secret = dc['data'][1]['value']
        else:
            key = tmp[1]
            secret = tmp[2]
        return dns_name, key, secret

    # 获取dnsapi对象
    def get_dns_class(self, auth_to):
        try:
            import panelDnsapi
            dns_name, key, secret = self.get_dnsapi(auth_to)
            dns_class = getattr(panelDnsapi, dns_name)(key, secret)
            dns_class._type = 1
            return dns_class
        except:
            return None

    # 解析域名
    def create_dns_record(self, auth_to, domain, dns_value, original_domain=None):
        # 如果为手动解析
        if auth_to == 'dns':
            return None
        from panelDnsapi import DnsMager
        dns_class = DnsMager().get_dns_obj_by_domain(original_domain)
        dns_class._type = 1
        if not dns_class:
            return public.return_message(-1, 0, public.lang("The operation failed. Please check that the key is correct"))

        # 申请前删除caa记录
        root, zone = public.get_root_domain(domain)
        try:
            dns_class.remove_record(public.de_punycode(root), '@', 'CAA')
        except:
            pass
        try:
            dns_class.create_dns_record(public.de_punycode(domain), dns_value)
            return public.return_message(0, 0, public.lang("Added successfully"))
        except:
            return public.return_message(-1, 0, public.get_error_info())

    # 检测ssl验证方式
    def check_ssl_method(self, get):
        """
        @name 检测ssl验证方式
        @domain string 域名
        """

        domain = get.domain
        if public.M('sites').where('id=?', (public.M('domain').where('name=?', (domain)).getField('pid'),)).getField(
                'project_type') == 'Java':
            siteRunPath = '{}/java_node_ssl'.format(public.M("config").getField("sites_path"))
        else:
            siteRunPath = self.get_domain_run_path(domain)

        if not siteRunPath:
            return public.return_message(-1, 0, public.lang("Failed to get the website path. Please check if the website exists"))

        verify_path = siteRunPath + '/.well-known/pki-validation'
        if not os.path.exists(verify_path):  os.makedirs(verify_path)

        # 生成临时文件
        check_val = public.GetRandomString(16)
        verify_file = '{}/{}.txt'.format(verify_path, check_val)
        public.writeFile(verify_file, check_val)
        if not os.path.exists(verify_file):
            return public.return_message(-1, 0, public.lang("Failed to create the validation file. Check if the write was blocked"))

        res = {}
        msg = [' domain name [{}] validation file cannot be accessed correctly'.format(domain),
               'Probable cause',
               '1、the resolution was not correct, or the resolution did not work [Please resolve the domain correctly, or wait for the resolution to work and try again]',
               '2、check whether 301/302 redirects are set [please temporarily turn off redirects related configuration]',
               '3、check whether the site has enabled reverse proxy [please temporarily turn off reverse proxy configuration]'
               ]

        res['HTTP_CSR_HASH'] = msg
        res['HTTPS_CSR_HASH'] = msg

        # 检测HTTP/https访问
        args = public.dict_obj()
        for stype in ['http', 'https']:
            args.url = '{}://{}/.well-known/pki-validation/{}.txt'.format(stype, domain, check_val)
            args.content = check_val
            if self.check_url_txt(args, 2) == 1:
                res['{}_CSR_HASH'.format(stype).upper()] = 1

        # 检测caa记录
        result = self.check_ssl_caa([domain])
        if not result:
            res['CNAME_CSR_HASH'] = 1
        else:
            res['CNAME_CSR_HASH'] = json.loads(result['data'])

        if os.path.exists(verify_file):
            os.remove(verify_file)
        return res

    @staticmethod
    def upload_cert_to_cloud(get):
        ssl_id = None
        ssl_hash = None
        try:
            if "ssl_id" in get:
                ssl_id = int(get.ssl_id)
            if "ssl_hash" in get:
                ssl_hash = get.ssl_hash.strip()
        except (ValueError, AttributeError, KeyError):
            return public.return_message(-1, 0, "parameter error")
        from ssl_manage import SSLManger
        try:
            return SSLManger().upload_cert(ssl_id, ssl_hash)
        except ValueError as e:
            return public.return_message(-1, 0, str(e))
        except Exception as e:
            return public.return_message(-1, 0, "operation mistake:" + str(e))

    @staticmethod
    def remove_cloud_cert(get):
        ssl_id = None
        ssl_hash = None
        local = False
        try:
            if "ssl_id" in get:
                ssl_id = int(get.ssl_id)
            if "ssl_hash" in get:
                ssl_hash = get.ssl_hash.strip()

            if "local" in get and get.local.strip() in ("1", 1, True, "true"):
                local = True

        except (ValueError, AttributeError, KeyError):
            return public.return_message(-1, 0, "parameter error")
        from ssl_manage import SSLManger
        try:
            return SSLManger().remove_cert(ssl_id, ssl_hash, local=local)
        except ValueError as e:
            return public.return_message(-1, 0, str(e))
        except Exception as e:
            return public.return_message(-1, 0, "operation mistake:" + str(e))

    # 未使用
    @staticmethod
    def refresh_cert_list(get=None):
        from ssl_manage import SSLManger
        try:
            return public.return_message(0, 0,  SSLManger().get_cert_list(force_refresh=True))
        except ValueError as e:
            return public.return_message(-1, 0, str(e))
        except Exception as e:
            return public.return_message(-1, 0, "operation mistake:" + str(e))

    @staticmethod
    def get_cert_info(get):
        ssl_id = None
        ssl_hash = None
        try:
            if "ssl_id" in get:
                ssl_id = int(get.ssl_id)
            if "ssl_hash" in get:
                ssl_hash = get.ssl_hash.strip()
        except (ValueError, AttributeError, KeyError):
            return public.return_message(-1, 0, "parameter error")
        from ssl_manage import SSLManger
        try:
            ssl_mager = SSLManger()
            target = ssl_mager.find_ssl_info(ssl_id, ssl_hash)
            if target is None:
                return public.return_message(-1, 0, public.lang("No certificate information was obtained"))
            target.update(ssl_mager.get_cert_for_deploy(target["hash"]))
            return target
        except ValueError as e:
            return public.return_message(-1, 0, str(e))
        except Exception as e:
            return public.return_message(-1, 0, "operation mistake:" + str(e))

    @staticmethod
    def get_cert_list(get):
        """
        search_limit 0 -> 所有证书
        search_limit 1 -> 没有过期的证书
        search_limit 2 -> 有效期小于等于15天的证书 但未过期
        search_limit 3 -> 过期的证书
        search_limit 4 -> 过期时间1年以上的证书
        """
        search_name = None
        search_limit = 0
        force_refresh = False

        try:
            if "search_name" in get:
                search_name = get.search_name.strip()
            if "search_limit" in get:
                search_limit = int(get.search_limit.strip())
            if "force_refresh" in get and get.force_refresh.strip() in ("1", 1, "True", True):
                force_refresh = True

        except (ValueError, AttributeError, KeyError):
            return public.return_message(-1, 0, "parameter error")

        param = None
        if search_name is not None:
            param = ['subject LIKE ?', ["%{}%".format(search_name)]]

        now = datetime.datetime.now()
        filter_func = lambda x: True
        if search_limit == 1:
            date = now.strftime("%Y-%m-%d")
            filter_func = lambda x: x["not_after"] >= date
        elif search_limit == 2:
            date1 = now.strftime("%Y-%m-%d")
            date2 = (now + datetime.timedelta(days=15)).strftime("%Y-%m-%d")
            filter_func = lambda x: date1 <= x["not_after"] <= date2
        elif search_limit == 3:
            date = now.strftime("%Y-%m-%d")
            filter_func = lambda x: x["not_after"] < date
        elif search_limit == 4:
            date = (now + datetime.timedelta(days=366)).strftime("%Y-%m-%d")
            filter_func = lambda x: x["not_after"] > date

        from ssl_manage import SSLManger
        try:
            res_list = SSLManger().get_cert_list(param=param, force_refresh=force_refresh)
            return public.return_message(0, 0, list(filter(filter_func, res_list)))
        except ValueError as e:
            return public.return_message(-1, 0, str(e))
        except Exception as e:
            return public.return_message(-1, 0, "operation mistake:" + str(e))

    def verify_mail_any(self, args):
        if "email" not in args:
            return public.return_message(-1, 0, "Missing email parameter")
        email = args.email
        data = {'email': email}
        try:
            sUrl = '{}/api/user/sendVerifyMailAny'.format(public.OfficialApiBase())

            rtmp = public.httpPost(sUrl, data)

            result = json.loads(rtmp)
            if result['success']:
                return public.return_message(0, 0, result['res'])
            else:
                return public.return_message(-1, 0, result['res'])

        except Exception as e:
            return public.return_message(-1, 0, "err:" + str(e))

Youez - 2016 - github.com/yon3zu
LinuXploit