403Webshell
Server IP : 172.67.216.182  /  Your IP : 162.158.88.157
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/breaking_through.py
# coding: utf-8
# -------------------------------------------------------------------
# aaPanel
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: hezhihong <[email protected]>
# -------------------------------------------------------------------

# ------------------------------
# 防爆破、编译器类
#------------------------------

import os,sys,time,db,json
import shlex
panel_path = '/www/server/panel'
if not os.name in ['nt']:
    os.chdir(panel_path)
if not 'class/' in sys.path:
    sys.path.insert(0, 'class/')
if not 'class_v2/' in sys.path:
    sys.path.insert(0, 'class_v2/')
    
import public
from public.regexplib import match_ipv4,match_ipv6
from safeModelV2.base import safeBase

# from pyroute2 import IPSet, NetlinkError

try:
    from pyroute2 import IPSet, NetlinkError
except:
    public.ExecShell("btpip install pyroute2")
    from pyroute2 import IPSet, NetlinkError


class main(safeBase):
    _config={}

    def __init__(self):
        self._types={'white':'aapanel.ipv4.whitelist','black':'aapanel.ipv4.blacklist'}
        self._types_system={'white':'whitelist','black':'blacklist'}
        self._config_file='/www/server/panel/data/breaking_through.json'
        try:
            self._config_file='{}/data/breaking_through.json'.format(public.get_panel_path())
        except:
            pass
        self._breaking_white_file='{}/data/breaking_white.conf'.format(public.get_panel_path())
        self._limit_file='{}/data/limit_login.pl'.format(public.get_panel_path())
        self.__script_py = public.get_panel_path() + '/script/breaking_through_check.py'
        self.__complier_group='aapanel_complier'
        self.__gcc_path=""
        if os.path.exists("/usr/bin/gcc"):
            self.__gcc_path="/usr/bin/gcc"
        else:self.__gcc_path=public.ExecShell('which gcc')[0].strip()
        self.__log_type='Brute force protection'
        self.__write_log=True

        with db.Sql() as sql:
            sql = sql.dbfile("/www/server/panel/data/default.db")

            black_white_sql = '''CREATE TABLE IF NOT EXISTS `black_white` (
  `id` INTEGER PRIMARY KEY AUTOINCREMENT,
  `ip` VARCHAR(45),
  `ps` VARCHAR(40),
  `add_type` VARCHAR(20),
  `add_time` TEXT,
  `timeout` INTEGER,
  `black_reason` INTEGER
)'''

            #black_reason 0 手动添加 1 ssh爆破ip 2 aapanel爆破ip 3 ftp爆破ip 4 历史记录爆破ip

            sql.execute(black_white_sql, ())
            sql.close()
        
        
        
    def init_ipset(self):
        """
        @name 初始化ipset
        """
        # try:
        #     # 在循环外只查询指定 ipset集合
        #     check_result = public.ExecShell('ipset list aapanel.ipv4.whitelist && ipset list aapanel.ipv4.blacklist')[0]
        #     for i in self._types:
        #         # check_result=public.ExecShell('ipset list')[0]
        #         if self._types[i] not in check_result:
        #             public.ExecShell('ipset create '+self._types[i]+' hash:net timeout 0')
        #         rule_type='DROP'
        #         if i=='white':rule_type='ACCEPT'
        #         if not public.ExecShell('iptables-save | grep "match-set '+self._types[i]+'"')[0]:
        #             public.ExecShell('iptables -I INPUT -m set --match-set '+self._types[i]+' src -j '+rule_type)
        # except:pass


        from pyroute2 import IPSet, NetlinkError
        import socket

        try:
            with IPSet() as ipset:
                # 创建或检查ipset集合
                for i in self._types:
                    try:
                        # 尝试获取集合
                        ipset.get_set_byname(self._types[i])
                    except NetlinkError as e:
                        if e.code == 2:  # ENOENT - No such file or directory
                            # 集合不存在,创建新的ipset集合
                            public.print_log(f"Creating ipset {self._types[i]}")
                            ipset.create(name=self._types[i], stype='hash:net', family=socket.AF_INET, timeout=0)
                        else:
                            raise

            # 设置iptables规则
            for i in self._types:
                rule_type = 'DROP'
                if i == 'white':
                    rule_type = 'ACCEPT'
                if not public.ExecShell('iptables-save | grep "match-set {}"'.format(self._types[i]))[0]:
                    public.ExecShell('iptables -I INPUT -m set --match-set {} src -j {}'.format(self._types[i], rule_type))
        except:
            pass

    def type_conversion(self,data,types):
        if types =='bool':
            try:
                if data=='true':
                    return True
                else:return False
                # data=bool(data)
                # return data
            except:return False
        elif types =='int':
            try:
                if type(data)=='int':return data
                data=int(data)
                return data
            except:return 0
                
        
    def set_config(self,get):
        """
        @name 设置防护配置
        """
        self._config=self.read_config()
        if 'global_status' in get or 'username_status' in get or 'ip_status' in get:
            self._config['global_status']=self.type_conversion(get.global_status,'bool') if 'global_status' in get else self._config['global_status']
            self._config['username_status']=self.type_conversion(get.username_status,'bool') if 'username_status' in get else self._config['username_status']
            self._config['ip_status']=self.type_conversion(get.ip_status,'bool') if 'ip_status' in get else self._config['ip_status']
            public.writeFile(self._config_file,json.dumps(self._config))
            public.write_log_gettext(self.__log_type, 'Configuration modification successful!')
            return public.return_message(0, 0, public.lang("Setting successful"))
        try:
            # if 'ip_command' in get:
            try:
                get.ip_command=get.ip_command.strip()
            except:pass
            if get.ip_command=='' and not self.type_conversion(get.ip_ipset_filter,'bool'): return public.return_message(-1, 0, public.lang("Please enable at least one command and firewall"))
            self._config['based_on_username']={"limit":self.type_conversion(get.username_limit,'int'),"count":self.type_conversion(get.username_count,'int'),"type":self.type_conversion(get.username_type,'int'),"limit_root":self.type_conversion(get.username_limit_root,'bool')}
            self._config['based_on_ip']={"limit":self.type_conversion(get.ip_limit,'int'),"count":self.type_conversion(get.ip_count,'int'),"command":get.ip_command,"ipset_filter":self.type_conversion(get.ip_ipset_filter,'bool')}
            self._config['history_limit']=self.type_conversion(get.history_limit,'int')
            self._config['global_status']=self._config['global_status']
            self._config['username_status']=self._config['username_status']
            self._config['ip_status']=self._config['ip_status']
            public.writeFile(self._config_file,json.dumps(self._config))
        except Exception as ee:
            public.print_log('ee:{}'.format(ee))
        #写日志
        if self.__write_log:
            public.write_log_gettext(self.__log_type, 'Configuration modification successful!')
        return public.return_message(0, 0, public.lang("Setting successful"))
    
    
    def get_config(self,get):
        """
        @name 获取防护配置
        """
        self._config=self.read_config()
        try:
            tmp_config=public.readFile(self._config_file)
            self._config = json.loads(tmp_config)
            return public.return_message(0,0,self._config)
        except:
            pass
        return public.return_message(0,0,self._config)
        
    def read_config(self):
        """
        @name 读取防护配置
        """
        self._config={"based_on_username":{"limit":5,"count":8,"type":0,"limit_root":False},"based_on_ip":{"limit":5,"count":8,"command":"","ipset_filter":True},"history_limit":60,"history_start":0,'global_status':True,'username_status':False,'ip_status':False}
        if not os.path.exists(self._config_file):
            public.writeFile(self._config_file,json.dumps(self._config))
            return self._config
        tmp_config = public.readFile(self._config_file)
        if not tmp_config:
            return self._config
        try:
            self._config = json.loads(tmp_config)
        except:
            public.writeFile(self._config_file,json.dumps(self._config))
            return self._config
        return self._config
        
        
    def format_date_to_timestamp(self,time_string):
        """
        @name将时间转化为字符串
        """
        from datetime import datetime
    
        # 给定的时间字符串
        # time_string = "Jul 16 02:33:09"
        
        # 当前年份
        current_year = datetime.now().year
        
        # 构造完整的日期时间字符串
        full_time_string = f"{current_year} {time_string}"
        
        # 定义日期时间格式
        date_format = "%Y %b %d %H:%M:%S"
        
        # 解析时间字符串
        parsed_time = datetime.strptime(full_time_string, date_format)
        
        # 转换为时间戳
        timestamp = parsed_time.timestamp()
        return timestamp
        
        # # 输出时间戳
        # print("Timestamp:", timestamp)
        
    # def get_ssh_info_test(self,get):
    #     limit_time=int(self._config['based_on_username']['limit'])*60
    #     now_time=old_limit=time.time()
    #     start_time=public.format_date(times=now_time-limit_time)
    #     ssh_info=self.get_ssh_intrusion(start_time)
    #     result,ip_total=self.get_ssh_info(ssh_info)
    #     return result
        
    def get_ssh_info(self,result, data=[], keyword='', get_data: bool = False):
        """
        @获取SSH信息
        @param since_time:'2024-07-01 05:39:30'
        """
        self._config=self.read_config()
        ssh_info_list=data
        keys=['journalctl_fail','journalctl_connection','journalctl_invalid_user',"log_file_fail","log_file_connection","log_file_invalid_user"]
        ip_total={}
        limit_time=self._config['based_on_ip']['limit']*60
        now_time=int(time.time())
        
        for key in keys:
            if key in result and result[key]:
                line_list=result[key].split('\n')
                for line in line_list:
                    if line =='' or len(line)<50 :continue
                    
                    # ssh_info={"user":"","exptime":"","ip":"","authservice":"aapanel safe","country_code":"","logintime":"","service":"","country_name":"","timeleft":"","lock_status":'unlock'}
                    ssh_info={"user":"","exptime":"","ip":"","authservice":"","country_code":"","logintime":"","service":"","country_name":"","timeleft":"","lock_status":'unlock'}
                    user='root'
                    ip='127.0.0.1'

                    #取user
                    try:
                        user=line.split(' for ',1)[1].split(' ')[0]
                    except: pass
                
                    #取ip
                    try:
                        ip=line.split(' from ',1)[1].split(' ')[0]
                    except: pass
                    
                    ssh_info['service']='sshd'
                    if ip =="127.0.0.1" and "Connection closed by authenticating user" in line:            
                        #取ip
                        try:
                            ip=line.split(' user ',1)[1].split(' ')[1]
                        except: pass
                    #从爆破用户日志取用户
                    if key in ['journalctl_invalid_user','log_file_invalid_user']:
                        #取user
                        try:
                            user=line.split('Invalid user ',1)[1].split(' ')[0]
                        except: pass

                    #检测用户是否为空
                    if user =='':
                        user='-'
                    ssh_info['ip']=ip
                    ssh_info['user']=user
                    check_result=public.ExecShell('ipset test aapanel.ipv4.blacklist '+ip)[1]
                    if 'is in set' in check_result:ssh_info['lock_status']='lock'

                    #取时间
                    try:
                        tmp_time=self.format_date_to_timestamp(line[:15])
                        
                    except:
                        tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32])

                    logintime=public.format_date(times=tmp_time)
                    tmp_exp_time=tmp_time+limit_time
                    exp_time=public.format_date(times=tmp_exp_time)
                    timeleft=0 if now_time>tmp_exp_time else tmp_exp_time-now_time
                    ssh_info["exptime"]=exp_time
                    ssh_info["timeleft"]=timeleft
                    
                    ssh_info['logintime']=logintime
                    if ip not in ip_total:
                        ip_total[ip]={'count':1,'ssh_infos':[]}
                    else:
                        ip_total[ip]['count']+=1

                    if get_data:
                        if keyword !='' and (keyword in 'sshd' or keyword in ssh_info['authservice'] or keyword in ip or keyword in ssh_info['user'] or keyword in logintime):
                            ip_total[ip]['ssh_infos'].append(ssh_info)
                            ssh_info_list.append(ssh_info)
                        if keyword =='':
                            ip_total[ip]['ssh_infos'].append(ssh_info)
                            ssh_info_list.append(ssh_info)

        return ssh_info_list, ip_total
                        
                    
                    
            
        
    def get_ssh_intrusion(self,since_time):
            """
            @获取SSH爆破次数
            @param since_time:'2024-07-01 05:39:30'
            """
            test_string="""Aug  4 05:22:56 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2
    Aug  4 05:23:01 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2
    Aug  4 05:23:04 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2
    Aug  4 05:23:08 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2
    Aug  4 05:23:11 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2
    Aug  4 05:23:18 cpanel76262789 sshd[2112655]: Failed password for root from 218.92.0.52 port 41852 ssh2
    Aug  4 05:23:23 cpanel76262789 sshd[2112655]: Failed password for root from 218.92.0.52 port 41852 ssh2
    Aug  4 05:47:03 cpanel76262789 sshd[2114164]: Failed password for root from 49.235.86.107 port 54144 ssh2
    Aug  4 05:47:13 cpanel76262789 sshd[2114181]: Failed password for root from 81.192.46.48 port 39134 ssh2
    Aug  4 05:49:10 cpanel76262789 sshd[2114252]: Failed password for root from 188.235.158.112 port 41790 ssh2
    """
            
            
            result = {'journalctl_fail':"",'journalctl_connection':"",'journalctl_invalid_user':"","log_file_fail":"","log_file_connection":"","log_file_invalid_user":""}
            if os.path.exists("/etc/debian_version"):
                version = public.readFile('/etc/debian_version').strip()
                if 'bookworm' in version or 'jammy' in version or 'impish' in version:
                    version = 12
                else:
                    try:
                        version = float(version)
                    except:
                        version = 11
                if version >= 12:
                    result['journalctl_fail'] = public.ExecShell("journalctl -u ssh --no-pager --since '"+since_time+"'|grep -a 'Failed password for' |grep -v 'invalid'")[0]
                    result['journalctl_connection']=public.ExecShell("journalctl -u ssh --no-pager --since '"+since_time+"'|grep -a 'Connection closed by authenticating user' |grep -a 'preauth'")[0]
                    result['journalctl_invalid_user']=public.ExecShell("journalctl -u ssh --no-pager --since '"+since_time+"'|grep -a 'sshd' |grep -a 'Invalid user'|grep -v 'Connection closed by'")[0]
                    return result
            for sfile in self.get_ssh_log_files(None):
                start_timestramp=public.to_date(times=since_time)
                try:
                    try:
                        tmp_result = public.ExecShell("cat %s|grep -a 'Failed password for' |grep -v 'invalid'" % (sfile))[0].strip()
                        add_result=[]
                        line_list=tmp_result.split('\n')
                        for line in line_list:
                            try:
                                tmp_time=self.format_date_to_timestamp(line[:15])
                            except:
                                tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32])
                            # print('tmp_time:{}'.format(public.format_date(times=tmp_time)))
                            if start_timestramp<=tmp_time:
                                add_result.append(line)
                        add_string='\n'.join(add_result)
                        result['log_file_fail']+=add_string
                    except:pass
                    try:
                        tmp_result= public.ExecShell("cat %s|grep -a 'Connection closed by authenticating user' |grep -a 'preauth'" % (sfile))[0].strip()
                        
                        add_result=[]
                        line_list=tmp_result.split('\n')
                        for line in line_list:
                            try:
                                tmp_time=self.format_date_to_timestamp(line[:15])
                            except:
                                tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32])

                            # print('tmp_time:{}'.format(public.format_date(times=tmp_time)))
                            if start_timestramp<=tmp_time:
                                add_result.append(line)
                        add_string='\n'.join(add_result)
                        result['log_file_connection']+=add_string
                    except:pass
                    try:
                        cmd="cat %s|grep -a 'sshd' |grep -a 'Invalid user '|grep -v 'Connection closed by'" % (sfile)
                        tmp_result= public.ExecShell("cat %s|grep -a 'sshd' |grep -a 'Invalid user'|grep -v 'Connection closed by'" % (sfile))[0].strip()
                        
                        add_result=[]
                        line_list=tmp_result.split('\n')
                        for line in line_list:
                            try:
                                tmp_time=self.format_date_to_timestamp(line[:15])
                            except:
                                tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32])

                            # print('tmp_time:{}'.format(public.format_date(times=tmp_time)))
                            if start_timestramp<=tmp_time:
                                add_result.append(line)
                        add_string='\n'.join(add_result)
                        result['log_file_invalid_user']+=add_string
                    except:pass
                except: pass
            # self.set_ssh_cache(data)
            return result
            
    def check_black_white_ipset(self):
        """
        @name 检测ipset黑白名单
        """
        for list_type,list_name in self._types.items():
            ip_info=public.M('black_white').where('add_type=?', (list_type,)).select()
            if list_type=='white':#白名单检测
                for ip in ip_info:
                    public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout 0')
            else:#黑名单检测
                for ip in ip_info:
                    timeout=timeleft=0
                    if int(ip['timeout'])!=0:
                        add_time=int(public.to_date(times=ip['add_time']))
                        now_time=int(time.time())
                        exptime=add_time+int(ip['timeout'])
                        timeleft=exptime-now_time
                        timeout=timeleft
                        if timeleft>0:
                            public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout '+str(timeout))
                        else:
                            public.M('black_white').where('id=?', (ip['id'],)).delete()
                    else:
                        public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout 0')
                        
                        
        return
                            
                        
                
            
            
    def cron_method(self):
        """
        @name 防爆破检测方法
        """
        self.init_ipset()
        # public.print_log('Starting through check task execution')
        self.check_black_white_ipset()
        self._config=self.read_config()
        
        if not self._config['global_status']:
            return 
        # public.print_log('防爆破脚本开始运行...')
        aapanel_login_info=[]
        now_time=old_limit=time.time()
        if  self._config['username_status']:
            limit_time=int(self._config['based_on_username']['limit'])*60
            count=int(self._config['based_on_username']['count'])
            
            start_time=public.format_date(times=now_time-limit_time)
            aapanel_login_info=public.M('logs').where('type=? and addtime>=? and log LIKE ?',('Login',start_time,'%is incorrec%')).select()
            aapanel_login_limit=now_time+limit_time
            try:
                old_limit=int(public.readFile(self._limit_file))
            except:old_limit=now_time
            if len(aapanel_login_info)>=count and old_limit<=now_time:
                public.writeFile(self._limit_file,str(aapanel_login_limit))
                # public.print_log('统计到面板登录最大尝试次数')
                # public.print_log('当前时间为:{}'.format(public.format_date(times=now_time)))
                # public.print_log('限制时间为:{}'.format(public.format_date(times=aapanel_login_limit)))
            
            
        if self._config['ip_status']:
            #ssh login
            #取ssh记录
            limit_time=int(self._config['based_on_ip']['limit'])*60
            # limit_time=2592000
            start_time=public.format_date(times=now_time-limit_time)
            ip_count=int(self._config['based_on_ip']['count'])
            ssh_info=self.get_ssh_intrusion(start_time)
            _, ip_total = self.get_ssh_info(ssh_info)

            # ssh最大ip限制处理
            for ip, details in ip_total.items():
                if int(details['count'])<ip_count:continue
                # public.print_log('统计到ssh登录最大尝试次数')

                if self._config['based_on_ip']['ipset_filter']:
                    # public.print_log('防火墙防护状态打开')
                    args = public.dict_obj()
                    args.types = 'black'
                    args.ips = ip
                    args.cron = 'true'
                    args.black_reason = 1
                    self.add_black_white(args)
                command = self._config['based_on_ip']['command']
                if command != '':
                    public.ExecShell('nohup ' + str(command) + ' &')
                    
        # public.print_log('Breaking through check task has been executed')
        # time.sleep(60)
        return


    

    def is_shell_command(self,command_string):
        # 尝试使用shlex.split处理字符串
        try:
            # 分割字符串
            split_command = shlex.split(command_string)
            
            # 检查是否有至少一个非空字符串
            if split_command:
                # 检查第一个元素是否可能是命令名
                command_name = split_command[0]
                
                # 检查命令名是否只包含字母、数字或下划线
                if all(c.isalnum() or c == '_' for c in command_name):
                    return True
        except ValueError:
            # 如果shlex.split抛出ValueError,那么可能不是一个合法的shell命令
            pass
        
        return False
        
    def get_history_record(self,get):
        """
        @name 获取历史记录,能匹配关键词搜索
        
        
        {'error_logins':[
                {
                    "timeleft": "356099", #解封剩余分钟数
                    "user": "anonymous", #用户名
                    "exptime": "2025-04-09 10:21:01", #解封时间
                    "ip": "34.22.135.234",
                    "authservice": "pure-ftpd",#身份验证服务
                    "country_code": "BE",#所在国家简称
                    "logintime": "2024-08-02 10:21:01",#登录时间
                    "service": "system", #服务
                    "country_name": "Belgium"#所在国家名称
                    "lock_status":0 #0(未封锁)/1(封锁中)
                }]
        """
        self._config=self.read_config()
        now_time=int(time.time())
        keyword=get.keyword.strip()
        result=[]
        limit_time=int(self._config['history_limit'])*60  #默认最近1小时
        aapanel_user=public.M('users').where("id=?", (1,)).getField('username')
        start_time=public.format_date(times=now_time-limit_time)
        if self._config['history_start'] !=0 and int(time.time())-int(self._config['history_start'])<limit_time:
            start_time=public.format_date(times=self._config['history_start'])
        if get.types == 'login':
            login_info=public.M('logs').where('type=? and addtime>=? and log LIKE ?',('Login',start_time,'%is incorrec%')).select()
            if len(login_info)>0:
                for i in login_info:
                    ip=i['log'].split('Login IP:')[1].strip()
                    ip=ip.split(':')[0]
                    exptime=int(time.time())-limit_time-public.to_date(i['addtime'])
                    if exptime<0:exptime=0
                    timeleft= 0 if now_time>public.to_date(i['addtime'])+limit_time else now_time-(public.to_date(i['addtime'])+limit_time)
                    tt_time=public.format_date(times=public.to_date(times=i['addtime'])+limit_time)
                    single_info={"timeleft":timeleft,
                    "user":aapanel_user,
                    "ip":ip,
                    "authservice":"aapanel",
                    "exptime":tt_time,#当前时间-超时时间-登录时间
                    "country_code":"",
                    "logintime":i['addtime'],
                    "service":"aapanel",
                    "country_name":""
                    }
                    if keyword !='' and (keyword in aapanel_user or keyword in ip or keyword in "aapanel" or keyword in i['addtime']) :result.append(single_info)
                    if keyword =='':result.append(single_info)
            #取ssh记录
            ssh_info=self.get_ssh_intrusion(start_time)
            result,ip_total=self.get_ssh_info(ssh_info, result, keyword=keyword, get_data=True)
                    
        elif get.types == 'ip':
            ip_info=public.M('black_white').where('add_type=? and timeout !=? and add_time>?', ('black',0,start_time )).select()
            for i in ip_info:
                if keyword !='' and (keyword not in i['ip'] and keyword in "aapanel" and  keyword in i['addtime']) :continue
                add_time=int(public.to_date(times=i['add_time']))
                exptime=add_time+i['timeout']
                timeleft= 0 if now_time>exptime else exptime-now_time
                single_info={"timeleft":timeleft//60,
                "ip":i['ip'],
                "exptime":public.format_date(times=exptime),
                "begin":i['add_time'],
                "country_code":"",
                "note":"",
                "action":"aapanel",
                "country_name":"",
                'lock_status':'blocked',
                'block_reason':'Trigger SSH explosion-proof rule breaking' if i['black_reason']==1 else 'Trigger aapanel explosion-proof rule breaking'
                }
                result.append(single_info)
                
            # if len(result)==0:
            #     #取ssh记录
            #     ssh_info=self.get_ssh_intrusion(start_time)
            #     result,ip_total=self.get_ssh_info(ssh_info,result,keyword=get.keyword)
        # elif get.types == 'ip':
            
        # elif get.types == 'login':
        #排序
        result = sorted(result, key=lambda x: x['logintime'], reverse=True)
        #取分页数据
        data = self.get_page(get,result)
        return public.return_message(0,0,data)
        
    def set_history_record_limit(self,get=None):
        """
        @name 设置历史记录时间
        """
        self._config=self.read_config()
        try:
            if 'history_limit' in get:
                self._config['history_limit']=self.type_conversion(get.history_limit,'int')
                #写日志
                if self.__write_log:
                    public.write_log_gettext(self.__log_type, 'Set the duration of failed login attempts (in minutes) to [{}] minutes'.format(self._config['history_limit']))
            if 'history_start' in get:
                self._config['history_start']=self.type_conversion(get.history_start,'int')
                #写日志
                if self.__write_log:
                    public.write_log_gettext(self.__log_type, 'History has been cleared!')
        except Exception as ee:
            public.print_log('ee:{}'.format(ee))
        public.writeFile(self._config_file,json.dumps(self._config))
            
        return public.return_message(0, 0, public.lang("Setting successful"))
        
        
    def clear_history_record_limit(self,get):
        """
        @name 移除并清空历史记录
        """
        self.init_ipset()
        get.history_start=int(time.time())
        self.set_history_record_limit(get)
        #清除历史记录
        clear_ips=public.M('black_white').where('add_type=? and timeout !=?', ('black',0)).select()
        for clear_info in clear_ips:
            check_result=public.ExecShell('ipset test aapanel.ipv4.blacklist '+clear_info['ip'])[1]
            if 'is in set' in check_result:
                public.ExecShell('ipset del '+self._types['black']+' '+clear_info['ip'])
        public.M('black_white').where('add_type=? and timeout !=?', ('black',0)).delete()
        
        #写日志
        if self.__write_log:
            public.write_log_gettext(self.__log_type, 'The history has been cleared and the blocked IP has been cleared')
        return public.return_message(0, 0, public.lang("Setting successful"))
        
        
        
    def get_black_white(self,get):
        """
        @name 获取黑/白名单
        """
        ip_list=[]
        result=public.M('black_white').where('add_type=? and black_reason=?', (get.types,0)).select()
        return public.return_message(0,0,result)
        
    def add_black_white(self,get):
        """
        @name 添加、编辑、删除黑/白名单
        """
        self.init_ipset()
        self.init_complier()
        if 'black_reason' not in get:get.black_reason=0
        ip_infos=get.ips.strip().split('\\n')
        ip_list=[]
        #检测ip是否正确
        for i in ip_infos:
            if i=='':continue
            ps=''
            i_list=i.split('#',1)
            ip=i_list[0].replace('"', '').strip()
            if len(i_list)>1:
                ps=i_list[1].strip()
            single_info={"ip":ip,"ps":ps}
            if public.is_ipv4(ip):
                ip_list.append(single_info)
            else:
                public.print_log('ip:{}'.format(ip))
                return {'status': -1, "timestamp": int(time.time()), "message": {'result':'[{}] IP address incorrect'.format(ip)}}
        if len(ip_list)==0:
            #清空黑/白名单
            public.ExecShell('ipset flush '+self._types[get.types])
            public.M('black_white').where('add_type=? and black_reason =?', (get.types,0)).delete()
            self.writeListFile()
            if self.__write_log:
                public.write_log_gettext(self.__log_type,  'The black and white list operation settings have been executed')
            return public.return_message(0, 0, public.lang("The operation has been executed"))
        if 'ps' not in get and 'cron' not in get:       
            public.ExecShell('ipset flush '+self._types[get.types])
            public.M('black_white').where('add_type=? and black_reason =?', (get.types,0)).delete()
        timeout=0
        if get.types=='black' and 'hand' not in get:
            timeout=int(self._config['based_on_ip']['limit']) *60
        
        check_result=public.ExecShell('ipset list')[0]
        if self._types[get.types] not in check_result:
            public.ExecShell('ipset create '+self._types[get.types]+' hash:net timeout 0')
        
        # success_list=[]
        # failed_list=[]

        try:
            for ip_info in ip_list:
                ip=ip_info['ip']
                ps=ip_info['ps']
                if ps=='' and len(ip_list)==1 and 'ps' in get:ps=get.ps
                if ip=='':continue
                if 'cron' in get and get.types=='black' and public.M('black_white').where('ip=? and add_type=?', (ip, 'white')).count():
                    # public.print_log('匹配到白名单规则,跳过:{}'.format(ip))
                    continue
                    
                #解封黑名单
                if 'clear_black' in get:
                    public.ExecShell('ipset del '+self._types['black']+' '+ip )
                    public.M('black_white').where('ip=?', (ip,)).delete()
                    
                if get.types=='black' and 'hand' not in get:
                    if ip=='127.0.0.1':
                        # public.print_log('The IP address [{}] is the local IP address and cannot be added to the blacklist'.format(ip))
                        continue

                if not public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count():
                    public.M('black_white').add('ip,add_type,ps,add_time,timeout,black_reason',(ip, get.types,ps,time.strftime('%Y-%m-%d %X',time.localtime()),timeout,get.black_reason))
                    if public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count():
                        #写日志
                        if self.__write_log:
                            public.write_log_gettext(self.__log_type,  'Successfully added IP [{}] to the interception system [{}]', (ip,self._types_system[get.types]))
                
                result=public.ExecShell('ipset add '+self._types[get.types]+' '+ip+' timeout '+str(timeout))
                # if public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count():
                #     success_list.append(ip)
                # else:
                #     failed_list.append(ip)
        except:pass
        # if len(success_list)>0:
        #     message='The following IP addresses have been successfully added:【{}】'.format(",".join(success_list))
        # if len(failed_list)>0:
        #     message+='The following IP addresses have been failed added:【{}】'.format(",".join(failed_list))
        self.writeListFile()
        # if self.__write_log:
        #         public.write_log_gettext(self.__log_type,  'The black and white list operation settings have been executed')
        return {'status': 0, "timestamp": int(time.time()), "message": {'result':'The operation has been executed'}}
        # return public.return_message(0, 0, public.lang("The operation has been executed"))
        
    def writeListFile(self):
        result=public.M('black_white').where('add_type=?', ('white',)).select()
        if len(result)<1:return
        ip_list=[]
        for i in result:
            ip_list.append(i['ip'])
        ip_string=','.join(ip_list)
        public.writeFile(self._breaking_white_file,ip_string)
        return 
        
        
    # def modify_black_white(self,get):
    #     """
    #     @name 编辑黑/白名单
    #     """
    #     if public.M('black_white').where('id=?', (get.id, )).count():
    #         public.M('black_white').where('id=?',(get.id, )).setField('ps',get.ps)
    #     return public.return_message(0, 0, public.lang("Edited successfully"))
            
        
    # def del_balck_white(self,get):
    #     """
    #     @name 删除黑/白名单
    #     """
    #     # public.print_log('---------010')
    #     #数据库内添加指定数据并返回数据库id
    #     #rgs_obj.id = public.M('crontab').add('name,type,where1,where_hour,where_minute,echo,addtime,status,save,backupTo,sType,sName,sBody,urladdress',("续签Let's Encrypt证书",'day','',hour,minute,echo,time.strftime('%Y-%m-%d %X',time.localtime()),0,'','localhost','toShell','',shell,''))
    #     if not public.M('black_white').where('id=?', (get.id,)).count():
    #         public.M('black_white').where('id=?', (get.id,)).delete()
    #     ip=public.M('black_white').where("id=?", (get.id,)).getField('ip')
    #     # public.print_log('---------01')
    #     types=public.M('black_white').where("id=?", (get.id,)).getField('add_type')
    #     public.print_log('---------02')
    #     public.print_log('id:{}'.format(get.id))
    #     public.M('black_white').where('id=?', (get.id,)).delete()
    #     public.ExecShell('ipset del '+self._types[types]+' '+ip)
    #     public.print_log('---------03')
    #     return public.return_message(0, 0, public.lang("Delete successfully"))

        
    def check_local_ip_white(self,get):
        """
        @name 编辑黑/白名单
        """
        if not public.M('black_white').where('ip=? and add_type=?', (get.ip, 'white')).count():
            return public.return_message(-1, 0, public.lang("Your current IP address [{}] is not on the whitelist.", get.ip))
        return public.return_message(0, 0, public.lang("Your current IP address [{}] is on the whitelist.", get.ip))
        
        
    def panel_ip_white(self,get):
        """
        @name 面板设置ip加白
        """
        get.ips=get.ip.strip()
        # from BTPanel import cache
        # limitip=''
        # try:
        #     limitip = public.readFile('data/limitip.conf')
        #     limitip=limitip.strip()
        # except:
        #     limitip=''
        # if limitip=='':limitip=get.ip
        # else:
        #     if get.ip not in limitip:
        #         limitip=limitip+','+get.ip
        # public.writeFile('data/limitip.conf',limitip)
        # cache.set('limit_ip',[])
        get.types='white'
        if 'ps' not in get:
            get.ps='your ip address'
        get.hand=True
        self.__write_log=False
        result=self.add_black_white(get)
        self.__write_log=True
        if result['status']==0:
            if self.__write_log:
                public.write_log_gettext(self.__log_type,  'Access IP [{}] successfully whitelisted'.format(get.ips))
            return public.return_message(0, 0, public.lang("Added successfully"))
        else:
            return public.return_message(-1, 0, public.lang("Added failed"))
        

    def del_cron(self):
        cron_name='[Do not delete] breaking through check task'
        cron_path = public.GetConfigValue('setup_path') + '/www/server/cron/'
        try:
            cron_path = public.GetConfigValue('setup_path') + '/cron/'
        except:
            pass
        # python_path = ''
        # try:
        #     python_path = public.ExecShell('which btpython')[0].strip("\n")
        # except:
        #     try:
        #         python_path = public.ExecShell('which python')[0].strip("\n")
        #     except:
        #         pass
        # if not python_path: return False
        if public.M('crontab').where('name=?',(cron_name,)).count():
            cron_echo = public.M('crontab').where(
                    "name=?", (cron_name, )).getField('echo')
            cron_id = public.M('crontab').where(
                    "echo=?", (cron_echo, )).getField('id')
            args = {"id": cron_id}
            import crontab
            crontab.crontab().DelCrontab(args)
            del_cron_file = cron_path + cron_echo
            public.ExecShell(
                "crontab -u root -l| grep -v '{}'|crontab -u root -".
                format(del_cron_file))
                    
        return True
        
        
    def add_cron(self):
        cron_name='[Do not delete] breaking through check task'
        cron_path = public.GetConfigValue('setup_path') + '/www/server/cron/'
        try:
            cron_path = public.GetConfigValue('setup_path') + '/cron/'
        except:
            pass
        python_path = ''
        try:
            python_path = public.ExecShell('which btpython')[0].strip("\n")
        except:
            try:
                python_path = public.ExecShell('which python')[0].strip("\n")
            except:
                pass
        if not python_path: return False
        count=public.M('crontab').where('name=?',(cron_name,)).count()
        if count>1:
            cron_echo = public.M('crontab').where(
                    "name=?", (cron_name, )).getField('echo')
            cron_id = public.M('crontab').where(
                    "echo=?", (cron_echo, )).getField('id')
            args = {"id": cron_id}
            import crontab
            crontab.crontab().DelCrontab(args)
            del_cron_file = cron_path + cron_echo
            public.ExecShell(
                "crontab -u root -l| grep -v '{}'|crontab -u root -".
                format(del_cron_file))
                    
        if not public.M('crontab').where('name=?',
                                         (cron_name, )).count():
            cmd = '{} {}'.format(python_path, self.__script_py)
            args = {
                "name": cron_name,
                "type": 'minute-n',
                "where1": '1',
                "hour": '',
                "week":'',
                "minute": '',
                "sName": "",
                "sType": 'toShell',
                "notice": '',
                "notice_channel": '',
                'datab_name':'',
                'tables_name':'',
                "save": '',
                "save_local": '1',
                "backupTo": '',
                "sBody": cmd,
                "urladdress": ''
            }
            import crontab
            res = crontab.crontab().AddCrontab(args)
            if res and "id" in res.keys():
                return True
            return False
        return True
        
    def get_protected_services(self,get):
        """
        @name 获取防护配置
        """
        result={'based_on_username':['aapanel'],'based_on_ip':['ssh']}
            
        return public.return_message(0,0,result)
    

    def get_login_limit(self):
        """
        @name 获取登录限制配置
        """
        self._config=self.read_config()
        if not self._config['global_status'] or not self._config['username_status']:return False
         #防爆破检测
        now_time=limit_time=time.time()
        white_ips=''
        _limit_login='{}/data/limit_login.pl'.format(public.get_panel_path())
        breaking_white='{}/data/breaking_white.conf'.format(public.get_panel_path())
        #获取限制时间
        try:
            limit_time=float(public.readFile(_limit_login))
            if os.path.exists(breaking_white):
                limit_time=float(public.readFile(_limit_login))
            if os.path.exists(breaking_white):
                white_ips+=public.readFile(breaking_white)
        except:pass
        intranet_local_ip=public.get_local_ip()
        if intranet_local_ip=='':intranet_local_ip=public.get_local_ip_2()
        from BTPanel import session
        if 'address' not in session:
            session['address']=public.GetClientIp()
        if now_time<limit_time and (white_ips=='' or session['address'] not in white_ips and  intranet_local_ip not in white_ips):
            return True
        return False
        
        
    def get_linux_users(self,get):
        """
        @name 获取linux用户列表信息
        """
        #取用户列表
        user_list=[]
        user_exec=public.ExecShell('cat /etc/passwd')[0].strip()
        try:
            tmp_list=user_exec.split('\n')
            for i in tmp_list:
                i_strip=i.strip()
                if i_strip=='':continue
                user_list.append(i_strip.split(':',1)[0])
        except:pass
        
        #取分页数据
        data = self.get_page(get,user_list)
            
        return public.return_message(0,0,data)
        
        
    def get_page(self,get,result):
        """
        @name 取分页信息
        """
        #取分页数据
        import page
        page = page.Page()
        info = {}
        info['count'] = len(result)
        info['row'] = 10
        info['p'] = 1
        if hasattr(get, 'p'):
            info['p'] = int(get['p'])
        if hasattr(get, 'limit'):
            info['row'] = int(get['limit'])
        info['uri'] = get
        info['return_js'] = ''
        if hasattr(get, 'tojs'):
            info['return_js'] = get.tojs
        data = {}
        # 获取分页数据
        data['data']=[]
        data['page'] = page.GetPage(info, '1,2,3,4,5,8')
        start = (info['p']-1)*info['row']
        end= info['p']*info['row']-1
        for index in range(len(result)):
            if index<start:continue
            if index >end:continue
            data['data'].append(result[index])
            
        return data
        
    def get_compiler_info(self,get):
        """
        @name 获取编译器组内成员信息
        """
        complier_members=[]
        try:
            group_exec=public.ExecShell('grep '+self.__complier_group+' /etc/group')[0].strip()
            group_string=group_exec.split(':')[3].strip()
            if group_string!='':
                complier_members=group_exec.split(':')[3].split(',')
        except:pass
        
        #取分页数据
        data = self.get_page(get,complier_members)
        return public.return_message(0,0,data)
        
    def init_complier(self):
        """
        @name 初始化编译器组
        """
        #读取配置文件/etc/group
        if os.path.exists('/etc/group'):
            groupContent = public.readFile('/etc/group')
            if groupContent.find(self.__complier_group+":x:") == -1:
                public.ExecShell('groupadd '+self.__complier_group)
            try:
                file_stat = os.stat(self.__gcc_path)
                gid=file_stat.st_gid
                import grp
                group_name=grp.getgrgid(gid).gr_name
                if group_name!=self.__complier_group:
                    public.ExecShell('chgrp '+self.__complier_group+' '+self.__gcc_path)
                    
            except:pass
        
            
        
    def add_user_to_compiler(self,get):
        """
        @name 添加指定用户到编译器组
        """
        self.init_complier()
        if 'users' not in get:return public.return_message(-1, 0, public.lang("parameter error"))
        try:
            if type(get.users)==str:
                import ast
                get.users=ast.literal_eval(get.users)
        except:pass
        add_users=get.users
        get.limit=10000
        complier_members=self.get_compiler_info(get)['message']['data']
        for add_user in add_users:
            try:
                if (len(complier_members)>0 and add_user not in complier_members) or len(complier_members)<1:
                    public.ExecShell('usermod -aG '+self.__complier_group+' '+add_user)
                    # complier_members=self.get_compiler_info(get)['message']['data']
                    # public.print_log('complier_members2:{}'.format(complier_members))
                    # if add_user in complier_members:return public.return_message(0, 0, public.lang("Added successfully"))
            except:pass
        #写日志
        if self.__write_log:
            public.write_log_gettext(self.__log_type,  'Successfully added users [{}] to the compiler group', (','.join(add_users),))
        return public.return_message(0, 0, public.lang("Operation executed"))
        
        
    def del_user_to_compiler(self,get):
        """
        @name 删除编译器组内指定用户
        """
        if 'user' not in get or get.user.strip()=='':return public.return_message(-1, 0, public.lang("parameter error"))
        del_user=get.user.strip()
        get.limit=10000
        complier_members=self.get_compiler_info(get)['message']['data']
        try:
            if len(complier_members)>0 and del_user in complier_members:
                public.ExecShell('gpasswd -d '+del_user+' '+self.__complier_group)
                complier_members=self.get_compiler_info(get)['message']['data']
                if len(complier_members)>0 and del_user in complier_members:return public.return_message(-1, 0, public.lang("Delete failed"))
        except:pass
        #写日志
        if self.__write_log:
            public.write_log_gettext(self.__log_type,  'User [{}] successfully removed from compiler group', (del_user,))
        return public.return_message(0, 0, public.lang("Delete successfully"))
        
    def set_compiler_status(self,get):
        """
        @name 为编译器其他用户设置状态
        @param get.status 0关闭 1开启 不传此参数时,仅获取状态,不设置状态
        """
        
        chmod_limt='0750'
        log_string={'0750':'closed','0755':'enabled'}
        if 'status' in get:
            if int(get.status)==1:chmod_limt='0755'
            public.ExecShell('chmod '+chmod_limt+' '+self.__gcc_path)
            #写日志
            if self.__write_log:
                public.write_log_gettext(self.__log_type,  'Non privileged user successfully {} gcc compiler', (log_string[chmod_limt],))
        limit_status=False
        try:
            accept=oct(os.stat(self.__gcc_path).st_mode)[-4:]
            if accept=='0755':limit_status=True
        except:pass
        return public.return_message(0,0,limit_status)
        
            
        
        
        
        
    
        
    
        
        
        
        

Youez - 2016 - github.com/yon3zu
LinuXploit