Server IP : 172.67.216.182 / Your IP : 162.158.88.2 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/firewallModelV2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2014-2099 宝塔软件(http://bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: wzz <[email protected]> # ------------------------------------------------------------------- import json import re import time import os # ------------------------------ # 系统防火墙模型 - 业务接口类 # ------------------------------ import public from firewallModelV2.firewallBase import Base from firewallModelV2.iptablesServices import IptablesServices # noinspection PyUnusedLocal class main(Base): def __init__(self): super().__init__() self.check_table_column() self.check_table_firewall_forward() self.iptables = IptablesServices() self._add_sid = None # 2024/3/14 下午 12:01 获取防火墙状态信息 def get_firewall_info(self, get): """ @name 获取防火墙统计 """ cache_key = "firewall_info" from BTPanel import cache data = cache.get(cache_key) if data: return public.success_v2(data) data = { 'port': len(self.firewall.list_port()), 'ip': len(self.iptables.list_address()) + len(self.firewall.list_address()), 'trans': len(self.iptables.list_port_forward()), 'country': public.M('firewall_country').count(), 'banned': public.M('firewall_malicious_ip').count(), 'type': "firewalld" if self._isFirewalld else "ufw" if self._isUfw else "iptables", } if not "m_time_file" in self.__dict__: self.m_time_file = "/www/server/panel/data/firewall/geoip_mtime.pl" m_time = public.readFile(self.m_time_file) data['update_time'] = public.format_date(times=int(m_time) if m_time else int(time.time())) isPing = True try: file = '/etc/sysctl.conf' conf = public.readFile(file) rep = r"#*net\.ipv4\.icmp_echo_ignore_all\s*=\s*([0-9]+)" tmp = re.search(rep, conf).groups(0)[0] if tmp == '1': isPing = False except: isPing = True data['ping'] = isPing cache.set(cache_key, data, 3600) return public.success_v2(data) # 2024/3/26 下午 3:40 获取防火墙状态 def get_status(self, get): """ @name 获取防火墙状态 @author wzz <2024/3/26 下午 3:40> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ firewall_status = self.get_firewall_status() firewall_init_status = self.check_iptables_env(firewall_status) data = {"status": firewall_status, "init_status": firewall_init_status} return public.success_v2(data) def clean_cache(self,get): """ @name 强制清除防火墙缓存 @author csj """ from BTPanel import cache cache_keys = ["firewall_info","port_rules_list","ip_rules_list","port_forward_list"] for cache_key in cache_keys: cache.delete(cache_key) return public.success_v2('Cache cleared successfully') def check_iptables_env(self, firewall_status): if firewall_status: if not os.path.exists('/etc/systemd/system/BT-FirewallServices.service'): return {'status': False, 'msg': public.lang('Firewall not initialised.')} elif public.ExecShell("iptables -C INPUT -j IN_BT")[1] != '': #丢失iptable链 需要重新创建 exec_shell = 'sh {}/script/init_firewall.sh'.format(public.get_panel_path()) public.ExecShell(exec_shell) return {'status': True, 'msg': public.lang('installed.')} else: return {'status': True, 'msg': public.lang('installed.')} else: return {'status': True, 'msg': public.lang('Firewall not turned on.')} def update_bt_firewall(self, get): if not os.path.exists('/etc/systemd/system/BT-FirewallServices.service'): panel_path = public.get_panel_path() exec_shell = '(' if not os.path.exists('/usr/sbin/ipset'): exec_shell = exec_shell + '{} install ipset -y;'.format(public.get_sys_install_bin()) exec_shell = exec_shell + 'sh {panel_path}/script/init_firewall.sh;btpython -u {panel_path}/script/upgrade_firewall.py )'.format(panel_path=panel_path) import panelTask task_obj = panelTask.bt_task() task_id = task_obj.create_task('Firewall Initialisation Tasks', 0, exec_shell) return public.success_v2({'task_id': task_id, 'msg': public.lang('Firewall initialisation task created.')}) else: return public.fail_v2(public.lang('Installed')) # 2024/3/26 下午 3:42 设置防火墙状态 def set_status(self, get): """ @name 设置防火墙状态 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.status = get.get('status/s', '1') if get.status not in ['0', '1']: return public.fail_v2(public.lang("The parameter is incorrect")) if get.status == '1': start_status = self.firewall.start() if not start_status["status"]: public.WriteLog("system firewall", "Failed to start firewall") return public.fail_v2(start_status['msg']) get.reload = 0 default_ports = ["SSH", "80", "443", "39000-40000", "21", "PanelPort"] close_ports = get.get('ports', '').split(",") for port in default_ports: if port not in close_ports: if port == "PanelPort": _panel_port_file = '/www/server/panel/data/port.pl' port = public.readFile(_panel_port_file).strip(" ").strip("\n") elif port == "SSH": cmd = "cat /etc/ssh/sshd_config |grep -E '^Port'|awk '{print $2}'|awk 'NR == 1'" ssh_port = public.ExecShell(cmd)[0].strip(" ").strip("\n") if ssh_port != "": port = ssh_port get.port = port self.set_port_rule(get) self.firewall.reload() public.WriteLog("system firewall", "Firewall activation") return public.return_message(0 if start_status['status'] else -1, 0, start_status['msg']) else: data = self.firewall.stop() return public.return_message(0 if data['status'] else -1, 0, data['msg']) # 2024/5/13 下午3:50 检查指定端口是否已经存在,如果存在则返回False,否则返回True def check_port_exist(self, get): """ @name 检查指定端口是否已经存在,如果存在则返回False,否则返回True @author wzz <2024/5/13 下午3:51> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ port_rules_list = self.port_rules_list(get) for item in port_rules_list['message']['data']: if item["Port"] == get.port and item["Address"] == get.address and item["Protocol"] == get.protocol and \ item["Strategy"] == get.strategy and item["Chain"] == get.chain: return False return True # 2024/5/13 下午4:13 检查指定ip规则是否已经存在,如果存在则返回False,否则返回True def check_ip_exist(self, get): """ @name 检查指定ip规则是否已经存在,如果存在则返回False,否则返回True @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ ip_rules_list = self.ip_rules_list(get) for item in ip_rules_list['message']["data"]: if item["Address"] == get.address and item["Strategy"] == get.strategy and item["Chain"] == get.chain: return False return True # 2024/5/13 下午4:17 检查指定端口转发规则是否已经存在,如果存在则返回False,否则返回True def check_forward_exist(self, get): """ @name 检查指定端口转发规则是否已经存在,如果存在则返回False,否则返回True @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ forward_rules_list = self.port_forward_list(get) for item in forward_rules_list["message"]["data"]: if item["S_Address"] == get.S_Address and item["S_Port"] == get.S_Port and item["T_Address"] == get.T_Address and \ item["T_Port"] == get.T_Port: return False return True # 2024/3/26 下午 6:09 从数据库中获取端口规则列表 def get_port_db(self, get): """ @name 从数据库中获取端口规则列表 @author wzz <2024/3/26 下午 6:13> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ try: where = '1=1' sql = public.M('firewall_new') data = sql.where(where, ()).select() domain_sql = public.M('firewall_domain') domain_data = domain_sql.where(where, ()).select() for i in range(len(data)): if not "ports" in data[i]: data[i]['status'] = -1 continue if "brief" in data[i]: data[i]['brief'] = public.xssdecode(data[i]['brief']) if not "chain" in data[i]: data[i]['chain'] = "INPUT" if "chain" in data[i] and data[i]['chain'] == "": data[i]['chain'] = "INPUT" for j in range(len(domain_data)): if "domain" in domain_data[j] and data[i]['address'] in domain_data[j]['domain']: data[i]['domain'] = domain_data[j]['domain'] break return data except Exception as e: return [] # 2024/3/26 下午 11:53 从数据库中获取ip规则列表 def get_ip_db(self, get): """ @name 从数据库中获取ip规则列表 @author wzz <2024/3/26 下午 11:53> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ try: where = '1=1' sql = public.M('firewall_ip') ip_data = sql.where(where, ()).select() for i_data in ip_data: if "brief" in i_data: i_data['brief'] = public.xssdecode(i_data['brief']) if not "chain" in i_data: i_data['chain'] = "INPUT" if "chain" in i_data and i_data['chain'] == "": i_data['chain'] = "INPUT" return ip_data except Exception as e: return [] # 2024/3/26 下午 11:58 从数据库中获取端口转发规则列表 def get_forward_db(self, get): """ @name 从数据库中获取端口转发规则列表 @author wzz <2024/3/26 下午 11:58> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ try: where = '1=1' sql = public.M('firewall_forward') if hasattr(get, 'query'): where = " S_Address like '%{search}%' or S_Port like '%{search}%' or T_Address like '%{search}%' or T_Port like '%{search}%'".format( search=get.query ) res = sql.where(where, ()).select() if type(res) != list: return [] return res except Exception as e: return [] # 2024/3/26 下午 10:46 构造端口规则返回数据 def structure_port_return_data(self, list_port, rule_db, get): """ @name 构造返回数据 @author wzz <2024/3/26 下午 10:47> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ cache_key = "port_rules_list" from BTPanel import cache data = cache.get(cache_key) if data: new_list = [] sort_data = data for j in range(len(sort_data)): if get.query != "": if get.query in sort_data[j]['Port'] or get.query in sort_data[j]['brief'] or get.query in \ sort_data[j]['Address']: new_list.append(sort_data[j]) elif "Chain" not in sort_data[j]: new_list.append(sort_data[j]) elif get.chain != "ALL" and sort_data[j]['Chain'] == get.chain: new_list.append(sort_data[j]) if len(new_list) > 0 or get.query != "" or get.chain != "ALL": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: new_list = [] for j in range(len(list_port)): list_port[j]['id'] = 0 list_port[j]['sid'] = 0 list_port[j]['brief'] = "" list_port[j]['domain'] = "" if (list_port[j]['Port'].find(":") != -1 or list_port[j]['Port'].find("-") != -1 or list_port[j]['Port'].find("/") != -1 or list_port[j]['Port'].find(".") != -1): list_port[j]['status'] = 1 else: try: if not ":" in list_port[j]['Port'] or not "-" in list_port[j]['Port']: list_port[j]['status'] = self.CheckPort(int(list_port[j]['Port']), list_port[j]['Protocol']) else: list_port[j]['status'] = -1 except: list_port[j]['status'] = -1 if "Chain" in list_port[j] and list_port[j]['Chain'] == "OUTPUT": list_port[j]['status'] = -1 list_port[j]['addtime'] = "--" list_port[j]['Port'] = list_port[j]['Port'].replace(":", "-") for i in range(len(rule_db)): # 备注不受条件影响 # if rule_db[i]['ports'] == list_port[j]['Port']: # list_port[j]['brief'] = rule_db[i]['brief'] if (rule_db[i]['ports'] == list_port[j]['Port'] and rule_db[i]['protocol'] == list_port[j]['Protocol'] and rule_db[i]['address'].lower() == list_port[j]['Address'].lower() and rule_db[i]['types'] == list_port[j]['Strategy'] and rule_db[i]['chain'] == list_port[j]['Chain']): list_port[j]['id'] = rule_db[i]['id'] list_port[j]['sid'] = rule_db[i]['sid'] list_port[j]['brief'] = rule_db[i]['brief'] list_port[j]['addtime'] = rule_db[i]['addtime'] if "domain" in rule_db[i]: list_port[j]['domain'] = rule_db[i]['domain'] break if get.query != "": if get.query in list_port[j]['Port'] or get.query in list_port[j]['brief'] or get.query in \ list_port[j]['Address']: new_list.append(list_port[j]) if len(new_list) > 0 or get.query != "": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: sort_data = sorted(list_port, key=lambda x: x['addtime'], reverse=True) if get.query == "": cache.set(cache_key, sort_data, 86400) return self.return_page(sort_data, get) # 2024/3/27 上午 12:01 构造ip规则返回数据 def structure_ip_return_data(self, list_ip, rule_db, get): """ @name 构造ip规则返回数据 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ cache_key = "ip_rules_list" from BTPanel import cache data = cache.get(cache_key) if data: new_list = [] sort_data = data for j in range(len(sort_data)): if get.query != "": if get.query in sort_data[j]['Address'] or get.query in sort_data[j]['brief']: new_list.append(sort_data[j]) elif get.chain != "ALL" and sort_data[j]['Chain'] == get.chain: new_list.append(sort_data[j]) if len(new_list) > 0 or get.query != "" or get.chain != "ALL": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: new_list = [] # btTodo1: 要删除已经失效的ip 暂时不加 可能会删掉其他模块从其他地方添加的IP # matched_ids = set() # btTodo1: 存储匹配到的id 用于删除数据库中已经失效的ip for j in range(len(list_ip)): import random list_ip[j]['id'] = random.randint(-100, -1) list_ip[j]['sid'] = 0 list_ip[j]['brief'] = "" list_ip[j]['domain'] = "" list_ip[j]['addtime'] = "--" list_ip[j]["expiry_date"] = '' # BT-Services托管的IP规则 if "Timeout" in list_ip[j]: # BT-Services托管的IP规则 if int(list_ip[j]["Timeout"]) > 0: from datetime import datetime timeout = int(time.time()) + int(list_ip[j]['Timeout']) timeout_str = datetime.fromtimestamp(timeout).strftime('%Y-%m-%d %H:%M:%S') list_ip[j]["expiry_date"] = timeout_str list_ip[j]["stype"] = '1' else: # 系统防火墙的IP规则 list_ip[j]["stype"] = '0' for i in range(len(rule_db)): if (rule_db[i]['address'] == list_ip[j]['Address'] and rule_db[i]['types'] == list_ip[j]['Strategy'] and rule_db[i]['chain'] == list_ip[j]['Chain']): list_ip[j]['id'] = rule_db[i]['id'] list_ip[j]['sid'] = rule_db[i]['sid'] list_ip[j]['brief'] = rule_db[i]['brief'] list_ip[j]['addtime'] = rule_db[i]['addtime'] if "domain" in rule_db[i]: list_ip[j]['domain'] = rule_db[i]['domain'] # matched_ids.add(rule_db[i]['id']) # btTodo1:相匹配的ip列表 用于过滤 break if get.query != "": if get.query in list_ip[j]['brief'] or get.query in list_ip[j]['Address']: new_list.append(list_ip[j]) # all_ids = {entry['id'] for entry in rule_db} # btTodo1:获取 rule_db 中所有 ID # expiry_ip_ids = all_ids - matched_ids # btTodo1:失效IP的ID # for expiry_id in expiry_ip_ids: # btTodo1: # public.M('firewall_ip').where("id=?", (expiry_id,)).delete() # btTodo1: if len(new_list) > 0 or get.query != "": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: sort_data = sorted(list_ip, key=lambda x: x['addtime'], reverse=True) if get.query == "": cache.set(cache_key, sort_data, 86400) from mod.project.ssh.base import SSHbase page_data = self.return_page(sort_data, get) page_data["data"] = SSHbase.return_area(page_data["data"], "Address") return page_data # 2024/3/27 上午 12:11 构造端口转发规则返回数据 def structure_forward_return_data(self, list_forward, rule_db, get): """ @name 构造端口转发规则返回数据 @author wzz <2024/3/27 上午 12:11> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ cache_key = "port_forward_list" from BTPanel import cache data = cache.get(cache_key) if data: new_list = [] sort_data = data for j in range(len(sort_data)): if get.query != "": if get.query in sort_data[j]['S_Address'] or get.query in sort_data[j]['S_Port'] or get.query in \ sort_data[j]['T_Address'] or get.query in sort_data[j]['T_Port'] or get.query in \ sort_data[j]['brief']: new_list.append(sort_data[j]) if len(new_list) > 0 or get.query != "": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: new_list = [] for j in range(len(list_forward)): list_forward[j]['id'] = 0 list_forward[j]['brief'] = "" list_forward[j]['addtime'] = "--" for i in range(len(rule_db)): if (rule_db[i]['T_Address'] == list_forward[j]['T_Address'] and rule_db[i]['S_Port'] == list_forward[j]['S_Port'] and rule_db[i]['T_Port'] == list_forward[j]['T_Port']): list_forward[j]['id'] = rule_db[i]['id'] list_forward[j]['brief'] = rule_db[i]['brief'] list_forward[j]['addtime'] = rule_db[i]['addtime'] break if get.query != "": if (get.query in list_forward[j]['brief'] or get.query in list_forward[j][ 'S_Address'] or get.query in list_forward[j]['S_Port'] or get.query in list_forward[j]['T_Address'] or get.query in list_forward[j]['T_Port']): new_list.append(list_forward[j]) if len(new_list) > 0 or get.query != "": sort_data = sorted(new_list, key=lambda x: x['addtime'], reverse=True) else: sort_data = sorted(list_forward, key=lambda x: x['addtime'], reverse=True) if get.query == "": cache.set(cache_key, sort_data, 86400) return self.return_page(sort_data, get) # 2024/3/25 上午 11:05 获取所有端口规则列表 def port_rules_list(self, get): """ @name 获取所有端口规则列表 @author wzz <2024/3/25 上午 11:06> @param "data":{"参数名":""} <数据类型> 参数描述 @return list[dict{}...] """ get.chain = get.get('chain/s', 'ALL') get.query = get.get('query/s', '') get.p = get.get('p', 1) get.row = get.get('row', 20) rule_db = self.get_port_db(get) if get.chain == "INPUT": list_port = self.firewall.list_input_port() elif get.chain == "OUTPUT": list_port = self.firewall.list_output_port() else: list_port = self.firewall.list_port() return public.success_v2(self.structure_port_return_data(list_port, rule_db, get)) # 2024/3/26 下午 3:17 导出规则 def export_rules(self, get): """ @name 导出规则 @author wzz <2024/3/26 下午 3:17> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.rule = get.get('rule/s', 'port') if get.rule == "port": return self.export_port_rules(get) elif get.rule == "ip": return self.export_ip_rules(get) else: return self.export_port_forward(get) # 2024/3/26 下午 3:18 导入规则 def import_rules(self, get): """ @name 导入规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置导入规则 if not self.get_firewall_status(): return public.fail_v2("Start the firewall before importing the rules!") get.rule = get.get('rule/s', 'port') if get.rule == "port": return self.import_port_rules(get) elif get.rule == "ip": return self.import_ip_rules(get) else: return self.import_port_forward(get) # 2024/3/26 下午 2:38 导出所有端口规则 def export_port_rules(self, get): """ @name 导出所有端口规则 @author wzz <2024/3/26 下午 2:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.chain = get.get('chain/s', 'all') if get.chain == "INPUT": file_name = "input_port_rules_{}".format(int(time.time())) elif get.chain == "OUTPUT": file_name = "output_port_rules_{}".format(int(time.time())) else: file_name = "port_rules_{}".format(int(time.time())) data = self.port_rules_list(get)['message']['data'] if not data: return public.fail_v2("No rules can be exported") if not os.path.exists(self.config_path): os.makedirs(self.config_path, exist_ok=True) file_path = "{}/{}.json".format(self.config_path, file_name) public.writeFile(file_path, public.GetJson(data)) public.WriteLog("system firewall", "Export port rules") return public.success_v2(file_path) # 2024/3/26 下午 2:41 导入端口规则 def import_port_rules(self, get): """ @name 导入端口规则 @author wzz <2024/3/26 下午 2:58> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.file = get.get('file/s', '') if not get.file: return public.fail_v2("The file cannot be empty") if not os.path.exists(get.file): return public.fail_v2("The file does not exist") try: data = public.readFile(get.file) if "|" in data and not "{" in data: get.rule_name = "port_rule" return self.import_rules_old(get) data = json.loads(data) # 2024/4/10 下午2:51 反转数据 data.reverse() except: return public.fail_v2("The file content is abnormal or malformed") args = public.dict_obj() for item in data: args.operation = 'add' args.protocol = item['Protocol'] args.port = item['Port'] args.strategy = item['Strategy'] args.chain = item['Chain'] args.address = item.get('Address', 'all') args.brief = item.get('brief', '') args.reload = "0" self.set_port_rule(args) if self._isFirewalld: self.firewall.reload() return public.success_v2("The import was successful") # 2024/5/14 上午10:27 调用旧的导入规则方法 def import_rules_old(self, get): """ @name 调用旧的导入规则方法 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ try: from safeModel.firewallModel import main as firewall firewall_obj = firewall() get.file_name = get.file.split("/")[-1] firewall_obj.import_rules(get) return public.success_v2("The import was successful") except Exception as e: return public.fail_v2(str(e)) # 2024/3/26 上午 9:30 处理多个ip以换行的方式添加/删除 def set_nline_port_ip(self, get): """ @name 处理多个ip以换行的方式添加/删除 @author wzz <2024/3/26 上午 9:32> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = get.address.split("\n") failed_list = [] for addr in address: if not public.checkIp(addr): return public.fail_v2('The destination address is in the wrong format') get.address = addr if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2(f'The setting succeeds, but the following rule settings fail:{failed_list}') # if self._isFirewalld: # self.firewall.reload() return public.success_v2('The setup was successful') # 2024/3/26 上午 9:30 处理多个ip以逗号的方式添加/删除 def set_tline_port_ip(self, get): """ @name 处理多个ip以逗号的方式添加 @author wzz <2024/3/26 上午 9:32> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = get.address.split(",") failed_list = [] for addr in address: if not public.checkIp(addr): return public.fail_v2('The destination address is in the wrong format') get.address = addr if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2('The setting succeeds, but the following rule settings fail::{}'.format(failed_list)) # if self._isFirewalld: # self.firewall.reload() return public.success_v2('The setup was successful') # 2024/3/26 上午 9:34 处理192.168.1.10-192.168.1.20这种范围ip的添加/删除 def set_range_port_ip(self, get): """ @name 处理192.168.1.10-192.168.1.20这种范围ip的添加/删除 @author wzz <2024/3/26 上午 9:35> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = self.firewall.handle_ip_range(get.address) failed_list = [] for addr in address: get.address = addr if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2('The setting succeeds, but the following rule settings fail::{}'.format(failed_list)) # if self._isFirewalld: # self.firewall.reload() return public.success_v2('The setup was successful') # 2024/3/25 下午 6:27 设置端口规则 def set_port_rule(self, get): """ @name 设置端口规则 @author wzz <2024/3/25 下午 6:28> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置端口规则 if not self.get_firewall_status(): return public.fail_v2("Please activate the firewall before setting up the rules!") get.operation = get.get('operation/s', 'add') get.protocol = get.get('protocol/s', 'tcp') get.address = get.get('address/s', 'all') get.port = get.get('port/s', '') get.strategy = get.get('strategy/s', 'accept') get.chain = get.get('chain/s', 'INPUT') get.reload = get.get('reload/s', "1") get.brief = get.get('brief/s', '') if get.address == "Anywhere" or get.address == "": get.address = "all" if get.protocol == "all": get.protocol = "tcp/udp" if get.port == "": return public.fail_v2("The destination port cannot be empty") from copy import deepcopy if get.address != "all" and "," in get.address: args1 = public.to_dict_obj(deepcopy(get.get_items())) address_list = get.address.split(",") for address in address_list: args1.address = address result = self.more_prot_rule(args1) if not result['status']: return public.fail_v2(result['msg']) elif get.address != "all" and "\n" in get.address: args2 = public.to_dict_obj(deepcopy(get.get_items())) address_list = get.address.split("\n") for address in address_list: args2.address = address result = self.more_prot_rule(args2) if not result['status']: return public.fail_v2(result['msg']) else: result = self.more_prot_rule(get) if not result['status']: return public.fail_v2(result['msg']) if self._isFirewalld and get.reload == "1": self.firewall.reload() cache_key = "firewall_info" from BTPanel import cache data = cache.get(cache_key) if data: cache.delete(cache_key) cache_key = "port_rules_list" data = cache.get(cache_key) if data: cache.delete(cache_key) public.WriteLog("system firewall", "Set up port rules:{}".format(get.port)) return public.success_v2("The setup was successful") # 2024/3/29 下午 4:04 处理多个ip的端口规则情况 def more_prot_rule(self, get): """ @name 处理多个ip的端口规则情况 @author wzz <2024/3/29 下午 4:02> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.port.find(",") != -1: from copy import deepcopy args = public.to_dict_obj(deepcopy(get.get_items())) port_list = get.port.split(",") for port in port_list: args.port = port result = self.exec_port_rule(args) if not result['status']: return result return public.success_v2('The setup was successful') else: return self.exec_port_rule(get) # 2024/3/28 下午 6:29 执行端口设置 def exec_port_rule(self, get): """ @name 执行端口设置 @author wzz <2024/3/28 下午 6:29> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.operation == "add" and not self.check_port_exist(get): return public.fail_v2('Port {} already exists, do not add it repeatedly'.format(get.port)) self.set_port_db(get) # 2024/3/25 下午 8:23 处理多个ip的情况,例如出现每行一个ip if get.address != "all" and "\n" in get.address: return self.set_nline_port_ip(get) elif get.address != "all" and "-" in get.address: return self.set_range_port_ip(get) elif get.address != "all" and "," in get.address: return self.set_tline_port_ip(get) elif get.address != "all" and "/" in get.address: if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) elif get.address != "all" and not public.checkIp(get.address) and not public.is_ipv6(get.address): return public.fail_v2('The specified IP address is in the wrong format') else: if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) return result # 2024/5/14 上午10:40 前置检测ip是否合法 def check_ips(self, get): """ @name 前置检测ip是否合法 @author wzz <2024/5/14 上午10:40> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.address != "all" and "\n" in get.address: address = get.address.split("\n") for addr in address: if addr != "all" and not public.checkIp(addr) and not public.is_ipv6(addr): return public.fail_v2("The specified IP address is in the wrong format") elif get.address != "all" and "," in get.address: address = get.address.split(",") for addr in address: if addr != "all" and not public.checkIp(addr) and not public.is_ipv6(addr): return public.fail_v2("The specified IP address is in the wrong format") else: if get.address != "all" and not public.checkIp(get.address) and not public.is_ipv6(get.address): return public.fail_v2("The specified IP address is in the wrong format") return public.success_v2("ok") # 2024/3/27 上午 9:37 修改端口规则 def modify_port_rule(self, get): """ @name 修改端口规则 @author wzz <2024/3/27 上午 9:38> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置规则 if not self.get_firewall_status(): return public.fail_v2("Please activate the firewall before setting up the rules!") get.old_data = get.get('old_data/s', '') get.new_data = get.get('new_data/s', '') if get.old_data == "": return public.fail_v2("Please pass in old_data") if get.new_data == "": return public.fail_v2("Please pass in new_data") get.old_data = json.loads(get.old_data) get.new_data = json.loads(get.new_data) args1 = public.dict_obj() args1.operation = 'remove' args1.port = get.old_data['Port'] args1.protocol = get.old_data['Protocol'] args1.address = get.old_data['Address'] args1.strategy = get.old_data['Strategy'] args1.chain = get.old_data['Chain'] args1.id = get.old_data['id'] args1.sid = get.old_data['sid'] args1.reload = "0" self.set_port_rule(args1) args2 = public.dict_obj() args2.operation = 'add' args2.port = get.new_data['port'] args2.protocol = get.new_data['protocol'] args2.address = get.new_data['address'] if "address" in get.new_data else "all" args2.strategy = get.new_data['strategy'] args2.chain = get.new_data['chain'] args2.brief = get.new_data['brief'] if 'brief' in get.new_data else "" args2.reload = "1" self.set_port_rule(args2) return public.success_v2("The modification was successful") # 2024/3/27 下午 4:03 修改域名端口规则 def modify_domain_port_rule(self, get): """ @name 修改域名端口规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置设置规则 if not self.get_firewall_status(): return public.fail_v2("Please activate the firewall before setting up the rules!") get.old_data = get.get('old_data/s', '') get.new_data = get.get('new_data/s', '') if get.old_data == "": return public.fail_v2("Please pass in old_data") if get.new_data == "": return public.fail_v2("Please pass in new_data") get.old_data = json.loads(get.old_data) get.new_data = json.loads(get.new_data) address = self.get_a_ip(get.new_data['domain']) if address == "": return public.fail_v2(f"Domain name: 【{get.domain}】Resolution failed") args1 = public.dict_obj() args1.operation = 'remove' args1.port = get.old_data['Port'] args1.protocol = get.old_data['Protocol'] args1.address = get.old_data['Address'] args1.strategy = get.old_data['Strategy'] args1.chain = get.old_data['Chain'] args1.id = get.old_data['id'] args1.sid = get.old_data['sid'] self.set_port_rule(args1) args2 = public.dict_obj() args2.operation = 'add' args2.port = get.new_data['port'] args2.protocol = get.new_data['protocol'] args2.address = address args2.strategy = get.new_data['strategy'] args2.chain = get.new_data['chain'] args2.brief = get.new_data['brief'] if "brief" in get.new_data else "" args2.domain = get.new_data['domain'] self.set_port_rule(args2) if self._isFirewalld: self.firewall.reload() return public.success_v2("The modification was successful") # 2024/3/26 下午 5:10 设置域名端口规则 def set_domain_port_rule(self, get): """ @name 设置域名端口规则 @author wzz <2024/3/26 下午 5:11> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置规则 if not self.get_firewall_status(): return public.fail_v2("Please activate the firewall before setting up the rules!") # public.set_module_logs('Setting up domain port rules', 'set_domain_port_rule', 1) get.operation = get.get('operation/s', 'add') get.protocol = get.get('protocol/s', 'tcp') get.domain = get.get('domain/s', '') get.port = get.get('port/s', '') get.strategy = get.get('strategy/s', 'accept') get.chain = get.get('chain/s', 'INPUT') if get.domain == "": return public.fail_v2(public.lang("The destination domain name cannot be empty")) if get.port == "": return public.fail_v2(public.lang("The destination port cannot be empty")) if not public.is_domain(get.domain): return public.fail_v2(public.lang("The destination domain name is in the wrong format")) if "|" in get.domain: get.domain = get.domain.split("|")[0] address = self.get_a_ip(get.domain) if address == "": return public.fail_v2(public.lang("Domain name: 【{}】Resolution failed", get.domain)) get.address = address self.set_port_db(get) if get.chain == "INPUT": result = self.input_port(get) else: result = self.output_port(get) if result['status'] and self._isFirewalld: self.firewall.reload() return public.return_message(0 if result.get("status") else -1, 0, result['msg']) # 2024/5/13 下午4:46 添加端口规则到指定数据库 def add_port_db(self, get, protocol, addtime, domain): """ @name 添加端口规则到指定数据库 @author wzz <2024/5/13 下午4:46> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ add_sid = public.M('firewall_new').add( 'ports,brief,protocol,address,types,addtime,domain,sid,chain', ( get.port, public.xsssec(get.brief), protocol, get.address, get.strategy, addtime, domain, 0, get.chain) ) if get.domain != "": domain_sid = public.M('firewall_domain').add( 'types,domain,port,address,brief,addtime,sid,protocol,domain_total', (get.strategy, domain, get.port, get.address, public.xsssec(get.brief), addtime, add_sid, get.protocol, get.domain) ) public.M('firewall_new').where("id=?", (add_sid,)).save('sid', domain_sid) self.check_resolve_crontab() # 2024/5/13 下午4:49 从指定数据库删除端口规则 def remove_port_db(self, get, protocol, addtime, domain): """ @name 从指定数据库删除端口规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ public.M('firewall_new').where("ports=? and protocol=? and address=? and types=? and chain=?", ( get.port, protocol, get.address, get.strategy, get.chain )).delete() get.domain = get.get('domain/s', '') if get.domain != "": public.M('firewall_domain').where("domain=?", (get.domain,)).delete() self.remove_resolve_crontab() # 2024/3/26 下午 5:52 添加/删除数据库的端口规则 def set_port_db(self, get): """ @name 添加/删除数据库的端口规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.operation == "add": # 检测端口是否已经添加过 query_result = public.M('firewall_new').where( 'ports=? and address=? and protocol=? and types=? and chain=?', (get.port, get.address, get.protocol, get.strategy, get.chain) ).find() if not query_result: get.domain = get.get('domain/s', '') domain = "{}|{}".format(get.domain, get.address) if get.domain != "" else "" addtime = time.strftime('%Y-%m-%d %X', time.localtime()) if get.protocol == "tcp/udp" and self._isFirewalld: self.add_port_db(get, "tcp", addtime, domain) self.add_port_db(get, "udp", addtime, domain) else: self.add_port_db(get, get.protocol, addtime, domain) else: query_result = public.M('firewall_new').where( 'ports=? and address=? and protocol=? and types=? and chain=?', (get.port, get.address, get.protocol, get.strategy, get.chain) ).find() if query_result: if get.protocol == "tcp/udp" and self._isFirewalld: self.remove_port_db(get, "tcp", query_result['addtime'], query_result['domain']) self.remove_port_db(get, "udp", query_result['addtime'], query_result['domain']) else: self.remove_port_db(get, get.protocol, query_result['addtime'], query_result['domain']) # 2024/3/26 下午 11:23 添加/删除数据库的ip规则 def set_ip_db(self, get): """ @name 添加/删除数据库的ip规则 @author wzz <2024/3/26 下午 11:24> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.operation == "add": get.domain = get.get('domain/s', '') domain = "{}|{}".format(get.domain, get.address) if get.domain != "" else "" query_result = public.M('firewall_ip').where("address=? and types=? and domain=? and chain=?", (get.address, get.strategy, domain, get.chain)).find() if not query_result: addtime = time.strftime('%Y-%m-%d %X', time.localtime()) self._add_sid = public.M('firewall_ip').add( 'address,types,brief,addtime,domain,sid,chain', (get.address, get.strategy, public.xsssec(get.brief), addtime, domain, 0, get.chain) ) if get.domain != "": domain_sid = public.M('firewall_domain').add( 'types,domain,port,address,brief,addtime,sid,protocol,domain_total', (get.strategy, domain, '', get.address, public.xsssec(get.brief), addtime, self._add_sid, '', get.domain) ) public.M('firewall_ip').where("id=?", (self._add_sid,)).save('sid', domain_sid) self.check_resolve_crontab() else: get.address = get.get("address/s", '') get.strategy = get.get("strategy/s", '') if get.address == "": return public.fail_v2(public.lang("Please pass in id")) public.M('firewall_ip').where("address=? and types=? and chain=?", (get.address, get.strategy, get.chain)).delete() get.domain = get.get('domain/s', '') if get.domain != "": public.M('firewall_domain').where("domain=?", (get.domain, )).delete() self.remove_resolve_crontab() # 2024/3/26 下午 11:40 添加/删除数据库的端口转发规则 def set_forward_db(self, get): """ @name 添加/删除数据库的端口转发规则 @author wzz <2024/3/26 下午 11:40> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.operation == "add": query_result = public.M('firewall_forward').where("S_Port=? and T_Address=? and T_Port=? and Protocol=?", ( get.S_Port, get.T_Address, get.T_Port, get.protocol )).find() if not query_result: get.brief = get.get('brief/s', '') addtime = time.strftime('%Y-%m-%d %X', time.localtime()) self._add_sid = public.M('firewall_forward').add( 'S_Port,T_Address,T_Port,Protocol,Family,addtime,brief', (get.S_Port, get.T_Address, get.T_Port, get.protocol, "ipv4", addtime, get.brief) ) else: public.M('firewall_forward').where( "S_Port=? and T_Address=? and T_Port=? and Protocol=?", (get.S_Port, get.T_Address, get.T_Port, get.protocol) ).delete() # 2024/3/25 下午 6:55 入站端口规则 def input_port(self, get): """ @name 入站端口规则 @param "data":{"参数名":""} <数据类型> 参数描述 dabao @return list[dict{}...] """ info = { "Port": get.port, "Protocol": get.protocol.lower(), "Strategy": get.strategy.lower(), "Family": "ipv4", } if ":" in get.address: info["Family"] = "ipv6" if get.address != "all": info["Address"] = get.address result = self.firewall.rich_rules(info=info, operation=get.operation) elif get.address == "all" and get.strategy == "drop": result = self.firewall.rich_rules(info=info, operation=get.operation) else: result = self.firewall.input_port(info=info, operation=get.operation) return result # 2024/3/25 下午 6:56 出站端口规则 def output_port(self, get): """ @name 出站端口规则 @author wzz <2024/3/25 下午 6:56> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ info = { "Port": get.port, "Protocol": get.protocol.lower(), "Strategy": get.strategy.lower(), "Priority": "0", "Family": "ipv4", } if ":" in get.address: info["Family"] = "ipv6" if get.address != "all": info["Address"] = get.address result = self.firewall.output_rich_rules(info=info, operation=get.operation) else: result = self.firewall.output_port(info=info, operation=get.operation) return result # 2024/3/26 上午 9:40 处理多个ip的情况,例如出现每行一个ip def set_nline_ip_rule(self, get): """ @name 处理多个ip的情况,例如出现每行一个ip @author wzz <2024/3/26 上午 9:40> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = get.address.split("\n") failed_list = [] from copy import deepcopy for addr in address: verify_addr = addr.split("/")[0] if not public.is_ipv4(verify_addr) and not public.is_ipv6(verify_addr): continue args = public.to_dict_obj(deepcopy(get.get_items())) args.address = addr args.family = "ipv4" if ":" not in addr else "ipv6" if self.check_is_user_ip(args): continue if get.operation == "add" and not self.check_ip_exist(args): continue self.set_ip_db(args) info = { "Address": args.address, "Family": args.family, "Strategy": args.strategy, "Priority": args.priority, "Zone": get.zone.lower(), } result = self.iptables.rich_rules(info=info, operation=get.operation,chain=get.chain) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2(public.lang("The setting succeeds, but the following rule settings fail::{}", failed_list)) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The setup was successful")) # 2024/3/26 上午 9:40 处理多个ip的情况,例如出现逗号隔开ip def set_tline_ip_rule(self, get): """ @name 处理多个ip的情况,例如出现逗号隔开ip @author wzz <2024/3/26 上午 9:40> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = get.address.split(",") failed_list = [] from copy import deepcopy for addr in address: if "/" in addr: a_rgs = public.to_dict_obj(deepcopy(get.get_items())) a_rgs.address = addr self.set_mask_ip_rule(get) del a_rgs continue if "-" in addr: a_rgs = public.to_dict_obj(deepcopy(get.get_items())) a_rgs.address = addr self.set_range_ip_rule(a_rgs) del a_rgs continue if not public.checkIp(addr): return public.fail_v2(public.lang("The destination address is in the wrong format")) args = public.to_dict_obj(deepcopy(get.get_items())) args.address = addr if self.check_is_user_ip(args): continue if get.operation == "add" and not self.check_ip_exist(args): continue self.set_ip_db(args) info = { "Address": args.address, "Family": args.family, "Strategy": args.strategy, "Priority": args.priority, "Zone": get.zone.lower(), } result = self.iptables.rich_rules(info=info, operation=get.operation, chain=get.chain) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2(public.lang("The setting succeeds, but the following rule settings fail::{}", failed_list)) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The setup was successful")) # 2024/3/26 上午 9:43 处理192.168.1.10-192.168.1.20这种范围ip的添加/删除 def set_range_ip_rule(self, get): """ @name 处理192.168.1.10-192.168.1.20这种范围ip的添加/删除 @author wzz <2024/3/26 上午 9:43> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ address = self.firewall.handle_ip_range(get.address) failed_list = [] from copy import deepcopy for addr in address: args = public.to_dict_obj(deepcopy(get.get_items())) args.address = addr if self.check_is_user_ip(args): continue if get.operation == "add" and not self.check_ip_exist(args): continue self.set_ip_db(args) info = { "Address": args.address, "Family": args.family, "Strategy": args.strategy, "Priority": args.priority, "Zone": get.zone.lower(), } result = self.iptables.rich_rules(info=info, operation=get.operation, chain=get.chain) if not result['status']: failed_list.append({ "address": addr, "msg": result['msg'] }) if len(failed_list) > 0: return public.success_v2(public.lang("The setting succeeds, but the following rule settings fail::{}", failed_list)) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The setup was successful")) # 2024/3/26 上午 9:46 设置带掩码的ip段 def set_mask_ip_rule(self, get): """ @name 设置带掩码的ip段 @author wzz <2024/3/26 上午 9:47> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ if get.operation == "add" and not self.check_ip_exist(get): return public.fail_v2(public.lang("The destination address {} already exists, please do not add it repeatedly", get.address)) self.set_ip_db(get) info = { "Address": get.address, "Family": get.family, "Strategy": get.strategy, "Priority": get.priority, "Zone": get.zone.lower(), } if get.chain == "INPUT": result = self.firewall.rich_rules(info=info, operation=get.operation) else: result = self.firewall.output_rich_rules(info=info, operation=get.operation) if result['status'] and self._isFirewalld: self.firewall.reload() return public.return_message(0 if result.get("status") else -1, 0, result['msg']) # 2024/4/9 下午11:31 检查是否会自己的ip,如果是则返回 def check_is_user_ip(self, get): """ @name @author wzz <2024/4/9 下午11:31> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/3/26 下午 11:27 处理用户当前的远程ip,如果添加的ip与此ip一直则返回 try: from flask import request user_ip = request.remote_addr if user_ip in get.address: return True return False except: return False # 2024/3/25 下午 7:16 设置ip规则 def set_ip_rule(self, get): """ @name 设置ip规则 @author wzz <2024/3/25 下午 7:16> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置规则 if not self.get_firewall_status(): return public.fail_v2(public.lang("Please activate the firewall before setting up the rules!")) get.operation = get.get('operation/s', 'add') get.address = get.get('address/s', '') get.strategy = get.get('strategy/s', 'accept') get.chain = get.get('chain/s', 'INPUT') get.family = get.get('family/s', 'ipv4') get.priority = get.get('priority/s', '0') get.reload = get.get('reload/s', "1") get.zone = get.get('zone', "public") get.timeout = get.get('timeout/d', 0) get.stype = get.get('stype/d', 1) #1托管的IPTABLES 0原生系统防火墙 if get.address == "": return public.fail_v2(public.lang("The destination IP cannot be empty")) if not get.strategy.lower() in ["accept", "drop"]: return public.fail_v2("Failed to set rule: Unsupported policy type") if get.chain not in ["INPUT", "OUTPUT"]: return public.fail_v2('Unsupported chain types') option_result = self.ip_rule_option(get) cache_key = "firewall_info" from BTPanel import cache data = cache.get(cache_key) if data: cache.delete(cache_key) cache_key = "ip_rules_list" data = cache.get(cache_key) if data: cache.delete(cache_key) return option_result # 2024/12/21 15:53 开始设置ip规则 def ip_rule_option(self, get): """ @name 开始设置ip规则 """ # 2024/3/25 下午 8:23 处理多个ip的情况,例如出现每行一个ip if get.address != "all" and "\n" in get.address: return self.set_nline_ip_rule(get) elif get.address != "all" and "," in get.address: return self.set_tline_ip_rule(get) elif get.address != "all" and "-" in get.address: return self.set_range_ip_rule(get) elif get.address != "all" and "/" not in get.address and not public.checkIp(get.address) and not public.is_ipv6(get.address): return public.fail_v2('Destination address format error') else: if get.operation == "add" and get.strategy == "drop": if self.check_is_user_ip(get): return public.fail_v2("Can't add your own ip") if get.operation == "add" and not self.check_ip_exist(get): return public.fail_v2("Target address {} already exists, please don't add it repeatedly".format(get.address)) self.set_ip_db(get) info = { "Address": get.address, "Family": get.family, "Strategy": get.strategy.lower(), "Priority": get.priority.lower(), "Zone": get.zone.lower(), "Timeout": get.timeout #过期时间 可以不传 默认为0永久 } if get.stype == 1: #接管的iptables规则 result = self.iptables.rich_rules(info=info, operation=get.operation, chain=get.chain) if get.reload == "1": public.ExecShell("systemctl reload BT-FirewallServices") else: #原生系统防火墙的规则 result = self.firewall.rich_rules(info=info, operation=get.operation) if get.reload == "1": self.firewall.reload() public.WriteLog("system firewall", "Setting up IP rules:{}".format(get.address)) return public.return_message(0 if result.get("status") else -1, 0, result["msg"]) # 2024/3/27 上午 9:44 修改ip规则 def modify_ip_rule(self, get): """ @name 修改ip规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置设置规则 if not self.get_firewall_status(): return public.fail_v2(public.lang("Please activate the firewall before setting up the rules!")) get.old_data = get.get('old_data/s', '') get.new_data = get.get('new_data/s', '') if get.old_data == "": return public.fail_v2(public.lang("Please pass in old_data")) if get.new_data == "": return public.fail_v2(public.lang("Please pass in new_data")) get.old_data = json.loads(get.old_data) get.new_data = json.loads(get.new_data) args1 = public.dict_obj() args1.operation = 'remove' args1.address = get.old_data['Address'] args1.strategy = get.old_data['Strategy'] args1.family = get.old_data['Family'] args1.chain = get.old_data['Chain'] args1.id = get.old_data['id'] args1.sid = get.old_data['sid'] args1.zone = get.old_data['Zone'] if "Zone" in get.old_data else "public" args1.reload = "0" args1.stype = get.old_data['stype'] self.set_ip_rule(args1) args2 = public.dict_obj() args2.operation = 'add' args2.address = get.new_data['address'] args2.strategy = get.new_data['strategy'] args2.family = get.new_data['family'] args2.chain = get.new_data['chain'] args2.brief = get.new_data['brief'] args2.zone = get.new_data['zone'] if "zone" in get.new_data else "public" args2.reload = "0" self.set_ip_rule(args2) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The modification was successful")) # 2024/3/26 下午 5:18 设置域名ip规则 def set_domain_ip_rule(self, get): """ @name 设置域名ip规则 @author wzz <2024/3/26 下午 5:19> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.operation = get.get('operation/s', 'add') get.protocol = get.get('protocol/s', 'tcp') get.domain = get.get('domain/s', '') get.port = get.get('port/s', '') get.strategy = get.get('strategy/s', 'accept') get.chain = get.get('chain/s', 'INPUT') if get.domain == "": return public.fail_v2(public.lang("The destination domain name cannot be empty")) if get.port == "": return public.fail_v2(public.lang("The destination port cannot be empty")) if not public.is_domain(get.domain): return public.fail_v2(public.lang("The destination domain name is in the wrong format")) address = self.get_a_ip(get.domain) if address == "": return public.fail_v2(public.lang("Domain name: 【{}】Resolution failed", get.domain)) if not public.checkIp(get.address): return public.fail_v2(public.lang("The destination address is in the wrong format")) info = { "Address": address, "Family": get.family, "Strategy": get.strategy.lower(), "Priority": get.priority.lower(), } if get.chain == "INPUT": result = self.firewall.rich_rules(info=info, operation=get.operation) else: result = self.firewall.output_rich_rules(info=info, operation=get.operation) if result['status'] and self._isFirewalld: self.firewall.reload() return public.return_message(0 if result.get("status") else -1, 0, result['msg']) # 2024/3/25 上午 11:18 获取所有ip规则列表 def ip_rules_list(self, get): """ @name 获取所有ip规则列表 @param "data":{"参数名":""} <数据类型> 参数描述 @return list[dict{}...] """ get.chain = get.get('chain/s', 'all') get.query = get.get('query/s', '') get.p = get.get('p', 1) get.row = get.get('row', 20) ip_db = self.get_ip_db(get) if get.chain == "INPUT": list_address = self.iptables.list_address(["INPUT"]) #托管的IP规则 system_list_address = self.firewall.list_input_address() #系统防火墙的IP规则 ufw firewall 为了考虑用户手动添加的规则 list_address += system_list_address #拼接 elif get.chain == "OUTPUT": list_address = self.iptables.list_address(["OUTPUT"]) system_list_address = self.firewall.list_output_address() list_address += system_list_address else: list_address = self.iptables.list_address(["INPUT", "OUTPUT"]) system_list_address = self.firewall.list_address() list_address += system_list_address data = self.structure_ip_return_data(list_address, ip_db, get) return public.success_v2(data) # 2024/3/26 下午 3:03 导出所有ip规则 def export_ip_rules(self, get): """ @name 导出所有ip规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.chain = get.get('chain/s', 'all') if get.chain == "INPUT": file_name = "input_ip_rules_{}".format(int(time.time())) elif get.chain == "OUTPUT": file_name = "output_ip_rules_{}".format(int(time.time())) else: file_name = "ip_rules_{}".format(int(time.time())) data = self.ip_rules_list(get)["message"]["data"] if not data: return public.fail_v2(public.lang("No rules can be exported")) if not os.path.exists(self.config_path): os.makedirs(self.config_path, exist_ok=True) file_path = "{}/{}.json".format(self.config_path, file_name) public.writeFile(file_path, public.GetJson(data)) public.WriteLog("system firewall", "Export IP rules") return public.success_v2(file_path) # 2024/3/26 下午 3:05 导入ip规则 def import_ip_rules(self, get): """ @name 导入ip规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.file = get.get('file/s', '') if not get.file: return public.fail_v2(public.lang("The file cannot be empty")) if not os.path.exists(get.file): return public.fail_v2(public.lang("The file does not exist")) try: data = public.readFile(get.file) if "|" in data and not "{" in data: get.rule_name = "ip_rule" return self.import_rules_old(get) data = json.loads(data) # 2024/4/10 下午2:51 反转数据 data.reverse() except: return public.fail_v2(public.lang("The file content is abnormal or malformed")) args = public.dict_obj() for item in data: args.operation = 'add' args.address = item['Address'] args.strategy = item['Strategy'] args.chain = item['Chain'] args.family = item['Family'] args.brief = item['brief'] args.reload = "0" self.set_ip_rule(args) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The import was successful")) # 2024/3/25 下午 2:34 获取端口转发列表 def port_forward_list(self, get): """ @name 获取端口转发列表 @param "data":{"参数名":""} <数据类型> 参数描述 @return list[dict{}...] """ get.query = get.get('query/s', '') get.p = get.get('p', 1) get.row = get.get('row', 20) list_port_forward = self.iptables.list_port_forward() forward_db = self.get_forward_db(get) return public.success_v2(self.structure_forward_return_data(list_port_forward, forward_db, get)) # 2024/3/26 下午 3:08 导出所有端口转发规则 def export_port_forward(self, get): """ @name 导出所有端口转发规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ data = self.iptables.list_port_forward() if not data: return public.fail_v2(public.lang("No rules can be exported")) file_name = "port_forward_{}".format(int(time.time())) if not os.path.exists(self.config_path): os.makedirs(self.config_path, exist_ok=True) file_path = "{}/{}.json".format(self.config_path, file_name) public.writeFile(file_path, public.GetJson(data)) public.WriteLog("system firewall", "Export port forwarding rules") return public.success_v2(file_path) # 2024/3/26 下午 3:10 导入端口转发规则 def import_port_forward(self, get): """ @name 导入端口转发规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ get.file = get.get('file/s', '') if not get.file: return public.fail_v2(public.lang("The file cannot be empty")) if not os.path.exists(get.file): return public.fail_v2(public.lang("The file does not exist")) try: data = public.readFile(get.file) if "|" in data and not "{" in data: get.rule_name = "trans_rule" return self.import_rules_old(get) data = json.loads(data) # 2024/4/10 下午2:51 反转数据 data.reverse() except: return public.fail_v2(public.lang("The file content is abnormal or malformed")) args = public.dict_obj() for item in data: args.operation = 'add' args.protocol = item['Protocol'] args.S_Address = item['S_Address'] args.S_Port = item['S_Port'] args.T_Address = item['T_Address'] args.T_Port = item['T_Port'] args.reload = "0" self.set_port_forward(args) if self._isFirewalld: self.firewall.reload() return public.success_v2(public.lang("The import was successful")) # 2024/3/25 下午 3:43 设置端口转发 def set_port_forward(self, get): """ @name 设置端口转发 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置设置规则 if not self.get_firewall_status(): return public.fail_v2(public.lang("Please activate the firewall before setting up the rules!")) get.operation = get.get('operation/s', 'add') get.protocol = get.get('protocol/s', 'tcp') get.S_Address = get.get('S_Address/s', '') get.S_Port = get.get('S_Port/s', '') get.T_Address = get.get("T_Address/s", '') get.T_Port = get.get("T_Port/s", '') get.reload = get.get('reload/s', "1") if get.S_Port == "": return public.fail_v2(public.lang("The source port cannot be empty")) # if get.T_Address == "": # return public.return_message(-1, 0, public.lang("目标地址不能为空")) if get.T_Port == "": return public.fail_v2(public.lang("The destination port cannot be empty")) # 2024/3/25 下午 5:49 前置检测 if get.operation == "add": check_ip_forward = self.firewall.check_ip_forward() if not check_ip_forward["status"]: return public.fail_v2(check_ip_forward['msg']) if not self.check_forward_exist(get): return public.fail_v2(public.lang("If the rule already exists, do not add it repeatedly")) self.set_forward_db(get) # 2024/3/25 下午 5:50 构造传参,调用底层方法设置端口转发 if get.protocol == "tcp/udp" or get.protocol == "all": info = {"Family": "ipv4", "Protocol": "tcp", "S_Address": get.S_Address, "S_Port": get.S_Port, "T_Address": get.T_Address, "T_Port": get.T_Port} result = self.iptables.port_forward(info=info, operation=get.operation) if not result['status']: return public.fail_v2(result['msg']) info["Protocol"] = "udp" result = self.iptables.port_forward(info=info, operation=get.operation) else: info = { "Family": "ipv4", "Protocol": get.protocol, "S_Address": get.S_Address, "S_Port": get.S_Port, "T_Address": get.T_Address, "T_Port": get.T_Port, } result = self.iptables.port_forward(info=info, operation=get.operation) cache_key = "firewall_info" from BTPanel import cache data = cache.get(cache_key) if data: cache.delete(cache_key) cache_key = "port_forward_list" data = cache.get(cache_key) if data: cache.delete(cache_key) return public.return_message(0 if result.get("status") else -1, 0, result['msg']) # 2024/3/27 上午 9:44 修改端口转发规则 def modify_forward_rule(self, get): """ @name 修改端口转发规则 @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} """ # 2024/4/17 下午4:47 检查防火墙状态,如果未启动则不允许设置规则 if not self.get_firewall_status(): return public.fail_v2(public.lang("Please activate the firewall before setting up the rules!")) get.old_data = get.get('old_data/s', '') get.new_data = get.get('new_data/s', '') if get.old_data == "": return public.fail_v2(public.lang("Please pass in old_data")) if get.new_data == "": return public.fail_v2(public.lang("Please pass in new_data")) get.old_data = json.loads(get.old_data) get.new_data = json.loads(get.new_data) args1 = public.dict_obj() args1.operation = 'remove' args1.S_Address = get.old_data['S_Address'] args1.S_Port = get.old_data['S_Port'] args1.T_Address = get.old_data['T_Address'] args1.T_Port = get.old_data['T_Port'] args1.Protocol = get.old_data['Protocol'] args1.id = get.old_data['id'] args1.reload = "0" self.set_port_forward(args1) args2 = public.dict_obj() args2.operation = 'add' # args2.S_Address = get.new_data['S_Address'] args2.S_Port = get.new_data['S_Port'] args2.T_Address = get.new_data['T_Address'] args2.T_Port = get.new_data['T_Port'] args2.Protocol = get.new_data['protocol'] args2.brief = get.new_data['brief'] args2.reload = "0" self.set_port_forward(args2) return public.success_v2("The modification was successful") def check_table_firewall_forward(self): """ @name 检查并创建表 @return dict{"status":True/False,"msg":"提示信息"} """ if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'firewall_forward')).count(): public.M('').execute( """CREATE TABLE "firewall_forward" (`id` INTEGER PRIMARY KEY AUTOINCREMENT, `S_Address` TEXT, `S_Domain` TEXT, `S_Port` TEXT, `T_Address` TEXT, `T_Domain` TEXT, `T_Port` TEXT DEFAULT '', `Protocol` TEXT DEFAULT '', `Family` TEXT DEFAULT '', `addtime` TEXT DEFAULT '', `brief` TEXT DEFAULT '');""" ) # 检查字段是否存在 不存在创建 def check_table_column(self, ): create_table_str = public.M('firewall_new').table('sqlite_master').where( 'type=? AND name=?', ('table', 'firewall_new')).getField('sql') if 'sid' not in create_table_str: public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "sid" int DEFAULT 0') if 'domain' not in create_table_str: public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "domain" TEXT DEFAULT ""') if 'chain' not in create_table_str: public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "chain" TEXT DEFAULT ""') create_table_str = public.M('firewall_ip').table('sqlite_master').where( 'type=? AND name=?', ('table', 'firewall_ip')).getField('sql') if 'sid' not in create_table_str: public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "sid" int DEFAULT 0') if 'domain' not in create_table_str: public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "domain" TEXT DEFAULT ""') if 'chain' not in create_table_str: public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "chain" TEXT DEFAULT ""')