Server IP : 104.21.38.3 / Your IP : 162.158.170.62 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/wp_toolkit/ |
Upload File : |
# coding: utf-8 # +------------------------------------------------------------------- # | 宝塔Linux面板 # +------------------------------------------------------------------- # | Copyright (c) 2024-2099 宝塔软件(http://bt.cn) All rights reserved. # +------------------------------------------------------------------- # | Author: lkq <[email protected]> # +------------------------------------------------------------------- # | Wordpress 安全扫描 # +-------------------------------------------------------------------- import json, os, time import requests,re,zipfile proxies = {} import public import sys,os if "/www/server/panel/class_v2/wp_toolkit/" not in sys.path: sys.path.insert(1, "/www/server/panel/class_v2/wp_toolkit/") #进入到 import totle_db class wordpress_scan: wordpress_diff_path = "/www/wordpress_diff_path" #默认插件的头部信息 plugin_default_headers = { "Name": "Plugin Name", "PluginURI": "Plugin URI", "Version": "Version", "Description": "Description", "Author": "Author", "AuthorURI": "Author URI", "TextDomain": "Text Domain", "DomainPath": "Domain Path", "Network": "Network", "RequiresWP": "Requires at least", "RequiresPHP": "Requires PHP", "UpdateURI": "Update URI", "RequiresPlugins": "Requires Plugins", "_sitewide": "Site Wide Only" } #默认主题的头部信息 theme_default_headers = { "Name": "Theme Name", "Title": "Theme Name", "Version": "Version", "Author": "Author", "AuthorURI": "Author URI", "UpdateURI": "Update URI", "Template": "Theme Name", "Stylesheet": "Theme Name", } def check_dir(self): ''' @name 检查需要的目录是否存在 @auther lkq @time 2024-10-08 @msg 检查需要的目录是否存在 ''' if not os.path.exists(self.wordpress_diff_path): os.makedirs(self.wordpress_diff_path) if not os.path.exists(self.wordpress_diff_path + "/plugin/"): os.makedirs(self.wordpress_diff_path + "/plugin/") def M(self, table, db="wordpress_plugin"): ''' @name 获取数据库对象 @param table 表名 @param db 数据库名 ''' with totle_db.Sql(db) as sql: return sql.table(table) def get_wordpress_version(self, path): ''' @name 获取WordPress版本 @param path WordPress路径 @return dict ''' wp_version_file = path + "/wp-includes/version.php" version = {"version": "", "locale": ""} if not os.path.exists(wp_version_file): return version with open(wp_version_file, 'r', encoding='utf-8') as file: file_data = file.read() match = re.search(r"\$wp_version\s*=\s*\'(.*)\';", file_data) if match: version["version"] = match.group(1) # $wp_local_package match = re.search(r"\$wp_local_package\s*=\s*\'(.*)\';", file_data) if match: version["locale"] = match.group(1) return version def get_plugin_data(self, plugin_file, default_headers, context=''): ''' @参考:/wp-admin/includes/plugin.php get_plugin_data 代码 @name 获取插件信息 @param plugin_file 插件文件 @return dict @auther lkq @time 2024-10-08 ''' # 读取文件内容 if not os.path.exists(plugin_file): return {} # 定义8KB大小 max_length = 8 * 1024 # 8 KB try: # 读取文件的前8KB with open(plugin_file, 'r', encoding='utf-8') as file: file_data = file.read(max_length) except Exception as e: return {} # 替换CR为LF file_data = file_data.replace('\r', '\n') # 处理额外的headers extra_headers = {} if context: extra_context_headers = [] # 假设有一个函数可以获取额外的headers # extra_context_headers = get_extra_headers(context) extra_headers = dict.fromkeys(extra_context_headers, '') # 假设额外的headers all_headers = {**extra_headers, **default_headers} # 检索所有headers for field, regex in all_headers.items(): if field.startswith('_'): # 跳过以_开头的内部字段 continue match = re.search(f'{regex}:(.*)$', file_data, re.IGNORECASE | re.MULTILINE) if match: all_headers[field] = match.group(1).strip() else: all_headers[field] = '' if all_headers.get("Network") and not all_headers['Network'] and all_headers['_sitewide']: all_headers['Network'] = all_headers['_sitewide'] if all_headers.get("Network"): all_headers['Network'] = 'true' == all_headers['Network'].lower() if all_headers.get("_sitewide"): del all_headers['_sitewide'] if all_headers.get("TextDomain") and not all_headers['TextDomain']: plugin_slug = os.path.dirname(os.path.basename(plugin_file)) if '.' != plugin_slug and '/' not in plugin_slug: all_headers['TextDomain'] = plugin_slug all_headers['Title'] = all_headers['Name'] all_headers['AuthorName'] = all_headers['Author'] # 返回插件的信息 return all_headers def Md5(self,strings): """ @name 生成MD5 @author hwliang<[email protected]> @param strings 要被处理的字符串 @return string(32) """ if type(strings) != bytes: strings = strings.encode() import hashlib m = hashlib.md5() m.update(strings) return m.hexdigest() def FileMd5(self,filename): """ @name 生成文件的MD5 @author hwliang<[email protected]> @param filename 文件名 @return string(32) or False """ if not os.path.isfile(filename): return False import hashlib my_hash = hashlib.md5() f = open(filename, 'rb') while True: b = f.read(8096) if not b: break my_hash.update(b) f.close() return my_hash.hexdigest() def get_plugin(self, path,one=''): ''' @name 获取WordPress插件信息 @param path 插件路径 @return dict @auther lkq @time 2024-10-08 ''' plugin_path = path + "/wp-content/plugins" if not os.path.exists(plugin_path): return {} tmp_list = [] for file in os.listdir(plugin_path): if one: if file!=one:continue plugin_file = os.path.join(plugin_path, file) # if os.path.isfile(plugin_file) and plugin_file.endswith(".php"): # tmp_list.append(file) if os.path.isdir(plugin_file): # 读取文件夹中的第一层文件 for file2 in os.listdir(plugin_file): plugin_file2 = os.path.join(plugin_file, file2) if os.path.isfile(plugin_file2) and plugin_file2.endswith(".php"): tmp_list.append( file + "/" + file2) if len(tmp_list) == 0: return {} result = {} for i in tmp_list: plugin_file = plugin_path + "/" + i # 判断文件是否可读 if not os.access(plugin_file, os.R_OK): continue plugin_data = self.get_plugin_data(plugin_file, self.plugin_default_headers) if not plugin_data: continue if plugin_data["Name"] == "": continue #如果 name 中没/ 的话 if "/" not in i: #则判断一下 if 'wordpress.org/plugins/' in plugin_data["PluginURI"]: plugin_data["PluginURI"] = plugin_data["PluginURI"].replace('http://wordpress.org/plugins/', '').replace("http://wordpress.org/plugins/","") #去掉最后的/ if plugin_data["PluginURI"][-1]=="/": plugin_data["PluginURI"]=plugin_data["PluginURI"][:-1] i=plugin_data["PluginURI"] else: continue result[i] = plugin_data return result def get_themes(self, path): ''' @name 获取WordPress主题信息 @param path 主题路径 @return dict @auther lkq @time 2024-10-08 ''' themes_path = path + "/wp-content/themes" # 循环目录 if not os.path.exists(themes_path): return {} tmp_list = [] for file in os.listdir(themes_path): plugin_file = os.path.join(themes_path, file) if os.path.isdir(plugin_file): if os.path.exists(plugin_file + "/style.css"): tmp_list.append(file) if len(tmp_list) == 0: return {} result = {} for i in tmp_list: plugin_file = themes_path + "/" + i + "/style.css" # 判断文件是否可读 if not os.access(plugin_file, os.R_OK): continue plugin_data = self.get_plugin_data(plugin_file, self.theme_default_headers) if not plugin_data: continue if plugin_data["Name"] == "": continue result[i] = plugin_data return result def get_plugins_update(self, path): ''' @name 获取插件更新 @param path WordPress路径 @auther lkq @time 2024-10-10 @msg 获取插件更新 ''' plugin_info = self.get_plugin(path) active = [] url = 'http://api.wordpress.org/plugins/update-check/1.1/' for i in plugin_info: active.append(i) plugins = {"plugins": json.dumps({"plugins": plugin_info, "active": active}), "locale": "%5B%22zh_CN%22%5D", "all": "true", "translations": ""} headers = { 'User-Agent': "WordPress/6.6.2; http://wp471.com", 'Content-Type': 'application/x-www-form-urlencoded', } response = requests.post(url, headers=headers, data=plugins, proxies=proxies) print(response.text) def get_themes_update(self, path): ''' @name 获取主题更新 @param path WordPress路径 @auther lkq @time 2024-10-10 @msg 获取主题更新 ''' plugin_info = self.get_themes(path) active = [] url = 'http://api.wordpress.org/themes/update-check/1.1/' for i in plugin_info: active.append(i) plugins = {"themes": json.dumps({"themes": plugin_info, "active": active}), "locale": "", "all": "true", "translations": ""} headers = { 'User-Agent': "WordPress/6.6.2; http://wp471.com", 'Content-Type': 'application/x-www-form-urlencoded', } response = requests.post(url, headers=headers, data=plugins, proxies=proxies) print(response.text) def get_wordpress_update(self, path): ''' @name 获取WordPress更新 @param path WordPress路径 @auther lkq @time 2024-10-10 @msg 获取WordPress更新 ''' version = self.get_wordpress_version(path) if version["version"] == "": return url = 'http://api.wordpress.org/core/version-check/1.7/?version=' + version["version"] + '&locale=' + version[ "locale"] headers = { 'User-Agent': "WordPress/6.6.2; http://wp471.com", } response = requests.get(url, headers=headers, proxies=proxies) print(response.text) def compare_versions(self,version1, version2): ''' @name 对比版本号 @param version1 版本1 @param version2 版本2 @return int 0 相等 1 大于 -1 小于 ''' # 分割版本号为整数列表 v1 = [int(num) if num.strip() != '' else 0 for num in version1.split('.')] v2 = [int(num) if num.strip() != '' else 0 for num in version2.split('.')] # 逐个比较版本号的每个部分 for num1, num2 in zip(v1, v2): if num1 > num2: return 1 # version1 > version2 elif num1 < num2: return -1 # version1 < version2 # 如果所有部分都相同,比较长度(处理像'1.0'和'1.0.0'这样的情况) if len(v1) > len(v2): return 1 if any(num > 0 for num in v1[len(v2):]) else 0 elif len(v1) < len(v2): return -1 if any(num > 0 for num in v2[len(v1):]) else 0 # 如果完全相同 return 0 def let_identify(self,version,vlun_infos): ''' @name 对比版本号判断是否存在漏洞 @param version 当前版本 @param vlun_infos 漏洞信息 @return list ''' for i in vlun_infos: i["vlun_status"] = False #如果是小于等于的话 if i["let"]=="<=": if self.compare_versions(version,i["vlun_version"])<=0: i["vlun_status"]=True #小于 if i["let"]=="<": if self.compare_versions(version,i["vlun_version"])<0: i["vlun_status"]=True if i['let']=='-': #从某个版本开始、到某个版本结束 version_list=i["vlun_version"].split("-") if len(version_list)!=2:continue if self.compare_versions(version,version_list[0])>=0 and self.compare_versions(version,version_list[1])<=0: i["vlun_status"]=True return vlun_infos def scan(self,path): ''' @name 扫描WordPress @param path WordPress路径 @return dict @auther lkq @time 2024-10-10 @msg 通过扫描WordPress的版本、插件、主题来判断是否存在漏洞 ''' vlun_list = [] #判断文件是否存在 import os if not os.path.exists(path): return vlun_list result = {} result["version"] = self.get_wordpress_version(path) result["plugins"] = self.get_plugin(path) result["themes"] = self.get_themes(path) #扫描插件是否存在漏洞 for i in result["plugins"]: plguin=i.split("/")[0] Name=result["plugins"][i]["Name"] #检查插件是否下架了 if self.M("plugin_error","plugin_error").where("slug=?",(plguin,)).count()>0: error_status = self.M("plugin_error","plugin_error").where("slug=?",(plguin,)).field("error,name,slug,description,closed_date,reason,status").find() if type(error_status)!=dict:continue if error_status["status"]==0: vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin_closed", "load_version": "","cve": "","time":"","status":0} #时间格式转为时间戳 if len(error_status["closed_date"])<10: error_status["closed_date"]=int(time.time()) else: try: error_status["closed_date"]=int(time.mktime(time.strptime(error_status["closed_date"], "%Y-%m-%d %H:%M:%S"))) except: error_status["closed_date"]=int(time.time()) vlun["slug"]=plguin vlun["name"]=Name vlun["vlun_info"]=error_status["description"] vlun["css"]="10" vlun["load_version"]=result["plugins"][i]["Version"] vlun["time"]=error_status["closed_date"] vlun_list.append(vlun) # continue #检查插件这个插件是否好久没有更新了 if self.M("wordpress_not_update","wordpress_not_update").where("slug=?",(plguin,)).count()>0: error_status = self.M("wordpress_not_update", "wordpress_not_update").where("slug=?", (plguin,)).field( "last_time,status").find() vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin_not_update", "load_version": "", "cve": "", "time": "", "status": 0} if type(error_status)!=dict:continue #当前时间减去最后更新时间 计算出年数 if len(str(error_status["last_time"])) > 6: #当前时间减去最后更新时间 计算出年数 year = int((time.time() - error_status["last_time"]) / 60 / 60 / 24 / 365) if year >= 10: vlun["css"] = 10 elif year >= 5: vlun["css"] = 8 elif year >= 3: vlun["css"] = 6 vlun["slug"] = plguin vlun["name"] = Name vlun["vlun_info"] = "The plugin has not been updated for more than {} years".format(year) vlun["load_version"] = result["plugins"][i]["Version"] vlun["time"] = int(time.time()) vlun_list.append(vlun) if result["plugins"][i]["Version"]=="":continue #检查插件是否存在漏洞 if self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=?",(plguin,)).count()>0: vlun_infos=self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='plugin'",(plguin)).select() vlun_infos=self.let_identify(result["plugins"][i]["Version"],vlun_infos) for j2 in vlun_infos: if j2["vlun_status"]: vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin", "load_version": "","cve": "","time":""} vlun["load_version"]=result["plugins"][i]["Version"] vlun["cve"]=j2["cve"] vlun["slug"]=plguin vlun["name"] = Name vlun["vlun_info"]=j2["msg"] vlun["css"]=j2["css"] vlun["time"] = j2["data_time"] vlun_list.append(vlun) #扫描主题是否存在漏洞 for i in result["themes"]: plguin = i.split("/")[0] if result["themes"][i]["Version"] == "": continue Name = result["themes"][i]["Name"] # 检查插件是否存在漏洞 if self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='theme'", (plguin,)).count() > 0: vlun_infos = self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='theme'", (plguin)).select() vlun_infos = self.let_identify(result["themes"][i]["Version"], vlun_infos) for j2 in vlun_infos: if j2["vlun_status"]: vlun = {"name": "", "vlun_info": "", "css": "", "type": "theme", "load_version": "","cve": "","time":""} vlun["load_version"] = result["themes"][i]["Version"] vlun["cve"] = j2["cve"] vlun["slug"] = plguin vlun["name"] = Name vlun["vlun_info"] = j2["msg"] vlun["css"] = j2["css"] vlun["time"] = j2["data_time"] vlun_list.append(vlun) #检查WordPress是否存在漏洞 #使用date_time 排序取最新的15个版本 if len(result["version"]["version"])>=1 and self.M("wordpress_vulnerabilities", "wordpress_vulnerabilities").where("types=?",("core")).count() > 0: #取10个版本 vlun_infos = self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("types=?",("core")).order("data_time desc").limit("15").select() vlun_infos = self.let_identify(result["version"]["version"], vlun_infos) for j2 in vlun_infos: if j2["vlun_status"]: vlun = {"name": "", "vlun_info": "", "css": "", "type": "core", "load_version": "","cve": "","time":""} vlun["load_version"] = result["version"]["version"] vlun["cve"] = j2["cve"] vlun["slug"] = "WordPress" vlun["name"] = "WordPress" vlun["vlun_info"] = j2["msg"] vlun["css"] = j2["css"] vlun["time"]=j2["data_time"] vlun_list.append(vlun) #忽略 ignore_path = "/www/server/panel/data/wordpress_ignore_vuln.json" if os.path.exists(ignore_path): try: ignore_infos = json.loads(public.readFile(ignore_path)) except: ignore_infos = {} else: ignore_infos = {} if path in ignore_infos: for i in ignore_infos[path]: for i2 in vlun_list: if i["type"]=="plugin_closed": if i["slug"]==i2["slug"]: vlun_list.remove(i2) if i["type"]=="plugin_not_update": if i["slug"]==i2["slug"]: vlun_list.remove(i2) if i["slug"]==i2["slug"] and i["name"]==i2["name"] and i["vlun_info"]==i2["vlun_info"] and i["css"]==i2["css"] and i["type"]==i2["type"] and i["cve"]==i2["cve"] and i["time"]==i2["time"]: vlun_list.remove(i2) #更新到文件中 wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json" status= {"last_time": int(time.time()), "vulnerabilities": len(vlun_list), "status": True} import os if os.path.exists(wordpress_scan_path): try: wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path)) except: wordpress_scan_info = {} else: wordpress_scan_info = {} if path not in wordpress_scan_info: wordpress_scan_info[path]=status else: wordpress_scan_info[path]["last_time"]=int(time.time()) wordpress_scan_info[path]["vulnerabilities"]=len(vlun_list) public.WriteFile(wordpress_scan_path,json.dumps(wordpress_scan_info)) return vlun_list def ignore_vuln(self,get): ''' @name 增加忽略漏洞 @param slug 插件slug @param path 插件路径 @auther lkq @time 2024-10-10 @msg 增加忽略漏洞 ''' if 'path' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'name' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'vlun_info' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'css' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'type' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'cve' not in get: return public.return_message(-1,0,public.lang("Parameter error")) if 'time' not in get: return public.return_message(-1, 0, public.lang("Parameter error")) if 'slug' not in get: return public.return_message(-1, 0, public.lang("Parameter error")) if 'ignore_type' not in get: return public.return_message(-1, 0, public.lang("Parameter error")) path = get['path'] name = get['name'] vlun_info = get['vlun_info'] css = float(get['css']) type = get['type'] cve = get['cve'] time = int(get['time']) slug = get['slug'] ignore_type=get['ignore_type'] #忽略的列表路径path ignore_path ="/www/server/panel/data/wordpress_ignore_vuln.json" if os.path.exists(ignore_path): try: ignore_infos=json.loads(public.readFile(ignore_path)) except: ignore_infos={} else: ignore_infos={} if ignore_type=="add": if path not in ignore_infos: ignore_infos[path]=[] ignore={"name":name,"vlun_info":vlun_info,"css":css,"type":type,"cve":cve,"time":time,"slug":slug} if ignore not in ignore_infos[path]: ignore_infos[path].append(ignore) public.writeFile(ignore_path,json.dumps(ignore_infos)) return public.return_message(0,0,public.lang("Added successfully")) if ignore_type=="del": if path not in ignore_infos: return public.return_message(-1,0,public.lang("No data found")) ignore={"name":name,"vlun_info":vlun_info,"css":css,"type":type,"cve":cve,"time":time,"slug":slug} if ignore in ignore_infos[path]: ignore_infos[path].remove(ignore) public.writeFile(ignore_path,json.dumps(ignore_infos)) return public.return_message(0,0,public.lang("Deleted successfully")) def get_ignore_vuln(self,get): ''' @name 获取忽略的漏洞 @param path 插件路径 @auther lkq @time 2024-10-10 @msg 获取忽略的漏洞 ''' #如果不传递参数就返回所有 ignore_path = "/www/server/panel/data/wordpress_ignore_vuln.json" if os.path.exists(ignore_path): try: ignore_infos = json.loads(public.readFile(ignore_path)) except: ignore_infos = {} else: ignore_infos = {} if 'path' in get: path = get['path'] if path not in ignore_infos: return public.return_message(0,0,[]) else: return public.return_message(0,0,ignore_infos[path]) else: return public.return_message(0, 0, []) def download_file_with_progress(self,url, filename,slug,re=False): ''' @name 下载插件的文件 @param url 下载地址 @param filename 文件名 @param slug 插件slug @param re 是否重试 ''' header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", } with requests.get(url, stream=True, headers=header, proxies=proxies, timeout=20) as response: #判断状态码 if response.status_code!=200: # print("Download failed, status code:",response.status_code," URL:",url) if not re: self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+".zip", filename,slug,re=True) return total_length = response.headers.get('content-length') if total_length is None: # 无法获取文件大小 response.raise_for_status() with open(filename, 'wb') as file: for chunk in response.iter_content(chunk_size=65536): if chunk: # 过滤掉保活新块 file.write(chunk) file.flush() else: total_length = int(total_length) if total_length > 1024 * 1024 * 20: print("\nSkip files larger than 20M ", slug) return downloaded = 0 with open(filename, 'wb') as file: for chunk in response.iter_content(chunk_size=65536): file.write(chunk) file.flush() downloaded += len(chunk) # 计算下载进度 done = int(50 * downloaded / total_length) if total_length < 1024 * 1024: total_length_mb = total_length / 1024 # 已下载大小转为KB downloaded_mb = downloaded / 1024 infos = "{:.2f}KB/{:.2f}KB slug:{}".format(downloaded_mb, total_length_mb,slug) else: total_length_mb = total_length / (1024 * 1024) downloaded_mb = downloaded / (1024 * 1024) infos = "{:.2f}MB/{:.2f}MB slug:{}".format(downloaded_mb, total_length_mb,slug) print("\r[{}{}] {:.2f}% file_size:{} ".format('█' * done, '.' * (50 - done),100 * downloaded / total_length,infos,), end='') # 判断文件是否下载完整 if total_length != os.path.getsize(filename): os.remove(filename) print("\nThe downloaded file is incomplete and has been deleted. The file is currently being re downloaded", filename) self.download_file_with_progress(url, filename,slug) def zip_file_plugin_data(self,file_data, default_headers, context=''): ''' @参考:/wp-admin/includes/plugin.php get_plugin_data 代码 @name 通过ZIP文件获取插件信息 @param plugin_file 插件文件 @return dict @auther lkq @time 2024-10-08 ''' file_data = file_data.replace('\r', '\n') # 处理额外的headers extra_headers = {} if context: extra_context_headers = [] extra_headers = dict.fromkeys(extra_context_headers, '') # 假设额外的headers all_headers = {**extra_headers, **default_headers} # 检索所有headers for field, regex in all_headers.items(): if field.startswith('_'): # 跳过以_开头的内部字段 continue match = re.search(f'{regex}:(.*)$', file_data, re.IGNORECASE | re.MULTILINE) if match: all_headers[field] = match.group(1).strip() else: all_headers[field] = '' if all_headers.get("Network") and not all_headers['Network'] and all_headers['_sitewide']: all_headers['Network'] = all_headers['_sitewide'] if all_headers.get("Network"): all_headers['Network'] = 'true' == all_headers['Network'].lower() if all_headers.get("_sitewide"): del all_headers['_sitewide'] all_headers['Title'] = all_headers['Name'] all_headers['AuthorName'] = all_headers['Author'] # 返回插件的信息 return all_headers def check_plugin(self,path,plugin_info): ''' @name 检查所有的插件是否被修改过、或者新增了文件 @param path WordPress路径 @param plugin_info 插件信息 @return dict @auther lkq @time 2024-10-10 ''' self.check_dir() for i in plugin_info: slug=i.split("/")[0] version=plugin_info[i]["Version"] if version == "": continue plugin_file = self.wordpress_diff_path + "/plugin/" + slug+"."+version+".zip" if not os.path.exists(plugin_file): self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+"."+version+".zip", plugin_file,slug) else: try: zipfile.ZipFile(plugin_file) print("压缩包文件已经存在、且可正常读取文件、正在跳过", plugin_file) except: print("The zip file cannot be opened, it is being deleted and re downloaded",slug+"."+version+".zip") os.remove(plugin_file) self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+"."+version+".zip", plugin_file,slug) plugin_file_list={} #获取所有插件的文件 for i in plugin_info: slug = i.split("/")[0] version=plugin_info[i]["Version"] if version == "": continue plugin_file = self.wordpress_diff_path + "/plugin/" + slug + "." + version + ".zip" if not os.path.exists(plugin_file): continue plugin_file_list[slug] = {} plugin_path=path+"/wp-content/plugins/"+slug #判断文件是否存在 if not os.path.exists(plugin_path): continue #遍历目录下所有的PHP文件 for root, dirs, files in os.walk(plugin_path): for file in files: if file.endswith('.php'): file_path = os.path.join(root, file) plugin_file_list[slug][slug+"/"+file_path.replace(plugin_path + "/", "")] = self.FileMd5(file_path) #读取压缩包中的文件版本是否和本地的版本一致 with zipfile.ZipFile(plugin_file, 'r') as zip_file: #查找i 的文件 if i in zip_file.namelist(): #读取文件内容 with zip_file.open(i) as file: file_data = file.read() # #获取插件信息 plugin_data = self.zip_file_plugin_data(file_data.decode("utf-8"), self.plugin_default_headers) if not plugin_data: continue if plugin_data["Name"] == "": continue #判断版本是否一致 if plugin_data["Version"]!=version: print("版本不一致、跳过") continue #对比MD5 for file_name in plugin_file_list[slug]: if file_name in zip_file.namelist(): with zip_file.open(file_name) as file: file_data = file.read() if self.Md5(file_data)!=plugin_file_list[slug][file_name]: print("文件已经被修改、MD5于云端文件不一致",file_name) else: print("文件异常、原版压缩包中不存在该文件",file_name) def check_all_plugin(self,path): ''' @name 检查所有的插件是否被修改过、或者新增了文件 :param path: :return: ''' return self.check_plugin(path,self.get_plugin(path)) def one_check_plugin(self,path,slug): ''' @name 检查单个插件是否被修改过、或者新增了文件 @param path WordPress路径 @return dict @auther lkq @time 2024-10-10 ''' plugin_file_path = path + "/wp-content/plugins/" + slug if not os.path.exists(plugin_file_path): return [] #获取所在插件的信息 plugin_info = self.get_plugin(path,slug) if len(plugin_info)==0: return [] self.check_plugin(path, plugin_info) def get_vlu_time(self): ''' @name 获取漏洞库更新时间 @param get: :param get: :return: ''' date_time=self.M("wordpress_vulnerabilities", "wordpress_vulnerabilities").order("data_time desc").limit("1").field("data_time").find() #转为2024-10-10 if date_time: date_time=time.strftime("%Y-%m-%d", time.localtime(date_time["data_time"])) return date_time else: #今天的日期 return time.strftime("%Y-%m-%d", time.localtime(time.time())) def auto_scan(self): ''' @name 自动扫描 每天扫描一次 每个网站延迟1S ''' site_infos=public.M("sites").where("project_type=?",("WP2")).select() #如果没有站点的话 if len(site_infos)==0: return #自动扫描的配置文件 wordpress_scan_path="/www/server/panel/data/wordpress_wp_scan.json" if not os.path.exists(wordpress_scan_path): wordpress_wp_scan={} else: try: wordpress_wp_scan=json.loads(public.ReadFile(wordpress_scan_path)) except: wordpress_wp_scan={} for i in site_infos: if i["path"] not in wordpress_wp_scan: wordpress_wp_scan[i["path"]]={"last_time":0,"vulnerabilities":0,"status":True} if not i["status"]: continue #获取上次扫描的时间 last_time=wordpress_wp_scan[i["path"]]["last_time"] #判断有没有超过一天 if time.time()-last_time<43200: continue #获取站点的路径 path=i["path"] time.sleep(1) #扫描站点 try: vlun_list=self.scan(path) except: continue wordpress_wp_scan[i["path"]]["last_time"]=int(time.time()) wordpress_wp_scan[i["path"]]["vulnerabilities"]=len(vlun_list) public.WriteFile(wordpress_scan_path,json.dumps(wordpress_wp_scan)) def set_auth_scan(self,path): ''' @name 停止扫描 @param path WordPress路径 @auther lkq @time 2024-10-10 @msg 停止扫描 ''' wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json" flag=False if os.path.exists(wordpress_scan_path): try: wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path)) except: wordpress_scan_info = {} else: wordpress_scan_info = {} if path in wordpress_scan_info: if wordpress_scan_info[path]["status"]: wordpress_scan_info[path]["status"]=False else: wordpress_scan_info[path]["status"]=True flag=True else: wordpress_scan_info[path]={"last_time":0,"vulnerabilities":0,"status":False} public.WriteFile(wordpress_scan_path,json.dumps(wordpress_scan_info)) if flag: return public.return_message(0,0,public.lang("Started successfully")) return public.return_message(0,0,public.lang("Stopped successfully")) def get_auth_scan_status(self,path): ''' @name 获取扫描状态 @param path WordPress路径 @auther lkq @time 2024-10-10 @msg 获取扫描状态 ''' wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json" if os.path.exists(wordpress_scan_path): try: wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path)) except: wordpress_scan_info = {} else: wordpress_scan_info = {} if path in wordpress_scan_info: return public.return_message(0,0,wordpress_scan_info[path]["status"]) return public.return_message(0,0,True)