Server IP : 172.67.216.182 / Your IP : 172.70.147.109 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
# coding: utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: hwliang <2020-08-04> # +------------------------------------------------------------------- import os, sys, json, time, datetime os.chdir("/www/server/panel") sys.path.append("class/") import public from public.validate import Param class panelWarning: __path = '/www/server/panel/data/warning' __ignore = __path + '/ignore' __result = __path + '/result' __risk = __path + '/risk' _vuln_ignore = __path + '/ignore.json' _vuln_result = __path + '/result.json' __repair_count = __path + '/repair_count.json' __vul_list = __path + '/high_risk_vul-9.json' # 空i那么;皮的泡沫隔离 __report = '/www/server/panel/data/warning_report' vul_num = 0 discov_count = 0 # 扫描中发现的漏洞数 score = 100 # 扫描中动态分数 yum_time = __path + '/yum_time.pl' new_vul_list = __path + '/vul_centos7.json' product_version = __path + '/product_version.json' def __init__(self): if not os.path.exists(self.__ignore): os.makedirs(self.__ignore, 384) if not os.path.exists(self.__result): os.makedirs(self.__result, 384) if not os.path.exists(self.__risk): os.makedirs(self.__risk, 384) if not os.path.exists(self.__path): os.makedirs(self.__path, 384) if not os.path.exists(self.__report): os.makedirs(self.__report, 384) if not os.path.exists(self._vuln_ignore): result = [] public.WriteFile(self._vuln_ignore, json.dumps(result)) if not os.path.exists(self._vuln_result): result = [] public.WriteFile(self._vuln_result, json.dumps(result)) self.new_system_result = [] # self.sys_version = self.get_sys_version() # self.sys_product = self.new_get_sys_product() def _get_list(self): # 最终输出结果 self.data = { 'security': [], 'risk': [], 'ignore': [], "is_autofix": [], } # 获取支持一键修复的列表 try: is_autofix = public.read_config("safe_autofix") except: is_autofix = [] # 临时扫描结果,中断的时候返回 self.tmp_data = { 'security': [], 'risk': [], 'ignore': [], 'is_autofix': is_autofix, 'check_time': datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") } context = {"status": "Ready to repair", "percentage": 0, "count": 0, "score": 100} public.WriteFile(self.__path + '/bar.txt', json.dumps(context)) # 扫描进度条归零 bar_num = 0 # 进度条初始化 bar_limit = 0 # 进度条限制 self.discov_count = 0 # 扫描中发现漏洞数量初始化 self.score = 100 # 扫描中分数变化 sys_version = self.get_sys_version() # 获取系统版本 # self.compare_md5() # 比较漏洞库版本 # 下载新版漏洞库文件 if not self.download_new_vulns(): sys_version = None # centos_7走新版漏洞扫描 if sys_version == 'centos_7' or sys_version == 'centos_8' or sys_version == 'centos_8_stream': self.new_system_scan() # ubuntu走新版2接口 elif sys_version == 'ubuntu_20.04' or sys_version == 'ubuntu_22.04' or sys_version == 'ubuntu_18.04' or sys_version == 'debian_12' or sys_version == 'debian_11' or sys_version == 'debian_10': self.new_system_scan2() # 旧版本漏洞检测 # else: # self.system_scan() # 旧版本系统漏洞扫描 # 加载安全风险模块 p = public.get_modules('class_v2/safe_warning_v2') for m_name in p.__dict__.keys(): ignore_file = self.__ignore + '/' + m_name + '.pl' # 忽略的检查项 if p[m_name]._level == 0: continue m_info = { 'title': p[m_name]._title, 'm_name': m_name, 'ps': p[m_name]._ps, 'version': p[m_name]._version, 'level': p[m_name]._level, 'ignore': p[m_name]._ignore, 'date': p[m_name]._date, 'tips': p[m_name]._tips, 'help': p[m_name]._help } try: m_info['remind'] = p[m_name]._remind except: pass result_file = self.__result + '/' + m_name + '.pl' try: s_time = time.time() m_info['status'], m_info['msg'] = p[m_name].check_run() m_info['taking'] = round(time.time() - s_time, 6) m_info['check_time'] = int(time.time()) public.writeFile(result_file, json.dumps( [m_info['status'], m_info['msg'], m_info['check_time'], m_info['taking']], )) except: continue m_info['ignore'] = os.path.exists(ignore_file) if m_info['ignore']: self.data['ignore'].append(m_info) self.tmp_data['ignore'].append(m_info) # 临时扫描结果 else: if m_info['status']: self.data['security'].append(m_info) self.tmp_data['security'].append(m_info) # 临时扫描结果 else: risk_file = self.__risk + '/' + m_name + '.pl' public.writeFile(risk_file, json.dumps(m_info)) self.data['risk'].append(m_info) self.tmp_data['risk'].append(m_info) # 临时扫描结果 self.discov_count += 1 # 扫描中发现风险数 self.score -= m_info['level'] if self.score < 0: self.score = 0 bar = ("%.2f" % (float(bar_num) / float(len(p.__dict__.keys())) * 50 + 50)) # 通过进度条限制,防止写文件频繁占用高 if int(float(bar)) >= bar_limit: context = {"status": "{}".format(m_info['title']), "percentage": int(float(bar)), "count": self.discov_count, "score": self.score} public.WriteFile(self.__path + '/bar.txt', json.dumps(context)) self.dump_tmp_result() # 发现漏洞,先保存一份临时的 bar_limit += 10 bar_num += 1 # 新版漏洞检测无需读文件 # is_autofix被包含进tmp_data{}字典里,会动态增加 self.data['is_autofix'] = is_autofix # self.data['is_autofix'] += is_autofix # if sys_version == 'centos_7' or sys_version == 'ubuntu_20.04' or sys_version == 'ubuntu_22.04' or sys_version == 'ubuntu_18.04' or sys_version == 'debian_12' or sys_version == 'debian_11' or sys_version == 'debian_10': # self.data['is_autofix'] += is_autofix # 旧版本漏洞检测 # else: # vuln_result = self.get_vuln_result() # self.data['risk'] = self.data['risk'] + vuln_result['risk'] # self.data['ignore'] = self.data['ignore'] + vuln_result['ignore'] # vuln_is_autofix = [] # for vr in vuln_result['risk']: # if not vr["reboot"]: # vuln_is_autofix.append(vr["cve_id"]) # self.data['is_autofix'] = is_autofix + vuln_is_autofix score = 100 for d in self.data['risk']: score = score - d['level'] if score < 0: self.data['score'] = 0 else: self.data['score'] = score self.data['risk'] = sorted(self.data['risk'], key=lambda x: x['level'], reverse=True) self.data['security'] = sorted(self.data['security'], key=lambda x: x['level'], reverse=True) self.data['ignore'] = sorted(self.data['ignore'], key=lambda x: x['level'], reverse=True) self.data['check_time'] = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") # 将结果输出一份到报告目录下 with open("/www/server/panel/data/warning_report/data.json", "w") as f: json.dump(self.data, f) self.record_times() context = {"status": "Detection completed", "percentage": 100, "count": self.discov_count, "score": self.score} public.WriteFile(self.__path + '/bar.txt', json.dumps(context)) # public.WriteFile(self.__path + '/bar.txt', "100") # 扫描进度条归零 return self.data def download_new_vulns(self): ''' 根据系统版本确定漏洞库名 ''' sys_version = self.get_sys_version() zip_file = "" if sys_version == "centos_7": self.new_vul_list = self.__path + '/vul_centos7.json' zip_file = "vul_centos7.zip" elif sys_version == "centos_8": self.new_vul_list = self.__path + '/vul_centos8.json' zip_file = "vul_centos8.zip" elif sys_version == "centos_8_stream": self.new_vul_list = self.__path + '/vul_centos8stream.json' zip_file = "vul_centos8stream.zip" elif sys_version == "ubuntu_20.04": self.new_vul_list = self.__path + '/vul_ubuntu2004.json' zip_file = "vul_ubuntu2004.zip" elif sys_version == "ubuntu_22.04": self.new_vul_list = self.__path + '/vul_ubuntu2204.json' zip_file = "vul_ubuntu2204.zip" elif sys_version == "ubuntu_18.04": self.new_vul_list = self.__path + '/vul_ubuntu1804.json' zip_file = "vul_ubuntu1804.zip" elif sys_version == "debian_12": self.new_vul_list = self.__path + '/vul_debian12.json' zip_file = "vul_debian12.zip" elif sys_version == "debian_11": self.new_vul_list = self.__path + '/vul_debian11.json' zip_file = "vul_debian11.zip" elif sys_version == "debian_10": self.new_vul_list = self.__path + '/vul_debian10.json' zip_file = "vul_debian10.zip" # 检查漏洞库文件是否存在 if not os.path.exists(self.new_vul_list): if zip_file != '': downfile = self.__path+'/'+zip_file public.downloadFile("/safe_warning/{}".format(public.get_url(),zip_file), downfile) o, e = public.ExecShell("unzip -o {} -d {}".format(downfile, self.__path)) # 解压报错 if e != "": return False else: return False return True # cve_id def get_list(self, args): ''' @name 开始扫描并返回结果 @param args: @return: ''' if 'force' in args: # 校验参数 try: args.validate([ Param('force').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) import subprocess if hasattr(args, 'open') and args.open == "1": public.set_module_logs("panel_warning_v2", "get_list") public.WriteFile(self.__path + '/kill.pl', "False") # 用来判断这次的扫描是否被中断,默认没中断 command = "btpython /www/server/panel/class_v2/panel_warning_v2.py" process = subprocess.Popen(command, shell=True) # 获取进程ID pid = process.pid public.WriteFile(self.__path + "/pid.txt", str(pid)) process.wait() result_file = self.__path + '/resultresult.json' output = { "score": 100, "check_time": public.format_date(), "interrupt": False, "security": [], "risk": [], "ignore": [], "is_autofix": [] } if not os.path.exists(result_file): return output result_body = public.ReadFile(result_file) if not result_body: return output try: output = json.loads(result_body) except: return output # 给前端判断这次的扫描结果是否中断 if public.ReadFile(self.__path + '/kill.pl') == "True": output["interrupt"] = True else: output["interrupt"] = False return public.return_message(0,0,output) def sync_rule(self): ''' @name 从云端同步规则 @author hwliang<2020-08-05> @return void ''' # try: # dep_path = '/www/server/panel/class/safe_warning' # local_version_file = self.__path + '/version.pl' # last_sync_time = local_version_file = self.__path + '/last_sync.pl' # if os.path.exists(dep_path): # if os.path.exists(last_sync_time): # if int(public.readFile(last_sync_time)) > time.time(): # return # else: # if os.path.exists(local_version_file): os.remove(local_version_file) # download_url = public.get_url() # version_url = download_url + '/install/warning/version.txt' # cloud_version = public.httpGet(version_url) # if cloud_version: cloud_version = cloud_version.strip() # local_version = public.readFile(local_version_file) # if local_version: # if cloud_version == local_version: # return # tmp_file = '/tmp/bt_safe_warning.zip' # public.ExecShell('wget -O {} {} -T 5'.format(tmp_file,download_url + '/install/warning/safe_warning.zip')) # if not os.path.exists(tmp_file): # return # if os.path.getsize(tmp_file) < 2129: # os.remove(tmp_file) # return # if not os.path.exists(dep_path): # os.makedirs(dep_path,384) # public.ExecShell("unzip -o {} -d {}/ >/dev/null".format(tmp_file,dep_path)) # public.writeFile(local_version_file,cloud_version) # public.writeFile(last_sync_time,str(int(time.time() + 7200))) # if os.path.exists(tmp_file): os.remove(tmp_file) # public.ExecShell("chmod -R 600 {}".format(dep_path)) # except: # pass def set_ignore(self, args): ''' @name 设置指定项忽略状态 @author hwliang<2020-08-04> @param dict_obj { m_name<string> 模块名称 } @return dict ''' m_name = args.m_name.strip() ignore_file = self.__ignore + '/' + m_name + '.pl' if os.path.exists(ignore_file): os.remove(ignore_file) else: public.writeFile(ignore_file, '1') return public.return_message(0, 0, public.lang("successfully set!")) def check_find(self, args): ''' @name 检测指定项 @author hwliang<2020-08-04> @param dict_obj { m_name<string> 模块名称 } @return dict ''' # 校验参数 try: args.validate([ Param('m_name').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: m_name = args.m_name.strip() p = public.get_modules('class_v2/safe_warning_v2') m_info = { 'title': p[m_name]._title, 'm_name': m_name, 'ps': p[m_name]._ps, 'version': p[m_name]._version, 'level': p[m_name]._level, 'ignore': p[m_name]._ignore, 'date': p[m_name]._date, 'tips': p[m_name]._tips, 'help': p[m_name]._help } # 解决已经在忽略列表中,但是如果仍然需要检查的话可以检查 ignore_file = self.__ignore + '/' + m_name + '.pl' if os.path.exists(ignore_file): from cachelib import SimpleCache cache = SimpleCache(5000) ikey = 'warning_list' cache.delete(ikey) os.remove(ignore_file) result_file = self.__result + '/' + m_name + '.pl' s_time = time.time() m_info['status'], m_info['msg'] = p[m_name].check_run() m_info['taking'] = round(time.time() - s_time, 4) m_info['check_time'] = int(time.time()) public.writeFile(result_file, json.dumps( [m_info['status'], m_info['msg'], m_info['check_time'], m_info['taking']])) return public.return_message(0, 0, public.lang("It has been retested.")) except: return public.return_message(-1, 0, public.lang("Detection failed")) def system_scan(self): ''' 一键扫描系统 :param get: :return: dict ''' self.compare_md5() sys_version = self.get_sys_version() # if sys_version == 'None': # return public.returnMsg(False, public.lang("It is not supported by the current system")) sys_product = self.get_sys_product() # if not os.path.exists(self.__vul_list): # return public.returnMsg(False, public.lang("The scan failed")) vul_list = self.get_vul_list() new_risk_list = [] new_ignore_list = [] # error_list = [] result_dict = {} # cp_list = [] vul_count = 0 ignore_list = self.get_ignore_list() reboot_count = 0 self.vul_num = len(vul_list) bar_num = 0 # 进度条初始化 bar_limit = 0 # 进度条限制 for vul in vul_list: bar = ("%.2f" % (float(bar_num)/float(self.vul_num)*50)) # 限制进度条,限制写文件频率 if int(float(bar)) >= bar_limit: context = {"status": "{}".format(vul['cve_id']), "percentage": int(float(bar)), "count": self.discov_count, "score": self.score} public.WriteFile(self.__path+'/bar.txt', json.dumps(context)) self.dump_tmp_result() # 发现漏洞,先保存一份临时的 bar_limit += 10 bar_num += 1 vul_count += 1 for v in vul["affected_list"]: if v["manufacturer"] == sys_version: tmp = 1 # 默认命中 if not 'affected' in v: continue arr = v['affected'].split("Up to (excluding)\n ") if len(arr) < 2: continue vul_version = arr[1] try: for soft in v["softname"]: compare_result = self.version_compare(sys_product[soft], vul_version) if compare_result >= 0: tmp = 0 # 当有一个软件包版本不在漏洞范围内,则不命中 break if tmp == 1: # softname_list = [soft+'-'+sys_product[soft] for soft in v["softname"]] # softname_list = [{soft: sys_product[soft]} for soft in v["softname"]] self.discov_count += 1 # 扫描中发现漏洞数 softname_dict = {} for soft in v["softname"]: softname_dict[soft] = sys_product[soft] level = self.get_score_risk(vul["score"]) vul_dict = {key: vul[key] for key in ["cve_id", "vuln_name", "vuln_time", "vuln_solution"]} vul_dict["level"] = level self.score -= level # 扫描中的动态分数 if self.score < 0: self.score = 0 vul_dict["soft_name"] = softname_dict vul_dict["vuln_version"] = vul_version vul_dict["check_time"] = int(time.time()) vul_dict["reboot"] = "" if "kernel" in [k for k in v["softname"]]: vul_dict["reboot"] = "This vulnerability is a kernel vulnerability, and you need to upgrade the kernel version yourself. It is recommended to make snapshots and backups before upgrading" reboot_count += 1 if vul["cve_id"] in ignore_list: new_ignore_list.append(vul_dict) self.tmp_data['ignore'].append(vul_dict) # 添加到临时字典 break new_risk_list.append(vul_dict) self.tmp_data['risk'].append(vul_dict) # 添加到临时字典 if vul_dict["reboot"]: self.tmp_data['is_autofix'].append(vul_dict["cve_id"]) break # cp_list.append(vul["cve_id"]+': '+str([soft+'-'+sys_product[soft] for soft in v["softname"]])+' >= '+vul_version) except Exception as e: # error_list.append(vul["cve_id"]+': '+str(e)) break result_dict["vul_count"] = vul_count result_dict["risk"] = new_risk_list result_dict["ignore"] = new_ignore_list # result_dict["reboot"] = self.__need_reboot # result_dict["error"] = error_list # result_dict["compare"] = cp_list public.WriteFile(self.__path + '/system_scan_time', str(int(time.time()))) public.WriteFile(self._vuln_result, json.dumps(result_dict)) # try: # public.WriteFile(self._vuln_result, json.dumps(result_dict)) # return public.returnMsg(True, public.lang("扫描完成")) # except: # return public.returnMsg(False, public.lang("扫描失败")) # 版本比较 def version_compare(self, ver_a, ver_b): ''' 比较版本大小 :param ver_a: 软件版本 :param ver_b: 漏洞版本 :return: int 大于等于返回1或0,小于返回-1 ''' sys_version = self.get_sys_version() if "ubuntu" in sys_version or "debian" in sys_version: if ver_b.startswith("1:"): ver_b = ver_b[2:] # if ver_a.startswith("1:"): # ver_a = ver_a[2:] result = public.ExecShell("dpkg --compare-versions " + ver_a + " ge " + ver_b + " && echo true") if 'warning' in result[1].strip(): return None if 'true' in result[0].strip(): return 1 else: return -1 return self.vercmp(ver_a, ver_b) def vercmp(self, first, second): import re R_NONALNUMTILDE = re.compile(br"^([^a-zA-Z0-9~]*)(.*)$") R_NUM = re.compile(br"^([\d]+)(.*)$") R_ALPHA = re.compile(br"^([a-zA-Z]+)(.*)$") first = first.encode("ascii", "ignore") second = second.encode("ascii", "ignore") while first or second: m1 = R_NONALNUMTILDE.match(first) m2 = R_NONALNUMTILDE.match(second) m1_head, first = m1.group(1), m1.group(2) m2_head, second = m2.group(1), m2.group(2) if m1_head or m2_head: continue if first.startswith(b'~'): if not second.startswith(b'~'): return -1 first, second = first[1:], second[1:] continue if second.startswith(b'~'): return 1 if not first or not second: break m1 = R_NUM.match(first) if m1: m2 = R_NUM.match(second) if not m2: return 1 isnum = True else: m1 = R_ALPHA.match(first) m2 = R_ALPHA.match(second) isnum = False if not m1: return -1 if not m2: return 1 if isnum else -1 m1_head, first = m1.group(1), m1.group(2) m2_head, second = m2.group(1), m2.group(2) if isnum: m1_head = m1_head.lstrip(b'0') m2_head = m2_head.lstrip(b'0') m1hlen = len(m1_head) m2hlen = len(m2_head) if m1hlen < m2hlen: return -1 if m1hlen > m2hlen: return 1 if m1_head < m2_head: return -1 if m1_head > m2_head: return 1 continue m1len = len(first) m2len = len(second) if m1len == m2len == 0: return 0 if m1len != 0: return 1 return -1 # 取系统版本 def get_sys_version(self): ''' 获取当前系统版本 :return: string ''' sys_version = "None" if os.path.exists("/etc/redhat-release"): result = public.ReadFile("/etc/redhat-release") if "CentOS Linux release 7" in result: sys_version = "centos_7" elif "CentOS Linux release 8" in result: sys_version = "centos_8" elif "CentOS Stream release 8" in result: sys_version = "centos_8_stream" elif os.path.exists("/etc/lsb-release"): if "Ubuntu 20.04" in public.ReadFile("/etc/lsb-release"): sys_version = "ubuntu_20.04" elif "Ubuntu 22.04" in public.ReadFile("/etc/lsb-release"): sys_version = "ubuntu_22.04" elif "Ubuntu 18.04" in public.ReadFile("/etc/lsb-release"): sys_version = "ubuntu_18.04" elif os.path.exists("/etc/debian_version"): result = public.ReadFile("/etc/debian_version") if "10." in result: sys_version = "debian_10" elif "11." in result: sys_version = "debian_11" elif "12." in result: sys_version = "debian_12" return sys_version def new_get_sys_product(self, flag=False): ''' 新版获取系统软件包及版本{"name":"version"} @param flag bool 为True时直接扫一次 ''' # 修复完成需要重新获取一次软件包版本 if flag: sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) sys_version = self.get_sys_version() if sys_version == "centos_7": # 根据yum日志判断是否用软件包更新 yumlog = "/var/log/yum.log" elif sys_version == "centos_8" or sys_version == "centos_8_stream": yumlog = "/var/log/dnf.log" else: yumlog = "/var/log/apt/history.log" try: # 先判断有没有yum.log if os.path.exists(yumlog): # 第一次将yum.log修改时间记录在文件里 if not os.path.exists(self.yum_time): new_modi_time = str(int(os.path.getmtime(yumlog))) public.WriteFile(self.yum_time, new_modi_time) sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) else: old_modi_time = public.ReadFile(self.yum_time) new_modi_time = str(int(os.path.getmtime(yumlog))) # 比较上一次记录的时间和这次获取的修改时间,相等则根据rpm文件的修改时间 if old_modi_time == new_modi_time: if os.path.exists(self.product_version): # 若rpm文件修改日期大于七天,则还是执行rpm检测 if self.is_file_too_old(self.product_version, 7): sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) else: sys_product = json.loads(public.ReadFile(self.product_version)) else: sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) # 不相等证明最近有用yum安装过新软件,更新文件并检测 else: sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) # 将软件包版本写入文件 public.WriteFile(self.yum_time, new_modi_time) # 没有yum.log,则直接根据rpm文件修改日期 else: if os.path.exists(self.product_version): if self.is_file_too_old(self.product_version, 7): sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) else: sys_product = json.loads(public.ReadFile(self.product_version)) else: sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) except Exception as e: sys_product = self.get_sys_product() public.WriteFile(self.product_version, json.dumps(sys_product)) # 其他系统版本 return sys_product # 取软件包版本 def get_sys_product(self): """ 获取系统软件包及版本 {"name":"version"} :return dict 如果系统不支持则返回str None """ product_version = {} sys_version = self.get_sys_version() # if sys_version == 'None':return public.returnMsg(False, public.lang("当前系统暂不支持")) if "centos" in sys_version: result = public.ExecShell('rpm -qa --qf \'%{NAME};%{VERSION}-%{RELEASE}\\n\'')[0].strip().split('\n') elif "ubuntu" in sys_version: # result1 = subprocess.check_output(['dpkg-query', '-W', '-f=${Package};${Version}\n']).decode('utf-8').strip().split('\n') result = public.ExecShell('dpkg-query -W -f=\'${Package};${Version}\n\'')[0].strip().split('\n') elif "debian" in sys_version: result = public.ExecShell('dpkg-query -W -f=\'${Package};${Version}\n\'')[0].strip().split('\n') elif sys_version == "None": return None else: return None for pkg in result: try: product_version[pkg.split(";")[0]] = pkg.split(";")[1] except: return None # product_version["kernel"] = subprocess.check_output(['uname', '-r']).decode('utf-8').strip().replace(".x86_64", "") product_version["kernel"] = public.ExecShell('uname -r')[0].strip() return product_version def get_vuln_result(self): ''' 获取上一次扫描结果 :param get: :return: dict ''' d_risk = 0 h_risk = 0 m_risk = 0 vul_list = [] if not os.path.exists(self.__vul_list): self.vul_num = 0 else: self.vul_num = len(self.get_vul_list()) if not os.path.exists(self._vuln_result): tmp_dict = {"vul_count": self.vul_num, "risk": [], "ignore": [], "count": {"serious": 0, "high_risk": 0, "moderate_risk": 0}, "msg": "", "repair_count": {"all_count": 0, "today_count": 0}, "all_check_time": "", "ignore_count": 0} if os.path.exists("/etc/redhat-release"): result = public.ReadFile("/etc/redhat-release") if "CentOS Linux release 8" in result: # tmp_dict["msg"] = "当前系统【centos_8】官方已停止维护,为了安全起见,建议升级至centos 8 stream\n详情参考教程:https://www.bt.cn/bbs/thread-82931-1-1.html" tmp_dict["msg"] = "The current system [centos_8] has been officially stopped maintenance, for security purposes, it is recommended to upgrade to centos 8 stream" return tmp_dict if public.ReadFile(self._vuln_result) == '[]': tmp_dict = {"vul_count": self.vul_num, "risk": [], "ignore": [], "count": {"serious": 0, "high_risk": 0, "moderate_risk": 0}, "msg": "", "repair_count": {"all_count": 0, "today_count": 0}, "all_check_time": "", "ignore_count": 0} if os.path.exists("/etc/redhat-release"): result = public.ReadFile("/etc/redhat-release") if "CentOS Linux release 8" in result: # tmp_dict["msg"] = "当前系统【centos_8】官方已停止维护,为了安全起见,建议升级至centos 8 stream\n详情参考教程:https://www.bt.cn/bbs/thread-82931-1-1.html" tmp_dict["msg"] = "The current system [centos_8] has been officially stopped maintenance, for security purposes, it is recommended to upgrade to centos 8 stream" return tmp_dict result_dict = json.loads(public.ReadFile(self._vuln_result)) old_risk_list = result_dict["risk"] old_ignore_list = result_dict["ignore"] new_risk_list = old_risk_list.copy() new_ignore_list = old_ignore_list.copy() tmp_ignore_list = self.get_ignore_list() for cve in old_risk_list: if cve["cve_id"] in tmp_ignore_list: new_ignore_list.append(cve) new_risk_list.remove(cve) for cve_ig in old_ignore_list: if cve_ig["cve_id"] not in tmp_ignore_list: new_risk_list.append(cve_ig) new_ignore_list.remove(cve_ig) for vul in new_ignore_list + new_risk_list: vul_list.append(vul["cve_id"]) if vul["cve_id"] in tmp_ignore_list: continue if vul["level"] == 3: d_risk += 1 elif vul["level"] == 2: h_risk += 1 elif vul["level"] == 1: m_risk += 1 list_sort = [3, 2, 1] # 排序列表 # result_dict["risk"] = old_risk_list result_dict["risk"] = sorted(new_risk_list, key=lambda x: list_sort.index(x.get("level"))) # result_dict["ignore"] = old_ignore_list result_dict["ignore"] = sorted(new_ignore_list, key=lambda x: list_sort.index(x.get("level"))) # result_dict["reboot"] = self.__need_reboot result_dict["count"] = {"serious": d_risk, "high_risk": h_risk, "moderate_risk": m_risk} result_dict["msg"] = "" result_dict["repair_count"] = self.count_repair(vul_list) result_dict["all_check_time"] = public.ReadFile(self.__path + '/system_scan_time') result_dict["ignore_count"] = len(tmp_ignore_list) if os.path.exists("/etc/redhat-release"): result = public.ReadFile("/etc/redhat-release") if "CentOS Linux release 8" in result: result_dict[ "msg"] = "The current system [centos_8] has been officially stopped maintenance, for security purposes, it is recommended to upgrade to centos 8 stream" public.WriteFile(self._vuln_result, json.dumps(result_dict)) return result_dict # 按分数评等级 def get_score_risk(self, score): ''' 拿到分数,返回危险等级 :param score: :return: int 若没有符合的分数就报错,需要捕获异常 ''' if float(score) >= 9.0: risk = 3 elif float(score) >= 7.0: risk = 2 elif float(score) >= 6.0: risk = 1 return risk def get_vul_list(self): return json.loads(public.ReadFile(self.__vul_list)) def get_ignore_list(self): return json.loads(public.ReadFile(self._vuln_ignore)) def set_vuln_ignore(self, args): ''' 设置忽略指定cve,若已在列表里,则删除,不在列表里则添加 :param args: :return: dict {status:true,msg:'设置成功/失败'} ''' cve_list = json.loads(args.cve_list.strip()) ignore_list = self.get_ignore_list() for cl in cve_list: if cl in ignore_list: ignore_list.remove(cl) else: ignore_list.append(cl) public.WriteFile(self._vuln_ignore, json.dumps(ignore_list)) # public.WriteFile(self.__result, json.dumps(result_dict)) return public.return_message(0, 0, public.lang("successfully set!")) def count_repair(self, now_list): ''' 获取总共修复漏洞的数量以及今日修复漏洞数量 :param now_list: :return: dict ''' cve_dict = {} if not os.path.exists(self.__repair_count): cve_dict["all_cve"] = now_list cve_dict["today_cve"] = now_list cve_dict["time"] = int(time.time()) public.WriteFile(self.__repair_count, json.dumps(cve_dict)) cve_dict = json.loads(public.ReadFile(self.__repair_count)) cve_dict["all_cve"].extend(set(now_list) - set(cve_dict["all_cve"])) all_count = len(cve_dict["all_cve"]) - len(now_list) cve_dict["today_cve"].extend(set(now_list) - set(cve_dict["today_cve"])) today_count = len(cve_dict["today_cve"]) - len(now_list) # if cve_dict["time"].split(" ")[0] != self.get_time().split(" ")[0]: # cve_dict["today_cve"] = now_list cve_dict["time"] = int(time.time()) public.WriteFile(self.__repair_count, json.dumps(cve_dict)) return {"all_count": all_count, "today_count": today_count} def get_time(self): return public.format_date() def check_cve(self, args): ''' 检测单个漏洞 :param args: :return: dict ''' sys_product = self.get_sys_product() if not sys_product: return public.returnMsg(True, public.lang("Detection failed")) cve_id = args.cve_id.strip() result_dict = json.loads(public.ReadFile(self._vuln_result)) risk_list = result_dict["risk"] ignore_list = result_dict["ignore"] tmptmp = 1 for cve in risk_list: if cve["cve_id"] == cve_id: tmp = 1 # 默认命中漏洞 cve["check_time"] = int(time.time()) for soft in list(cve["soft_name"].keys()): if self.version_compare(sys_product[soft], cve["vuln_version"]) >= 0: tmp = 0 # 当有一个软件包不命中,则为已修复 tmptmp = 0 break if tmp == 0: risk_list.remove(cve) for cve in ignore_list: if cve["cve_id"] == cve_id: tmp = 1 # 默认命中漏洞 cve["check_time"] = int(time.time()) for soft in list(cve["soft_name"].keys()): if self.version_compare(sys_product[soft], cve["vuln_version"]) >= 0: tmp = 0 # 当有一个软件包不命中,则为已修复 tmptmp = 0 break if tmp == 0: ignore_list.remove(cve) result_dict["risk"] = risk_list result_dict["ignore"] = ignore_list public.WriteFile(self._vuln_result, json.dumps(result_dict)) if tmptmp == 0: return public.returnMsg(True, public.lang("It has been retested.")) else: return public.returnMsg(True, public.lang("It has been retested.")) def compare_md5(self): ''' 对比md5,更新漏洞库 :return: ''' import requests # try: # new_md5 = requests.get("https://www.bt.cn/vulscan_d11ad1fe99a5f078548b0ea355db42dc.txt").text # except: # return 0 # old_md5 = public.FileMd5(self.__vul_list) # if old_md5 != new_md5 or not os.path.exists(self.__vul_list): if not os.path.exists(self.__vul_list): try: public.downloadFile("{}/install/src/high_risk_vul.zip".format(public.get_url()), self.__path + "/high_risk_vul.zip") public.ExecShell("unzip -o {}/high_risk_vul.zip -d {}/".format(self.__path, self.__path)) except: return 0 return 1 def get_logs(self, get): ''' 获取升级日志 :param get: :return: dict ''' import files return public.returnMsg(True, files.files().GetLastLine(self.__path + '/log.txt', 20)) def record_times(self): ''' 记录近七日扫描次数 ''' date_obj = datetime.datetime.now() weekday = datetime.datetime.now().weekday() if not os.path.exists("/www/server/panel/data/warning_report/record.json"): tmp = {"scan": [], "repair": []} for i in range(6, -1, -1): last_date = (date_obj - datetime.timedelta(days=i)).strftime("%Y/%m/%d") tmp["scan"].append({"date": last_date, "times": 0}) tmp["repair"].append({"date": last_date, "times": 0}) public.WriteFile("/www/server/panel/data/warning_report/record.json", json.dumps(tmp)) with open("/www/server/panel/data/warning_report/record.json", "r") as f: record = json.load(f) if record["scan"][weekday]["date"] == datetime.datetime.now().strftime("%Y/%m/%d"): record["scan"][weekday]["times"] += 1 else: record["scan"][weekday]["date"] = datetime.datetime.now().strftime("%Y/%m/%d") record["scan"][weekday]["times"] = 1 public.WriteFile("/www/server/panel/data/warning_report/record.json", json.dumps(record)) def get_scan_bar(self, args): ''' 获取扫描进度条 @param args: @return: int ''' if not os.path.exists(self.__path + '/bar.txt'):return 0 data = json.loads(public.ReadFile(self.__path + '/bar.txt')) return public.return_message(0,0,data) def kill_get_list(self, args): ''' 杀掉扫描进程 @param args: @return: ''' if not os.path.exists(self.__path + '/pid.txt'): return public.return_message(-1,0,"Interrupt failure") pid = public.ReadFile(self.__path + '/pid.txt') err = public.ExecShell("kill -9 {}".format(str(pid)))[1].strip() if err: return public.return_message(-1,0,"Interrupt failure") else: public.WriteFile(self.__path + '/kill.pl', "True") return public.return_message(0,0,'Interrupt successfully') def dump_tmp_result(self): ''' 动态保存结果 @param args: @return: ''' public.WriteFile(self.__path + '/tmp_result.json', json.dumps(self.tmp_data)) def get_tmp_result(self, args): ''' 获取中途中断结果 @param args: @return: ''' if not os.path.exists(self.__path + '/tmp_result.json'): return "err" return public.return_message(0,0,json.loads(public.ReadFile(self.__path + '/tmp_result.json'))) def new_system_scan2(self): ''' 新版系统dpkg软件包漏洞检测 ''' if not os.path.exists(self.new_vul_list): return # 加载漏洞库文件 try: vul_json = json.loads(public.ReadFile(self.new_vul_list)) packages_rule = vul_json['Packages'] detail = vul_json['Detail'] except Exception as e: return sys_product = self.new_get_sys_product() if sys_product is None: return dpkg = Dpkg # 获取DPKG对象 # 符合 systemscan_result = {} # 开始检测 # 第一层遍历系统软件包 for pk, ver in sys_product.items(): # 跳过内核漏洞检测 # if pk.startswith("kernel"): # continue # 是否有历史漏洞 if pk in packages_rule: # 第二层遍历比较软件包涉及的漏洞版本 for rule in packages_rule[pk]: # 判断软件包版本是否存在主版本号,有则漏洞版本一起保留主版本号,否则去掉漏洞版本的主版本号 pk_ver, vul_ver = self.adjust_ver(ver, rule[0]) try: cp_result = dpkg.compare_versions(pk_ver, vul_ver) except: continue # 任意一个命中 if cp_result == -1: if rule[1] not in systemscan_result: systemscan_result[rule[1]] = detail[str(rule[1])] systemscan_result[rule[1]]["impact"] = [{"package": pk, "version": pk_ver, "vul_ver": vul_ver}] else: systemscan_result[rule[1]]["impact"].append({"package": pk, "version": pk_ver, "vul_ver": vul_ver}) # 为了兼容旧版本再次做处理 for sr in systemscan_result.values(): one_risk = {} one_risk["title"] = "【{}】Linux系统安全漏洞编号".format(sr["ref_id"]) one_risk["data"] = "2023-12-08" one_risk["help"] = "" one_risk["ignore"] = False level = self.new_severity_to_num(sr["severity"]) one_risk["level"] = level one_risk["m_name"] = sr["ref_id"] pk_list = [] # 判断是否有内核漏洞在里面 is_kernel = False soft_list = [] for impact in sr["impact"]: if impact["package"].startswith("kernel"): is_kernel = True continue soft_list.append("{} Versions below {}".format(impact["package"]+"-"+impact["version"], impact["vul_ver"])) pk_list.append(impact["package"]) if is_kernel: continue one_risk["msg"] = "Security vulnerabilities are found in the following system software:<br>{}<br>Vulnerabilities involved:{}<br>Please refer to the official announcement for details:{}".format('<br>'.join(soft_list), '、'.join(sr["cve"]),sr["ref_url"]) one_risk["ps"] = "【{}】Linux system vulnerability security vulnerability number".format(sr["ref_id"]) one_risk["remind"] = "Fixing vulnerabilities has certain risks, so it is recommended to take a good system snapshot to prevent system operation from being affected." one_risk["status"] = False one_risk["taking"] = 0.000001 one_risk["tips"] = ["Update the software to a safe version according to the risk description", "Or click [One-click Repair] to solve all security issues"] one_risk["version"] = 1 one_risk["type"] = "vulnerability" one_risk["package"] = pk_list # 存储结果 self.data["risk"].append(one_risk) self.data["is_autofix"].append(one_risk["m_name"]) # 扫描中发现的漏洞数 self.discov_count += 1 # 扫描中的动态分数 self.score -= level if self.score < 0: self.score = 0 # 扫描中的动态风险 self.tmp_data['risk'].append(one_risk) # 可修复项 self.tmp_data['is_autofix'].append(one_risk["m_name"]) def new_system_scan(self): ''' 新版系统rpm软件包漏洞检测 提升扫描速度 :param get: :return: dict ''' # 判断新漏洞库存不存在 if not os.path.exists(self.new_vul_list): return context = {"status": "Checking system software", "percentage": 0, "count": 0, "score": 100} public.WriteFile(self.__path + '/bar.txt', json.dumps(context)) sys_product = self.new_get_sys_product() # 加载漏洞库文件 try: vul_json = json.loads(public.ReadFile(self.new_vul_list)) packages_rule = vul_json['Packages'] detail = vul_json['Detail'] except Exception as e: return if sys_product is None: return # 符合 systemscan_result = {} # 开始检测 # 第一层遍历系统软件包 for pk, ver in sys_product.items(): # 跳过内核漏洞检测 # if pk.startswith("kernel"): # continue # 是否有历史漏洞 if pk in packages_rule: # 第二层遍历比较软件包涉及的漏洞版本 for rule in packages_rule[pk]: # 判断软件包版本是否存在主版本号,有则漏洞版本一起保留主版本号,否则去掉漏洞版本的主版本号 pk_ver, vul_ver = self.adjust_ver(ver, rule[0]) cp_result = self.vercmp(pk_ver, vul_ver) if cp_result == -1: if rule[1] not in systemscan_result: systemscan_result[rule[1]] = detail[str(rule[1])] systemscan_result[rule[1]]["impact"] = [{"package": pk, "version": pk_ver, "vul_ver": vul_ver}] else: systemscan_result[rule[1]]["impact"].append({"package": pk, "version": pk_ver, "vul_ver": vul_ver}) # public.WriteFile("/tmp/centos7_result.json", json.dumps(systemscan_result, indent=4)) # 为了兼容旧版本再次做处理 for sr in systemscan_result.values(): one_risk = {} one_risk["title"] = "【{}】Linux system vulnerability security notice".format(sr["ref_id"]) one_risk["data"] = "2023-12-08" one_risk["help"] = "" one_risk["ignore"] = False level = self.new_severity_to_num(sr["severity"]) one_risk["level"] = level one_risk["m_name"] = sr["ref_id"] pk_list = [] # 判断是否有内核漏洞在里面 is_kernel = False soft_list = [] for impact in sr["impact"]: if impact["package"].startswith("kernel"): is_kernel = True continue soft_list.append("{} Versions below {}".format(impact["package"]+"-"+impact["version"], impact["vul_ver"])) pk_list.append(impact["package"]) if is_kernel: continue one_risk["msg"] = "The following system software was found to have security vulnerabilities:<br>{}<br>Vulnerabilities involved:{}<br>Refer to the official announcement for details:{}".format('<br>'.join(soft_list), '、'.join(sr["cve"]),sr["ref_url"]) one_risk["ps"] = "【{}】Linux system vulnerability security notice".format(sr["ref_id"]) one_risk["remind"] = "Fixing vulnerabilities has certain risks, so it is recommended to take a good system snapshot to prevent system operation from being affected." one_risk["status"] = False one_risk["taking"] = 0.000001 one_risk["tips"] = ["Update the software to a safe version according to the risk description", "Or click [One-click Repair] to solve all security issues"] one_risk["version"] = 1 one_risk["type"] = "vulnerability" one_risk["package"] = pk_list # 存储结果 self.data["risk"].append(one_risk) self.data["is_autofix"].append(one_risk["m_name"]) # 扫描中发现的漏洞数 self.discov_count += 1 # 扫描中的动态分数 self.score -= level if self.score < 0: self.score = 0 # 扫描中的动态风险 self.tmp_data['risk'].append(one_risk) # 可修复项 self.tmp_data['is_autofix'].append(one_risk["m_name"]) # result_json = { # "vul_count": len(detail), # "risk": [], # "ignore": [], # "all_check_time": "", # "ignore_count": 0, # "msg": "", # "repair_count": {"all_count": 0, "today_vount": 0}, # } # for sr in systemscan_result.values(): # one_risk = { # "cve_id": "", # "vuln_name": "", # "vuln_time": "2021-12-16", # "vuln_solution": "", # "level": 1, # "soft_name": {}, # "vuln_version": "", # "check_time": 1701910962, # "reboot": "" # } # one_risk["cve_id"] = sr["ref_id"] # one_risk["vuln_name"] = "【{}】Linux软件安全公告".format(sr["ref_id"]) # one_risk["vuln_solution"] = "更新涉及软件补丁,具体信息参考官方公告{}".format(sr["ref_url"]) # level = self.new_severity_to_num(sr["severity"]) # one_risk["level"] = level # # 处理受影响的软件包(为了兼容旧版本暂时这样) # soft_name = {} # vuln_version = "" # for impact in sr["impact"]: # soft_name[impact["package"]] = impact["version"] # vuln_version = impact["vul_ver"] # one_risk["soft_name"] = soft_name # one_risk["vuln_version"] = vuln_version # risk_list.append(one_risk) # # # 扫描中发现的漏洞数 # self.discov_count += 1 # # 扫描中的动态分数 # self.score -= level # if self.score < 0: # self.score = 0 # # 扫描中的动态风险 # self.tmp_data['risk'].append(one_risk) # # 可修复项 # self.tmp_data['is_autofix'].append(one_risk["cve_id"]) # # result_json["risk"] = risk_list # public.WriteFile(self.__path + '/system_scan_time', int(time.time())) # public.WriteFile(self._vuln_result, json.dumps(result_json)) def is_file_too_old(self, file_path, days): """ 判断文件是否过于陈旧 :param file_path: 文件路径 :param days: 超过的天数 :return: bool """ mtime = os.path.getmtime(file_path) # 不存在直接返回True if mtime is None: return True mod_time = datetime.datetime.fromtimestamp(mtime) days_old = datetime.datetime.now() - mod_time return days_old.days > days def adjust_ver(self, ver_a, ver_b): ''' 确保两个版本主版本号统一,一方存在另一方不存在则删除主版本,要么都有,要么都没有 ''' if ":" in ver_a: if ":" in ver_b: ver_1 = ver_a else: ver_1 = ver_a.split(":")[1] ver_2 = ver_b else: if ":" in ver_b: ver_2 = ver_b.split(":")[1] else: ver_2 = ver_b ver_1 = ver_a return ver_1, ver_2 def new_severity_to_num(self, severity): ''' 将漏洞级别转变成数字 ''' if severity == "Critical": return 3 elif severity == "Important": return 2 elif severity == "Moderate": return 1 elif severity == "Low": return 1 elif severity == "High": return 3 elif severity == "Medium": return 2 else: return 2 # def new_rpmvercmp(self, sys_ver, vul_ver): # output, err = public.ExecShell("rpmdev-vercmp {} {}".format(sys_ver, vul_ver)) # if err != '': # return 1 # output = output.strip() # if output == "{} > {}".format(sys_ver, vul_ver): # return 1 # elif output == "{} == {}".format(sys_ver, vul_ver): # return 0 # elif output == "{} < {}".format(sys_ver, vul_ver): # return -1 # else: # return 1 class Dpkg: def __init__(self): self._fileinfo = None self._control_str = None self._headers = None self._message = None self._upstream_version = None self._debian_revision = None self._epoch = None @staticmethod def get_epoch(version_str): try: e_index = version_str.index(":") except ValueError: return 0, version_str try: epoch = int(version_str[0:e_index]) except ValueError as ex: print(f"Corrupt dpkg version '{version_str}': epochs can only be ints, and " "epochless versions cannot use the colon character.") return epoch, version_str[e_index + 1:] @staticmethod def get_upstream(version_str): try: d_index = version_str.rindex("-") except ValueError: return version_str, "0" return version_str[0:d_index], version_str[d_index + 1:] @staticmethod def split_full_version(version_str): epoch, full_ver = Dpkg.get_epoch(version_str) upstream_rev, debian_rev = Dpkg.get_upstream(full_ver) return epoch, upstream_rev, debian_rev @staticmethod def get_alphas(revision_str): for i, char in enumerate(revision_str): if char.isdigit(): if i == 0: return "", revision_str return revision_str[0:i], revision_str[i:] return revision_str, "" @staticmethod def get_digits(revision_str): if not revision_str: return 0, "" for i, char in enumerate(revision_str): if not char.isdigit(): if i == 0: return 0, revision_str return int(revision_str[0:i]), revision_str[i:] return int(revision_str), "" @staticmethod def listify(revision_str): result = [] while revision_str: rev_1, remains = Dpkg.get_alphas(revision_str) rev_2, remains = Dpkg.get_digits(remains) result.extend([rev_1, rev_2]) revision_str = remains return result @staticmethod def dstringcmp(a, b): if a == b: return 0 try: for i, char in enumerate(a): if char == b[i]: continue if char == "~": return -1 if b[i] == "~": return 1 if char.isalpha() and not b[i].isalpha(): return -1 if not char.isalpha() and b[i].isalpha(): return 1 if ord(char) > ord(b[i]): return 1 if ord(char) < ord(b[i]): return -1 except IndexError: if char == "~": return -1 return 1 if b[len(a)] == "~": return 1 return -1 @staticmethod def compare_revision_strings(rev1, rev2): if rev1 == rev2: return 0 list1 = Dpkg.listify(rev1) list2 = Dpkg.listify(rev2) if list1 == list2: return 0 try: for i, item in enumerate(list1): if i >= len(list2): raise IndexError if not isinstance(item, list2[i].__class__): print(f"Cannot compare '{item}' to {list2[i]}, something has gone horribly awry.") if item == list2[i]: continue if isinstance(item, int): if item > list2[i]: return 1 if item < list2[i]: return -1 else: return Dpkg.dstringcmp(item, list2[i]) except IndexError: if list1[len(list2)][0][0] == "~": return -1 return 1 if list2[len(list1)][0][0] == "~": return 1 return -1 @staticmethod def compare_versions(ver1, ver2): if ver1 == ver2: return 0 epoch1, upstream1, debian1 = Dpkg.split_full_version(str(ver1)) epoch2, upstream2, debian2 = Dpkg.split_full_version(str(ver2)) if epoch1 < epoch2: return -1 if epoch1 > epoch2: return 1 upstr_res = Dpkg.compare_revision_strings(upstream1, upstream2) if upstr_res != 0: return upstr_res debian_res = Dpkg.compare_revision_strings(debian1, debian2) if debian_res != 0: return debian_res return 0 if __name__ == "__main__": # st = time.time() panel = panelWarning() # panel.new_system_scan() public.WriteFile('/www/server/panel/data/warning/resultresult.json', json.dumps(panel._get_list())) # et = time.time() # #coding: utf-8 # # +------------------------------------------------------------------- # # | aaPanel # # +------------------------------------------------------------------- # # | Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # # +------------------------------------------------------------------- # # | Author: hwliang <2020-08-04> # # +------------------------------------------------------------------- # # import os,sys,json,time,public # # class panelWarning: # __path = '/www/server/panel/data/warning' # __ignore = __path + '/ignore' # __result = __path + '/result' # __risk = __path + '/risk' # def __init__(self): # if not os.path.exists(self.__ignore): # os.makedirs(self.__ignore,384) # if not os.path.exists(self.__result): # os.makedirs(self.__result,384) # if not os.path.exists(self.__risk): # os.makedirs(self.__risk, 384) # # def get_list(self,args): # p = public.get_modules('class/safe_warning') # data = { # 'security':[], # 'risk':[], # 'ignore':[] # } # # for m_name in p.__dict__.keys(): # ignore_file = self.__ignore + '/' + m_name + '.pl' # # 忽略的检查项 # if p[m_name]._level == 0: continue # # m_info = { # 'title': p[m_name]._title, # 'm_name': m_name, # 'ps': p[m_name]._ps, # 'version': p[m_name]._version, # 'level': p[m_name]._level, # 'ignore': p[m_name]._ignore, # 'date': p[m_name]._date, # 'tips': p[m_name]._tips, # 'help': p[m_name]._help # } # result_file = self.__result + '/' + m_name + '.pl' # # try: # s_time = time.time() # m_info['status'],m_info['msg'] = p[m_name].check_run() # m_info['taking'] = round(time.time() - s_time,6) # m_info['check_time'] = int(time.time()) # public.writeFile(result_file,json.dumps([m_info['status'],m_info['msg'],m_info['check_time'],m_info['taking']],)) # except: # continue # # m_info['ignore'] = os.path.exists(ignore_file) # if m_info['ignore']: # data['ignore'].append(m_info) # else: # if m_info['status']: # data['security'].append(m_info) # else: # risk_file = self.__risk + '/' + m_name + '.pl' # public.writeFile(risk_file, json.dumps(m_info)) # data['risk'].append(m_info) # # data['risk'] = sorted(data['risk'],key=lambda x: x['level'],reverse=True) # data['security'] = sorted(data['security'],key=lambda x: x['level'],reverse=True) # data['ignore'] = sorted(data['ignore'],key=lambda x: x['level'],reverse=True) # # 获取支持一键修复的列表 # try: # is_autofix = public.read_config("safe_autofix") # except: # is_autofix = [] # data['is_autofix'] = is_autofix # return data # # # def sync_rule(self): # ''' # @name 从云端同步规则 # @author hwliang<2020-08-05> # @return void # ''' # # try: # # dep_path = '/www/server/panel/class/safe_warning' # # local_version_file = self.__path + '/version.pl' # # last_sync_time = local_version_file = self.__path + '/last_sync.pl' # # if os.path.exists(dep_path): # # if os.path.exists(last_sync_time): # # if int(public.readFile(last_sync_time)) > time.time(): # # return # # else: # # if os.path.exists(local_version_file): os.remove(local_version_file) # # # download_url = public.get_url() # # version_url = download_url + '/install/warning/version.txt' # # cloud_version = public.httpGet(version_url) # # if cloud_version: cloud_version = cloud_version.strip() # # # local_version = public.readFile(local_version_file) # # if local_version: # # if cloud_version == local_version: # # return # # # tmp_file = '/tmp/bt_safe_warning.zip' # # public.ExecShell('wget -O {} {} -T 5'.format(tmp_file,download_url + '/install/warning/safe_warning.zip')) # # if not os.path.exists(tmp_file): # # return # # # if os.path.getsize(tmp_file) < 2129: # # os.remove(tmp_file) # # return # # # if not os.path.exists(dep_path): # # os.makedirs(dep_path,384) # # public.ExecShell("unzip -o {} -d {}/ >/dev/null".format(tmp_file,dep_path)) # # public.writeFile(local_version_file,cloud_version) # # public.writeFile(last_sync_time,str(int(time.time() + 7200))) # # if os.path.exists(tmp_file): os.remove(tmp_file) # # public.ExecShell("chmod -R 600 {}".format(dep_path)) # # except: # # pass # # # # def set_ignore(self,args): # ''' # @name 设置指定项忽略状态 # @author hwliang<2020-08-04> # @param dict_obj { # m_name<string> 模块名称 # } # @return dict # ''' # m_name = args.m_name.strip() # ignore_file = self.__ignore + '/' + m_name + '.pl' # if os.path.exists(ignore_file): # os.remove(ignore_file) # else: # public.writeFile(ignore_file,'1') # return public.returnMsg(True, public.lang("Successfully set!")) # # def check_find(self, args): # ''' # @name 检测指定项 # @author hwliang<2020-08-04> # @param dict_obj { # m_name<string> 模块名称 # } # @return dict # ''' # try: # m_name = args.m_name.strip() # p = public.get_modules('class/safe_warning') # m_info = { # 'title': p[m_name]._title, # 'm_name': m_name, # 'ps': p[m_name]._ps, # 'version': p[m_name]._version, # 'level': p[m_name]._level, # 'ignore': p[m_name]._ignore, # 'date': p[m_name]._date, # 'tips': p[m_name]._tips, # 'help': p[m_name]._help # } # # # 解决已经在忽略列表中,但是如果仍然需要检查的话可以检查 # ignore_file = self.__ignore + '/' + m_name + '.pl' # if os.path.exists(ignore_file): # from cachelib import SimpleCache # cache = SimpleCache(5000) # ikey = 'warning_list' # cache.delete(ikey) # os.remove(ignore_file) # # result_file = self.__result + '/' + m_name + '.pl' # s_time = time.time() # m_info['status'], m_info['msg'] = p[m_name].check_run() # m_info['taking'] = round(time.time() - s_time, 4) # m_info['check_time'] = int(time.time()) # public.writeFile(result_file, json.dumps( # [m_info['status'], m_info['msg'], m_info['check_time'], m_info['taking']])) # return public.returnMsg(True, public.lang("Retested")) # except: # return public.returnMsg(False, public.lang("Detection failed"))