Server IP : 104.21.38.3 / Your IP : 162.158.163.252 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
# coding: utf-8 # + ------------------------------------------------------------------- # | aapenl云存储上传接口 # + ------------------------------------------------------------------- # | Copyright (c) 2015-2016 aapanel(http:aapanel.com) All rights reserved. # + ------------------------------------------------------------------- # | Author: sww # + ------------------------------------------------------------------- ''' 增加 先添加/www/server/panel/data/libList.conf文件中的信息 增加对应存储的class --实现检测是否链接成功,实现上传接口封装 链接失败或未登录都返回false,成功返回对象 authorize权限判定函数 增加CloudStoraUpload类中cloud_obj映射 ''' import os import sys import time import traceback import public # 七牛云 class qiniu: flag = True qc_obj = None def __init__(self): if not '/www/server/panel/plugin/qiniu' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/qiniu') try: from qiniu_main import QiNiuClient as qc self.qc_obj = qc() self.qc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.qc_obj else: return False # 上传到天翼云 class tianyiyun: flag = True tyy_obj = None def __init__(self): if not '/www/server/panel/plugin/tianyiyun' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/tianyiyun') try: from tianyiyun_main import tianyiyun_main as tyy self.tyy_obj = tyy() except: print(traceback.format_exc()) self.flag = False def get_obj(self): if self.flag: return self.tyy_obj else: return False # 百度云 class bos: flag = True bc_obj = None def __init__(self): if not '/www/server/panel/plugin/bos' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/bos') try: from bos_main import BOSClient as bc self.bc_obj = bc() self.bc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.bc_obj else: return False # 腾讯云 class cos: flag = True cc_obj = None def __init__(self): if not '/www/server/panel/plugin/txcos' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/txcos') try: from txcos_main import COSClient as cc self.cc_obj = cc() self.cc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.cc_obj else: return False # 又拍云 class upyun: flag = True uc_obj = None def __init__(self): if not '/www/server/panel/plugin/upyun' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/upyun') try: from upyun_main import UpYunClient as uc self.uc_obj = uc() self.uc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.uc_obj else: return False # 阿里云 class alioss: flag = True oc_obj = None def __init__(self): if not '/www/server/panel/plugin/alioss' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/alioss') try: from alioss_main import OSSClient as oc self.oc_obj = oc() self.oc_obj.authorize() except: print(traceback.format_exc()) self.flag = False def get_obj(self): if self.flag: return self.oc_obj else: return False # 未测试 class gdrive: flag = True gc_obj = None def __init__(self): if not '/www/server/panel/plugin/gdrive' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/gdrive') try: from gdrive_main import gdrive_main as gc self.gc_obj = gc() self.gc_obj.resumable_upload = self.resumable_upload self.gc_obj.delete_object=self.delete_object self.gc_obj.build_object_name=self.build_object_name self.gc_obj.__get_folder_id=self.__get_folder_id if not self.gc_obj.set_creds(): self.flag = False except: self.flag = False def get_obj(self): if self.flag: return self.gc_obj else: return False def resumable_upload(self, file_name, object_name, *args, **kwargs): get = public.to_dict_obj( { "filename": file_name, "filepath": object_name } ) return self.gc_obj.upload_file(get) def build_object_name(self, data_type, file_name): import re # 定义不同数据类型的前缀字典 prefix_dict = { "site": "web", "database": "db", "path": "path", } # 确保data_type是字典中的一个键,否则使用"default"作为前缀 data_prefix = prefix_dict.get(data_type, "other") if "database" in file_name: # 特别处理数据库文件的路径 dir_path = '/'.join(file_name.split('/')[:-1]) sub_path_name = '/'.join(dir_path.split('/')[-2:]) + '/' object_name = f'bt_backup/{data_prefix}/{sub_path_name}' else: # 对其他类型文件使用正则表达式进行处理 file_regx = f"{data_prefix}_(.+)_20\\d+_\\d+(?:\\.|_)" sub_search = re.search(file_regx.lower(), file_name) sub_path_name = "" if sub_search: sub_path_name = sub_search.groups()[0] + '/' object_name = f'bt_backup/{data_prefix}/{sub_path_name}' if object_name[:1] == "/": object_name = object_name[1:] return object_name def delete_object(self,filename=None,data_type=None): filename = filename.split('/')[-1] file_id = self.gc_obj._get_file_id(filename) self.gc_obj._delete_file(file_id) def __get_folder_id(self, floder_name): service = build('drive', 'v3', credentials=self.__creds) results = service.files().list(pageSize=10, q="name='{}' and mimeType='application/vnd.google-apps.folder'".format(floder_name),fields="nextPageToken, files(id, name)").execute() items = results.get('files', []) if not items: return [] else: for item in items: return item["id"] # 亚马逊 class aws_s3: flag = True cc_obj = None def __init__(self): if not '/www/server/panel/plugin/aws_s3' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/aws_s3') try: from s3lib.client.aws_s3 import COSClient as cc self.cc_obj = cc() self.cc_obj.upload_file = self.cc_obj.upload_file1 self.cc_obj.resumable_upload = self.cc_obj.multipart_upload self.cc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.cc_obj else: return False # 未测试 未做登录确定 class gcloud_storage: flag = True gsc_obj = None def __init__(self): if not '/www/server/panel/plugin/gcloud_storage' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/gcloud_storage') try: from gcloud_storage_main import gcloud_storage_main as gsc self.gsc_obj = gsc() self.gsc_obj.resumable_upload = self.resumable_upload self.gsc_obj.backup_path="bt_backup" except: print(traceback.format_exc()) self.flag = False def get_obj(self): if self.flag: return self.gsc_obj else: return False def resumable_upload(self, file_name, object_name, *args, **kwargs): get = public.to_dict_obj( { "path": object_name, "filename": file_name } ) return self.gsc_obj.upload_file(get) # 华为云 class obs: flag = True oc_obj = None def __init__(self): if not '/www/server/panel/plugin/obs' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/obs') try: from obs_main import OBSClient as oc self.oc_obj = oc() self.oc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.oc_obj else: return False # 未测试 class msonedrive: flag = True oc_obj = None def __init__(self): if not '/www/server/panel/plugin/msonedrive' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/msonedrive') try: from msonedrive_main import OneDriveClient as oc self.oc_obj = oc() self.oc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.oc_obj else: return False # ftp 未登录验证 class ftp: flag = True ftp_obj = None def __init__(self): if not '/www/server/panel/plugin/ftp' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/ftp') try: from ftp_main import get_client self.ftp_obj = get_client(use_sftp=None) except: self.flag = False def get_obj(self): if self.flag: return self.ftp_obj else: return False # 京东云 class jdcloud: flag = True oc_obj = None def __init__(self): if not '/www/server/panel/plugin/jdcloud' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/jdcloud') try: from jdcloud_main import OBSClient as oc self.oc_obj = oc() self.oc_obj.authorize() except: self.flag = False def get_obj(self): if self.flag: return self.oc_obj else: return False class webdav: flag = True webdav_obj = None def __init__(self): if '/www/server/panel/plugin/webdav' not in sys.path: sys.path.insert(0, '/www/server/panel/plugin/webdav') try: from webdav_main import webdav_main as webdav self.webdav_obj = webdav() self.webdav_obj.backup_path = webdav().default_backup_path self.webdav_obj.delete_file = webdav().delete_object self.webdav_obj.cloud_delete_file = webdav().delete_object except: self.flag = False def get_obj(self): """返回 WebDAV 对象,如果初始化失败则返回 False""" return self.webdav_obj if self.flag else False # 上传到minio class minio: flag = True minio_obj = None def __init__(self): if not '/www/server/panel/plugin/minio' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/minio') try: from minio_main import minio_main as minio self.minio_obj = minio() self.minio_obj.backup_path = minio().default_backup_path self.minio_obj.delete_file = minio().delete_object self.minio_obj.cloud_delete_file =minio().delete_object except: print(traceback.format_exc()) self.flag = False def get_obj(self): if self.flag: return self.minio_obj else: return False # 上传到多吉云 class dogecloud: flag = True dgc_obj = None def __init__(self): if not '/www/server/panel/plugin/dogecloud' in sys.path: sys.path.insert(0, '/www/server/panel/plugin/dogecloud') try: from dogecloud_main import dogecloud_main as dgc self.dgc_obj = dgc() self.dgc_obj.delete_file = dgc().delete_object self.dgc_obj.cloud_delete_file =dgc().delete_object except: print(traceback.format_exc()) self.flag = False def get_obj(self): if self.flag: return self.dgc_obj else: return False # 总函数 class CloudStoraUpload: cloud_list = [] cloud_obj = { 'qiniu': qiniu, 'alioss': alioss, 'ftp': ftp, 'bos': bos, 'obs': obs, 'aws_s3': aws_s3, 'gdrive': gdrive, 'msonedrive': msonedrive, 'gcloud_storage': gcloud_storage, 'upyun': upyun, 'jdcloud': jdcloud, 'txcos': cos, 'tianyiyun': tianyiyun, 'webdav':webdav, 'minio':minio, 'dogecloud':dogecloud } backup_path = "/backup_path/" __CLOUD_TITLE = { "qiniu": "Qiniu Cloud Storage", "alioss": "Alibaba Cloud OSS", "ftp": "FTP Storage", "bos": "Baidu Cloud BOS", "obs": "Huawei Cloud OBS", "aws_s3": "AWS S3", "gdrive": "Google Drive", "msonedrive": "Microsoft OneDrive", "gcloud_storage": "Google Cloud Storage", "upyun": "upyun storage", "jdcloud": "JD Cloud Storage", "txcos": "Tencent Cloud COS", 'tianyiyun': "Tianyi Cloud ZOS", 'webdav':"WebDav", 'minio':"MinIO storage", 'dogecloud':"Duoji Cloud COS" } def __init__(self): self.obj = None # 获取当前云存储的安装列表 import json tmp = public.readFile('/www/server/panel/data/libList.conf') if tmp: libs = json.loads(tmp) for lib in libs: if not 'opt' in lib: continue filename = '/www/server/panel/plugin/{}'.format(lib['opt']) if not os.path.exists(filename): continue self.cloud_list.append(lib['opt']) def run(self, cloud_name): if cloud_name not in self.cloud_list or cloud_name not in self.cloud_obj.keys(): return False obj = self.cloud_obj[cloud_name]() if not obj.flag: return False self.obj = obj.get_obj() if not hasattr(self.obj, "_name"): self.obj._name = cloud_name if not hasattr(self.obj, "_title"): self.obj._title = self.__CLOUD_TITLE.get(cloud_name, "") if not hasattr(self.obj, "backup_path"): self.obj.backup_path = self.backup_path if not hasattr(self.obj, "error_msg"): self.obj.error_msg = "" if not hasattr(self.obj, "upload_file"): self.obj.upload_file = self.__upload_api if not hasattr(self.obj, "resumable_upload"): self.obj.resumable_upload = self.__upload_api if not hasattr(self.obj, "delete_object"): self.obj.delete_object = self.__upload_api if type(self.obj.backup_path) == str: self.backup_path = self.obj.backup_path return self.obj def __upload_api(self, *args, **kwargs): self.obj.error_msg = public.lang("Failed to link to cloud storage!") return False def cloud_upload_file(self, file_name: str, upload_path: str, *args, **kwargs): """按照数据类型上传文件 针对 panelBackup v1.2以上调用 :param file_name: 上传文件名称 :param data_type: 数据类型 site/database/path :return: True/False """ try: return self.obj.resumable_upload(file_name, object_name=upload_path, *args, **kwargs) except Exception as _: return False def cloud_delete_dir(self, file_path: str, *args, **kwargs): """删除文件夹 """ try: dir_list = self.obj.get_list(file_path) for info in dir_list["list"]: path = os.path.join(file_path, info['name']) self.obj.delete_object(path, *args, **kwargs) return True except Exception as e: return False def cloud_delete_file(self, file_path: str, *args, **kwargs): """删除文件 """ try: return self.obj.delete_object(file_path, *args, **kwargs) except Exception as e: print(e) return False def cloud_download_file(self, clould_path, loacl_path, *args, **kwargs): try: # print(clould_path, loacl_path) if not self.obj: return False if self.obj.get_list(clould_path)['list']: # 调用目录下载 self.cloud_download_dir(clould_path, loacl_path, *args, **kwargs) url = self.get_file_download_url(clould_path) if not url['status']: return public.returnMsg(False, '云存储下载文件失败') # import panelTask # task_obj = panelTask.bt_task() # task_obj.create_task('下载文件', 1, url['msg'], loacl_path) exec = 'wget --no-check-certificate -T 30 -t 5 -d -O {} {} '.format(loacl_path, url['msg']) public.ExecShell(exec) time.sleep(1) if os.path.exists(loacl_path): return public.returnMsg(True, '云存储下载文件成功') return public.returnMsg(False, '云存储下载文件失败') except: return public.returnMsg(False, '云存储下载文件失败') def cloud_download_dir(self, clould_path, loacl_path, *args, **kwargs): try: if not self.obj: return False data = self.obj.get_list(clould_path) if not os.path.exists(loacl_path): os.makedirs(loacl_path) for i in data['list']: path = os.path.join(data['path'], i['name']) loacl_path1 = os.path.join(loacl_path, i['name']) if self.obj.get_list(path)['list']: # 判断是否是文件夹 if not os.path.exists(loacl_path1): os.makedirs(loacl_path1) self.cloud_download_dir(path, loacl_path1, *args, **kwargs) else: self.obj.download_file(path, loacl_path1, *args, **kwargs) return except: return public.returnMsg(False, '云存储下载文件失败') def get_file_download_url(self, down_load_path): if self.obj.get_list(down_load_path)['list']: return public.returnMsg(False, '不支持文件夹下载,请手动到云存储服务端下载') file_name = os.path.basename(down_load_path) down_load_path = os.path.dirname(down_load_path) data = self.obj.get_list(down_load_path) if data['list']: for i in data['list']: if i['name'] == file_name: return public.returnMsg(True, i['download']) return public.returnMsg(False, '获取下载链接失败') # if __name__ == '__main__': # c = CloudStoraUpload() # a = c.run('alioss') # x = 'bt_backup/database/test_cron_back_1_2023-10-10_02-30-03_mysql_data.sql.gz' # print(c.obj.backup_path) # c.cloud_download_file('/bt_backup/', '/xiaopacai/backup')