403Webshell
Server IP : 172.67.216.182  /  Your IP : 172.70.208.8
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/ssh_security_v2.py
#coding: utf-8
#-------------------------------------------------------------------
# aaPanel
#-------------------------------------------------------------------
# Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved.
#-------------------------------------------------------------------
# Author: lkqiang <[email protected]>
#-------------------------------------------------------------------
# SSH 安全类
#------------------------------
import public,os,re,send_mail,json
from datetime import datetime
from public.validate import Param


class ssh_security:
    __type_list = ['ed25519','ecdsa','rsa', 'dsa']
    __key_type_file = '{}/data/ssh_key_type.pl'.format(public.get_panel_path())
    __key_files = ['/root/.ssh/id_ed25519','/root/.ssh/id_ecdsa','/root/.ssh/id_rsa','/root/.ssh/id_rsa_bt']
    __type_files = {
        "ed25519": "/root/.ssh/id_ed25519",
        "ecdsa": "/root/.ssh/id_ecdsa",
        "rsa": "/root/.ssh/id_rsa",
        "dsa": "/root/.ssh/id_dsa"
    }
    open_ssh_login = public.get_panel_path() + '/data/open_ssh_login.pl'

    __SSH_CONFIG='/etc/ssh/sshd_config'
    __ip_data = None
    __ClIENT_IP='/www/server/panel/data/host_login_ip.json'
    __pyenv = 'python'
    __REPAIR={"1":{"id":1,
                   "type":"file",
                   "harm":"High",
                   "repaired":"1",
                   "level":"3",
                   "name":"Make sure SSH MaxAuthTries is set between 3-6",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Remove the MaxAuthTries comment symbol # in /etc/ssh/sshd_config, set the maximum number of failed password attempts 3-6 recommended 4",
                   "repair":"MaxAuthTries 4",
                   "rule":[{"re":"\nMaxAuthTries\\s*(\\d+)","check":{"type":"number","max":7,"min":3}}],
                   "repair_loophole":[{"re":"\n?#?MaxAuthTries\\s*(\\d+)","check":"\nMaxAuthTries 4"}]},
              "2":{"id":2,
                   "repaired":"1",
                   "type":"file",
                   "harm":"High",
                   "level":"3",
                   "name":"SSHD Mandatory use of V2 security protocol",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Set parameters in the /etc/ssh/sshd_config file as follows",
                   "repair":"Protocol 2",
                   "rule":[{"re":"\nProtocol\\s*(\\d+)",
                            "check":{"type":"number","max":3,"min":1}}],
                   "repair_loophole":[{"re":"\n?#?Protocol\\s*(\\d+)","check":"\nProtocol 2"}]},
              "3":{"id":3,
                   "repaired":"1",
                   "type":"file",
                   "harm":"High",
                   "level":"3",
                   "name":"Set SSH idle exit time",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Set ClientAliveInterval to 300 to 900 in /etc/ssh/sshd_config, which is 5-15 minutes, and set ClientAliveCountMax to 0-3",
                   "repair":"ClientAliveInterval 600  ClientAliveCountMax 2",
                   "rule":[{"re":"\nClientAliveInterval\\s*(\\d+)","check":{"type":"number","max":900,"min":300}}],
                   "repair_loophole":[{"re":"\n?#?ClientAliveInterval\\s*(\\d+)","check":"\nClientAliveInterval 600"}]},
              "4":{"id":4,
                   "repaired":"1",
                   "type":"file",
                   "harm":"High",
                   "level":"3",
                   "name":"Make sure SSH LogLevel is set to INFO",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Set parameters in the /etc/ssh/sshd_config file as follows (uncomment)",
                   "repair":"LogLevel INFO",
                   "rule":[{"re":"\nLogLevel\\s*(\\w+)","check":{"type":"string","value":["INFO"]}}],
                   "repair_loophole":[{"re":"\n?#?LogLevel\\s*(\\w+)","check":"\nLogLevel INFO"}]},
              "5":{"id":5,
                   "repaired":"1",
                   "type":"file",
                   "harm":"High",
                   "level":"3",
                   "name":"Disable SSH users with empty passwords from logging in",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Configure PermitEmptyPasswords to no in /etc/ssh/sshd_config",
                   "repair":"PermitEmptyPasswords no",
                   "rule":[{"re":"\nPermitEmptyPasswords\\s*(\\w+)","check":{"type":"string","value":["no"]}}],
                   "repair_loophole":[{"re":"\n?#?PermitEmptyPasswords\\s*(\\w+)","check":"\nPermitEmptyPasswords no"}]},
              "6":{"id":6,
                   "repaired":"1",
                   "type":"file",
                   "name":"SSH uses the default port 22",
                   "harm":"High",
                   "level":"3",
                   "file":"/etc/ssh/sshd_config",
                   "Suggestions":"Set Port to 6000 to 65535 in / etc / ssh / sshd_config",
                   "repair":"Port 60151",
                   "rule":[{"re":"Port\\s*(\\d+)","check":{"type":"number","max":65535,"min":22}}],
                   "repair_loophole":[{"re":"\n?#?Port\\s*(\\d+)","check":"\nPort 65531"}]}}
    __root_login_types = {'yes':'yes - keys and passwords','no':'no - no login','without-password':'without-password - only key login','forced-commands-only':'forced-commands-only - can only execute commands'}


    def __init__(self):
        if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'ssh_login_record')).count():
            public.M('').execute('''CREATE TABLE ssh_login_record (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                addr TEXT,
                server_ip TEXT,
                user_agent TEXT,
                ssh_user TEXT,
                login_time INTEGER DEFAULT 0,
                close_time INTEGER DEFAULT 0,
                video_addr TEXT);''')
            public.M('').execute('CREATE INDEX ssh_login_record ON ssh_login_record (addr);')

        if not os.path.exists(self.__ClIENT_IP):
            public.WriteFile(self.__ClIENT_IP,json.dumps([]))
        self.__mail=send_mail.send_mail()
        self.__mail_config=self.__mail.get_settings()
        self._check_pyenv()
        try:
            self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP))
        except:
            self.__ip_data=[]

    def _check_pyenv(self):
        if os.path.exists('/www/server/panel/pyenv'):
            self.__pyenv = 'btpython'

    def get_ssh_key_type(self):
        '''
        获取ssh密钥类型
        @author hwliang
        :return:
        '''
        default_type = 'rsa'
        if not os.path.exists(self.__key_type_file):
            return default_type
        new_type = public.ReadFile(self.__key_type_file)
        if new_type in self.__type_list:
            return new_type
        return default_type


    def return_python(self):
        if os.path.exists('/www/server/panel/pyenv/bin/python'):return '/www/server/panel/pyenv/bin/python'
        if os.path.exists('/usr/bin/python'):return '/usr/bin/python'
        if os.path.exists('/usr/bin/python3'):return '/usr/bin/python3'
        return 'python'


    def return_profile(self):
        if os.path.exists('/root/.bash_profile'): return '/root/.bash_profile'
        if os.path.exists('/etc/profile'): return '/etc/profile'
        fd = open('/root/.bash_profil', mode="w", encoding="utf-8")
        fd.close()
        return '/root/.bash_profil'

    def return_bashrc(self):
        if os.path.exists('/root/.bashrc'):return '/root/.bashrc'
        if os.path.exists('/etc/bashrc'):return '/etc/bashrc'
        if os.path.exists('/etc/bash.bashrc'):return '/etc/bash.bashrc'
        fd = open('/root/.bashrc', mode="w", encoding="utf-8")
        fd.close()
        return '/root/.bashrc'


    def check_files(self):
        try:
            json.loads(public.ReadFile(self.__ClIENT_IP))
        except:
            public.WriteFile(self.__ClIENT_IP, json.dumps([]))

    def get_ssh_port(self):
        conf = public.readFile(self.__SSH_CONFIG)
        if not conf: conf = ''
        rep = r"#*Port\s+([0-9]+)\s*\n"
        tmp1 = re.search(rep,conf)
        port = '22'
        if tmp1:
            port = tmp1.groups(0)[0]
        return port

    # 主判断函数
    def check_san_baseline(self, base_json):
        if base_json['type'] == 'file':
            if 'check_file' in base_json:
                if not os.path.exists(base_json['check_file']):
                    return False
            else:
                if os.path.exists(base_json['file']):
                    ret = public.ReadFile(base_json['file'])
                    for i in base_json['rule']:
                        valuse = re.findall(i['re'], ret)
                        if i['check']['type'] == 'number':
                            if not valuse: return False
                            if not valuse[0]: return False
                            valuse = int(valuse[0])
                            if valuse > i['check']['min'] and valuse < i['check']['max']:
                                return True
                            else:
                                return False
                        elif i['check']['type'] == 'string':
                            if not valuse: return False
                            if not valuse[0]: return False
                            valuse = valuse[0]
                            if valuse in i['check']['value']:
                                return True
                            else:
                                return False
                return True

    def san_ssh_security(self,get):
        data={"num":100,"result":[]}
        result = []
        ret = self.check_san_baseline(self.__REPAIR['1'])
        if not ret: result.append(self.__REPAIR['1'])
        ret = self.check_san_baseline(self.__REPAIR['2'])
        if not ret: result.append(self.__REPAIR['2'])
        ret = self.check_san_baseline(self.__REPAIR['3'])
        if not ret: result.append(self.__REPAIR['3'])
        ret = self.check_san_baseline(self.__REPAIR['4'])
        if not ret: result.append(self.__REPAIR['4'])
        ret = self.check_san_baseline(self.__REPAIR['5'])
        if not ret: result.append(self.__REPAIR['5'])
        ret = self.check_san_baseline(self.__REPAIR['6'])
        if not ret: result.append(self.__REPAIR['6'])
        data["result"]=result
        if len(result)>=1:
            data['num']=data['num']-(len(result)*10)
        return data

    ################## SSH 登陆报警设置 ####################################
    def send_mail_data(self, title, body, login_ip, type=None):
        # public.print_log(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        # public.print_log((title, body, login_ip))
        from panel_msg.collector import SitePushMsgCollect
        msg = SitePushMsgCollect.ssh_login(body)
        push_data = {
            "login_ip": "" if body.find("backdoor user") != -1 else ( login_ip if login_ip != "" else "unknown ip"),
            "msg_list": ['>Send content:' + body]
        }
        # public.print_log(push_data)
        try:
            import sys
            if "/www/server/panel" not in sys.path:
                sys.path.insert(0, "/www/server/panel")

            from mod.base.push_mod import push_by_task_keyword
            # public.print_log(push_data)
            res = push_by_task_keyword("ssh_login", "ssh_login", push_data=push_data)
            if res:
                return
        except:
            pass

        try:
            login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
            if not os.path.exists(login_send_type_conf):
                return
                # login_type = "mail"
            else:
                login_type = public.readFile(login_send_type_conf).strip()
                if not login_type:
                    login_type = "mail"
            object = public.init_msg(login_type.strip())
            if not object:
                return False
            if login_type=="mail":
                data={}
                data['title'] = title
                data['msg'] = body
                object.push_data(data)
            elif login_type=="wx_account":
                from push.site_push import ToWechatAccountMsg
                if body.find("backdoor user") != -1:
                    msg = ToWechatAccountMsg.ssh_login("")
                else:
                    msg = ToWechatAccountMsg.ssh_login(login_ip if login_ip != "" else "unknown ip")
                object.send_msg(msg)
            else:
                msg = public.get_push_info("SSH logon alarm", ['>Send content:' + body])
                msg['push_type'] = "SSH logon alarm"
                object.push_data(msg)
        except:
            pass

    #检测非UID为0的账户
    def check_user(self):
        ret = []
        cfile = '/etc/passwd'
        if os.path.exists(cfile):
            f = open(cfile, 'r')
            for i in f:
                i = i.strip().split(":")
                if i[2] == '0' and i[3] == '0':
                    if i[0] == 'root': continue
                    ret.append(i[0])
        if ret:
            data=''.join(ret)
            public.run_thread(self.send_mail_data,args=(public.GetLocalIp()+' There is a backdoor user in the server',public.GetLocalIp()+' There is a backdoor user in the server '+data+' please check/etc/passwd',))
            return True
        else:
            return False

    #记录root 的登陆日志

    #返回登陆IP
    def return_ip(self,get):
        self.check_files()
        # return public.returnMsg(True, self.__ip_data)
        return public.return_message(0, 0, self.__ip_data)

    #添加IP白名单
    def add_return_ip(self, get):

        # 校验参数
        try:
            get.validate([
                Param('ip').Require().String().Ip(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        self.check_files()
        if get.ip.strip() in self.__ip_data:
            # return public.returnMsg(False, public.lang("Already exists"))
            return public.return_message(-1, 0, public.lang("Already exists"))
        else:
            self.__ip_data.append(get.ip.strip())
            public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
            # return public.returnMsg(True, public.lang("Added successfully"))
            return public.return_message(0, 0, public.lang("Added successfully"))

    def del_return_ip(self, get):
        # 校验参数
        try:
            get.validate([
                Param('ip').Require().Ip(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        self.check_files()
        if get.ip.strip() in self.__ip_data:
            self.__ip_data.remove(get.ip.strip())
            public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
            # return public.returnMsg(True, public.lang("Successfully deleted"))
            return public.return_message(0, 0, public.lang("Successfully deleted"))
        else:
            # return public.returnMsg(False, public.lang("IP does not exist"))
            return public.return_message(-1, 0, public.lang("IP does not exist"))

    #取登陆的前50个条记录
    def login_last(self):
        self.check_files()
        data=public.ExecShell('last -n 50')
        data=re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",data[0])
        if data>=1:
            data2=list(set(data))
            for i in data2:
                if not i in self.__ip_data:
                    self.__ip_data.append(i)
            public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
        return self.__ip_data

    #获取ROOT当前登陆的IP
    def get_ip(self):
        data = public.ExecShell(''' who am i |awk ' {print $5 }' ''')
        data = re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",data[0])
        return data

    def get_logs(self, get):

        # 分页校验参数
        try:
            get.validate([
                Param('p_size').Integer(),
                Param('p').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        import page
        page = page.Page()
        count = public.M('logs').where('type=?', ('SSH security',)).count()
        limit = 10
        info = {}
        info['count'] = count
        info['row'] = limit
        info['p'] = 1
        if hasattr(get, 'p'):
            info['p'] = int(get['p'])
        info['uri'] = get
        info['return_js'] = ''
        if hasattr(get, 'tojs'):
            info['return_js'] = get.tojs
        data = {}
        # 获取分页数据
        data['page'] = page.GetPage(info, '1,2,3,4,5,8')
        data['data'] = public.M('logs').where('type=?', (u'SSH security',)).order('id desc').limit(
            str(page.SHIFT) + ',' + str(page.ROW)).field('log,addtime').select()
        # return data
        return public.return_message(0, 0, data)

    def get_server_ip(self):
        if os.path.exists('/www/server/panel/data/iplist.txt'):
            data=public.ReadFile('/www/server/panel/data/iplist.txt')
            return data.strip()
        else:return '127.0.0.1'


    #登陆的情况下
    def login(self):
        self.check_files()
        self.check_user()
        self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP))
        ip=self.get_ip()
        if len(ip[0])==0:return False
        try:
            import time
            mDate = time.strftime('%Y-%m-%d %X', time.localtime())
            if ip[0] in self.__ip_data:
                if public.M('logs').where('type=? addtime', ('SSH security',mDate,)).count():return False
                public.WriteLog('SSH security', 'The server {} login IP is {}, login user is root'.format(public.GetLocalIp(),ip[0]))
                return False
            else:
                if public.M('logs').where('type=? addtime', ('SSH security', mDate,)).count(): return False
                self.send_mail_data('Server {} login alarm'.format(public.GetLocalIp()),'There is a login alarm on the server {}, the login IP is {}, the login user is root'.format(public.GetLocalIp(),ip[0]))
                public.WriteLog('SSH security','There is a login alarm on the server {}, the login IP is {}, login user is root'.format(public.GetLocalIp(),ip [0]))
                return True
        except:
            pass



    #修复bashrc文件
    def repair_bashrc(self):
        data = public.ReadFile(self.return_bashrc())
        if re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
            public.WriteFile(self.return_bashrc(),data.replace(self.return_python()+' /www/server/panel/class/ssh_security.py login',''))
            #遗留的错误信息
            datassss = public.ReadFile(self.return_bashrc())
            if re.search(self.return_python(),datassss):
                public.WriteFile(self.return_bashrc(),datassss.replace(self.return_python(),''))


    #开启监控
    def start_jian(self,get):
        self.repair_bashrc()
        data = public.ReadFile(self.return_profile())
        if not re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
            cmd = '''shell="%s /www/server/panel/class/ssh_security.py login"
        nohup  `${shell}` &>/dev/null &
        disown $!''' % (self.return_python())
            public.WriteFile(self.return_profile(), data.strip() + '\n' + cmd)
            return public.returnMsg(True, public.lang("Open successfully"))
        return public.returnMsg(False, public.lang("Open failed"))

    #关闭监控
    def stop_jian(self,get):
        data = public.ReadFile(self.return_profile())
        if re.search(self.return_python()+' /www/server/panel/class/ssh_security.py', data):
            cmd='''shell="%s /www/server/panel/class/ssh_security.py login"'''%(self.return_python())
            data=data.replace(cmd, '')
            cmd='''nohup  `${shell}` &>/dev/null &'''
            data=data.replace(cmd, '')
            cmd='''disown $!'''
            data=data.replace(cmd, '')
            public.WriteFile(self.return_profile(),data)
            #检查是否还存在遗留
            if re.search(self.return_python()+' /www/server/panel/class/ssh_security.py', data):
                public.WriteFile(self.return_profile(),data.replace(self.return_python()+' /www/server/panel/class/ssh_security.py login',''))
            #遗留的错误信息
            datassss = public.ReadFile(self.return_profile())
            if re.search(self.return_python(),datassss):
                public.WriteFile(self.return_profile(),datassss.replace(self.return_python(),''))

            return public.returnMsg(True, public.lang("Closed successfully"))
        else:
            return public.returnMsg(True, public.lang("Closed successfully"))

    #监控状态
    def get_jian(self,get):
        data = public.ReadFile(self.return_profile())
        #if re.search(r'{}\/www\/server\/panel\/class\/ssh_security.py\s+login'.format(r".*python\s+"), data):
        if re.search('/www/server/panel/class/ssh_security.py login', data):
            return public.returnMsg(True, public.lang("1"))
        else:
            return public.returnMsg(False, public.lang("1"))

    def set_password(self, get):
        '''
        开启密码登陆
        get: 无需传递参数
        '''
        ssh_password = r'\n#?PasswordAuthentication\s\w+'
        file = public.readFile(self.__SSH_CONFIG)
        if not file:
            return public.return_message(-1, 0, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!"))
            # return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!"))
        if len(re.findall(ssh_password, file)) == 0:
            file_result = file + '\nPasswordAuthentication yes'
        else:
            file_result = re.sub(ssh_password, '\nPasswordAuthentication yes', file)
        self.wirte(self.__SSH_CONFIG, file_result)
        self.restart_ssh()
        public.WriteLog('SSH management', 'Enable password login')
        # return public.returnMsg(True, public.lang("Open successfully"))
        return public.return_message(0, 0, public.lang("Open successfully"))

    def set_sshkey(self, get):
        '''
        设置ssh 的key
        参数 ssh=rsa&type=yes
        '''
        # 分页校验参数
        try:
            get.validate([
                Param('ssh').Require().String('in', ['yes', 'no']).Xss(),
                Param('type').Require().Xss(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        # ssh_type = ['yes', 'no']
        ssh = get.ssh
        # if not ssh in ssh_type: return public.returnMsg(False, public.lang("ssh option failed"))
        s_type = get.type
        if not s_type in self.__type_list:
            # return public.returnMsg(False, public.lang("Wrong encryption method"))
            return public.return_message(-1, 0, public.lang("Wrong encryption method"))

        authorized_keys = '/root/.ssh/authorized_keys'
        file = ['/root/.ssh/id_{}.pub'.format(s_type), '/root/.ssh/id_{}'.format(s_type)]
        for i in file:
            if os.path.exists(i):
                public.ExecShell(r'sed -i "\~$(cat %s)~d" %s' % (file[0], authorized_keys))
                os.remove(i)
        os.system("ssh-keygen -t {s_type} -P '' -f /root/.ssh/id_{s_type} |echo y".format(s_type = s_type))
        if os.path.exists(file[0]):
            public.ExecShell('cat %s >> %s && chmod 600 %s' % (file[0], authorized_keys, authorized_keys))
            rec = r'\n#?RSAAuthentication\s\w+'
            rec2 = r'\n#?PubkeyAuthentication\s\w+'
            file = public.readFile(self.__SSH_CONFIG)
            if not file:
                # return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!"))
                return public.return_message(-1, 0, public.lang("ERROR: sshd config configuration file does not exist"))
            if len(re.findall(rec, file)) == 0: file = file + '\nRSAAuthentication yes'
            if len(re.findall(rec2, file)) == 0: file = file + '\nPubkeyAuthentication yes'
            file_ssh = re.sub(rec, '\nRSAAuthentication yes', file)
            file_result = re.sub(rec2, '\nPubkeyAuthentication yes', file_ssh)
            if ssh == 'no':
                ssh_password = r'\n#?PasswordAuthentication\s\w+'
                if len(re.findall(ssh_password, file_result)) == 0:
                    file_result = file_result + '\nPasswordAuthentication no'
                else:
                    file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file_result)
            self.wirte(self.__SSH_CONFIG, file_result)
            public.writeFile(self.__key_type_file, s_type)
            self.restart_ssh()
            public.WriteLog('SSH management', 'Set up SSH key authentication and successfully generate the key')
            # return public.returnMsg(True, public.lang("Open successfully"))
            return public.return_message(0, 0, public.lang("Open successfully"))
        else:
            public.WriteLog('SSH management', 'Failed to set SSH key authentication')
            # return public.returnMsg(False, public.lang("Open failed"))
            return public.return_message(-1, 0, public.lang("Open failed"))

        # 取SSH信息

    def get_msg_push_list(self,get):
        """
        @name 获取消息通道配置列表
        @auther: cjxin
        @date: 2022-08-16
        @
        """
        cpath = 'data/msg.json'
        try:
            if 'force' in get or not os.path.exists(cpath):
                public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.aapanel.com"),cpath)
        except : pass

        data = {}
        if os.path.exists(cpath):
            msgs = json.loads(public.readFile(cpath))
            for x in msgs:
                x['setup'] = False
                x['info'] = False
                key = x['name']
                try:
                    obj =  public.init_msg(x['name'])
                    if obj:
                        x['setup'] = True
                        x['info'] = obj.get_version_info(None)
                except :
                    print(public.get_error_info())
                    pass
                data[key] = x
        # return data
        return public.return_message(0, 0, data)
    def _get_msg_push_list(self,get):
        """
        @name 获取消息通道配置列表
        @auther: cjxin
        @date: 2022-08-16
        @
        """
        cpath = 'data/msg.json'
        try:
            if 'force' in get or not os.path.exists(cpath):
                public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.aapanel.com"),cpath)
        except : pass

        data = {}
        if os.path.exists(cpath):
            msgs = json.loads(public.readFile(cpath))
            for x in msgs:
                x['setup'] = False
                x['info'] = False
                key = x['name']
                try:
                    obj =  public.init_msg(x['name'])
                    if obj:
                        x['setup'] = True
                        x['info'] = obj.get_version_info(None)
                except :
                    print(public.get_error_info())
                    pass
                data[key] = x
        return data
        # return public.return_message(0, 0, data)



    #取消告警
    def clear_login_send(self,get):

        # 校验参数
        try:
            get.validate([
                Param('type').Require().String().Xss(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))


        login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
        os.remove(login_send_type_conf)
        self.stop_jian(get)
        # return public.returnMsg(True, public.lang("Successfully cancel the login alarm!"))
        return public.return_message(0, 0, public.lang("Successfully cancel the login alarm"))

    #设置告警
    def set_login_send(self,get):
        # 校验参数
        try:
            get.validate([
                Param('type').Require().String().Xss(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
        set_type=get.type.strip()

        msg_configs = self._get_msg_push_list(get)
        # public.print_log("22222 --{}".format(msg_configs.keys()))
        if set_type not in msg_configs.keys():
            # return public.returnMsg(False, public.lang("This send type is not supported"))
            return public.return_message(-1, 0, public.lang("This send type is not supported"))

        from panelMessage import panelMessage
        pm = panelMessage()
        obj = pm.init_msg_module(set_type)
        if not obj:
            # return public.returnMsg(False, public.lang("The message channel is not installed."))
            return public.return_message(-1, 0, public.lang("The message channel is not installed"))

        public.writeFile(login_send_type_conf, set_type)
        self.start_jian(get)
        # return public.returnMsg(True, public.lang("Successfully set"))
        return public.return_message(0, 0, public.lang("Successfully set"))

    #查看告警
    def get_login_send(self, get):
        login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
        if os.path.exists(login_send_type_conf):
            send_type = public.readFile(login_send_type_conf).strip()
        else:
            send_type ="error"
        # return public.returnMsg(True, send_type)
        return public.return_message(0, 0, send_type)

    def GetSshInfo(self, get):
        # port = public.get_ssh_port()

        pid_file = '/run/sshd.pid'
        if os.path.exists(pid_file):
            pid = int(public.readFile(pid_file))
            status = public.pid_exists(pid)
        else:
            import system
            panelsys = system.system()
            version = panelsys.GetSystemVersion()
            if os.path.exists('/usr/bin/apt-get'):
                if os.path.exists('/etc/init.d/sshd'):
                    status = public.ExecShell("service sshd status | grep -P '(dead|stop)'|grep -v grep")
                else:
                    status = public.ExecShell("service ssh status | grep -P '(dead|stop)'|grep -v grep")
            else:
                if version.find(' 7.') != -1 or version.find(' 8.') != -1 or version.find('Fedora') != -1:
                    status = public.ExecShell("systemctl status sshd.service | grep 'dead'|grep -v grep")
                else:
                    status = public.ExecShell("/etc/init.d/sshd status | grep -e 'stopped' -e '已停'|grep -v grep")

            #       return status;
            if len(status[0]) > 3:
                status = False
            else:
                status = True
        return status


    def stop_key(self, get):
        '''
        关闭key
        无需参数传递
        '''
        is_ssh_status=self.GetSshInfo(get)
        rec = r'\n\s*#?\s*RSAAuthentication\s+\w+'
        rec2 = r'\n\s*#?\s*PubkeyAuthentication\s+\w+'
        file = public.readFile(self.__SSH_CONFIG)
        if not file:
            # return public.returnMsg(False, public.lang("错误:sshd_config配置文件不存在,无法继续!"))
            return public.return_message(-1, 0, public.lang("Error: sshd config configuration file does not exist"))
        file_ssh = re.sub(rec, '\nRSAAuthentication no', file)
        file_result = re.sub(rec2, '\nPubkeyAuthentication no', file_ssh)
        self.wirte(self.__SSH_CONFIG, file_result)

        if is_ssh_status:
            self.set_password(get)
            self.restart_ssh()
        public.WriteLog('SSH management','Disable SSH key login')
        # return public.returnMsg(True, public.lang("Disable successfully"))
        return public.return_message(0, 0, public.lang("Disable successfully"))



    def get_config(self, get):
        '''
        获取配置文件
        无参数传递
        '''
        result = {}
        file = public.readFile(self.__SSH_CONFIG)
        if not file:
            # return public.returnMsg(False, public.lang("Error: sshd config does not exist"))
            return public.return_message(-1, 0, public.lang("Error: sshd config does not exist"))

        # ========   以下在2022-10-12重构  ==========
        # author : hwliang
        # 是否开启RSA公钥认证
        # 默认开启(最新版openssh已经不支持RSA公钥认证)
        # yes = 开启
        # no = 关闭
        result['rsa_auth'] = 'yes'
        rec = r'^\s*RSAAuthentication\s*(yes|no)'
        rsa_find = re.findall(rec, file, re.M|re.I)
        if rsa_find and rsa_find[0].lower() == 'no': result['rsa_auth'] = 'no'

        # 获取是否开启公钥认证
        # 默认关闭
        # yes = 开启
        # no = 关闭
        result['pubkey'] = 'no'
        if self._get_key(get)['msg']: # 先检查是否存在可用的公钥
            pubkey = r'^\s*PubkeyAuthentication\s*(yes|no)'
            pubkey_find = re.findall(pubkey, file, re.M|re.I)
            if pubkey_find and pubkey_find[0].lower() == 'yes': result['pubkey'] = 'yes'


        # 是否开启密码登录
        # 默认开启
        # yes = 开启
        # no = 关闭
        result['password'] = 'yes'
        ssh_password = r'^\s*PasswordAuthentication\s*([\w\-]+)'
        ssh_password_find = re.findall(ssh_password, file, re.M|re.I)
        if ssh_password_find and ssh_password_find[0].lower() == 'no': result['password'] = 'no'

        #是否允许root登录
        # 默认允许
        # yes = 允许
        # no = 不允许
        # without-password = 允许,但不允许使用密码登录
        # forced-commands-only = 允许,但只允许执行命令,不能使用终端
        result['root_is_login'] = 'yes'
        result['root_login_type'] = 'yes'
        root_is_login=r'^\s*PermitRootLogin\s*([\w\-]+)'
        root_is_login_find = re.findall(root_is_login, file, re.M|re.I)
        if root_is_login_find and root_is_login_find[0].lower() != 'yes':
            result['root_is_login'] = 'no'
            result['root_login_type'] = root_is_login_find[0].lower()
        result['root_login_types'] = self.__root_login_types
        result['port'] = public.get_sshd_port()
        result['key_type'] = public.ReadFile(self.__key_type_file)
        # return result
        return public.return_message(0, 0, result)


    def set_root(self, get):
        '''
        开启密码登陆
        get: 无需传递参数
        '''
        # without-password  yes no   forced-commands-only
        # 分页校验参数
        try:
            get.validate([
                Param('p_type').String('in', ['yes', 'no', 'without-password', 'forced-commands-only']).Xss(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        p_type = 'yes'
        if 'p_type' in get: p_type = get.p_type
        if p_type not in self.__root_login_types.keys():
            # return public.returnMsg(False, public.lang("Parameter passing error!"))
            return public.return_message(-1, 0, public.lang("Parameter passing error"))
        ssh_password = r'^\s*#?\s*PermitRootLogin\s*([\w\-]+)'
        file = public.readFile(self.__SSH_CONFIG)
        src_line = re.search(ssh_password, file,re.M)
        new_line = 'PermitRootLogin {}'.format(p_type)
        if not src_line:
            file_result = file + '\n{}'.format(new_line)
        else:
            file_result = file.replace(src_line.group(),new_line)
        self.wirte(self.__SSH_CONFIG, file_result)
        self.restart_ssh()
        msg = public.lang('Set the root login method as: {}',self.__root_login_types[p_type])
        public.WriteLog('SSH management',msg)
        # return public.returnMsg(True, msg)
        return public.return_message(0, 0, msg)


    def set_root_password(self, get):
        """
        @name 设置root密码
        @param get:
        @return:
        """
        password = get.password if "password" in get else ""
        username = get.username if "username" in get else ""
        if not password: return public.return_message(-1, 0, public.lang("The password cannot be empty"))
        if len(password) < 8: return public.return_message(-1, 0, public.lang("The password cannot be less than 8 bits long"))
        if get.username not in self.get_sys_user(get)['msg']:
            return public.return_message(-1, 0, public.lang("The username already exists"))

        has_letter = bool(re.search(r'[a-zA-Z!@#$%^&*()-_+=]', password))
        has_digit_or_symbol = bool(re.search(r'[0-9!@#$%^&*()-_+=]', password))
        if not has_letter or not has_digit_or_symbol: return public.return_message(-1, 0, public.lang("The password must contain letters and numbers or symbols"))

        if username == "root":
            cmd_result, cmd_err = public.ExecShell("echo root:%s|chpasswd" % password)
        else:
            cmd_result, cmd_err = public.ExecShell("echo %s:%s|chpasswd" % (username, password))

        if cmd_err: return public.return_message(-1, 0, public.lang("Setup failure"))
        public.WriteLog("SSH", "[Security] - [SSH] - [Set %s password]" % username)
        return public.return_message(0, 0, public.lang("successfully set"))

    def get_sys_user(self, get):
        """获取所有用户名
        @param:
        @return
        """
        from collections import deque
        user_set = deque()
        with open('/etc/passwd') as fp:
            for line in fp.readlines():
                user_set.append(line.split(':', 1)[0])

        return public.returnMsg(True, list(user_set))

    def stop_root(self, get):
        '''
        开启密码登陆
        get: 无需传递参数
        '''
        ssh_password = r'\n\s*PermitRootLogin\s+\w+'
        file = public.readFile(self.__SSH_CONFIG)
        if len(re.findall(ssh_password, file)) == 0:
            file_result = file + '\nPermitRootLogin no'
        else:
            file_result = re.sub(ssh_password, '\nPermitRootLogin no', file)
        self.wirte(self.__SSH_CONFIG, file_result)
        self.restart_ssh()
        public.WriteLog('SSH management','Set the root login method to: no')
        return public.returnMsg(True, public.lang("Disable successfully"))

    def stop_password(self, get):
        '''
        关闭密码访问
        无参数传递
        '''
        file = public.readFile(self.__SSH_CONFIG)
        ssh_password = r'\n#?PasswordAuthentication\s\w+'
        file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file)
        self.wirte(self.__SSH_CONFIG, file_result)
        self.restart_ssh()
        public.WriteLog('SSH management','Disable password access')
        return public.returnMsg(True, public.lang("Closed successfully"))

    def _get_key(self, get):
        '''
        获取key 无参数传递
        '''
        key_type = self.get_ssh_key_type()
        if key_type in self.__type_files.keys():
            key_file = self.__type_files[key_type]
            key = public.readFile(key_file)
            return public.returnMsg(True,key)
        return public.returnMsg(True, public.lang(""))

    def get_key(self, get):
        '''
        获取key 无参数传递
        '''
        key_type = self.get_ssh_key_type()
        if key_type in self.__type_files.keys():
            key_file = self.__type_files[key_type]
            key = public.readFile(key_file)
            return public.return_message(0, 0,key)
        return public.return_message(0, 0, public.lang(""))
    def download_key(self, get):
        '''
            @name 下载密钥
        '''
        download_file = ''
        key_type = self.get_ssh_key_type()
        if key_type in self.__type_files.keys():
            if os.path.exists(self.__type_files[key_type]):
                download_file = self.__type_files[key_type]

        else:
            for file in self.__key_files:
                if not os.path.exists(file): continue
                download_file = file
                break

        if not download_file: return public.returnMsg(False, public.lang("Key file not found!"))
        from flask import send_file
        filename = "{}_{}".format(public.GetHost(),os.path.basename(download_file))
        return send_file(download_file,download_name=filename)

    def wirte(self, file, ret):
        result = public.writeFile(file, ret)
        return result

    def restart_ssh(self):
        '''
        重启ssh 无参数传递
        '''
        version = public.readFile('/etc/redhat-release')
        act = 'restart'
        if not os.path.exists('/etc/redhat-release'):
            public.ExecShell('service ssh ' + act)
        elif version.find(' 7.') != -1 or version.find(' 8.') != -1:
            public.ExecShell("systemctl " + act + " sshd.service")
        else:
            public.ExecShell("/etc/init.d/sshd " + act)
    #检查是否设置了钉钉
    def check_dingding(self, get):
        '''
        检查是否设置了钉钉
        '''
        #检查文件是否存在
        if not os.path.exists('/www/server/panel/data/dingding.json'):return False
        dingding_config=public.ReadFile('/www/server/panel/data/dingding.json')
        if not dingding_config:return False
        #解析json
        try:
            dingding=json.loads(dingding_config)
            if dingding['dingding_url']:
                return True
        except:
            return False

    #开启SSH双因子认证
    def start_auth_method(self, get):
        '''
        开启SSH双因子认证
        '''
        #检查是否设置了钉钉
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return  ssh_class.start_ssh_authentication_two_factors()

    #关闭SSH双因子认证
    def stop_auth_method(self, get):
        '''
        关闭SSH双因子认证
        '''
        #检查是否设置了钉钉
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return ssh_class.close_ssh_authentication_two_factors()

    #获取SSH双因子认证状态
    def get_auth_method(self, get):
        '''
        获取SSH双因子认证状态
        '''
        #检查是否设置了钉钉
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return ssh_class.check_ssh_authentication_two_factors()

    #判断so文件是否存在
    def check_so_file(self, get):
        '''
        判断so文件是否存在
        '''
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return ssh_class.is_check_so()

    #下载so文件
    def get_so_file(self, get):
        '''
        下载so文件
        '''
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return ssh_class.download_so()

    #获取pin
    def get_pin(self, get):
        '''
        获取pin
        '''
        import ssh_authentication
        ssh_class=ssh_authentication.ssh_authentication()
        return public.returnMsg(True, ssh_class.get_pin())

    def get_login_record(self,get):
        if os.path.exists(self.open_ssh_login):

            return public.returnMsg(True, public.lang(""))
        else:
            return public.returnMsg(False, public.lang(""))
    def start_login_record(self,get):
        if os.path.exists(self.open_ssh_login):
            return public.returnMsg(True, public.lang(""))
        else:
            public.writeFile(self.open_ssh_login,"True")
            return public.returnMsg(True, public.lang(""))
    def stop_login_record(self,get):
        if os.path.exists(self.open_ssh_login):
            os.remove(self.open_ssh_login)
            return public.returnMsg(True, public.lang(""))
        else:
            return public.returnMsg(True, public.lang(""))
    # 获取登录记录列表
    def get_record_list(self, get):
        if 'limit' in get:
            limit = int(get.limit.strip())
        else:
            limit = 12
        import page
        page = page.Page()
        count = public.M('ssh_login_record').order("id desc").count()
        info = {}
        info['count'] = count
        info['row'] = limit
        info['p'] = 1
        if hasattr(get, 'p'):
            info['p'] = int(get['p'])
        info['uri'] = get
        info['return_js'] = ''
        if hasattr(get, 'tojs'):
            info['return_js'] = get.tojs
        data = {}
        # 获取分页数据
        data['page'] = page.GetPage(info, '1,2,3,4,5,8')

        data['data'] = public.M('ssh_login_record').order('id desc').limit(
            str(page.SHIFT) + ',' + str(page.ROW)).select()

        return data

    def get_file_json(self,get):

        if os.path.exists(get.path):
            ret=json.loads(public.ReadFile(get.path))
            return  ret
        else:
            return ''

if __name__ == '__main__':
    import sys
    type = sys.argv[1]
    if type=='login':
        try:
            aa = ssh_security()
            aa.login()
        except:pass
    else:
        pass

Youez - 2016 - github.com/yon3zu
LinuXploit