Server IP : 172.67.216.182 / Your IP : 162.158.88.11 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2019 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <[email protected]> # ------------------------------------------------------------------- # ------------------------------ # AUTH验证接口 # ------------------------------ import public, time, json, os, requests from BTPanel import session, cache from public.validate import Param class panelAuth: __request_url = None __product_list_path = 'data/product_list.pl' __product_bay_path = 'data/product_bay.pl' __product_id = '100000011' __official_url = public.OfficialApiBase() __failed_connect_server='Failed to connect to the server!' def create_serverid(self, get): try: userPath = 'data/userInfo.json' if not os.path.exists(userPath): return public.return_message(-1, 0, public.lang("Please login with account first")) tmp = public.readFile(userPath) if len(tmp) < 2: tmp = '{}' data = json.loads(tmp) data['uid'] = data['id'] if not data: return public.return_message(-1, 0, public.lang("Please login with account first")) if not 'server_id' in data: s1 = public.get_mac_address() + public.get_hostname() s2 = self.get_cpuname() serverid = public.md5(s1) + public.md5(s2) data['server_id'] = serverid public.writeFile(userPath, json.dumps(data)) return data except: return public.return_message(-1, 0, public.lang("Please login with account first")) def create_plugin_other_order(self, get): pdata = self.create_serverid(get) pdata['pid'] = get.pid pdata['cycle'] = get.cycle p_url = public.GetConfigValue('home') + '/api/Pluginother/create_order' if get.type == '1': pdata['renew'] = 1 p_url = public.GetConfigValue('home') + '/api/Pluginother/renew_order' return public.return_message(0, 0, json.loads(public.httpPost(p_url,pdata))) def get_order_stat(self, get): pdata = self.create_serverid(get) pdata['order_id'] = get.oid p_url = public.GetConfigValue('home') + '/api/Pluginother/order_stat' if get.type == '1': p_url = public.GetConfigValue('home') + '/api/Pluginother/re_order_stat' return public.return_message(0, 0, json.loads(public.httpPost(p_url,pdata))) def check_serverid(self, get): if get.serverid != self.create_serverid(get): return public.return_message(-1,0,False) return public.return_message(0,0,True) # 获取价格列表 新增多机购买 def get_plugin_price(self, get): # 校验参数 try: get.validate([ Param('product_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: userPath = 'data/userInfo.json' if not 'pluginName' in get and not 'product_id' in get: return public.return_message(-1, 0, public.lang("Parameter ERROR!")) if not os.path.exists(userPath): return public.return_message(-1, 0, public.lang("Please login with account first")) params = {} if not hasattr(get, 'product_id'): params['product_id'] = self.get_plugin_info(get.pluginName)['id'] else: params['product_id'] = get.product_id data = self.send_cloud('{}/api/product/pricesV3'.format(self.__official_url), params) if data['status'] ==-1: return data data=data['message'] if not data: return public.return_message(-1, 0, public.lang("Please log in to your aaPanel account on the panel first!")) if not data['success']: return public.return_message(-1, 0,data['msg']) # if len(data['res']) == 6: # return data['res'][3:] return public.return_message(0, 0,data['res']) except: public.print_log(public.get_error_info()) del(session['get_product_list']) return public.return_message(-1, 0,'Syncing information, please try again!\n {}',(public.get_error_info(),)) def get_plugin_info(self, pluginName): data = self.get_business_plugin(None) if data['status']==-1: return None for d in data['message']: if d['name'] == pluginName: return d return None def get_plugin_list(self, get): try: if not session.get('get_product_bay') or not os.path.exists(self.__product_bay_path): data = self.send_cloud('get_order_list_byuser', {}) if data['status']==-1:return data if data['message']: public.writeFile(self.__product_bay_path, json.dumps(data['message'])) session['get_product_bay'] = True data = json.loads(public.readFile(self.__product_bay_path)) return public.return_message(0,0,data) except: return public.return_message(-1,0,None) def get_buy_code(self, get): # 校验参数 try: get.validate([ Param('pid').Integer(), Param('cycle').Integer(), Param('source').Integer(), Param('num').Integer(), Param('charge_type').Integer(), Param('src').Integer(), Param('is_ipv6').Integer(), Param('coupon_id').Integer(), Param('cycle_unit').String(), Param('pay_channel').String(), Param('ip').String(), Param('os').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) cycle = getattr(get, 'cycle', 1) params = {} params['cycle'] = cycle params['product_id'] = get.pid params['src'] = 2 params['trigger_entry'] = get.source params['pay_channel'] = 2 # 0.管理后台生成 1.Ping++ 2.Stripe 3.Paypal 10.抵扣券 if hasattr(get, 'coupon_id'): params['coupon_id'] = get.coupon_id if hasattr(get, 'pay_channel'): params['pay_channel'] = get.pay_channel #优惠券检测 if get.pay_channel==10 and not hasattr(get, 'coupon_id'): return public.return_message(-1, 0, public.lang("parameter error: coupon_id")) if hasattr(get, 'charge_type'): params['charge_type'] = get.charge_type if int(get.charge_type) == 1: if not hasattr(get, 'cycle_unit'):return public.return_message(-1, 0, public.lang("parameter error: cycle_unit")) params['cycle_unit'] = get.cycle_unit env_info = public.fetch_env_info() params['environment_info'] = json.dumps(env_info) params['server_id'] = env_info['install_code'] # 多机购买 数量 if not hasattr(get, 'num'): return public.return_message(-1, 0, public.lang("parameter error: num")) params['num'] = get.num # 添加购买来源 # params['source'] = get.source data = self.send_cloud('{}/api/order/product/create'.format(self.__official_url), params) if data['status'] == -1: return data if int(params.get('pay_channel', 0)) == 10: # 刷新授权状态 public.load_soft_list() public.refresh_pd() return public.return_message(0, 0, data['message']['res']) def get_stripe_session_id(self, get): params = {} if hasattr(get, 'order_no'): params['order_no'] = get.order_no if hasattr(get, 'order_id'): params['order_id'] = get.order_id if hasattr(get, 'subscribe'): params['subscribe'] = get.subscribe # 开启本地支付 if hasattr(get, 'adaptive_pricing'): params['adaptive_pricing'] = get.adaptive_pricing if not params.get('order_no', None) and not params.get('order_id', None): return public.return_message(-1, 0, public.lang("parameter error")) data = self.send_cloud('{}/api/order/product/pay'.format(self.__official_url), params) session['focre_cloud'] = True return public.return_message(0, 0, data['message']['res']) # paypal支付 def get_paypal_session_id(self, get): params = {} if hasattr(get, 'oid'): params['oid'] = get.oid if not params.get('oid', None): return public.return_message(-1, 0, public.lang("parameter error")) data = self.send_cloud('{}/api/paypal/create_order'.format(self.__official_url), params) if data['status']==-1: return data session['focre_cloud'] = True data=data['message'] data2 = { "status": data.get("success", False), "res": data.get("res", ""), "nonce": data.get("nonce", 0), } return public.return_message(0, 0, data2) # paypal 支付确认 def check_paypal_status(self, get): params = {} if hasattr(get, 'paypal_order_id'): params['paypal_order_id'] = get.paypal_order_id if not params.get('paypal_order_id', None): return public.return_message(-1, 0, public.lang("parameter error")) data = self.send_cloud('{}/api/paypal/capture_order'.format(self.__official_url), params) data=data['message'] status_code=-1 status=data.get("success", False) if status: status_code=0 # 刷新授权状态 public.load_soft_list() public.refresh_pd() data2 = { "res": data.get("res", ""), "nonce": data.get("nonce", 0), } return public.return_message(status_code, 0, data2) def check_pay_status(self, get): params = {} params['id'] = get.id data = self.send_cloud('check_product_pays', params) if data['status']==-1:return data data=data['message'] if not data: return public.return_message(-1, 0, public.lang("Fail to connect to the server!")) if data['status'] == True: self.flush_pay_status(get) if 'get_product_bay' in session: del (session['get_product_bay']) return public.return_message(0, 0, data) def flush_pay_status(self, get): if 'get_product_bay' in session: del (session['get_product_bay']) data = self.get_plugin_list(get) if data['status']==-1: return public.return_message(-1, 0, public.lang("Fail to connect to the server!")) return public.return_message(0, 0, public.lang("Flush status success")) def get_renew_code(self): pass def check_renew_code(self): pass def get_business_plugin(self, get): try: if not session.get('get_product_list') or not os.path.exists(self.__product_list_path): data = self.send_cloud('{}/api/product/chargeProducts'.format(self.__official_url), {}) if data['message']['success']: public.writeFile(self.__product_list_path, json.dumps(data['message']['res'])) session['get_product_list'] = True data = json.loads(public.readFile(self.__product_list_path)) return public.return_message(0,0,data) except: return public.return_message(-1,0,None) def get_ad_list(self): pass def check_plugin_end(self): pass def get_re_order_status_plugin(self, get): params = {} params['pid'] = getattr(get, 'pid', 0) data = self.send_cloud('get_re_order_status', params) if data['status']==-1:return data data=data['message'] if not data: return public.return_message(-1, 0, public.lang("Fail to connect to the server!")) if data['status'] == True: self.flush_pay_status(get) if 'get_product_bay' in session: del (session['get_product_bay']) return public.return_message(0, 0, data) def get_voucher_plugin(self, get): # 校验参数 try: get.validate([ Param('pid').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) params = {} params['product_id'] = getattr(get, 'pid', 0) params['status'] = '0' data = self.send_cloud('{}/api/user/productVouchers'.format(self.__official_url), params) if data['status']==-1:return data if not data['message']: return public.return_message(0, 0,[]) return public.return_message(0, 0,data['message']['res']) def get_voucher_plugin_all(self, get): params = {} params['status'] = '0' data = self.send_cloud('{}/api/user/productVouchers'.format(self.__official_url), params) if data['status']==-1:return data if not data['message']: return public.return_message(0, 0,[]) return public.return_message(0, 0,data['message']['res']) def create_order_voucher_plugin(self, get): # 校验参数 try: get.validate([ Param('cycle_unit').String(), Param('pid').Integer(), Param('coupon_id').Integer(), Param('cycle').Integer(), Param('charge_type').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) cycle = getattr(get, 'cycle', '1') params = {} params['cycle'] = cycle params['cycle_unit'] = get.cycle_unit params['coupon_id'] = get.coupon_id params['src'] = 2 params['pay_channel'] = 10 params['charge_type'] = get.charge_type env_info = public.fetch_env_info() params['environment_info'] = json.dumps(env_info) params['server_id'] = env_info['install_code'] data = self.send_cloud('{}/api/order/product/create'.format(self.__official_url), params) session['focre_cloud'] = True if data['message']['success']: # 刷新授权状态 public.load_soft_list() public.refresh_pd() return public.return_message(0, 0, public.lang("Activate successfully")) return public.return_message(-1, 0, public.lang("Activate failed")) def send_cloud(self, cloudURL, params): try: userInfo = self.create_serverid(None) if 'token' not in userInfo: return public.return_message(-1,0,None) url_headers = {"Content-Type": "application/json", "authorization": "bt {}".format(userInfo['token']) } resp = requests.post(cloudURL, params=params, headers=url_headers) resp_json = resp.json() if resp.status_code != 200 or not resp_json.get('success', False): # 当接口错误信息存在时,返回接口错误信息 if isinstance(resp_json.get('res', None), str): raise public.HintException(str(resp_json['res'])) # return public.return_message(-1, 0, str(resp_json['res'])) # 否则统一返回连接服务器失败 return public.return_message(-1, 0, self.__failed_connect_server) # if not resp['res']: # public.print_log('not res:') # return public.return_message(-1,0,self.__failed_connect_server) return public.return_message(0, 0, resp_json) except public.HintException: raise except Exception: return public.return_message(-1,0,self.__failed_connect_server) def send_cloud_v2(self, cloudURL, params): try: userInfo = self.create_serverid(None) if 'token' not in userInfo: return public.return_message(-1,0,None) url_headers = {"Content-Type": "application/json", "authorization": "bt {}".format(userInfo['token']) } resp = requests.post(cloudURL, params=params, headers=url_headers) resp = resp.json() if not resp or 'res' not in resp: return public.return_message(-1,0,self.__failed_connect_server) if not resp['res']: return public.return_message(-1,0,None) if resp['success'] == False: return public.return_message(-1,0,self.__failed_connect_server) return public.return_message(0,0,resp['res']) except: return public.return_message(-1,0,self.__failed_connect_server) def send_cloud_v3(self, module, params): userInfo = self.create_serverid(None); if 'status' in userInfo: params['uid'] = 0 params['serverid'] = '' else: params['uid'] = userInfo['uid'] params['serverid'] = userInfo['serverid'] params['access_key'] = userInfo['access_key'] params['os'] = 'Windows' data = self.send_cloud_pro_2('obtain_coupons', params) return data def send_cloud_get(self, cloudURL, params): try: userInfo = self.create_serverid(None) if 'token' not in userInfo: return public.return_message(-1,0,None) url_headers = {"Content-Type": "application/json", "authorization": "bt {}".format(userInfo['token']) } resp = requests.get(cloudURL,headers=url_headers, stream=True).json() if not resp or 'res' not in resp: return public.return_message(-1,0,self.__failed_connect_server) if resp['success'] == False: return public.return_message(-1,0,self.__failed_connect_server) return public.return_message(0,0,resp['res']) except: return public.return_message(-1,0,self.__failed_connect_server) def send_cloud_pro(self, module, params): try: cloudURL = '{}/api/order/product/'.format(self.__official_url) userInfo = self.create_serverid(None) params['os'] = 'Linux' if 'status' in userInfo: params['server_id'] = '' else: params['server_id'] = userInfo['server_id'] url_headers = {"authorization": "bt {}".format(userInfo['token'])} resp = requests.post(cloudURL, params=params, headers=url_headers) resp = resp.json()['res'] if not resp: return None return resp except: return None def send_cloud_pro_2(self, module, params): try: cloudURL = '{}/api/user/{}'.format(self.__official_url,module) userInfo = self.create_serverid(None) params['os'] = 'Linux' if 'status' in userInfo: params['server_id'] = '' else: params['server_id'] = userInfo['server_id'] url_headers = {"authorization": "bt {}".format(userInfo['token'])} resp = requests.post(cloudURL, params=params, headers=url_headers) resp = resp.json() if not resp or 'res' not in resp: return public.return_message(-1,0,self.__failed_connect_server) if not resp['res']: return public.return_message(-1,0,self.__failed_connect_server) if resp['success'] == False: return public.return_message(-1,0,self.__failed_connect_server) return public.return_message(0,0,resp['res']) except: return public.return_message(-1,0,self.__failed_connect_server) def get_voucher(self, get): params = {} params['product_id'] = self.__product_id params['status'] = '0' data = self.send_cloud_pro('get_voucher', params) return data def get_order_status(self, get): params = {} data = self.send_cloud_pro('get_order_status', params) return data def get_product_discount_by(self, get): params = {} data = self.send_cloud_pro('get_product_discount_by', params) return data def get_re_order_status(self, get): params = {} data = self.send_cloud_pro('get_re_order_status', params) return data def create_order_voucher(self, get): code = getattr(get, 'code', '1') params = {} params['code'] = code data = self.send_cloud_pro('create_order_voucher', params) public.return_message(0, 0, data) def create_order(self, get): cycle = getattr(get, 'cycle', '1') params = {} params['cycle'] = cycle params['cycle'] = 'month' params['product_id'] = 100000012 params['src'] = 2 params['pay_channel'] = 2 params['charge_type'] = 1 params['environment_info'] = json.dumps(public.fetch_env_info()) data = self.send_cloud_pro('create', params) return data def get_cpuname(self): return public.ExecShell("cat /proc/cpuinfo|grep 'model name'|cut -d : -f2")[0].strip() def get_product_auth(self, get): # 校验参数 try: get.validate([ Param('pid').Integer(), Param('page').Integer(), Param('pageSize').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) params = {} params['page'] = get.page if 'page' in get else 1 params['pageSize'] = get.pageSize if 'pageSize' in get else 15 data = self.send_cloud('{}/api/user/productAuthorizes'.format(self.__official_url), params) if not data: return public.return_message(0, 0,[]) if 'success' not in data['message']: return public.return_message(0, 0,[]) data = data['message']['res'] # return [i for i in data['list'] if i['status'] != 'activated' and get.pid == i['product_id']] res = list() for i in data['list']: if i['status'] != 'activated' and str(get.pid) == str(i['product_id']): res.append(i) return public.return_message(0, 0,res) def get_product_auth_all(self, get): # 校验参数 try: get.validate([ Param('page').Integer(), Param('pageSize').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) params = {} params['page'] = get.page if 'page' in get else 1 params['pageSize'] = get.pageSize if 'pageSize' in get else 15 data = self.send_cloud('{}/api/user/productAuthorizes'.format(self.__official_url), params) if not data: return public.return_message(0, 0,[]) if 'success' not in data['message']: return public.return_message(0, 0,[]) data = data['message']['res'] res = list() for i in data['list']: if i['status'] != 'activated': res.append(i) return public.return_message(0, 0,res) def auth_activate(self, get): params = {} params['serial_no'] = get.serial_no params['environment_info'] = json.dumps(public.fetch_env_info()) data = self.send_cloud('{}/api/authorize/product/activate'.format(self.__official_url), params) if 'success' not in data['message'] or not data['message']['success']: return public.return_message(-1, 0, public.lang("Activate Failed")) session['focre_cloud'] = True # 刷新授权状态 public.load_soft_list() public.refresh_pd() return public.return_message(0, 0, public.lang("Activate successfully")) def renew_product_auth(self, get): params = {} params['serial_no'] = get.serial_no params['pay_channel'] = get.pay_channel params['cycle'] = get.cycle params['cycle_unit'] = get.cycle_unit params['src'] = 2 params['trigger_entry'] = get.source params['environment_info'] = json.dumps(public.fetch_env_info()) if hasattr(get, 'coupon_id') and get.pay_channel == '10': params['coupon_id'] = get.coupon_id data = self.send_cloud('{}/api/authorize/product/renew'.format(self.__official_url), params) if not data['message']['success']: data['message']['res'] = 'Invalid authorize OR authorize not found!' return public.return_message(-1, 0, data['message']['res']) session['focre_cloud'] = True # 使用抵扣券续费直接返回续费结果 if get.pay_channel == '10': if not data['message']['success']: return public.return_message(-1, 0, public.lang("Renew Failed")) # 刷新授权状态 public.load_soft_list() public.refresh_pd() return public.return_message(0, 0, public.lang("Renew successfully")) # 使用支付续费返回stripe的请求数据 return public.return_message(0, 0,data['message']['res']) def free_trial(self, get): """ 每个账号有一次免费试用专业版15天的机会 :return: """ params = {} params['environment_info'] = json.dumps(public.fetch_env_info()) data = self.send_cloud('{}/api/product/obtainProfessionalMemberFree'.format(self.__official_url), params) session['focre_cloud'] = True # 使用抵扣券续费直接返回续费结果 if not data['message']['success']: return public.return_message(-1, 0, public.lang("Apply Failed")) return public.return_message(0, 0, public.lang("Apply successfully")) # 获取专业版特权信息 或插件信息? def get_plugin_remarks(self, get): # 校验参数 try: get.validate([ Param('product_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if not hasattr(get, 'product_id'): return public.return_message(-1, 0, public.lang("product_id Parameter ERROR!")) product_id = get.product_id # ikey = 'plugin_remarks' + product_id # if ikey in session: # return session.get(ikey) try: url = '{}/api/panel/get_advantages/{}'.format(self.__official_url, product_id) data = requests.get(url).json() except: return public.return_message(-1, 0, public.lang("Failed to connect to the server!")) if not data: return public.return_message(-1, 0, public.lang("Failed to connect to the server!")) # session[ikey] = data return public.return_message(0,0,data) def res_request_error(self): return public.return_message(-1, 0, public.lang("Interface request failed ({})!", self.__request_url)) def get_apply_copon(self, get): """ 领取优惠券 @get.coupon 优惠券 """ # 校验参数 try: get.validate([ Param('obtain_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) params = {} params['obtain_id']= get.obtain_id cloudUrl=self.__official_url+'/api/user/obtain_coupons' # data = self.send_cloud_v2('obtain_coupons', params) data = self.send_cloud_v3(cloudUrl, params) return data def get_ignore_time(self, get): """ 获取忽略时间 @get.coupon 优惠券 """ try: limit = int(public.readFile('data/ignore_coupon_time.pl')) except: limit = public.return_message(0,0,0) if limit == -100 or limit > time.time(): return public.return_message(0,0,1) return public.return_message(0,0,0) def get_coupon_list(self, get): """ 获取可用的优惠券 @get.coupon 优惠券 """ params = {} cloudUrl=self.__official_url+'/api/user/coupons' data = self.send_cloud_get(cloudUrl, params) return data # if not data: # if type(data) == list: # return public.return_message(0,0,[]) # return public.return_message(-1,0,self.res_request_error()) # return public.return_message(0,0,data) def ignore_coupon_time(self, get): """ 获取优惠券 @get.coupon 优惠券 @get.limit_time 限制时间 永久 -100 """ if not hasattr(get, 'limit_time')or not get.limit_time: return public.return_message(-1, 0, public.lang("Missing parameter limit_time or parameter cannot be empty!")) limit_time = int(get.limit_time) if limit_time < time.time() and limit_time > 0: return public.return_message(-1, 0, public.lang("Time cannot be less than the current time!")) public.writeFile('data/ignore_coupon_time.pl', str(limit_time)) msg = 'Ignoring success, coupon information will not be displayed in the future' if limit_time > 0: msg = 'Ignoring success, coupon information will no longer be displayed to you before {}'.format(public.format_date(times=limit_time)) return public.return_message(0,0, msg) def get_coupons(self, get): """ @name 获取可领取的优惠券列表 @param uid 用户id """ # 用户是否忽略 if self.get_ignore_time(get)['message']['result']: return public.return_message(0,0,[]) params = {} cloudURL = self.__official_url+'/api/user/obtainable_coupons' data = self.send_cloud_v2(cloudURL, params) if not data: return public.return_message(-1,0,None) if data['status']==-1: return data return data def get_all_coupons(self,get): """ @name 获取所有优惠券 """ #获取可领取的优惠券列表 params = {} cloudURL = self.__official_url+'/api/user/obtainable_coupons' data = self.send_cloud_v2(cloudURL, params) if not data['message']: return public.return_message(-1,0,None) if data['status']==0: if data['message']['status'] ==1: data['message']['interface_type']=1 return data else: data['message']['interface_type']=2 # 获取可用的优惠券列表 params = {} cloudUrl=self.__official_url+'/api/user/coupons' tmp_data = self.send_cloud_get(cloudUrl, params) data['message']['total']=0 if isinstance(tmp_data['message'],list) and len(tmp_data['message'])>0: data['message']['total']=len(tmp_data['message']) data['message']['end_time']=tmp_data['message'][0]['end_time'] return data else: # 接口请求失败,返回默认值 return public.success_v2({ 'status': 0, 'obtain_id': 0, 'end_time': 0, 'type': 0, 'coupons': [], 'usable_coupon_num': 0, }) return data """ @name 统一请求接口 @param url 返回URL不是www.bt.cn,修改config/config.json的home字段 """ def request_post(self, url, params): params = {} data = self.send_cloud_pro_2('obtainable_coupons', params) return data # 检测订单支付状态 def detect_order_status(self, args: public.dict_obj): args.validate([ public.Param('order_id').Require().Integer('>', 0).Filter(int), ]) resp = self.send_cloud('{}/api/order/{}/status'.format(public.OfficialApiBase(), args.order_id), {}) # 订单支付成功时,刷新授权信息 if int(resp.get('message', {}).get('res', 0)) == 1: # 刷新授权状态 public.load_soft_list() public.refresh_pd() return resp # get expansion pricing def get_expand_pack_prices(self, args: public.dict_obj): expand_pack_type = str(args.get('expand_pack_type', 'mail')).strip() # 取缓存 cache_key = '{}:get_expand_pack_prices'.format(expand_pack_type) cache = public.cache_get(cache_key) if cache: # return cache return public.return_message(0, 0, cache) try: userPath = 'data/userInfo.json' if not os.path.exists(userPath): return public.return_message(-1, 0, public.lang("Please login with account first")) params = {'expand_pack_type': expand_pack_type} data = self.send_cloud('{}/api/product/expandPackPrices'.format(self.__official_url), params) if data['status'] == -1: # public.print_log('0') return data data = data['message'] if not data: return public.return_message(-1, 0, public.lang("Please log in to your aaPanel account on the panel first!")) if not data['success']: # public.print_log('1') return public.return_message(-1, 0,data['msg']) dat = {"prices": data['res'],} public.cache_set(cache_key, dat, 600) return public.return_message(0, 0, dat) except: public.print_log(public.get_error_info()) del(session['get_product_list']) return public.return_message(-1, 0,'Syncing information, please try again!\n {}',(public.get_error_info(),))