Server IP : 172.67.216.182 / Your IP : 162.158.108.97 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <[email protected]> # ------------------------------------------------------------------- # ------------------------------ # 数据备份模块 # ------------------------------ import os import sys import json import re import time from typing import Tuple, Union sys.stdout.reconfigure(encoding="utf-8") os.chdir('/www/server/panel') if not 'class/' in sys.path: sys.path.insert(0, 'class/') if not 'class_v2/' in sys.path: sys.path.insert(0, 'class_v2/') import public # from files import files _VERSION = 1.5 class backup: _BACKUP_DIR = public.M('config').where("id=?", (1,)).getField('backup_path') if not _BACKUP_DIR: _BACKUP_DIR = '/www/backup/' _exclude = "" _err_log = '/tmp/backup_err.log' _inode_min = 10 _db_mysql = None _cloud = None _is_save_local = os.path.exists('data/is_save_local_backup.pl') _error_msg = "" _backup_all = False # 数据库备份 _DB_BACKUP_DIR: str = os.path.join(_BACKUP_DIR, "database") # mysql 备份 _MYSQL_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "mysql", "crontab_backup") _MYSQLDUMP_BIN = public.get_mysqldump_bin() # mongodb 备份 _MONGODB_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "mongodb", "crontab_backup") _MONGODBDUMP_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongodump") _MONGOEXPORT_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongoexport") # redis 备份 _REDIS_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "redis", "crontab_backup") # pgsql 备份 _PGSQL_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "pgsql", "crontab_backup") _PGDUMP_BIN = os.path.join(public.get_setup_path(), "pgsql/bin/pg_dump") _SPLIT_SIZE = 5 * 1024 * 1024 * 1024 # 拆分文件大小 _CLOUD_OBJ = { "qiniu": "Qiniu Cloud Storage", "alioss": "Alibaba Cloud OSS", "ftp": "FTP Storage", "bos": "Baidu Cloud BOS", "obs": "Huawei Cloud OBS", "aws_s3": "AWS S3", "gdrive": "Google Drive", "msonedrive": "Microsoft OneDrive", "gcloud_storage": "Google Cloud Storage", "upyun": "Upyun Storage", "jdcloud": "JD Cloud Storage", "txcos": "Tencent Cloud COS", 'tianyiyun': "Tianyi Cloud ZOS", 'webdav':"WebDav", 'minio':"MinIO Storage", 'dogecloud':"Duoji Cloud COS", 'localhost':'Local Disk' } def __init__(self, cloud_object=None, cron_info={}): ''' @name 数据备份对象 @param cloud_object 远程上传对象,需具备以下几个属性和方法: _title = '中文名称,如:阿里云OSS' _name = '英文名称,如:alioss' upload_file(filename,data_type = None) 文件名 , 数据类型 site/database/path delete_file(filename,data_type = None) 文件名 , 数据类型 site/database/path 给_error_msg赋值,传递错误消息: _error_msg = "错误消息" ''' if not os.path.exists(self._BACKUP_DIR): os.makedirs(self._BACKUP_DIR) if not os.path.exists(self._MYSQL_BACKUP_DIR): os.makedirs(self._MYSQL_BACKUP_DIR) if not os.path.exists(self._MONGODB_BACKUP_DIR): os.makedirs(self._MONGODB_BACKUP_DIR) if not os.path.exists(self._REDIS_BACKUP_DIR): os.makedirs(self._REDIS_BACKUP_DIR) if not os.path.exists(self._PGSQL_BACKUP_DIR): os.makedirs(self._PGSQL_BACKUP_DIR) self._cloud_new = None self._cloud = cloud_object self.cron_info = None self.echo_id = cron_info.get('echo') if cron_info and 'echo' in cron_info.keys(): self.cron_info = self.get_cron_info(cron_info["echo"]) if not os.path.exists(self._BACKUP_DIR): os.makedirs(self._BACKUP_DIR) if not public.M('sqlite_master').db('backup').where('type=? AND name=? AND sql LIKE ?', ('table', 'backup', '%cron_id%')).count(): public.M('backup').execute("ALTER TABLE 'backup' ADD 'cron_id' INTEGER DEFAULT 0", ()) self.check_databases() def echo_start(self): print("=" * 90) print(public.lang("★start backup[{}]",str(public.format_date()))) print("=" * 90) def echo_end(self): print("=" * 90) print(public.lang("☆backup is complete[{}]",str(public.format_date()))) print("=" * 90) print("\n") def echo_info(self, msg): print("|-{}".format(public.lang(msg))) def echo_error(self, msg): print("=" * 90) print(public.lang("|-Error:{}",str(msg))) if self._error_msg: self._error_msg += "\n" self._error_msg += msg # 取排除列表用于计算排除目录大小 def get_exclude_list(self, exclude=[]): if not exclude: tmp_exclude = os.getenv('BT_EXCLUDE') if tmp_exclude: exclude = tmp_exclude.split(',') if not exclude: return [] return exclude # 构造排除 def get_exclude(self, exclude=[]): self._exclude = "" if not exclude: tmp_exclude = os.getenv('BT_EXCLUDE') if tmp_exclude: exclude = tmp_exclude.split(',') if not exclude: return "" for ex in exclude: if ex[-1] == '/': ex = ex[:-1] self._exclude += " --exclude='" + ex + "'" self._exclude += " " return self._exclude def GetDiskInfo2(self): # 取磁盘分区信息 temp = public.ExecShell("df -T -P|grep '/'|grep -v tmpfs|grep -v 'snap/core'|grep -v udev")[0] tempInodes = public.ExecShell("df -i -P|grep '/'|grep -v tmpfs|grep -v 'snap/core'|grep -v udev")[0] temp1 = temp.split('\n') tempInodes1 = tempInodes.split('\n') diskInfo = [] n = 0 cuts = [] for tmp in temp1: n += 1 try: inodes = tempInodes1[n - 1].split() disk = re.findall(r"^(.+)\s+([\w\.]+)\s+([\w\.]+)\s+([\w\.]+)\s+([\w\.]+)\s+([\d%]{2,4})\s+(/.{0,50})$", tmp.strip()) if disk: disk = disk[0] if len(disk) < 6: continue if disk[2].find('M') != -1: continue if disk[2].find('K') != -1: continue if len(disk[6].split('/')) > 10: continue if disk[6] in cuts: continue if disk[6].find('docker') != -1: continue if disk[1].strip() in ['tmpfs']: continue arr = {} arr['filesystem'] = disk[0].strip() arr['type'] = disk[1].strip() arr['path'] = disk[6] tmp1 = [disk[2], disk[3], disk[4], disk[5]] arr['size'] = tmp1 if int(inodes[1]) == 0 and int(inodes[2]) == 0: arr['inodes'] = [inodes[1], 10000, 10000, 0] else: arr['inodes'] = [inodes[1], inodes[2], inodes[3], inodes[4]] diskInfo.append(arr) except: continue return diskInfo # 取磁盘可用空间 def get_disk_free(self, dfile): diskInfo = self.GetDiskInfo2() if not diskInfo: return '', 0, 0 _root = None for d in diskInfo: if d['path'] == '/': _root = d continue if re.match("^{}/.+".format(d['path']), dfile): return d['path'], float(d['size'][2]) * 1024, int(d['inodes'][2]) if _root: return _root['path'], float(_root['size'][2]) * 1024, int(_root['inodes'][2]) return '', 0, 0 # 备份指定目录 def backup_path(self, spath, dfile=None, exclude=[], save=3, echo_id=None): try: if echo_id is None: echo_id = self.echo_id self.cron_info = public.M('crontab').where('echo=?', (echo_id,)).field( 'backupto,name,save_local,notice,notice_channel,id,sType,split_type,split_value').select()[0] error_msg = "" self.echo_start() if not os.path.exists(spath): error_msg = 'The specified directory {} does not exist!'.format(spath) self.echo_error(error_msg) self.send_failture_notification(error_msg, target="{}|path".format(spath)) return False if spath[-1] == '/': spath = spath[:-1] dirname = os.path.basename(spath) if not dfile: fname = 'path_{}_{}_{}.tar.gz'.format(dirname, public.format_date("%Y%m%d_%H%M%S"), public.GetRandomString(6)) dfile = os.path.join(self._BACKUP_DIR, 'path',dirname, fname) if not self.backup_path_to(spath, dfile, exclude): if self._error_msg: error_msg = self._error_msg self.send_failture_notification(error_msg, target="{}|path".format(spath)) return False cloud_name = self.cron_info['backupto'] if self._cloud is None: from cloud_stora_upload_v2 import CloudStoraUpload self._cloud_new = CloudStoraUpload() self._cloud = self._cloud_new.run(cloud_name) if not self._cloud: if cloud_name != 'localhost': error_msg = "Failed to link to cloud storage, please check if the configuration is correct!" self.echo_error(error_msg) self.send_failture_notification(error_msg, target=self.cron_info['name'], remark='Link to cloud storage failed') backup_size = os.path.getsize(dfile) if self._cloud: upload_path = os.path.join("path", dirname) self.echo_info("Uploading to {}, please wait...".format(self._cloud._title)) # 判断是否 大于 5 GB 进行切割上传 if backup_size >= self._SPLIT_SIZE and self.cron_info.get("split_type") and self.cron_info.get("split_value") and ( self.cron_info.get("split_type") != "size" or backup_size // 1024 // 1024 > self.cron_info.get("split_value")): is_status, err = self.split_upload_file(dfile, upload_path) if is_status is False: remark = "Backup to " + self._cloud._title self.send_failture_notification(err, target="{}|path".format(spath), remark=remark) return False, err fname = err # 云存储保存路径 else: # 正常上传 if self._cloud._title in ["多吉云COS"]: upload_file_path=os.path.join(self._cloud.backup_path , 'path',dirname, fname) else: upload_file_path="path" if self._cloud.upload_file(dfile, upload_file_path): self.echo_info("Successfully uploaded to {}".format(self._cloud._title)) else: if hasattr(self._cloud, "error_msg"): if self._cloud.error_msg: error_msg = self._cloud.error_msg if not error_msg: error_msg = "The scheduled task execution failed." self.echo_error(error_msg) if os.path.exists(dfile): os.remove(dfile) remark = "Backup to" + self._cloud._title self.send_failture_notification(error_msg, target="{}|path".format(spath), remark=remark) return False filename = dfile if self._cloud: filename = dfile + '|' + self._cloud._name + '|' + fname cron_id = 0 if echo_id: cron_id = public.M("crontab").where('echo=?', (echo_id,)).getField('id') pdata = { 'cron_id': cron_id, 'type': '2', 'name': spath, 'pid': 0, 'filename': filename, 'addtime': public.format_date(), 'size': backup_size } public.M('backup').insert(pdata) if self._cloud: _not_save_local = True save_local = 0 if self.cron_info: save_local = self.cron_info["save_local"] if save_local: _not_save_local = False else: if self._is_save_local: _not_save_local = False pdata = { 'cron_id': cron_id, 'type': '2', 'name': spath, 'pid': 0, 'filename': dfile, 'addtime': public.format_date(), 'size': backup_size } public.M('backup').insert(pdata) if _not_save_local: if os.path.exists(dfile): os.remove(dfile) self.echo_info("User settings do not retain local backups, deleted {}".format(dfile)) else: self.echo_info("Local backup has been kept") if not self._cloud: backups = public.M('backup').where( "cron_id=? and type=? and pid=? and name=? and filename NOT LIKE '%|%'", (cron_id, '2', 0, spath)).field('id,name,filename').select() else: backups = public.M('backup').where("cron_id=? and type=? and pid=? and name=? and filename LIKE ?", (cron_id, '2', 0, spath, '%{}%'.format(self._cloud._name))).field( 'id,name,filename').select() self.delete_old(backups, save, 'path') self.echo_end() self.save_backup_status(True, target="{}|path".format(spath)) return dfile except: self.send_failture_notification('Backup failed') # 清理过期备份文件 def delete_old(self, backups, save, data_type=None, site_name=None): if type(backups) == str: self.echo_info('Failed to clean expired backup, error:{} '.format(backups)) return self.echo_info('Keep the latest number of backups: {} copies'.format(save)) # 跳过手动备份文件 new_backups = [] for i in range(len(backups)): if data_type == 'database': # 数据库备份 new_backups.append(backups[i]) elif data_type == 'site' and backups[i]['name'][:4] == 'web_' and backups[i]['name'][-7:] == '.tar.gz': # 网站备份 new_backups.append(backups[i]) elif data_type == 'path' and backups[i]['name'][:5] == 'path_': # 目录备份 new_backups.append(backups[i]) if new_backups: backups = new_backups[:] num = len(backups) - int(save) if num > 0: self.echo_info('-' * 88) for backup in backups: # 处理目录备份到远程的情况 if backup['filename'].find('|') != -1: tmp = backup['filename'].split('|') backup['filename'] = tmp[0] backup['name'] = tmp[-1] # 尝试删除本地文件 if os.path.exists(backup['filename']): os.remove(backup['filename']) self.echo_info(public.lang('Expired backup files have been cleaned from disk: {}',backup['filename'])) # 尝试删除远程文件 if self._cloud: if backup['name'].endswith("_split"): # 是否为切割文件 self._cloud_new.cloud_delete_dir(backup['name']) elif data_type == "database": self._cloud_new.cloud_delete_file(backup.get("cloud_path")) elif data_type == "site": try: siteName = public.M('sites').where('id=?', (backup['pid'],)).getField('name') site_backup_dir= self._cloud_new.backup_path fname = backup['name'] dfile = os.path.join(site_backup_dir, 'site', siteName, fname) self._cloud_new.cloud_delete_file(dfile) except: # import traceback # print(traceback.format_exc()) pass else: self._cloud.delete_file(backup['name'], data_type) self.echo_info(public.lang('Expired backup files have been cleaned from {}: {}', self._cloud._title, backup['name'])) # 从数据库清理 public.M('backup').where('id=?', (backup['id'],)).delete() num -= 1 if num < 1: break # if data_type=='site': # backup_path = public.get_backup_path()+'/site'.replace('//','/') # site_lists = os.listdir(backup_path) # file_info =[] # del_list=[] # check_name = 'web_{}_'.format(site_name) # for site_v in site_lists: # tmp_dict = {} # if check_name=='web__':continue # if site_v.find(check_name)==-1:continue # filename =os.path.join(backup_path,site_v) # if os.path.isfile(filename): # tmp_dict['name']=filename # tmp_dict['time']=int(os.path.getmtime(filename)) # file_info.append(tmp_dict) # if file_info and len(file_info)>int(save): # file_info=sorted(file_info,key=lambda keys:keys['time']) # del_list=file_info[:-int(save)] # for del_file in del_list: # if not del_file:continue # if os.path.isfile(del_file['name']): # os.remove(del_file['name']) # self.echo_info(u"已从磁盘清理过期备份文件:" + del_file['name']) # 压缩目录 def backup_path_to( self, spath, dfile, exclude=[], siteName: str = None, for_wp2: bool = False, cp_time: str = None ): if not os.path.exists(spath): error_msg = public.lang('The specified directory {} does not exist!', spath) self.echo_error(error_msg) return False if spath[-1] == '/': spath = spath[:-1] dirname = os.path.basename(spath) dpath = os.path.dirname(dfile) if not os.path.exists(dpath): os.makedirs(dpath, 384) self.get_exclude(exclude) if self._exclude: self._exclude = self._exclude.replace(spath + '/', '') exclude_config = self._exclude exclude_list = self.get_exclude_list(exclude) p_size = public.get_path_size(spath, exclude=exclude_list) if not self._exclude: exclude_config = "Not set" if siteName: self.echo_info(public.lang('Backup site: {}', siteName)) self.echo_info(public.lang('Website document root: {}', spath)) else: self.echo_info(public.lang('Backup directory: {}', spath)) self.echo_info(public.lang('Directory size: {}',str(public.to_size(p_size)))) self.echo_info(public.lang('Exclusion setting: {}', exclude_config)) disk_path, disk_free, disk_inode = self.get_disk_free(dfile) self.echo_info(public.lang( 'Partition {} available disk space is: {}, available Inode is: {}',disk_path, str(public.to_size(disk_free)), str(disk_inode))) if disk_path: if disk_free < p_size: self.echo_error(public.lang( 'The available disk space of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!', str(public.to_size(p_size)) )) return False if disk_inode < self._inode_min: self.echo_error(public.lang( 'The available Inode of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!', str(self._inode_min) )) return False compression_time = "" if not for_wp2: stime = time.time() if os.path.exists(dfile): os.remove(dfile) public.ExecShell("cd " + os.path.dirname( spath) + " && tar zcvhf '" + dfile + "' " + self._exclude + " '" + dirname + "' 2>{err_log} 1> /dev/null".format( err_log=self._err_log)) compression_time = str('{:.2f}'.format(time.time() - stime)) tar_size = os.path.getsize(dfile) if tar_size < 1: self.echo_error(public.lang("Compression failed!")) self.echo_info(public.readFile(self._err_log)) return False compression_time = cp_time if cp_time else compression_time self.echo_info(public.lang('Compression completed, took {} seconds, compressed package size: {}',compression_time, str(public.to_size(tar_size)))) if siteName: self.echo_info(public.lang('Site backed up to: {}', dfile)) else: self.echo_info(public.lang('Directory has been backed up to: {}', dfile)) if os.path.exists(self._err_log): try: os.remove(self._err_log) except FileNotFoundError: pass return dfile def remove_last_directory_if_directory(self,spath): # 获取路径的最后一个部分 last_part = os.path.basename(spath) # 获取路径的上一级目录 parent_dir = os.path.dirname(spath) # 构建完整路径 full_path = os.path.join(parent_dir, last_part) # 检查路径的最后一部分是否是文件 if os.path.isfile(full_path): # 如果是文件,返回上一级目录 return parent_dir + "/" else: # 如果不是文件,返回原路径 return spath def __backup_site_init_cloud(self, cloud_name: str, siteName: str) -> None: if self._cloud is None: from cloud_stora_upload_v2 import CloudStoraUpload self._cloud_new = CloudStoraUpload() self._cloud = self._cloud_new.run(cloud_name) if not self._cloud: if not(cloud_name == 'Local Disk' or cloud_name == 'localhost'): # 没有 self._cloud 云存储对象, 且不是本地存储 self.echo_info("Failed to link to cloud storage, please check if the configuration is correct!") self.send_failture_notification( '"Failed to link to cloud storage, please check if the configuration is correct!"', target="{}|site".format(siteName), remark='') def __backup_site_cloud_upload(self, backup_size, dfile, upload_path, siteName, fname) -> bool: # 判断是否 大于 5 GB 进行切割上传 if backup_size >= self._SPLIT_SIZE and self.cron_info.get("split_type") and self.cron_info.get( "split_value") and ( self.cron_info.get("split_type") != "size" or backup_size // 1024 // 1024 > self.cron_info.get("split_value") ): is_status, err = self.split_upload_file(dfile, upload_path) if is_status is False: remark = "Backup to " + self._cloud._title self.send_failture_notification(err, target="{}|site".format(siteName), remark=remark) return False # fname = err # 云存储保存路径 else: # 正常上传 error_msg = '' if self._cloud._title in [ "AWS S3", "AWS S3对象存储", "天翼云ZOS", "MinIO存储", "多吉云COS" ]: ## 由于aws s3上传函数独特,单独处理 if self._cloud._title == "AWS S3": res = self._cloud.upload_file1(dfile, os.path.join(self._cloud.backup_path, 'site', siteName, fname)) else: res = self._cloud.upload_file(dfile, os.path.join('site', siteName)) else: # ftp, google drive... res = self._cloud.upload_file(dfile, 'site') if not res: if hasattr(self._cloud, "error_msg"): if self._cloud.error_msg: error_msg = self._cloud.error_msg if not error_msg: error_msg = public.lang("Backup task execution failed.") self.echo_error(error_msg) # if os.path.exists(dfile): # os.remove(dfile) remark = public.lang("Backup to " + self._cloud._title) self.send_failture_notification(error_msg, target="{}|site".format(siteName), remark=remark) return False self.echo_info("Successfully uploaded to {}".format(self._cloud._title)) return True def __backup_site_check_local_flag(self) -> bool: if self.cron_info['backupTo'] == "localhost" or self.cron_info['backupTo'] == "Local Disk": if_save_local = True else: if_save_local = self.cron_info["save_local"] if self.cron_info else 0 if if_save_local and not self._is_save_local: if not os.path.exists('data/is_save_local_backup.pl'): os.mknod('data/is_save_local_backup.pl') save_local = True if self._is_save_local and if_save_local else False return save_local def __backup_site_delete_old(self, target: str, backups: list, save: int, data_type=None, if_wp: bool = False): self.echo_info('-' * 88) self.echo_info(public.lang('Keep the latest number of backups: {} copies for {}'.format(save, target))) num = len(backups) - int(save) if num > 0: for backup in backups: try: # 从数据库清理 if not if_wp: public.M('backup').where('id=?', (backup['id'],)).delete() else: public.M('wordpress_backups').where('id=?', (backup['id'],)).delete() if os.path.exists(backup.get('filename', '')): os.remove(backup['filename']) self.echo_info(public.lang( 'Expired backup files have been cleaned from disk: {}', backup['filename']) ) # 尝试删除远程site文件 elif backup.get('filename', '').find('|') != -1 and self._cloud and data_type == 'site': try: tmp = backup['filename'].split('|') backup['filename'] = tmp[0] backup['name'] = tmp[-1] siteName = public.M('sites').where('id=?', (backup['pid'],)).getField('name') site_backup_dir = self._cloud_new.backup_path fname = backup['name'] dfile = os.path.join(site_backup_dir, 'site', siteName, fname) res = self._cloud_new.cloud_delete_file(dfile, 'site') if res: remote_name = backup.get('name', '') if not remote_name: remote_name = backup.get('filename', '').split('|')[-1] remote_path = os.path.join(self._cloud.backup_path, 'site', siteName, remote_name) self.echo_info( public.lang( 'Expired backup files have been cleaned from {}: {}', target, remote_path )) return res except Exception: continue else: continue except Exception: continue num -= 1 if num < 1: break # 备份指定站点 def backup_site(self, siteName, save: int = 3, exclude: list = None, echo_id=None) -> str | bool: panelPath = '/www/server/panel/' os.chdir(panelPath) sys.path.insert(0, panelPath) echo_id = self.echo_id if echo_id is None else echo_id cron_id = public.M("crontab").where('echo=?', (echo_id,)).getField('id') if echo_id else 0 self.echo_start() self.cron_info = public.M('crontab').where('echo=?', (echo_id,)).field( 'backupTo,name,save_local,notice,notice_channel,id,sType,split_type,split_value,db_backup_path' ).select()[0] if not self.cron_info: return False save_local = self.__backup_site_check_local_flag() find = public.M('sites').where('name=?', (siteName,)).select()[0] if not find: error_msg = 'The specified website [{}] does not exist!'.format(siteName) self.echo_error(error_msg) self.send_failture_notification(error_msg, target=siteName) return False # if self.cron_info['stop_site']=="1" : # if find['project_type']=="PHP": # if find['status']=="0": # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if find['project_type']=="Java": # from mod.project.java.projectMod import main as java # if not java().get_project_stat(find)['pid']: # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if find['project_type']=="Node": # from projectModel.nodejsModel import main as nodejs # if not nodejs().get_project_run_state(project_name=find['name']): # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if find['project_type']=="Go": # from projectModel.goModel import main as go # if not go().get_project_run_state(project_name=find['name']): # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if find['project_type']=="Python": # from projectModel.pythonModel import main as python # if not python().get_project_run_state(project_name=find['name']): # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if find['project_type']=="Other": # from projectModel.otherModel import main as other # if not other().get_project_run_state(project_name=find['name']): # print("|-网站{}已停用,跳过备份".format(siteName)) # return False # if self.cron_info['stop_site']=="1" and find['status']=="0": # print("{}网站已停用,跳过备份".format(siteName)) # return False site_backup_dir = self.get_site_backup_dir( self.cron_info['backupTo'], self.cron_info['db_backup_path'], siteName ) backup_dir = os.path.join(site_backup_dir, 'site') # save local file at first, insert db depend on 'save_local' bak_info = None if find['project_type'] != 'WP2': spath = find['path'] pid = find['id'] fname = 'web_{}_{}_{}.tar.gz'.format( siteName, public.format_date("%Y%m%d_%H%M%S"), public.GetRandomString(6) ) # save dfile = os.path.join(site_backup_dir, 'site', siteName, fname) if find['project_type'] == "Go": spath = self.remove_last_directory_if_directory(spath) if not self.backup_path_to( spath, dfile, exclude, siteName=siteName ): error_msg = self._error_msg if self._error_msg else "" self.send_failture_notification(error_msg, target=siteName) return False # insert if save_local: public.M('backup').insert({ 'cron_id': cron_id, 'type': 0, 'name': fname, 'pid': pid, 'filename': dfile, 'addtime': public.format_date(), 'size': os.path.getsize(dfile) }) else: # WP2 try: from wp_toolkit import wpbackup # save & insert start = time.time() bak_info = wpbackup(find['id']).backup_full_get_data(self.cron_info) cp_time = str('{:.2f}'.format(time.time() - start)) spath = find['path'] pid = bak_info.id fname = bak_info.bak_file.split('/')[-1] dfile = bak_info.bak_file if not self.backup_path_to( spath, dfile, exclude, siteName=siteName, for_wp2=True, cp_time=cp_time ): error_msg = self._error_msg if self._error_msg else "" self.send_failture_notification(error_msg, target=siteName) return False except Exception as e: self.echo_info(str(e)) self.echo_end() return False backup_size = os.path.getsize(dfile) if not self._backup_all: self.check_disk_space(backup_size, backup_dir) # init_cloud self.__backup_site_init_cloud(cloud_name=self.cron_info['backupTo'], siteName=siteName) # cloud part if self._cloud: cloud_file_name = dfile + '|' + self._cloud._name + '|' + fname upload_path = os.path.join("site", siteName) self.echo_info(public.lang("Uploading to {}, please wait...".format(self._cloud._title))) res = None fial_count = 0 while fial_count <= 2: # 防止CloudApi限流 try: res = self.__backup_site_cloud_upload(backup_size, dfile, upload_path, siteName, fname) break except Exception as e: self.echo_info(public.lang( "upload to cloud fail...please check if the configuration is correct! error: %s" % e) ) self.echo_info(public.lang( "try again after %") ) public.WriteLog("Cloud_Upload", f"upload to cloud fail...error:{e}") time.sleep(10 * (fial_count + 1)) fial_count += 1 continue if res: # insert data if find['project_type'] != 'WP2': public.M('backup').insert({ 'cron_id': cron_id, 'type': 0, 'name': fname, 'pid': pid, 'filename': cloud_file_name, 'addtime': public.format_date(), 'size': backup_size }) else: # wp public.M('wordpress_backups').insert({ 's_id': bak_info.site_id, 'bak_type': 3, 'bak_file': cloud_file_name, 'bak_time': bak_info.bak_time, 'size': backup_size, 'cron_id': cron_id, }) self.echo_info(public.lang("Local backup has been kept")) if not save_local: # 选了云但没选择保存本地, 移除 public.M('wordpress_backups').where('id=?', (pid,)).delete() if os.path.exists(dfile): os.remove(dfile) self.echo_info(public.lang( 'User settings do not retain local backups, deleted {}', dfile )) # 清理多余备份 if find['project_type'] != 'WP2': if save_local: local_backups = public.M('backup').where( "cron_id=? AND type=? AND pid=? AND filename NOT LIKE '%|%' AND filename LIKE?", (cron_id, '0', pid, f'{self._BACKUP_DIR}%') ).order('addtime').select() # 清理local self.__backup_site_delete_old('Local Storage', local_backups, save, 'site') if self._cloud: cloud_backups = public.M('backup').where( 'cron_id=? and type=? and pid=? and filename LIKE ?', (cron_id, '0', pid, f"%{self._cloud._name}%") ).order('addtime').select() self.__backup_site_delete_old(self._cloud._title, cloud_backups, save, 'site') # 清理cloud else: # wp site_id = public.M('sites').where('name=?', (siteName,)).getField('id') if save_local: local_wp_backups = public.M('wordpress_backups').where( "cron_id=? AND s_id=? AND bak_file NOT LIKE '%|%' AND bak_file LIKE?", (cron_id, site_id, f'{self._BACKUP_DIR}%') ).order('bak_time').select() local_wp_backups = [ { 'id': x.get('id'), 'pid': x.get('s_id'), 'name': x.get('bak_file', '').split('/')[-1], # 纯文件名 'filename': x.get('bak_file', ''), # 绝对路径 } for x in local_wp_backups if x ] # 清理wp2 local self.__backup_site_delete_old('Local Storage', local_wp_backups, save, 'site', True) if self._cloud: cloud_wp_backups = public.M('wordpress_backups').where( "cron_id=? AND s_id=? AND bak_file LIKE ? AND bak_file LIKE '%|%'", (cron_id, site_id, f"%{self._cloud._name}%") ).order('bak_time').select() cloud_wp_backups = [ { 'id': x.get('id'), 'pid': x.get('s_id'), 'name': x.get('bak_file', '').split('|')[0].split('/')[-1], # 纯文件名 'filename': x.get('bak_file', ''), # 绝对路径 } for x in cloud_wp_backups if x and isinstance(x, dict) ] # 清理wp2 cloud self.__backup_site_delete_old(self._cloud._title, cloud_wp_backups, save, 'site', True) self.echo_end() return dfile # 备份所有站点 def backup_site_all(self, save=3, echo_id=None): if echo_id is None: echo_id = self.echo_id self.cron_info = public.M('crontab').where('echo=?', (echo_id,)).field( 'backupto,name,save_local,notice,notice_channel,id,sType').select()[0] sites = public.M('sites').field('name').select() self._backup_all = True failture_count = 0 results = [] backup_files = [] # 用于存储成功备份的文件路径 if self._cloud == None: try: cloud_name = self.cron_info['backupTo'] except: cloud_name = self.cron_info['backupto'] from cloud_stora_upload_v2 import CloudStoraUpload self._cloud_new = CloudStoraUpload() self._cloud = self._cloud_new.run(cloud_name) for site in sites: self._error_msg = "" result = self.backup_site(site['name'], save, echo_id=echo_id) if not result: failture_count += 1 else: backup_files.append(result) # 收集成功备份的文件路径 results.append((site['name'], result, self._error_msg,)) self.save_backup_status(result, target="{}|site".format(site['name']), msg=self._error_msg) if failture_count > 0: self.send_all_failture_notification("site", results) else: size = sum(os.path.getsize(file) for file in backup_files if os.path.exists(file)) backup_dir=self._BACKUP_DIR self.check_disk_space(size,backup_dir) self._backup_all = False # 配置 def mypass(self, act): conf_file = '/etc/my.cnf' conf_file_bak = '/etc/my.cnf.bak' if os.path.getsize(conf_file) > 2: public.writeFile(conf_file_bak, public.readFile(conf_file)) public.set_mode(conf_file_bak, 600) public.set_own(conf_file_bak, 'mysql') elif os.path.getsize(conf_file_bak) > 2: public.writeFile(conf_file, public.readFile(conf_file_bak)) public.set_mode(conf_file, 600) public.set_own(conf_file, 'mysql') public.ExecShell("sed -i '/user=root/d' {}".format(conf_file)) public.ExecShell("sed -i '/password=/d' {}".format(conf_file)) if act: password = public.M('config').where('id=?', (1,)).getField('mysql_root') mycnf = public.readFile(conf_file) if not mycnf: return False src_dump_re = r"\[mysqldump\][^.]" sub_dump = "[mysqldump]\nuser=root\npassword=\"{}\"\n".format(password) mycnf = re.sub(src_dump_re, sub_dump, mycnf) if len(mycnf) > 100: public.writeFile(conf_file, mycnf) return True return True # map to list def map_to_list(self, map_obj): try: if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj) return map_obj except: return [] # 备份所有数据库 def backup_database_all(self, save, echo_id: str): if echo_id is None: echo_id = self.echo_id self.cron_info = public.M('crontab').where("echo=? and sType='database'", (echo_id,)).find() if not self.cron_info: print(public.lang("The scheduled task does not exist!")) sName = self.cron_info["sName"] save = self.cron_info["save"] db_type = str(self.cron_info.get("db_type", "mysql")).lower() if not db_type: public.M('crontab').where("echo=? and sType='database'", (echo_id,)).update({"db_type": "mysql"}) self.cron_info["db_type"] = "mysql" db_type = "mysql" if db_type == "mysql": try: from database import database database = database() mysql_server = public.M('database_servers').where("db_type=?", ('mysql',)).select() for i in mysql_server: database.SyncGetDatabases(public.to_dict_obj({'sid': i['id']})) except: pass database_list = [] if db_type in ["mysql", "mongodb", "pgsql"]: if sName == "ALL": # 备份所有数据库 data_list = public.M("databases").field("name").where("LOWER(type)=LOWER(?)", (db_type)).select() database_list = [data["name"] for data in data_list] elif db_type == "redis": database_list.append("redis") # 上传对象 self._cloud = None cloud_name = self.cron_info["backupTo"] if cloud_name.find("localhost") == -1: from cloud_stora_upload_v2 import CloudStoraUpload self._cloud_new = CloudStoraUpload() self._cloud = self._cloud_new.run(cloud_name) self._backup_all = True resp_list = [] failture_count = 0 for db_name in database_list: status, backup_path = self.backup_database(db_name, save, echo_id=echo_id) if status is False: failture_count += 1 resp_list.append((db_name, status, backup_path)) if failture_count > 0: self.send_all_failture_notification("database", resp_list) else: size = 0 backup_dir=self._DB_BACKUP_DIR self.check_disk_space(size,backup_dir) self._backup_all = False # 备份单个数据库 def backup_database(self, sName, save, echo_id: str): self.echo_start() if echo_id is None: echo_id = self.echo_id self.cron_info = public.M('crontab').where("echo=? and sType='database'", (echo_id,)).find() # print(self.cron_info) if not self.cron_info: error_msg = public.lang("The scheduled task does not exist! {}",str(echo_id)) self.echo_error(error_msg) self.send_failture_notification(error_msg) return False, error_msg sName = sName.strip() save = self.cron_info["save"] db_type = str(self.cron_info.get("db_type", "mysql")).lower() if not db_type: public.M('crontab').where("echo=? and sType='database'", (echo_id,)).update({"db_type": "mysql"}) self.cron_info["db_type"] = "mysql" db_type = "mysql" database = {} if db_type in ["mysql", "mongodb", "pgsql"]: database = public.M("databases").where("name=? and LOWER(type)=LOWER(?)", (sName, db_type)).find() if not database: error_msg = public.lang("{} Backup database {} does not exist",db_type, sName) self.echo_error(error_msg) self.send_failture_notification(error_msg, target="{}|database".format(sName)) return False, error_msg elif db_type == "redis": database = {"id": 0, "name": "redis", "db_type": 0} db_name = database["name"] # if not database: # error_msg = "{} 备份数据库 {} 不存在".format(db_type, sName) # self.echo_error(error_msg) # self.send_failture_notification(error_msg, target="{}|database".format(db_name)) # return False, error_msg func_dict = { "mysql": self.mysql_backup_database, "mongodb": self.mongodb_backup_database, "redis": self.redis_backup_database, "pgsql": self.pgsql_backup_database, } func_backup_database = func_dict[db_type] table_list=self.cron_info["table_list"] if table_list: # 备份 args = {"backup_mode": self.cron_info["backup_mode"], "db_backup_path": self.cron_info["db_backup_path"], "save_local": self.cron_info["save_local"],"table_list":[table_list]} else: # 备份 args = {"backup_mode": self.cron_info["backup_mode"], "db_backup_path": self.cron_info["db_backup_path"], "save_local": self.cron_info["save_local"]} status, msg = func_backup_database(database, args) if status is False: # 备份失败 self.send_failture_notification(msg, target="{}|database".format(db_name)) return status, msg backup_path = msg backup_size = os.path.getsize(backup_path) if not self._backup_all: self.check_disk_space(backup_size,backup_path) file_name = os.path.basename(backup_path) # 上传对象 if self._cloud is None: cloud_name = self.cron_info["backupTo"] if cloud_name.find("localhost") == -1: from cloud_stora_upload_v2 import CloudStoraUpload self._cloud_new = CloudStoraUpload() self._cloud = self._cloud_new.run(cloud_name) # 上传 if self._cloud is not None: if self._cloud is False: error_msg = public.lang("Failed to link to cloud storage, please check if the configuration is correct!") self.echo_info(error_msg) self.send_failture_notification(error_msg, target="{}|database".format(db_name)) return False, error_msg self.echo_info(public.lang("Uploading to {}, please wait...",self._cloud._title)) # 判断是否 大于 5 GB 进行切割上传 if backup_size >= self._SPLIT_SIZE and self.cron_info.get("split_type") and self.cron_info.get("split_value") and ( self.cron_info.get("split_type") != "size" or backup_size // 1024 // 1024 > self.cron_info.get("split_value")): upload_path = os.path.join("database", db_type, db_name) is_status, err = self.split_upload_file(backup_path, upload_path) if is_status is False: remark = public.lang("Backup to " + self._cloud._title) self.send_failture_notification(err, target="{}|database".format(db_name), remark=remark) return False, err file_name = err # 云存储保存路径 else: # 正常上传 if db_type=="redis": upload_path = os.path.join(self._cloud_new.backup_path, "database", db_type, file_name) else: upload_path = os.path.join(self._cloud_new.backup_path, "database", db_type, db_name, file_name) # print(upload_path) if self._cloud._title in ["AWS S3","AWS S3对象存储", "天翼云ZOS","WebDAV存储","MinIO存储","多吉云COS"]: ## 由于aws s3上传函数独特,单独处理 if self._cloud._title == "AWS S3": res = self._cloud.upload_file1(backup_path, upload_path) else: res = self._cloud.upload_file(backup_path, upload_path) if res: self.echo_info("Successfully uploaded to {}".format(self._cloud._title)) else: if hasattr(self._cloud, "error_msg"): if self._cloud.error_msg: error_msg = self._cloud.error_msg if not error_msg: error_msg = "Backup task execution failed." self.echo_error(error_msg) if os.path.exists(upload_path): os.remove(upload_path) remark = "Backup to " + self._cloud._title self.send_failture_notification(error_msg, target="{}|database".format(db_name), remark=remark) return False else: # if db_type=="redis": # upload_path = os.path.join(self._cloud_new.backup_path, "database", db_type, file_name) # else: # upload_path = os.path.join(self._cloud_new.backup_path, "database", db_type, db_name, file_name) upload_method = self._cloud.upload_file if self._cloud._title == "Google Drive" else self._cloud_new.cloud_upload_file if upload_method(backup_path, upload_path): self.echo_info("Successfully uploaded to {}".format(self._cloud._title)) else: error_msg = "Backup task execution failed." if hasattr(self._cloud, "error_msg"): if self._cloud.error_msg: error_msg = self._cloud.error_msg self.echo_error(error_msg) if os.path.exists(backup_path): os.remove(backup_path) remark = "Backup to " + self._cloud._title self.send_failture_notification(error_msg, target="{}|database".format(db_name), remark=remark) return False, error_msg cloud_backup_path = backup_path if self._cloud is not None: cloud_backup_path = backup_path + "|" + self._cloud._name + "|" + file_name self.echo_info("The database has been backed up to: {}".format(backup_path)) if os.path.exists(self._err_log): try: os.remove(self._err_log) except FileNotFoundError: pass # back_to = self._CLOUD_OBJ[self.cron_info.get("backupTo")] back_to = self.cron_info.get("backupTo") pdata = { "type": "1", "name": file_name, "pid": database["id"], "filename": cloud_backup_path, "size": backup_size, "addtime": public.format_date(), "ps": "Plan task backup to {}".format(back_to), "cron_id": self.cron_info["id"], } public.M("backup").insert(pdata) if self._cloud is not None: # 云端上传 if self.cron_info.get("save_local", 0) == 0 and self._is_save_local: # 本地保留 pdata = { "type": "1", "name": file_name, "pid": database["id"], "filename": backup_path, "size": backup_size, "addtime": public.format_date(), "ps": "Plan task backup to {}(Local reservation)".format(back_to), "cron_id": self.cron_info["id"], } public.M("backup").insert(pdata) if self.cron_info.get("save_local", 0) == 0 and not self._is_save_local: if os.path.exists(backup_path): os.remove(backup_path) self.echo_info("User setting to not keep local backup, deleted {}".format(backup_path)) else: # 本地不保留 self.echo_info("Local backup has been retained.") # 清理多余备份 if self._cloud is not None: backups = public.M("backup").where("cron_id=? and type=? and pid=? and filename LIKE ?", (self.cron_info["id"], "1", database["id"], "%{}%".format(self._cloud._name))).field( "id,name,filename").select() else: backups = public.M("backup").where("cron_id=? and type=? and pid=? and filename NOT LIKE '%|%'", (self.cron_info["id"], "1", database["id"])).field("id,name,filename").select() if self._cloud: for backup in backups: if db_type=="redis": backup["cloud_path"] = os.path.join(self._cloud_new.backup_path, "database", db_type, backup["name"]) else: backup["cloud_path"] = os.path.join(self._cloud_new.backup_path, "database", db_type, db_name, backup["name"]) self.delete_old(backups, save, "database") self.echo_end() self.save_backup_status(True, target="{}|database".format(db_name)) return True, backup_path # mysql 备份数据库 def mysql_backup_database(self, db_find: dict, args: dict) -> Tuple[bool, str]: import db_mysql storage_type = args.get("storage_type", "db") # 备份的文件数量, 按照数据库 | 按照表 table_list = args.get("table_list", []) # 备份的集合 db_name = db_find["name"] db_host = "localhost" db_user = "root" if db_find["db_type"] == 0: # 本地数据库 backup_mode = args.get("backup_mode", "0") if backup_mode == "1": # 使用非 root 账号 db_user = db_find["username"] # 获取数据库用户 db_password = db_find["password"] # 获取数据库密码 print("Using non root user {} to backup database {}".format(db_user, db_name)) else: # 使用 root 账号 # db_user = "root" db_password = public.M("config").where("id=?", (1,)).getField("mysql_root") if not db_password: error_msg = "The database password is empty! Please set the database password first!" self.echo_error(error_msg) return False, error_msg try: from panelMysql import panelMysql db_port = int(panelMysql().query("show global variables like 'port'")[0][1]) except: db_port = 3306 elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = int(conn_config["db_port"]) db_user = conn_config["db_user"] db_password = conn_config["db_password"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mysql')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = int(conn_config["db_port"]) db_user = conn_config["db_user"] db_password = conn_config["db_password"] else: error_msg = "Unknown database type" self.echo_error(error_msg) return False, error_msg if not db_password: error_msg = "The database password cannot be empty" self.echo_error(error_msg) return False, error_msg db_charset = public.get_database_character(db_name) mysql_obj = db_mysql.panelMysql().set_host(db_host, db_port, None, db_user, db_password) if isinstance(mysql_obj, bool): error_msg = "Failed to connect to database [{}: {}]".format(db_host, db_port) self.echo_error(error_msg) return False, error_msg db_data = mysql_obj.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='{}'".format(db_name)) db_size = 0 if isinstance(db_data, list) and len(db_data) != 0: if db_data[0][0] != None: db_size = db_data[0][0] if not db_size: error_msg = "The specified database '{}' has no data!".format(db_name) self.echo_error(error_msg) return False, error_msg self.echo_info('Backup MySQL database: {}'.format(db_name)) self.echo_info("Database size: {}".format(public.to_size(db_size))) self.echo_info("Database Character Set: {}".format(db_charset)) disk_path, disk_free, disk_inode = self.get_disk_free(self._MYSQL_BACKUP_DIR) self.echo_info("The available disk space for partition {} is: {}, and the available Inode is: {}".format(disk_path, public.to_size(disk_free), disk_inode)) if disk_path: if disk_free < db_size: error_msg = "The available disk space of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(public.to_size(db_size)) self.echo_error(error_msg) return False, error_msg if disk_inode < self._inode_min: error_msg = "The available Inode for the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(self._inode_min) self.echo_error(error_msg) return False, error_msg stime = time.time() self.echo_info("Start exporting database:{}".format(public.format_date(times=stime))) mysql_backup_dir = self.get_backup_dir(db_find, args, "mysql") db_backup_dir = os.path.join(mysql_backup_dir, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{backup_time}_mysql_data".format(db_name=db_name, backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) backup_shell = "'{mysqldump_bin}' --opt --hex-blob --skip-lock-tables --single-transaction --routines --events --skip-triggers --default-character-set='{db_charset}' --force " \ "--host='{db_host}' --port={db_port} --user='{db_user}' --password='{db_password}' '{db_name}'".format( mysqldump_bin=self._MYSQLDUMP_BIN, db_charset=db_charset, db_user=db_user, db_password=db_password, db_host=db_host, db_port=db_port, db_name=db_name, ) if storage_type == "db": # 导出单个文件 file_name = file_name + ".sql.gz" backup_path = os.path.join(db_backup_dir, file_name) table_shell = "" if len(table_list) != 0: if table_list and isinstance(table_list[0], str): table_list = table_list[0].split(',') # print("正在备份表{}".format(table_list)) table_shell = "'" + "' '".join(table_list) + "'" backup_shell += " {table_shell} 2> '{err_log}' | gzip > '{backup_path}'".format(table_shell=table_shell, err_log=self._err_log, backup_path=backup_path) public.ExecShell(backup_shell, env={"MYSQL_PWD": db_password}) else: # 按表导出 export_dir = os.path.join(db_backup_dir, file_name) if not os.path.isdir(export_dir): os.makedirs(export_dir) for table_name in table_list: tb_backup_path = os.path.join(export_dir, "{table_name}.sql".format(table_name=table_name)) tb_shell = backup_shell + " '{table_name}' 2> '{err_log}' > '{tb_backup_path}'".format(table_name=table_name, err_log=self._err_log, tb_backup_path=tb_backup_path) public.ExecShell(tb_shell, env={"MYSQL_PWD": db_password}) backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell("cd '{backup_dir}' && zip -m '{backup_path}' -r '{file_name}'".format(backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name)) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}".format(export_dir)) if not os.path.exists(backup_path): error_msg = "Database backup failed!!" self.echo_error(error_msg) self.echo_info(public.readFile(self._err_log)) return False, error_msg # self.echo_info('backup_path:{}'.format(backup_path)) gz_size = os.path.getsize(backup_path) if gz_size < 400: error_msg = "Backup file size abnormal, backup failed!" self.echo_error(error_msg) self.echo_info(public.readFile(self._err_log)) os.remove(backup_path) return False, error_msg self.echo_info("Database backup completed, taking {:.2f} seconds, compressed file size: {}".format(time.time() - stime, public.to_size(gz_size))) # self.check_disk_space(gz_size,self._MYSQL_BACKUP_DIR,type=1) return True, backup_path # mongodb 备份数据库 def mongodb_backup_database(self, db_find: dict, args: dict) -> Tuple[bool, str]: file_type = db_find.get("file_type", "bson") collection_list = args.get("collection_list", []) # 备份的集合 field_list = args.get("field_list", []) from databaseModel.mongodbModel import panelMongoDB db_name = db_find["name"] db_host = "127.0.0.1" db_user = db_find["username"] db_password = db_find["password"] conn_data = {} if db_find["db_type"] == 0: if panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled": if not db_password: error_msg = "MongoDB has enabled security authentication. The database password cannot be empty. Please set the password and try again!" self.echo_error(error_msg) return False, error_msg else: db_password = None db_port = int(panelMongoDB.get_config_options("net", "port", 27017)) elif db_find["db_type"] == 1: if not db_password: error_msg = "The database password is empty! Please set the database password first!" self.echo_error(error_msg) return False, error_msg # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = int(conn_config["db_port"]) conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] elif db_find["db_type"] == 2: if not db_password: error_msg = "The database password is empty! Please set the database password first!" self.echo_error(error_msg) return False, error_msg conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = int(conn_config["db_port"]) conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] else: error_msg = "Unknown database type!" self.echo_error(error_msg) return False, error_msg mongodb_obj = panelMongoDB().set_host(**conn_data) status, err_msg = mongodb_obj.connect() if status is False: error_msg = "Failed to connect to database [{}: {}]".format(db_host, db_port) self.echo_error(error_msg) return False, error_msg db_collections = 0 db_storage_size = 0 try: status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: error_msg = db_obj self.echo_error(error_msg) return False, error_msg data = db_obj.command("dbStats") db_collections = data.get("collections", 0) # 获取集合数 db_storage_size = data.get("storageSize", 0) # 获取存储大小 # if len(collection_list) == 0: # collection_list = db_obj.list_collection_names() except: error_msg = "Failed to connect to database [{}: {}]".format(db_host, db_port) self.echo_error(error_msg) return False, error_msg if db_collections == 0: error_msg = "The specified database '{}' does not have any collections!".format(db_name) self.echo_error(error_msg) return False, error_msg self.echo_info("Backup MongoDB database: {}".format(db_name)) self.echo_info("Database size: {}".format(public.to_size(db_storage_size))) self.echo_info("Number of database collections: {}".format(db_collections)) disk_path, disk_free, disk_inode = self.get_disk_free(self._MONGODB_BACKUP_DIR) self.echo_info("The available disk space for partition {} is: {}, and the available Inode is: {}".format(disk_path, public.to_size(disk_free), disk_inode)) if disk_path: if disk_free < db_storage_size: error_msg = "The available disk space of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(public.to_size(db_storage_size)) self.echo_error(error_msg) return False, error_msg if disk_inode < self._inode_min: error_msg = "The available Inode for the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(self._inode_min) self.echo_error(error_msg) return False, error_msg stime = time.time() self.echo_info("Start exporting database:{}".format(public.format_date(times=stime))) mongodb_backup_dir = self.get_backup_dir(db_find, args, "mongodb") db_backup_dir = os.path.join(mongodb_backup_dir, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{file_type}_{backup_time}_mongodb_data".format(db_name=db_find["name"], file_type=file_type, backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) export_dir = os.path.join(db_backup_dir, file_name) mongodump_shell = "'{mongodump_bin}' --host='{db_host}' --port={db_port} --db='{db_name}' --out='{out}' 2> '{err_log}'".format( mongodump_bin=self._MONGODBDUMP_BIN, db_host=db_host, db_port=db_port, db_name=db_name, out=export_dir, err_log=self._err_log, ) mongoexport_shell = "'{mongoexport_bin}' --host='{db_host}' --port={db_port} --db='{db_name}' 2> '{err_log}'".format( mongoexport_bin=self._MONGOEXPORT_BIN, db_host=db_host, db_port=db_port, db_name=db_name, err_log=self._err_log, ) if db_password is not None: # 本地未开启安全认证 mongodump_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote(str(db_password))) mongoexport_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote(str(db_password))) if file_type == "bson": if len(collection_list) == 0: public.ExecShell(mongodump_shell) else: for collection_name in collection_list: shell = "{mongodump_shell} --collection='{collection}'".format( mongodump_shell=mongodump_shell, collection=collection_name ) public.ExecShell(shell) else: # 导出 json csv 格式 fields = None if file_type == "csv": # csv fields = "--fields='{}'".format(",".join(field_list)) for collection_name in collection_list: file_path = os.path.join(export_dir, "{collection_name}.{file_type}".format(collection_name=collection_name, file_type=file_type)) shell = "{mongoexport_shell} --collection='{collection}' --type='{type}' --out='{out}'".format( mongoexport_shell=mongoexport_shell, collection=collection_name, type=file_type, out=file_path, ) if fields is not None: shell += " --fields='{fields}'".format(fields=fields) public.ExecShell(shell) backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell("cd {backup_dir} && zip -m {backup_path} -r {file_name}".format(backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name)) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}", format(export_dir)) error_msg = "Database backup failed!" self.echo_error(error_msg) self.echo_info(public.readFile(self._err_log)) return False, error_msg zip_size = os.path.getsize(backup_path) # self.check_disk_space(zip_size,self._MONGODB_BACKUP_DIR,type=1) self.echo_info("Database backup completed, taking {:.2f} seconds, compressed file size: {}".format(time.time() - stime, public.to_size(zip_size))) return True, backup_path # redis 备份数据库 def redis_backup_database(self, db_find: dict, args: dict) -> Tuple[bool, str]: if db_find["db_type"] != 0: error_msg = 'We currently do not support backing up remote databases!' self.echo_error(error_msg) return False, error_msg from databaseModel.redisModel import panelRedisDB redis_obj = panelRedisDB() redis_config = redis_obj.get_options(None) db_num = redis_config.get("databases", 16) for db_idx in range(0, int(db_num)): if redis_obj.redis_conn(db_idx) is False: error_msg = "Failed to connect to local Redis database" self.echo_error(error_msg) return False, error_msg redis_obj.redis_conn(db_idx).save() redis_obj_t = redis_obj.redis_conn(0) if redis_obj_t is False: error_msg = "Failed to connect to local Redis database" self.echo_error(error_msg) return False, error_msg redis_dir = redis_obj_t.config_get().get("dir", "") src_path = os.path.join(redis_dir, "dump.rdb") if not os.path.exists(src_path): error_msg = 'The database file does not exist! {}'.format(src_path) self.echo_error(error_msg) return False, error_msg file_name = "{sid}_{backup_time}_redis_data.rdb".format(sid=db_find["id"], backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) # 调用 get_backup_dir 函数来获取备份目录的路径 redis_backup_dir = self.get_backup_dir(db_find, args, "redis") # 使用获取的路径来构建备份文件的路径 backup_path = os.path.join(redis_backup_dir, file_name) db_size = os.path.getsize(src_path) self.echo_info('Backup Redis database') self.echo_info("Backup file size: {}".format(public.to_size(db_size))) self.echo_info("Backup path: {}".format(src_path)) disk_path, disk_free, disk_inode = self.get_disk_free(self._REDIS_BACKUP_DIR) self.echo_info("The available disk space for partition {} is: {}, and the available Inode is: {}".format(disk_path, public.to_size(disk_free), disk_inode)) if disk_path: if disk_free < db_size: error_msg = "The available disk space of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(public.to_size(db_size)) self.echo_error(error_msg) return False, error_msg if disk_inode < self._inode_min: error_msg = "The available Inode for the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(self._inode_min) self.echo_error(error_msg) return False, error_msg stime = time.time() self.echo_info("Start backing up the database:{}".format(public.format_date(times=stime))) import shutil shutil.copyfile(src_path, backup_path) if not os.path.exists(backup_path): error_msg = "Database backup failed!" self.echo_error(error_msg) self.echo_info(public.readFile(self._err_log)) return False, error_msg backup_size = os.path.getsize(backup_path) # self.check_disk_space(backup_size,self._REDIS_BACKUP_DIR,type=1) self.echo_info("Database backup completed, taking {:.2f} seconds, file size: {}".format(time.time() - stime, public.to_size(backup_size))) return True, backup_path # pgsql 备份数据库 def pgsql_backup_database(self, db_find: dict, args: dict) -> Tuple[bool, str]: from databaseModel.pgsqlModel import panelPgsql storage_type = args.get("storage_type", "db") # 备份的文件数量, 按照数据库 | 按照表 table_list = args.get("table_list", []) # 备份的集合 db_name = db_find["name"] db_user = "postgres" db_host = "127.0.0.1" if db_find["db_type"] == 0: db_port = panelPgsql.get_config_options("port", int, 5432) t_path = os.path.join(public.get_panel_path(), "data/postgresAS.json") if not os.path.isfile(t_path): error_msg = "Administrator password not set!" self.echo_error(error_msg) return False, error_msg db_password = json.loads(public.readFile(t_path)).get("password", "") if not db_password: error_msg = "The database password is empty! Please set the database password first!" self.echo_error(error_msg) return False, error_msg elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('pgsql')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] db_user = conn_config["db_user"] db_password = conn_config["db_password"] else: error_msg = "Unknown database type" self.echo_error(error_msg) return False, error_msg pgsql_obj = panelPgsql().set_host(host=db_host, port=db_port, database=None, user=db_user, password=db_password) status, err_msg = pgsql_obj.connect() if status is False: error_msg = "Failed to connect to database [{}: {}]".format(db_host, int(db_port)) self.echo_error(error_msg) return False, error_msg db_size = 0 db_data = pgsql_obj.query("SELECT pg_database_size('{}') AS database_size;".format(db_name)) if isinstance(db_data, list) and len(db_data) != 0: db_size = db_data[0][0] if db_size == 0: error_msg = "The specified database '{}' has no data!".format(db_name) self.echo_error(error_msg) return False, error_msg self.echo_info('Backup PgSQL database: {}'.format(db_name)) self.echo_info("Database size: {}".format(public.to_size(db_size))) disk_path, disk_free, disk_inode = self.get_disk_free(self._PGSQL_BACKUP_DIR) self.echo_info("The available disk space for partition {} is: {}, and the available Inode is: {}".format(disk_path, public.to_size(disk_free), disk_inode)) if disk_path: if disk_free < db_size: error_msg = "The available disk space of the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(public.to_size(db_size)) self.echo_error(error_msg) return False, error_msg if disk_inode < self._inode_min: error_msg = "The available Inode for the target partition is less than {}, and the backup cannot be completed. Please increase the disk capacity or change the default backup directory on the settings page!".format(self._inode_min) self.echo_error(error_msg) return False, error_msg stime = time.time() self.echo_info("Start exporting database:{}".format(public.format_date(times=stime))) # 调用 get_backup_dir 函数来获取备份目录的路径 pgsql_backup_dir = self.get_backup_dir(db_find, args, "pgsql") # 使用获取的路径来构建备份文件的路径 db_backup_dir = os.path.join(pgsql_backup_dir, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{backup_time}_pgsql_data".format(db_name=db_name, backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) shell = "'{pgdump_bin}' --host='{db_host}' --port={db_port} --username='{db_user}' --dbname='{db_name}' --clean".format( pgdump_bin=self._PGDUMP_BIN, db_host=db_host, db_port=int(db_port), db_user=db_user, db_name=db_name, ) if storage_type == "db": # 导出单个文件 file_name = file_name + ".sql.gz" backup_path = os.path.join(db_backup_dir, file_name) table_shell = "" if len(table_list) != 0: table_shell = "--table='" + "' --table='".join(table_list) + "'" shell += " {table_shell} | gzip > '{backup_path}'".format(table_shell=table_shell, backup_path=backup_path) public.ExecShell(shell, env={"PGPASSWORD": db_password}) else: # 按表导出 export_dir = os.path.join(db_backup_dir, file_name) if not os.path.isdir(export_dir): os.makedirs(export_dir) for table_name in table_list: tb_backup_path = os.path.join(export_dir, "{table_name}.sql".format(table_name=table_name)) tb_shell = shell + " --table='{table_name}' > '{tb_backup_path}'".format(table_name=table_name, tb_backup_path=tb_backup_path) public.ExecShell(tb_shell, env={"PGPASSWORD": db_password}) backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell("cd '{backup_dir}' && zip -m '{backup_path}' -r '{file_name}'".format(backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name)) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}", format(export_dir)) # public.ExecShell(shell, env={"PGPASSWORD": db_password}) if not os.path.exists(backup_path): error_msg = "Database backup failed!" self.echo_error(error_msg) self.echo_info(public.readFile(self._err_log)) return False, error_msg gz_size = os.path.getsize(backup_path) # self.check_disk_space(gz_size,self._PGSQL_BACKUP_DIR,type=1) self.echo_info("Database backup completed, taking {:.2f} seconds, compressed file size: {}".format(time.time() - stime, public.to_size(gz_size))) return True, backup_path def generate_success_title(self, task_name): from send_mail import send_mail sm = send_mail() now = public.format_date(format="%Y-%m-%d %H:%M") server_ip = sm.GetLocalIp() title = "{}-{} Task execution successful".format(server_ip, task_name) return title def generate_failture_title(self, task_name): title = "aapanel Plan Task Backup Failure Reminder".format(task_name) return title def generate_all_failture_notice(self, task_name, msg, backup_type, remark=""): from send_mail import send_mail sm = send_mail() now = public.format_date(format="%Y-%m-%d %H:%M:%S") server_ip = sm.GetLocalIp() if remark: remark = "\n* Task note: {}".format(remark) notice_content = """####Planned task execution failed: * Server IP: {} * time: {} * Plan task name:{}{} * The following is a list of {} backup failures:{} Please handle it as soon as possible""".format( server_ip, now, task_name, remark, backup_type, msg) return notice_content def generate_failture_notice(self, task_name, msg, remark): from send_mail import send_mail sm = send_mail() now = public.format_date(format="%Y-%m-%d %H:%M:%S") server_ip = sm.GetLocalIp() if remark: remark = "\n* Task note: {}".format(remark) notice_content = """####Planned task execution failed: * Server IP: {} * time: {} * Plan task name:{}{} * Error message: {} Please handle it as soon as possible""".format( server_ip, now, task_name, remark, msg) return notice_content def generate_disk_notice(self, task_name,free_space_gb,remark=""): from send_mail import send_mail sm = send_mail() now = public.format_date(format="%Y-%m-%d %H:%M:%S") server_ip = sm.GetLocalIp() if remark: remark = "\n* Task note: {}".format(remark) notice_content = """####Warm reminder for backup files of planned tasks: * Server IP: {} * time: {} * Plan task name: {}{} * reminder: We have detected that your server's disk space is insufficient {} G, which may cause subsequent backup file failures. Please clean up the disk as soon as possible! """.format( server_ip, now, task_name, remark,free_space_gb) return notice_content def get_cron_info(self, cron_name): """ 通过计划任务名称查找计划任务配置参数 """ try: cron_info = public.M('crontab').where('echo=?', (cron_name,)) \ .field('name,save_local,notice,notice_channel,id,sType').find() return cron_info except Exception as e: pass return {} def send_success_notification(self, msg, target="", remark=""): pass def send_disk_notification(self,free_space_gb,remark=""): """发送任务失败消息 :error_msg 错误信息 :remark 备注 """ if not self.cron_info: return cron_info = self.cron_info cron_title = cron_info["name"] notice = cron_info["notice"] notice_channel = cron_info["notice_channel"] if notice == 0 or not notice_channel: return if notice == 1 or notice == 2: title = self.generate_failture_title(cron_title) task_name = cron_title msg=self.generate_disk_notice(task_name,free_space_gb,remark) res = self.send_notification(notice_channel, title, msg) if res: self.echo_info("The message notification has been sent.") def send_failture_notification(self, error_msg, target="", remark=""): """发送任务失败消息 :error_msg 错误信息 :remark 备注 """ if self._backup_all: return if not self.cron_info: return cron_info = self.cron_info cron_title = cron_info["name"] save_local = cron_info["save_local"] notice = cron_info["notice"] notice_channel = cron_info["notice_channel"] self.save_backup_status(False, target, msg=error_msg) if notice == 0 or not notice_channel: return if notice == 1 or notice == 2: title = self.generate_failture_title(cron_title) task_name = cron_title msg = self.generate_failture_notice(task_name, error_msg, remark) res = self.send_notification(notice_channel, title, msg) if res: self.echo_info("The message notification has been sent.") def send_all_failture_notification(self, backup_type, results, remark=""): """统一发送任务失败消息 :results [(备份对象, 备份结果,错误信息),...] :remark 备注 """ if not self.cron_info: return cron_info = self.cron_info cron_title = cron_info["name"] save_local = cron_info["save_local"] notice = cron_info["notice"] notice_channel = cron_info["notice_channel"] if notice == 0 or not notice_channel: return if notice == 1 or notice == 2: title = self.generate_failture_title(cron_title) type_desc = { "site": "site", "database": "database" } backup_type_desc = type_desc[backup_type] task_name = cron_title failture_count = 0 total = 0 content = "" for obj in results: total += 1 obj_name = obj[0] result = obj[1] if not result: failture_count += 1 # content += "<tr><td style='color:red'>{}</td><tr>".format(obj_name) content += "\n{}".format(obj_name) if failture_count > 0: if self._cloud: remark = "Backup to {}, total {} {}, failed {}.".format( self._cloud._title, total, backup_type_desc, failture_count) else: remark = "Backup failed for {}/total of {} sites".format( failture_count, total, backup_type_desc) msg = self.generate_all_failture_notice(task_name, content, backup_type_desc, remark) res = self.send_notification(notice_channel, title, msg=msg, total=total, failture_count=failture_count) if res: self.echo_info("The message notification has been sent.") else: self.echo_error("Message notification sending failed.") def send_notification(self, channel, title, msg="", total=0, failture_count=0,cron_info=None): """发送通知 Args: channel (str): 消息通道,多个用英文逗号隔开 title (str): 通知标题 msg (str, optional): 消息内容. Defaults to "". Returns: bool: 通知是否发送成功 """ try: from config import config from panelPush import panelPush tongdao = [] if channel.find(",") >= 0: tongdao = channel.split(",") else: tongdao = [channel] error_count = 0 con_obj = config() get = public.dict_obj() msg_channels = con_obj.get_msg_configs(get) error_channel = [] channel_data = {} msg_data = {} for ch in tongdao: # 根据不同的消息通道准备不同的内容 if ch == "mail": msg_data = { "msg": msg.replace("\n", "<br/>"), "title": title } if ch in ["dingding", "weixin", "feishu", "wx_account", "tg"]: msg_data["msg"] = msg # print("msg",msg_data["msg"]) if ch in ["sms"]: if not cron_info: task_name=self.cron_info["name"] else: task_name=cron_info["name"] if total > 0 and failture_count > 0: msg_data["sm_type"] = "backup_all" msg_data["sm_args"] = { "panel_name": public.GetConfigValue('title'), "task_name": task_name, "failed_count": failture_count, "total": total } else: msg_data["sm_type"] = "backup" msg_data["sm_args"] = { "panel_name": public.GetConfigValue('title'), "task_name": task_name } channel_data[ch] = msg_data # print("channel data:") # print(channel_data) # 即时推送 pp = panelPush() push_res = pp.push_message_immediately(channel_data) if push_res["status"]: channel_res = push_res["msg"] for ch, res in channel_res.items(): if not res["status"]: if ch in msg_channels: error_channel.append(msg_channels[ch]["title"]) error_count += 1 if not push_res["status"] or error_count: self.echo_error("Message channel: {} failed to send!".format(",".join(error_channel))) else: self.echo_info("Message sent successfully.") if error_count == len(tongdao): return False return True except Exception as e: import traceback print(traceback.format_exc()) return False def send_notification2(self, channel, title, msg=""): try: from send_mail import send_mail tongdao = [] if channel.find(",") >= 0: tongdao = channel.split(",") else: tongdao = [channel] sm = send_mail() send_res = [] error_count = 0 channel_names = { "mail": "mail", "dingidng": "dingidng" } error_channel = [] settings = sm.get_settings() for td in tongdao: _res = False if td == "mail": if len(settings["user_mail"]['mail_list']) == 0: continue mail_list = settings['user_mail']['mail_list'] if len(mail_list) == 1: mail_list = mail_list[0] _res = sm.qq_smtp_send(mail_list, title=title, body=msg.replace("\n", "<br/>")) if not _res: error_count += 1 error_channel.append(channel_names[td]) if td == "dingding": if len(settings["dingding"]['info']) == 0: continue _res = sm.dingding_send(msg) send_res.append(_res) if not _res: error_count += 1 error_channel.append(channel_names[td]) if error_count > 0: print("Message channel: {} failed to send!".format(",".join(error_channel))) if error_count == len(tongdao): return False return True except Exception as e: print(e) return False def save_backup_status(self, status, target="", msg=""): """保存备份的状态""" try: if not self.cron_info: return cron_id = self.cron_info["id"] sql = public.M("system").dbfile("system").table("backup_status") sql.add("id,target,status,msg,addtime", (cron_id, target, status, msg, time.time(),)) except Exception as e: print("Save backup status exception: {}.".format(e)) # 切割文件上传 def split_upload_file(self, backup_path: str, upload_path: str) -> Tuple[bool, str]: is_status = True data = "" self.echo_info("File splitting in progress, please wait...") if self._cloud is None or self._cloud_new is None: return False, "No cloud storage specified!" if self.cron_info.get("split_type") == "size": is_status, data = self.split_file( file_path=backup_path, split_size=self.cron_info.get("split_value"), ) elif self.cron_info.get("split_type") == "num": is_status, data = self.split_file( file_path=backup_path, split_num=self.cron_info.get("split_value"), ) if is_status is False: return False, data save_dir = data["save_dir"] upload_dir = os.path.join(self._cloud_new.backup_path, upload_path, os.path.basename(save_dir)) self.echo_info("File splitting is in progress. The size of each individual file after splitting is {} M, and the number of files after splitting is {}".format(data.get("split_size"), data.get("split_num"))) join_msg = """Split file recovery method: 1. Download cloud storage directory: {upload_dir} On the panel 2. Enter the directory and click on 'File Merge'""".format( upload_dir=upload_dir ) self.echo_info(join_msg) # upload_path = os.path.join(upload_path, os.path.basename(save_dir)) for name in os.listdir(save_dir): path = os.path.join(save_dir, name) if not os.path.isfile(path): continue upload_path_temp = os.path.join(upload_dir, name) if not self._cloud_new.cloud_upload_file(path, upload_path_temp): public.ExecShell("chattr -i -R {split_dir}".format(split_dir=save_dir)) public.ExecShell("rm -rf {}".format(save_dir)) error_msg = "Backup task execution failed." if hasattr(self._cloud, "error_msg"): if self._cloud.error_msg: error_msg = self._cloud.error_msg self.echo_error(error_msg) if os.path.exists(backup_path): os.remove(backup_path) return False, error_msg self.echo_info("Successfully uploaded to {}".format(self._cloud._title)) public.ExecShell("chattr -i -R {split_dir}".format(split_dir=save_dir)) public.ExecShell("rm -rf {}".format(save_dir)) return True, upload_dir def split_file(self, file_path: str, # 文件路径 split_size: int = None, # 切割大小 MB split_num: int = None, # 切割数量 split_file_name: str = None, # 切割文件名,默认为切割目标文件名 split_file_ext: str = None, # 切割文件后缀 save_path: str = None, # 切割后保存的路径 ) -> Tuple[bool, Union[str, dict]]: import math if not os.path.isfile(file_path): return False, "The file does not exist!" totoal_size = os.path.getsize(file_path) split_file_option = "" totoal_split_size = math.ceil(totoal_size / 1024 / 1024) if split_size: # 默认使用大小 split_file_option = "--bytes={}M".format(split_size) split_num = math.ceil(totoal_split_size / split_size) elif split_num: # 按照数量拆分 split_file_option = "--number={}".format(split_num) split_size = round(totoal_split_size / split_num) else: # 默认按照大小拆分 100M split_size = 100 split_file_option = "--bytes={}M".format(split_size) split_num = math.ceil(totoal_split_size / split_size) if split_num < 2: return False, "The minimum number of file splits is 2!" file_name = os.path.basename(file_path) if not save_path: # 保存路径 save_path = os.path.dirname(file_path) save_dir_name = "{file_name}_split".format(file_name=file_name) save_dir = os.path.join(save_path, save_dir_name) i = 1 while os.path.isdir(save_dir): save_dir_name = "{file_name}_split-({ectype})".format(file_name=file_name, ectype=i) save_dir = os.path.join(save_path, save_dir_name) i += 1 os.makedirs(save_dir) if not split_file_name: # 切割文件名,默认为切割目标文件名 file_ext_temp = file_name.split(".") if len(file_ext_temp) != 0: split_file_name = "{file_name}_sqlit_".format(file_name=".".join(file_ext_temp[:-1])) else: split_file_name = "{file_name}_sqlit_".format(file_name=file_name) if not split_file_ext: split_file_ext = ".bt_split" split_file_shell = "--additional-suffix={split_file_ext}".format(split_file_ext=split_file_ext) shell = "cd '{save_dir}' && split {split_file_option} --numeric-suffixes=1 --suffix-length={split_length} {split_file_shell} '{file_path}' {split_file_name}".format( save_dir=save_dir, split_file_option=split_file_option, split_length=len(str(split_num)), file_path=file_path, split_file_name=split_file_name, split_file_shell=split_file_shell, ) public.ExecShell(shell) split_file_list = [] for name in os.listdir(save_dir): path = os.path.join(save_dir, name) if not os.path.isfile(path): continue split_file_list.append(name) split_config_info = { "name": file_name, "size": totoal_size, "split_size": split_size, "split_num": split_num, "md5": public.FileMd5(file_path), "split_file": split_file_list, "save_dir": save_dir, } split_config_path = os.path.join(save_dir, "split_config.bt_split_json") public.writeFile(split_config_path, json.dumps(split_config_info)) public.ExecShell("chattr +i -R {split_dir}".format(split_dir=save_dir)) return True, split_config_info def get_backup_dir(self, db_find: dict, args: dict, db_type: str) -> str: backup_dir = args.get("db_backup_path", "") save_local = args.get("save_local", "") if db_find["db_type"] == 0 or save_local == 1: # 本地数据库 if backup_dir: db_backup_dir = os.path.join(backup_dir, "database") specific_db_backup_dir = os.path.join(db_backup_dir, db_type, "crontab_backup") else: specific_db_backup_dir = os.path.join(self._DB_BACKUP_DIR, db_type, "crontab_backup") else: specific_db_backup_dir = os.path.join(self._DB_BACKUP_DIR, db_type, "crontab_backup") return specific_db_backup_dir # def get_site_backup_dir(self, backupto: str, save_local: int, db_backup_path: str, site_name: str) -> str: def get_site_backup_dir(self, backupto: str, db_backup_path: str, site_name: str) -> str: site_backup_dir = self._BACKUP_DIR # 给 site_backup_dir 一个默认值 if backupto == "localhost" or backupto == "Local Disk": if db_backup_path: site_backup_dir = db_backup_path return site_backup_dir def check_disk_space(self,file_size,backup_dir): min_required_space = max(file_size* 5, 1* 1024 * 1024 * 1024) # 1GB disk_usage = public.get_disk_usage(backup_dir) if disk_usage.free < min_required_space: free_space_gb = int(min_required_space / (1024 * 1024 * 1024)) self.send_disk_notification(free_space_gb) def check_databases(self): """检查数据表是否存在""" tables = ["backup_status"] import sqlite3 conn = sqlite3.connect("/www/server/panel/data/system.db") cur = conn.cursor() table_key = ",".join(["'"+t+"'" for t in tables]) sel_res = cur.execute("SELECT name FROM sqlite_master WHERE type='table' and name in ({})".format(table_key)) res = sel_res.fetchall() to_commit = False exists_dbs = [] if res: exists_dbs = [d[0] for d in res] if "backup_status" not in exists_dbs: csql = '''CREATE TABLE IF NOT EXISTS `backup_status` ( `id` INTEGER, `target` TEXT, `status` INTEGER, `msg` TEXT DEFAULT "", `addtime` DATETIME DEFAULT CURRENT_TIMESTAMP )''' cur.execute(csql) to_commit = True if to_commit: conn.commit() cur.close() conn.close() return True