Server IP : 104.21.38.3 / Your IP : 162.158.88.140 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/firewallModelV2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aapanel # ------------------------------------------------------------------- # Copyright (c) 2014-2099 aapanel(http://www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- import subprocess import sys if "/www/server/panel/class" not in sys.path: sys.path.insert(0, "/www/server/panel/class") import public from firewallModelV2.app.appBase import Base class IptablesServices(Base): def __init__(self): super().__init__() def set_chain_rich_ip(self, info, operation, chain): """ @name 添加/删除指定链的复杂ip规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ try: if "Address" in info and info["Address"] == "": return self._result(False, public.lang("Failed to set up rule: IP address cannot be empty")) if "Address" in info and public.is_ipv6(info['Address']): return self._result(False, public.lang("Failed to set up rule Unsupported IPV6 addresses")) if "Timeout" not in info or info["Timeout"] == "": info["Timeout"] = 0 if chain == "INPUT": chain = 'in' else: chain = 'out' if operation == "add": exec_cmd = "ipset add {}_bt_user_{}_ipset {} timeout {}".format(chain, info["Strategy"], info["Address"], info["Timeout"]) else: exec_cmd = "ipset del {}_bt_user_{}_ipset {}".format(chain, info["Strategy"], info["Address"]) stdout, stderr = public.ExecShell(exec_cmd) if stderr: return self._result(False, public.lang(f"Failed to set up rule:{stderr}")) return self._result(True, public.lang("Failed to set up rule")) except Exception as e: return self._result(False, public.lang(f"Failed to set up rule:{str(e)}")) def rich_rules(self, info, operation, chain): """ @name @author csj <2025/3/12 上午9:28> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if "Priority" in info and not "Port" in info: return self.set_chain_rich_ip(info, operation, chain) else: pass # 端口类型暂时不用新版 # return self.set_chain_rich_port(info, operation, "INPUT") # 2024/4/29 下午4:01 获取指定链的数据 def get_chain_data(self, chain): """ @name 获取指定链的数据 @author wzz <2024/4/29 下午4:01> @param "data":{"参数名":""} <数据类型> 参数描述 @return [ { "Family": "ipv4", "Address": "192.168.1.190", "Strategy": "accept", "Chain": "INPUT", "Timeout": timeout } ] """ if chain == "INPUT": ipset_chain = "in" elif chain == "OUTPUT": ipset_chain = "out" else: return [] rules = [] try: for strategy in ["accept", "drop"]: ipset_cmd = "ipset list {}_bt_user_{}_ipset | awk '/[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/ && /timeout/ {{print $1, $3}}'".format( ipset_chain, strategy ) stdout, stderr = public.ExecShell(ipset_cmd) iplist = stdout.split("\n") for line in iplist: if line == "": continue address = line.strip().split(" ")[0] timeout = line.strip().split(" ")[1] rule = { "Family": "ipv4", "Address": address, "Strategy": strategy, "Chain": chain, "Timeout": timeout } rules.append(rule) return rules except Exception as _: return "" def list_address(self, chains: list = None): """ 获取指定链的ipset列表 :param chains: 链列表 ["INPUT","OUTPUT"] """ if chains is None: chains = ["INPUT", "OUTPUT"] result = [] for chain in chains: result = result + self.get_chain_data(chain) return result def parse_rules(self, stdout): """ @name 解析规则列表输出,返回规则列表字典 @author wzz <2024/3/19 下午 3:53> 字段含义: "number": 规则编号,对应规则在链中的顺序。 "chain": 规则所属的链的名称。 "pkts": 规则匹配的数据包数量。 "bytes": 规则匹配的数据包字节数。 "target": 规则的目标动作,表示数据包匹配到该规则后应该执行的操作。 "prot": 规则适用的协议类型。 "opt": 规则的选项,包括规则中使用的匹配条件或特定选项。 "in": 规则匹配的数据包的输入接口。 "out": 规则匹配的数据包的输出接口。 "source": 规则匹配的数据包的源地址。 "destination": 规则匹配的数据包的目标地址。 "options": 规则的其他选项或说明,通常是规则中的注释或附加信息。 protocol(port协议头中数字对应的协议类型): 0: 表示所有协议 1: ICMP(Internet 控制消息协议) 6: TCP(传输控制协议) 17: UDP(用户数据报协议) @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ lines = stdout.strip().split('\n') rules = [] current_chain = None for line in lines: if line.startswith("Chain"): current_chain = line.split()[1] elif (line.startswith("target") or line.strip() == "" or "source" in line or "Warning: iptables-legacy tables present" in line): # 过滤表头,空行,警告 continue else: rule_info = line.split() rule = { "number": rule_info[0], "chain": current_chain, "pkts": rule_info[1], "bytes": rule_info[2], "target": rule_info[3], "prot": rule_info[4], "opt": rule_info[5], "in": rule_info[6], "out": rule_info[7], "source": rule_info[8], "destination": rule_info[9], "options": " ".join(rule_info[10:]).strip() } rules.append(rule) return rules def list_rules(self, parm): """ @name 列出指定表的指定链的规则 @author wzz <2024/3/19 下午 3:02> @param @return """ try: stdout = subprocess.check_output( ['iptables', '-t', parm['table'], '-L', parm['chain_name'], '-nv', '--line-numbers'], stderr=subprocess.STDOUT, universal_newlines=True ) return self.parse_rules(stdout) except Exception as _: return [] def list_port_forward(self): """ @name 调用list_rules获取所有nat表中的PREROUTING链的规则(端口转发规则),并分析成字典返回 @return dict{"status":True/False,"msg":"提示信息"} """ try: port_forward_rules = self.list_rules({"table": "nat", "chain_name": "FORWARD_BT"}) rules = [] for rule in port_forward_rules: rule_item = { "type": "port_forward", "number": rule["number"], "S_Address": rule["source"], "S_Port": "", "T_Address": "", "T_Port": "", "Protocol": "TCP" } options = rule["options"].split(' ') if rule["target"] == "DNAT": # 外部转发 有IP rule_item["S_Port"] = options[1].split("dpt:")[1] rule_item["T_Address"] = options[2].split("to:")[1].split(":")[0] rule_item["T_Port"] = options[2].split("to:")[1].split(":")[1] elif rule["target"] == "REDIRECT": # 内部转发无IP rule_item["S_Port"] = options[1].split("dpt:")[1] rule_item["T_Address"] = "127.0.0.1" rule_item["T_Port"] = options[-1] else: continue if rule["prot"] == "6" or rule["prot"] == "tcp": rule_item["Protocol"] = "TCP" elif rule["prot"] == "17" or rule["prot"] == "udp": rule_item["Protocol"] = "UDP" rules.append(rule_item) return rules except Exception as _: return [] def port_forward(self, info, operation): """ 设置端口转发规则 :param info: 规则信息 :param operation: 操作类型 add/del """ to_addr = info["T_Address"] if operation == "add": operation = "A" else: operation = "D" is_lo = False if to_addr == "" or to_addr == "0.0.0.0" or to_addr == "127.0.0.1" or to_addr == "0.0.0.0/0": is_lo = True if is_lo: exec_cmd = "iptables -t nat -{operation} FORWARD_BT -p {proto} --dport {sport} -j REDIRECT --to-port {tport}".format( operation=operation, proto=info["Protocol"], sport=info["S_Port"], tport=info["T_Port"]) else: exec_cmd = "iptables -t nat -{operation} FORWARD_BT -p {proto} --dport {sport} -j DNAT --to-destination {taddr}:{tport}".format( operation=operation, proto=info["Protocol"], addr=info["S_Address"], sport=info["S_Port"], taddr=to_addr, tport=info["T_Port"]) stdout, stderr = public.ExecShell(exec_cmd) if stderr: return self._result(False, public.lang(f"Failed to set up rule:{stderr}")) public.ExecShell("systemctl reload BT-FirewallServices") return self._result(True, public.lang("Setting up the rule was successful"))