403Webshell
Server IP : 104.21.38.3  /  Your IP : 162.158.88.140
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/firewallModelV2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/firewallModelV2/iptablesServices.py
# coding: utf-8
# -------------------------------------------------------------------
# aapanel
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 aapanel(http://www.aapanel.com) All rights reserved.
# -------------------------------------------------------------------

import subprocess
import sys

if "/www/server/panel/class" not in sys.path:
    sys.path.insert(0, "/www/server/panel/class")
import public
from firewallModelV2.app.appBase import Base


class IptablesServices(Base):
    def __init__(self):
        super().__init__()

    def set_chain_rich_ip(self, info, operation, chain):
        """
            @name 添加/删除指定链的复杂ip规则
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        try:
            if "Address" in info and info["Address"] == "":
                return self._result(False, public.lang("Failed to set up rule: IP address cannot be empty"))
            if "Address" in info and public.is_ipv6(info['Address']):
                return self._result(False, public.lang("Failed to set up rule Unsupported IPV6 addresses"))

            if "Timeout" not in info or info["Timeout"] == "":
                info["Timeout"] = 0

            if chain == "INPUT":
                chain = 'in'
            else:
                chain = 'out'

            if operation == "add":
                exec_cmd = "ipset add {}_bt_user_{}_ipset {} timeout {}".format(chain, info["Strategy"],
                                                                                info["Address"], info["Timeout"])
            else:
                exec_cmd = "ipset del {}_bt_user_{}_ipset {}".format(chain, info["Strategy"], info["Address"])
            stdout, stderr = public.ExecShell(exec_cmd)
            if stderr:
                return self._result(False, public.lang(f"Failed to set up rule:{stderr}"))

            return self._result(True, public.lang("Failed to set up rule"))
        except Exception as e:
            return self._result(False, public.lang(f"Failed to set up rule:{str(e)}"))

    def rich_rules(self, info, operation, chain):
        """
            @name
            @author csj <2025/3/12 上午9:28>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        if "Priority" in info and not "Port" in info:
            return self.set_chain_rich_ip(info, operation, chain)
        else:
            pass
            # 端口类型暂时不用新版
            # return self.set_chain_rich_port(info, operation, "INPUT")

    # 2024/4/29 下午4:01 获取指定链的数据
    def get_chain_data(self, chain):
        """
            @name 获取指定链的数据
            @author wzz <2024/4/29 下午4:01>
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return [
                       {
                          "Family": "ipv4",
                          "Address": "192.168.1.190",
                          "Strategy": "accept",
                          "Chain": "INPUT",
                          "Timeout": timeout
                       }
                    ]
        """
        if chain == "INPUT":
            ipset_chain = "in"
        elif chain == "OUTPUT":
            ipset_chain = "out"
        else:
            return []

        rules = []
        try:
            for strategy in ["accept", "drop"]:
                ipset_cmd = "ipset list {}_bt_user_{}_ipset | awk '/[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/ && /timeout/ {{print $1, $3}}'".format(
                    ipset_chain, strategy
                )
                stdout, stderr = public.ExecShell(ipset_cmd)
                iplist = stdout.split("\n")

                for line in iplist:
                    if line == "": continue
                    address = line.strip().split(" ")[0]
                    timeout = line.strip().split(" ")[1]
                    rule = {
                        "Family": "ipv4",
                        "Address": address,
                        "Strategy": strategy,
                        "Chain": chain,
                        "Timeout": timeout
                    }
                    rules.append(rule)
            return rules
        except Exception as _:
            return ""

    def list_address(self, chains: list = None):
        """
        获取指定链的ipset列表
        :param chains: 链列表 ["INPUT","OUTPUT"]
        """
        if chains is None:
            chains = ["INPUT", "OUTPUT"]
        result = []
        for chain in chains:
            result = result + self.get_chain_data(chain)
        return result

    def parse_rules(self, stdout):
        """
            @name 解析规则列表输出,返回规则列表字典
            @author wzz <2024/3/19 下午 3:53>
                字段含义:
                    "number": 规则编号,对应规则在链中的顺序。
                    "chain": 规则所属的链的名称。
                    "pkts": 规则匹配的数据包数量。
                    "bytes": 规则匹配的数据包字节数。
                    "target": 规则的目标动作,表示数据包匹配到该规则后应该执行的操作。
                    "prot": 规则适用的协议类型。
                    "opt": 规则的选项,包括规则中使用的匹配条件或特定选项。
                    "in": 规则匹配的数据包的输入接口。
                    "out": 规则匹配的数据包的输出接口。
                    "source": 规则匹配的数据包的源地址。
                    "destination": 规则匹配的数据包的目标地址。
                    "options": 规则的其他选项或说明,通常是规则中的注释或附加信息。

                protocol(port协议头中数字对应的协议类型):
                    0: 表示所有协议
                    1: ICMP(Internet 控制消息协议)
                    6: TCP(传输控制协议)
                    17: UDP(用户数据报协议)
            @param "data":{"参数名":""} <数据类型> 参数描述
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        lines = stdout.strip().split('\n')
        rules = []
        current_chain = None
        for line in lines:
            if line.startswith("Chain"):
                current_chain = line.split()[1]
            elif (line.startswith("target") or line.strip() == "" or "source" in line or
                  "Warning: iptables-legacy tables present" in line):
                # 过滤表头,空行,警告
                continue
            else:
                rule_info = line.split()
                rule = {
                    "number": rule_info[0],
                    "chain": current_chain,
                    "pkts": rule_info[1],
                    "bytes": rule_info[2],
                    "target": rule_info[3],
                    "prot": rule_info[4],
                    "opt": rule_info[5],
                    "in": rule_info[6],
                    "out": rule_info[7],
                    "source": rule_info[8],
                    "destination": rule_info[9],
                    "options": " ".join(rule_info[10:]).strip()
                }
                rules.append(rule)
        return rules

    def list_rules(self, parm):
        """
            @name 列出指定表的指定链的规则
            @author wzz <2024/3/19 下午 3:02>
            @param
            @return
        """
        try:
            stdout = subprocess.check_output(
                ['iptables', '-t', parm['table'], '-L', parm['chain_name'], '-nv', '--line-numbers'],
                stderr=subprocess.STDOUT, universal_newlines=True
            )
            return self.parse_rules(stdout)
        except Exception as _:
            return []

    def list_port_forward(self):
        """
            @name 调用list_rules获取所有nat表中的PREROUTING链的规则(端口转发规则),并分析成字典返回
            @return dict{"status":True/False,"msg":"提示信息"}
        """
        try:
            port_forward_rules = self.list_rules({"table": "nat", "chain_name": "FORWARD_BT"})
            rules = []
            for rule in port_forward_rules:
                rule_item = {
                    "type": "port_forward",
                    "number": rule["number"],
                    "S_Address": rule["source"],
                    "S_Port": "",
                    "T_Address": "",
                    "T_Port": "",
                    "Protocol": "TCP"
                }

                options = rule["options"].split(' ')
                if rule["target"] == "DNAT":  # 外部转发 有IP
                    rule_item["S_Port"] = options[1].split("dpt:")[1]
                    rule_item["T_Address"] = options[2].split("to:")[1].split(":")[0]
                    rule_item["T_Port"] = options[2].split("to:")[1].split(":")[1]
                elif rule["target"] == "REDIRECT":  # 内部转发无IP
                    rule_item["S_Port"] = options[1].split("dpt:")[1]
                    rule_item["T_Address"] = "127.0.0.1"
                    rule_item["T_Port"] = options[-1]
                else:
                    continue

                if rule["prot"] == "6" or rule["prot"] == "tcp":
                    rule_item["Protocol"] = "TCP"
                elif rule["prot"] == "17" or rule["prot"] == "udp":
                    rule_item["Protocol"] = "UDP"

                rules.append(rule_item)
            return rules
        except Exception as _:
            return []

    def port_forward(self, info, operation):
        """
        设置端口转发规则
        :param info: 规则信息
        :param operation: 操作类型 add/del
        """
        to_addr = info["T_Address"]

        if operation == "add":
            operation = "A"
        else:
            operation = "D"

        is_lo = False
        if to_addr == "" or to_addr == "0.0.0.0" or to_addr == "127.0.0.1" or to_addr == "0.0.0.0/0":
            is_lo = True

        if is_lo:
            exec_cmd = "iptables -t nat -{operation} FORWARD_BT -p {proto} --dport {sport} -j REDIRECT --to-port {tport}".format(
                operation=operation, proto=info["Protocol"], sport=info["S_Port"], tport=info["T_Port"])
        else:
            exec_cmd = "iptables -t nat -{operation} FORWARD_BT -p {proto} --dport {sport} -j DNAT --to-destination {taddr}:{tport}".format(
                operation=operation, proto=info["Protocol"], addr=info["S_Address"], sport=info["S_Port"],
                taddr=to_addr, tport=info["T_Port"])

        stdout, stderr = public.ExecShell(exec_cmd)
        if stderr:
            return self._result(False, public.lang(f"Failed to set up rule:{stderr}"))
        public.ExecShell("systemctl reload BT-FirewallServices")
        return self._result(True, public.lang("Setting up the rule was successful"))

Youez - 2016 - github.com/yon3zu
LinuXploit