Server IP : 172.67.216.182 / Your IP : 162.158.108.9 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
#!/usr/bin/env python # coding:utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: [email protected], # +------------------------------------------------------------------- # Firewalld管理类 # ------------------------------ from xml.etree.ElementTree import ElementTree, Element import os, public class firewalld: __TREE = None __ROOT = None __CONF_FILE = '/etc/firewalld/zones/public.xml' # 初始化配置文件XML对象 def __init__(self): if self.__TREE: return self.__TREE = ElementTree() self.__TREE.parse(self.__CONF_FILE) self.__ROOT = self.__TREE.getroot() # 获取端口列表 def GetAcceptPortList(self): mlist = self.__ROOT.getchildren() data = [] for p in mlist: if p.tag != 'port': continue tmp = p.attrib port = p.attrib['port'] data.append(tmp) return data # 添加端口放行 def AddAcceptPort(self, port, pool='tcp'): # 检查是否存在 if self.CheckPortAccept(pool, port): return True attr = {"protocol": pool, "port": port} Port = Element("port", attr) self.__ROOT.append(Port) self.Save() return True # 删除端口放行 def DelAcceptPort(self, port, pool='tcp'): # 检查是否存在 if not self.CheckPortAccept(pool, port): return True mlist = self.__ROOT.getchildren() m = False for p in mlist: if p.tag != 'port': continue if p.attrib['port'] == port: self.__ROOT.remove(p) m = True if m: self.Save() return True return False # 添加UDP端口放行 def AddUpdPort(self, port, pool='udp'): # 检查是否存在 if self.CheckPortAccept(pool, port): return True attr = {"protocol": pool, "port": port} Port = Element("port", attr) self.__ROOT.append(Port) self.Save() return True # 删除UDP端口放行 def DelUdpPort(self, port, pool='udp'): # 检查是否存在 if not self.CheckPortAccept(pool, port): return True mlist = self.__ROOT.getchildren() m = False for p in mlist: if p.tag != 'port': continue if p.attrib['port'] == port: self.__ROOT.remove(p) m = True if m: self.Save() return True return False # 检查端口是否已放行 def CheckPortAccept(self, pool, port): for p in self.GetAcceptPortList(): if p['port'] == port and p['protocol']==pool: return True return False # 获取屏蔽IP列表 def GetDropAddressList(self): mlist = self.__ROOT.getchildren() data = [] for ip in mlist: if ip.tag != 'rule': continue tmp = {} ch = ip.getchildren() a=None for c in ch: tmp['type']=None if c.tag == 'drop': tmp['type'] = 'drop' if c.tag == 'source': tmp['address']=c.attrib['address'] if tmp['type']: data.append(tmp) return data # 获取 reject 信息 def GetrejectLIST(self): mlist = self.__ROOT.getchildren() data = [] for ip in mlist: #print(ip) if ip.tag != 'rule': continue tmp = {} ch = ip.getchildren() a=None flag = None for c in ch: tmp['type']=None if c.tag == 'reject': tmp['type'] = 'reject' if c.tag == 'source': tmp['address']=c.attrib['address'] if c.tag =='port': tmp['protocol']=c.attrib['protocol'] tmp['port']=c.attrib['port'] if tmp['type']: data.append(tmp) return data # 获取 accept 信息 def Getacceptlist(self): mlist = self.__ROOT.getchildren() data = [] for ip in mlist: if ip.tag != 'rule': continue tmp = {} ch = ip.getchildren() a=None flag = None for c in ch: tmp['type']=None if c.tag == 'accept': tmp['type'] = 'accept' if c.tag == 'source': tmp['address']=c.attrib['address'] if c.tag =='port': tmp['protocol']=c.attrib['protocol'] tmp['port']=c.attrib['port'] if tmp['type']: data.append(tmp) return data # 获取所有信息 def Get_All_Info(self): data={} data['drop_ip']=self.GetDropAddressList() data['reject']=self.GetrejectLIST() data['accept']=self.Getacceptlist() return data # 判断是否存在 def Chekc_info(self,port,address,pool,type): data=self.Get_All_Info() if type=='accept': for i in data['accept']: #print(i['address'], i['protocol'], i['port']) if i['address']==address and i['protocol']==pool and i['port']==port: return True else: return False elif type=='reject': for i in data['accept']: # print(i['address'], i['protocol'], i['port']) if i['address'] == address and i['protocol'] == pool and i['port'] == port: return True else: return False else: return False def AddDropAddress(self, address): # 检查是否存在 if self.CheckIpDrop(address): return True attr = {"family": 'ipv4'} rule = Element("rule", attr) attr = {"address": address} source = Element("source", attr) drop = Element("drop", {}) rule.append(source) rule.append(drop) self.__ROOT.append(rule) self.Save() return 'OK' # 删除IP屏蔽 def DelDropAddress(self, address): # 检查是否存在 if not self.CheckIpDrop(address): return True mlist = self.__ROOT.getchildren() for ip in mlist: if ip.tag != 'rule': continue ch = ip.getchildren() for c in ch: if c.tag != 'source':continue if c.attrib['address'] == address: self.__ROOT.remove(ip) self.Save() return True return False # 添加端口放行并且指定IP def Add_Port_IP(self, port,address,pool,type): if type=='accept': # 判断是否存在 if self.Chekc_info(port,address,pool,type): return True attr = {"family": 'ipv4'} rule = Element("rule", attr) attr = {"address": address} source = Element("source", attr) attr={'port':str(port),'protocol':pool} port_info=Element("port",attr) accept = Element("accept", {}) rule.append(source) rule.append(port_info) rule.append(accept) self.__ROOT.append(rule) self.Save() return True elif type=='reject': # 判断是否存在 if self.Chekc_info(port,address,pool,type):return True attr = {"family": 'ipv4'} rule = Element("rule", attr) attr = {"address": address} source = Element("source", attr) attr = {'port': str(port), 'protocol': pool} port_info = Element("port", attr) reject = Element("reject", {}) rule.append(source) rule.append(port_info) rule.append(reject) self.__ROOT.append(rule) self.Save() return True else: return False # 删除指定端口的=。= def Del_Port_IP(self, port,address,pool,type): if type=='accept': a = None for i in self.__ROOT: if i.tag == 'rule': tmp = {} for c in i.getchildren(): tmp['type'] = None if c.tag == 'accept': tmp['type'] = 'accept' if c.tag == 'source': tmp['address'] = c.attrib['address'] if c.tag == 'port': tmp['protocol'] = c.attrib['protocol'] tmp['port'] = c.attrib['port'] if tmp['type']: if tmp['port'] == port and tmp['address'] == address and tmp['type'] == type and tmp['protocol'] == pool: self.__ROOT.remove(i) self.Save() return True elif type=='reject': for i in self.__ROOT: if i.tag == 'rule': tmp = {} for c in i.getchildren(): tmp['type'] = None if c.tag == 'reject': tmp['type'] = 'reject' if c.tag == 'source': tmp['address'] = c.attrib['address'] if c.tag == 'port': tmp['protocol'] = c.attrib['protocol'] tmp['port'] = c.attrib['port'] if tmp['type']: if tmp['port'] == port and tmp['address'] == address and tmp['type'] == type and tmp['protocol'] == pool: self.__ROOT.remove(i) self.Save() return True # 检查IP是否已经屏蔽 def CheckIpDrop(self, address): for ip in self.GetDropAddressList(): if ip['address'] == address: return True return False # 取服务状态 def GetServiceStatus(self): import psutil for pid in psutil.pids(): if psutil.Process(pid).name() == 'firewalld': return True return False # 服务控制 def FirewalldService(self, type): public.ExecShell('systemctl ' + type + ' firewalld.service') return public.return_msg_gettext(True, public.lang('Setup successfully!')) # 保存配置 def Save(self): self.format(self.__ROOT) self.__TREE.write(self.__CONF_FILE, 'utf-8') public.ExecShell('firewall-cmd --reload') # 整理配置文件格式 def format(self, em, level=0): i = "\n" + level * " " if len(em): if not em.text or not em.text.strip(): em.text = i + " " for e in em: self.format(e, level + 1) if not e.tail or not e.tail.strip(): e.tail = i if level and (not em.tail or not em.tail.strip()): em.tail = i