403Webshell
Server IP : 104.21.38.3  /  Your IP : 162.158.107.55
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/panel/class_v2/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/panel/class_v2/ftp_v2.py
#coding: utf-8
#  + -------------------------------------------------------------------
# | aaPanel
#  + -------------------------------------------------------------------
# | Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved.
#  + -------------------------------------------------------------------
# | Author: hwliang <[email protected]>
#  + -------------------------------------------------------------------
import public,db,re,os,firewalls
import firewalls_v2 as firewalls
from public.validate import Param
try:
    from BTPanel import session
except: pass
class ftp:
    __runPath = None
    
    def __init__(self):
        self.__runPath = '/www/server/pure-ftpd/bin'
        
    
    #添加FTP
    def AddUser(self,get):
        # 校验参数
        try:
            get.validate([
                Param('ftp_username').String(),
                Param('ftp_password').String(),
                Param('path').String(),
                Param('ps').String(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        try:
            if not os.path.exists('/www/server/pure-ftpd/sbin/pure-ftpd'): 
                return_message=public.return_msg_gettext(False,public.lang('Please install the Pure-FTPd service in the software store first.'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            import files_v2,time
            fileObj=files_v2.files()
            if get['ftp_username'].strip().find(' ') != -1: 
                return_message=public.returnMsg(False,public.lang('Username cannot contain spaces'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            if re.search(r"\W+",get['ftp_username']): 
                return_message={'code':501,'msg':public.get_msg_gettext('Username is illegal, special characters are NOT allowed!')}
                return public.return_message(-1,0, return_message)
            if len(get['ftp_username']) < 3: 
                return_message={'code':501,'msg':public.get_msg_gettext('Username is illegal, cannot be less than 3 characters!')}
                return public.return_message(-1,0, return_message)
            if not fileObj.CheckDir(get['path']): 
                return_message={'code':501,'msg':public.get_msg_gettext('System critical directory cannot be used as FTP directory!')}
                return public.return_message(-1,0, return_message)
            if public.M('ftps').where('name=?',(get.ftp_username.strip(),)).count(): 
                return public.return_message(-1, 0, public.lang("User [{}] exists!", get.ftp_username))
            username = get['ftp_username'].strip()
            if re.search("[\\/\\\\:\\*\\?\"\'\\<\\>\\|]+",username):
                return_message=public.return_msg_gettext(False,public.lang("Name cannot contain /\\:*?\"<>| symbol"))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            password = get['ftp_password'].strip()
            if len(password) < 6: 
                return_message=public.return_msg_gettext(False, public.lang('Password must be at least [{}] characters',("6",)))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            get.path = get['path'].replace(' ','')
            get.path = get.path.replace("\\", "/")
            fileObj.CreateDir(get)
            public.ExecShell('chown www.www ' + get.path)
            public.ExecShell(self.__runPath + '/pure-pw useradd "' + username + '" -u www -d ' + get.path + '<<EOF \n' + password + '\n' + password + '\nEOF')
            self.FtpReload()
            ps = public.xssencode2(get['ps'])
            if get['ps']=='': ps= public.lang('Edit notes');
            addtime=time.strftime('%Y-%m-%d %X',time.localtime())
            
            pid = 0
            if hasattr(get,'pid'): pid = get.pid
            public.M('ftps').add('pid,name,password,path,status,ps,addtime',(pid,username,password,get.path,1,ps,addtime))
            public.write_log_gettext('FTP manager', 'Successfully added FTP user [{}]!',(username,))
            return_message=public.return_msg_gettext(True,public.lang('Setup successfully!'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except Exception as ex:
            public.write_log_gettext('FTP manager', 'Failed to add FTP user[{}]! => {}',(username,str(ex)))
            return_message=public.return_msg_gettext(False,public.lang('Failed to add'))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])
    
    #删除用户
    def DeleteUser(self,get):
        # 校验参数
        try:
            get.validate([
                Param('username').String(),
                Param('id').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        try:
            username = get['username']
            id = get['id']
            if public.M('ftps').where("id=? and name=?", (id,username, )).count()==0:
                return_message=public.return_msg_gettext(False, public.lang('Delete error'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            public.ExecShell(self.__runPath + '/pure-pw userdel "' + username + '"')
            self.FtpReload()
            public.M('ftps').where("id=?",(id,)).delete()
            public.write_log_gettext('FTP manager', 'Successfully deleted FTP user[{}]!',(username,))
            return_message=public.return_msg_gettext(True, public.lang('Successfully deleted'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except Exception as ex:
            public.write_log_gettext('FTP manager', 'Faided to delete FTP user[{}]! => {}',(username,str(ex)))
            return_message=public.return_msg_gettext(False,public.lang('Failed to delete'))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])

    
    #修改用户密码
    def SetUserPassword(self,get):
        # 校验参数
        try:
            get.validate([
                Param('ftp_username').String(),
                Param('new_password').String(),
                Param('id').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        try:
            id = get['id']
            username = get['ftp_username'].strip()
            password = get['new_password'].strip()
            if public.M('ftps').where("id=? and name=?", (id,username, )).count()==0:
                return_message=public.return_msg_gettext(False, public.lang('DEL_ERROR'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            if len(password) < 6: 
                return_message=public.return_msg_gettext(False,public.lang('Password must be at least [{}] characters',"6"))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            public.ExecShell(self.__runPath + '/pure-pw passwd "' + username + '"<<EOF \n' + password + '\n' + password + '\nEOF')
            self.FtpReload()
            public.M('ftps').where("id=?",(id,)).setField('password',password)
            public.write_log_gettext('FTP manager', 'Successfully changed password for FTP user[{}]!',(username,))
            return_message=public.return_msg_gettext(True,public.lang('Setup successfully!'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except Exception as ex:
            public.write_log_gettext('FTP manager', 'Failed to change password FTP user[{}]! => {}',(username,str(ex)))
            return_message=public.return_msg_gettext(False,public.lang('Failed to modify'))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])
    
    
    #设置用户状态
    def SetStatus(self,get):
        msg = public.get_msg_gettext('Turn off');
        if get.status != '0': msg = public.get_msg_gettext('Turn on');
        try:
            id = get['id']
            username = get['username']
            status = get['status']
            if public.M('ftps').where("id=? and name=?", (id,username, )).count()==0:
                return_message=public.return_msg_gettext(False, public.lang('DEL_ERROR'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            if int(status)==0:
                public.ExecShell(self.__runPath + '/pure-pw usermod "' + username + '" -r 1')
            else:
                public.ExecShell(self.__runPath + '/pure-pw usermod "' + username + "\" -r ''")
            self.FtpReload()
            public.M('ftps').where("id=?",(id,)).setField('status',status)
            public.write_log_gettext('FTP manager','Successfully {} FTP user [{}]!', (msg,username))
            return_message=public.return_msg_gettext(True, public.lang('Setup successfully!'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except Exception as ex:
            public.write_log_gettext('FTP manager','Failed to {} FTP user [{}]! => {}', (msg,username,str(ex)))
            return_message=public.return_msg_gettext(False,public.lang('{} FTP user failed!',msg))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])
    
    '''
     * 设置FTP端口
     * @param Int _GET['port'] 端口号
     * @return bool
     '''
    def setPort(self,get):
        # 校验参数
        try:
            get.validate([
                Param('port').Integer(),
            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        try:
            port = get['port'].strip()
            if not port: 
                return public.return_message(-1, 0, public.lang("Please enter an integer for the port"))
            if int(port) < 1 or int(port) > 65535: 
                return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
            check_used = public.check_port_stat(int(port),public.GetLocalIp())
            if check_used == 2:
                return public.return_message(-1, 0, public.lang("Port[{}] is used!", str(port)))
            data = public.ExecShell('lsof -i:' + str(port))[0]
            if len(data) !=0:
                return public.return_message(-1, 0, public.lang("Port[{}] is used!", str(port)))
            file = '/www/server/pure-ftpd/etc/pure-ftpd.conf'
            conf = public.readFile(file)
            rep = u"\n#?\\s*Bind\\s+[0-9]+\\.[0-9]+\\.[0-9]+\\.+[0-9]+,([0-9]+)"
            #preg_match(rep,conf,tmp)
            conf = re.sub(rep,"\nBind        0.0.0.0," + port,conf)
            public.writeFile(file,conf)
            public.ExecShell('/etc/init.d/pure-ftpd restart')
            public.write_log_gettext('FTP manager', "Successfully modified FTP port to [{}]!",(port,))
            #添加防火墙
            #data = ftpinfo(port=port,ps = 'FTP端口')
            get.port=port
            get.ps = public.get_msg_gettext('FTP port');
            firewalls.firewalls().AddAcceptPort(get)
            session['port']=port
            return public.return_message(0, 0, public.lang("Setup successfully!"))
        except Exception as ex:
            public.write_log_gettext('FTP manager', 'Failed to modify FTP port! => {}',(str(ex),))
            return public.return_message(-1, 0, public.lang("Failed to modify"))

    #重载配置
    def FtpReload(self):
        public.ExecShell(self.__runPath + '/pure-pw mkdb /www/server/pure-ftpd/etc/pureftpd.pdb')

    def get_login_logs(self, get):
        import ftp_log_v2 as ftplog
        ftpobj = ftplog.ftplog()
        return ftpobj.get_login_log(get)
    def get_action_logs(self, get):
        import ftp_log_v2 as ftplog
        ftpobj = ftplog.ftplog()
        return ftpobj.get_action_log(get)

    def set_ftp_logs(self, get):
        import ftp_log_v2 as ftplog
        ftpobj = ftplog.ftplog()
        result = ftpobj.set_ftp_log(get)
        return result

    #修改用户密码
    def set_user_home(self,get):
        """
        change user home
        id: ftp id
        path: the new ftp user home
        ftp_username: ftp username
        migrate: migrate ftp user data to the new home

        """
        # 校验参数
        try:
            get.validate([
                Param('ftp_username').String(),
                Param('path').String(),
                Param('id').Integer(),
                Param('migrate').Integer(),

            ], [
                public.validate.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        try:
            id = get['id']
            path = get['path']
            username = get['ftp_username']
            # get the old path in the panel sqlite db
            old_path = public.M("ftps").where("id=?",(id,)).getField('path')
            # check the auth ftp user if exists
            auth_conf_file = '/www/server/pure-ftpd/etc/pureftpd.passwd'
            auth_conf = public.readFile(auth_conf_file)
            if not auth_conf:
                return_message=public.returnMsg(False, public.lang('FTP account has not been set up'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            # get the user specified conf
            auth_conf_list = [i for i in auth_conf.split('\n')]
            rep = '^{}:.*'.format(username)
            macth_conf = [i for i in auth_conf_list if re.search(rep,i)]
            if not macth_conf:
                return_message=public.returnMsg(False, public.lang('FTP account has not been set up1'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            if len(macth_conf) > 1:
                return_message=public.returnMsg(False, public.lang('Matching multiple configurations, this operation has been stopped!'))
                del return_message['status']
                return public.return_message(-1,0, return_message['msg'])
            if not os.path.exists(path):
                os.makedirs(path)
                public.ExecShell('chown www.www ' + path)
            # replace the old path
            result = macth_conf[0]
            specified_user_conf = result.replace(old_path,path)
            auth_conf = auth_conf.replace(result,specified_user_conf)
            public.writeFile(auth_conf_file,auth_conf)
            if get.migrate == '1':
                public.ExecShell('cp -rp {}/* {}'.format(old_path,path))
            self.FtpReload()
            public.M('ftps').where("id=?",(id,)).setField('path',path)
            public.write_log_gettext('FTP manager', 'Successfully changed password for FTP user[{}]!',(path,))
            return_message=public.return_msg_gettext(True,public.lang('Setup successfully!'))
            del return_message['status']
            return public.return_message(0,0, return_message['msg'])
        except Exception as ex:
            return public.get_error_info()
            public.write_log_gettext('FTP manager', 'FTP_PASS_ERR',(path,str(ex)))
            return_message=public.returnMsg(False, public.lang('Editing error'))
            del return_message['status']
            return public.return_message(-1,0, return_message['msg'])

Youez - 2016 - github.com/yon3zu
LinuXploit