403Webshell
Server IP : 104.21.38.3  /  Your IP : 172.69.176.125
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/firewallModelV2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/firewallModelV2/firewallBase.py
# coding: utf-8
# -------------------------------------------------------------------
# aapanel
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 aapanel(http://bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: wzz <[email protected]>
# -------------------------------------------------------------------

# ------------------------------
# 系统防火墙模型 - 基类
# ------------------------------

import os
import re
from typing import Dict, Union, Any

import public


class Base(object):
    def __init__(self):
        self.config_path = "{}/class_v2/firewallModelV2/config".format(public.get_panel_path())
        self.m_time_file = "/www/server/panel/data/firewall/geoip_mtime.pl"
        self._isUfw = False
        self._isFirewalld = False
        self._isIptables = False
        if os.path.exists('/usr/sbin/firewalld') or os.path.exists('/etc/redhat-release'):
            self._isFirewalld = True
            from firewallModelV2.app.firewalld import Firewalld
            self.firewall = Firewalld()
        elif os.path.exists('/usr/bin/apt-get'):
            self._isUfw = True
            from firewallModelV2.app.ufw import Ufw
            self.firewall = Ufw()
        elif not self._isUfw and not self._isFirewalld:
            self._isIptables = True
            from firewallModelV2.app.iptables import Iptables
            self.firewall = Iptables()
        _months = {'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04', 'May': '05', 'Jun': '06', 'Jul': '07',
                   'Aug': '08', 'Sep': '09', 'Sept': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12'}

    # 2024/3/14 上午 11:27 获取防火墙运行状态
    def get_firewall_status(self) -> bool:
        """
            @name 获取防火墙运行状态
            @author wzz <2024/3/14 上午 11:27>
            @param
            @return bool True/False
        """
        if self._isUfw:
            res = public.ExecShell("systemctl is-active ufw")[0]
            if res == "active": return True
            res = public.ExecShell("systemctl list-units | grep ufw")[0]
            if res.find('active running') != -1: return True
            res = public.ExecShell('/lib/ufw/ufw-init status')[0]
            if res.find("Firewall is not running") != -1: return False
            res = public.ExecShell('ufw status verbose')[0]
            if res.find('inactive') != -1: return False
            return True
        if self._isFirewalld:
            res = public.ExecShell("ps -ef|grep firewalld|grep -v grep")[0]
            if res: return True
            res = public.ExecShell("systemctl is-active firewalld")[0]
            if res == "active": return True
            res = public.ExecShell("systemctl list-units | grep firewalld")[0]
            if res.find('active running') != -1: return True
            return False
        else:
            res = public.ExecShell("/etc/init.d/iptables status")[0]
            if res.find('not running') != -1: return False
            res = public.ExecShell("systemctl is-active iptables")[0]
            if res == "active": return True
            return True

    # 2024/3/14 上午 11:30 设置禁ping
    def set_ping(self, get) -> dict:
        """
            @name 设置禁ping
            @author wzz <2024/3/14 上午 11:31>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        get.status = get.get("status", "1")
        get.status = str(get.status) if str(get.status) in ['0', '1'] else '1'

        filename = '/etc/sysctl.conf'
        conf = public.readFile(filename)
        if conf.find('net.ipv4.icmp_echo') != -1:
            rep = r"net\.ipv4\.icmp_echo.*"
            conf = re.sub(rep, 'net.ipv4.icmp_echo_ignore_all=' + get.status + "\n", conf)
        else:
            conf += "\nnet.ipv4.icmp_echo_ignore_all=" + get.status + "\n"

        if public.writeFile(filename, conf):
            public.ExecShell('sysctl -p')
            return public.return_message(0, 0, public.lang("SUCCESS"))
        else:
            return public.return_message(-1, 0,
                                         '<a style="color:red;">Error: Setting failed, sysctl.conf is not writable! </a><br>'
                                         '1. If aApanel [System Hardening] is installed, please close it first<br>'
                                         '2. If Cloud Lock is installed, please turn off the [System Collagen] function<br>'
                                         '3. If a security dog is installed, please turn off the [System Protection] function<br>'
                                         '4. If you use other security software, please uninstall it first<br>'
                                         )

    # 2024/3/14 上午 11:37 获取网站日志目录的大小
    def get_www_logs_size(self, get) -> Dict[str, Union[str, Any]]:
        """
            @name 获取网站日志目录的大小
            @author wzz <2024/3/14 上午 11:37>
            @param
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        aa = public.to_size(public.get_path_size("/www/wwwlogs"))
        size = aa if aa else "0B"
        return public.return_message(0, 0, {"log_path": "/www/wwwlogs", "size": size})

    # 2024/3/25 上午 10:50 获取防火墙类型,firewall或ufw
    def _get_firewall_type(self) -> str:
        """
            @name 获取防火墙类型,firewall或ufw
            @return str firewall/ufw
        """
        import os
        if os.path.exists('/usr/sbin/ufw'):
            return 'ufw'
        if os.path.exists('/usr/sbin/firewalld'):
            return 'firewall'
        return 'iptables'

    # 2024/3/26 下午 5:01 获取指定域名的A记录
    def get_a_ip(self, domain: str) -> str:
        """
            @name 获取指定域名的A记录
            @param domain: 域名
            @return str
        """
        try:
            import socket
            return socket.gethostbyname(domain)
        except Exception as e:
            return ""

    # 2024/3/26 下午 5:40 检查是否已添加计划任务,如果没有则添加
    def check_resolve_crontab(self):
        """
            @name 检查是否已添加计划任务
            @author wzz <2024/3/26 下午 5:41>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        python_path = "{}/pyenv/bin/python".format(public.get_panel_path())

        if not public.M('crontab').where(
                'name=?',
                ('[Do not delete] system firewall domain name resolution detection tasks',)
        ).count():
            cmd = '{} {}'.format(python_path, '/www/server/panel/script/firewall_domain.py')
            args = {
                "name": "[Do not delete] system firewall domain name resolution detection tasks",
                "type": 'minute-n',
                "where1": '5',
                "hour": '',
                "minute": '',
                "sName": "",
                "sType": 'toShell',
                "notice": '',
                "notice_channel": '',
                "save": '',
                "save_local": '1',
                "backupTo": '',
                "sBody": cmd,
                "urladdress": ''
            }
            import crontab
            res = crontab.crontab().AddCrontab(args)
            if res and "id" in res.keys():
                return True
            return False
        return True

    # 2024/3/26 下午 11:37 当没有域名解析时,删除域名解析的计划任务
    def remove_resolve_crontab(self):
        """
            @name 当没有域名解析时,删除域名解析的计划任务
            @author wzz <2024/3/26 下午 11:37>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """

        if not public.M('firewall_domain').count():
            pdata = public.M('crontab').where(
                'name=?',
                '[Do not delete] system firewall domain name resolution detection tasks'
            ).select()
            if pdata:
                import crontab
                for i in pdata:
                    args = {"id": i['id']}
                    crontab.crontab().DelCrontab(args)

    # 2024/3/26 下午 6:22 端口扫描
    def CheckPort(self, port, protocol):
        """
            @name 端口扫描
            @author wzz <2024/3/26 下午 6:22>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        import socket
        localIP = '127.0.0.1'
        temp = {}
        temp['port'] = port
        temp['local'] = True

        try:
            if 'tcp' in protocol.lower():
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.settimeout(0.01)
                s.connect((localIP, port))
                s.close()
            if 'udp' in protocol.lower():
                s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                s.settimeout(0.01)
                s.sendto(b'', (localIP, port))
                s.close()
        except:
            temp['local'] = False

        result = 0
        if temp['local']: result += 2
        return result

    # 2024/12/18 11:06 构造返回的分页数据
    def return_page(self, data, get):
        """
            @name 构造返回的分页数据
        """
        count = len(data)

        result = public.get_page(count, int(get.p), int(get.row))
        result['data'] = data[(int(get.p) - 1) * int(get.row):(int(get.p)) * int(get.row)]

        return result

Youez - 2016 - github.com/yon3zu
LinuXploit