Server IP : 104.21.38.3 / Your IP : 172.70.93.17 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/safeModelV2/ |
Upload File : |
#coding: utf-8 #------------------------------------------------------------------- # aaPanel #------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. #------------------------------------------------------------------- # Author: hwliang <[email protected]> #------------------------------------------------------------------- # ssh信息 #------------------------------ import json import os import re import time import public from safeModelV2.base import safeBase from datetime import datetime class main(safeBase): def __init__(self): pass # 获取当天登陆失败/登陆成功计数 def __get_today_stats(self): today_err_num1 = int(public.ExecShell( "journalctl -u ssh --no-pager -S today |grep -a 'Failed password for' |grep -v 'invalid' |wc -l")[0]) today_err_num2 = int(public.ExecShell( "journalctl -u ssh --no-pager -S today |grep -a 'Connection closed by authenticating user' |grep -a 'preauth' |wc -l")[0]) today_success = int(public.ExecShell("journalctl -u ssh --no-pager -S today |grep -a 'Accepted' |wc -l")[0]) return today_err_num1 + today_err_num2, today_success # 更新ssh统计记录 def __update_record_with_today_stats(self, record): today_err_num, today_success = self.__get_today_stats() if record["today_success"] < today_success: record["success"] += today_success if record["today_error"] < today_err_num: record["error"] += today_err_num record['today_error'] = today_err_num record['today_success'] = today_success # 获取终端执行命令记录 def ssh_cmd_history(self, get): try: result = [] file_path = "/root/.bash_history" data = public.readFile(file_path) if os.path.exists(file_path) else None danger_cmd = ['rm', 'rmi', 'kill', 'stop', 'pause', 'unpause', 'restart', 'update', 'exec', 'init', 'shutdown', 'reboot', 'chmod', 'chown', 'dd', 'fdisk', 'killall', 'mkfs', 'mkswap', 'mount', 'swapoff', 'swapon', 'umount', 'userdel', 'usermod', 'passwd', 'groupadd', 'groupdel', 'groupmod', 'chpasswd', 'chage', 'usermod', 'useradd', 'userdel', 'pkill'] if data: data_list = data.split("\n") for i in data_list: if len(result) >= 200: break if not i or i.startswith("#"): continue is_dangerous = any(cmd in i for cmd in danger_cmd) result.append({ "command": i, "is_dangerous": is_dangerous }) else: result = [] return public.return_message(0, 0, { "data": result, "total": len(result) }) except: return public.returnMsg(False, { "data": [], "total": 0 }) def get_ssh_intrusion(self,get): """ @获取SSH爆破次数 @param get: """ result = {'error': 0, 'success': 0, 'today_error': 0, 'today_success': 0} # debian系统处理 if os.path.exists("/etc/debian_version"): version = public.readFile('/etc/debian_version').strip() if 'bookworm' in version or 'jammy' in version or 'impish' in version: version = 12 else: try: version = float(version) except: version = 11 if version >= 12: try: # # 优先取缓存 pkey = "version_12_ssh_login_counts" if public.cache_get(pkey): return public.cache_get(pkey) # 读取记录文件 filepath = "/www/server/panel/data/ssh_login_counts.json" filedata = public.readFile(filepath) if os.path.exists(filepath) else public.writeFile(filepath, "[]") today = datetime.now().strftime('%Y-%m-%d') # 解析记录文件的内容 try: data_list = json.loads(filedata) except: data_list = [] if data_list: for index, record in enumerate(data_list): # 如果记录中有当天的数据,则直接返回 if record['date'] == today: self.__update_record_with_today_stats(record) if index == 0: # 确保只在首次找到匹配项时返回 data_list[0] = record # 设置缓存 public.cache_set(pkey, record, 30) return record else: record = data_list[0] self.__update_record_with_today_stats(record) # 设置缓存 public.cache_set(pkey, record, 30) return record # 没有记录文件 按原先的方式获取 err_num1 = int(public.ExecShell( "journalctl -u ssh --no-pager |grep -a 'Failed password for' |grep -v 'invalid' |wc -l")[0]) err_num2 = int(public.ExecShell( "journalctl -u ssh --no-pager --grep='Connection closed by authenticating user|preauth' |wc -l")[0]) result['error'] = err_num1 + err_num2 result['success'] = int(public.ExecShell("journalctl -u ssh --no-pager|grep -a 'Accepted' |wc -l")[0]) today_err_num, today_success = self.__get_today_stats() result['today_error'] = today_err_num result['today_success'] = today_success # 设置缓存 public.cache_set(pkey, result, 30) except: pass return result # 记录文件 ssh_intrusion_file = '/www/server/panel/config/ssh_intrusion.json' today = datetime.now().strftime('%Y-%m-%d') wf = True # 读取文件 try: ssh_intrusion_data = json.loads(public.readFile(ssh_intrusion_file)) if "time" in ssh_intrusion_data and ssh_intrusion_data['time'] == today: wf = False result['error'] = ssh_intrusion_data["data"]["error"] result['success'] = ssh_intrusion_data["data"]["success"] except: ssh_intrusion_data = {'time': '', 'data': result} logs_path_info = self.get_ssh_log_files_list(None) time_formatted = time.strftime('%b %d', time.localtime()) month, day = time_formatted.split() day = day.lstrip('0') formatted_time = "{} {}".format(month, day) formatted_time1 = "{} {} ".format(month, day) for sfile in logs_path_info: if not os.path.exists(sfile): continue for stype in result.keys(): # count = 0 # if sfile in data[stype] and not sfile in ['/var/log/auth.log','/var/log/secure']: # count += data[stype][sfile] # else: try: if stype in ["error", "success"] and ssh_intrusion_data and ssh_intrusion_data["time"] == today\ and ssh_intrusion_data["data"][stype] != 0: continue if stype == 'error': cmds = [ "cat {} | grep -a 'Failed password for' | grep -v 'invalid' | awk '{{print $5}}'".format(sfile), "cat {} | grep -a 'Connection closed by authenticating user' | grep -a 'preauth' | awk '{{print $5}}'".format(sfile), "cat {} | grep -a 'PAM service(sshd) ignoring max retries' | awk '{{print $5}}'".format(sfile) ] elif stype == 'success': cmds = [ "cat {} | grep -a 'Accepted' | awk '{{print $5}}'".format(sfile), "cat {} | grep -a 'sshd\\[.*session opened for user' | awk '{{print $5}}'".format(sfile) ] elif stype == 'today_error' and sfile in ["/var/log/secure", "/var/log/auth.log"]: cmds = [ "cat {} | grep -a 'Failed password for' | grep -v 'invalid' | grep -aE '{}|{}' | awk '{{print $5}}'".format(sfile, formatted_time, formatted_time1), "cat {} | grep -a 'Connection closed by authenticating user' | grep -a 'preauth' | grep -aE '{}|{}' | awk '{{print $5}}'".format(sfile, formatted_time, formatted_time1), "cat {} | grep -a 'PAM service(sshd) ignoring max retries' | grep -aE '{}|{}' | awk '{{print $5}}'".format(sfile, formatted_time, formatted_time1) ] elif stype == 'today_success' and sfile in ["/var/log/secure", "/var/log/auth.log"]: cmds = [ "cat {} | grep -a 'Accepted' | grep -aE '{}|{}' | awk '{{print $5}}'".format(sfile, formatted_time, formatted_time1), "cat {} | grep -a 'sshd\\[.*session opened for user' | grep -aE '{}|{}' | awk '{{print $5}}'".format(sfile, formatted_time, formatted_time1) ] else: continue log_entries = [] for cmd in cmds: output = public.ExecShell(cmd)[0].strip() if output: log_entries.extend(output.split('\n')) # 去重处理 if stype in ["success", "today_success"]: count = len(set(log_entries)) else: count = len(log_entries) result[stype] += count except Exception as e: continue result['success'] = result['today_success'] if result['today_success'] >= result['success'] else result['success'] + result['today_success'] result['error'] = result['today_error'] if result['today_error'] >= result['error'] else result['error'] + result['today_error'] # 写入到文件中 if wf: ssh_intrusion_data = {'time': today, 'data': result} public.writeFile(ssh_intrusion_file, json.dumps(ssh_intrusion_data)) return result # return public.return_message(0, 0, result) def get_ssh_cache(self): """ @获取缓存ssh记录 """ file = '{}/data/ssh_cache.json'.format(public.get_panel_path()) cache_data = {'success': {}, 'error': {}, 'today_success': {}, 'today_error': {}} if not os.path.exists(file): public.writeFile(file, json.dumps(cache_data)) return cache_data try: data = json.loads(public.readFile(file)) except: public.writeFile(file, json.dumps(cache_data)) data = cache_data return data def set_ssh_cache(self,data): """ @设置ssh缓存 """ file = '{}/data/ssh_cache.json'.format(public.get_panel_path()) public.writeFile(file,json.dumps(data)) return True def GetSshInfo(self,get): """ @获取SSH登录信息 """ port = public.get_sshd_port() status = public.get_sshd_status() isPing = True try: file = '/etc/sysctl.conf' conf = public.readFile(file) rep = r"#*net\.ipv4\.icmp_echo_ignore_all\s*=\s*([0-9]+)" tmp = re.search(rep,conf).groups(0)[0] if tmp == '1': isPing = False except: isPing = True data = {} data['port'] = port data['status'] = status data['ping'] = isPing data['firewall_status'] = self.CheckFirewallStatus() # data['error'] = self.get_ssh_intrusion(get) data['fail2ban'] = self._get_ssh_fail2ban() return public.return_message(0, 0, data) def get_ssh_login_info(self, get): """ @获取SSH登录信息 """ # return self.get_ssh_intrusion(get) return public.return_message(0, 0, self.get_ssh_intrusion(get)) @staticmethod def _get_ssh_fail2ban(): """ @name 获取fail2ban的服务和SSH防爆破状态 @return: """ plugin_path = "/www/server/panel/plugin/fail2ban" result_data = {"status": 0, "installed": 1} if not os.path.exists("{}".format(plugin_path)): result_data['installed'] = 0 return result_data sock = "{}/fail2ban.sock".format(plugin_path) if not os.path.exists(sock): return result_data s_file = '{}/plugin/fail2ban/config.json'.format(public.get_panel_path()) if os.path.exists(s_file): try: data = json.loads(public.readFile(s_file)) if 'sshd' in data: if data['sshd']['act'] == 'true': result_data['status'] = 1 return result_data except: pass return result_data #改远程端口 def SetSshPort(self,get): port = get.port if int(port) < 22 or int(port) > 65535: return public.returnMsg(False,'Port range must be between 22 and 65535!') ports = ['21','25','80','443','8080','888','8888'] if port in ports: return public.returnMsg(False,'Please dont use default ports for common programs!') file = '/etc/ssh/sshd_config' conf = public.readFile(file) rep = r"#*Port\s+([0-9]+)\s*\n" conf = re.sub(rep, "Port "+port+"\n", conf) public.writeFile(file,conf) if self.__isFirewalld: public.ExecShell('firewall-cmd --permanent --zone=public --add-port='+port+'/tcp') public.ExecShell('setenforce 0') public.ExecShell('sed -i "s#SELINUX=enforcing#SELINUX=disabled#" /etc/selinux/config') public.ExecShell("systemctl restart sshd.service") elif self.__isUfw: public.ExecShell('ufw allow ' + port + '/tcp') public.ExecShell("service ssh restart") else: public.ExecShell('iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport '+port+' -j ACCEPT') public.ExecShell("/etc/init.d/sshd restart") self.FirewallReload() public.M('firewall').where("ps=? or ps=? or port=?",('SSH remote management service','SSH remote service',port)).delete() public.M('firewall').add('port,ps,addtime',(port,'SSH remote service',time.strftime('%Y-%m-%d %X',time.localtime()))) public.WriteLog("TYPE_FIREWALL", "FIREWALL_SSH_PORT",(port,)) return public.return_message(0, 0,'Successfully modified') def SetSshStatus(self,get): """ @设置SSH状态 """ get.exists(["status"]) if int(get['status'])==1: msg = public.getMsg('FIREWALL_SSH_STOP') act = 'stop' else: msg = public.getMsg('FIREWALL_SSH_START') act = 'start' public.ExecShell("/etc/init.d/sshd "+act) public.ExecShell('service ssh ' + act) public.ExecShell("systemctl "+act+" sshd") public.ExecShell("systemctl "+act+" ssh") public.WriteLog("TYPE_FIREWALL", msg) return public.return_message(0, 0,'SUCCESS')