Server IP : 172.67.216.182 / Your IP : 172.70.143.85 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
#coding: utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: hwliang <[email protected]> # +------------------------------------------------------------------- import json import time import os import sys import socket import threading import re from itertools import chain if not 'class/' in sys.path: sys.path.insert(0,'class/') from io import BytesIO, StringIO def returnMsg(status,msg,value=None): if value: msg = public.get_msg_gettext(msg,value) return {'status':status,'msg':msg} import public try: import chardet except: os.system('btpip install chardet') import chardet class ssh_terminal: _panel_path = '/www/server/panel' _save_path = _panel_path + '/config/ssh_info/' _host = None _port = 22 _user = None _pass = None _pkey = None _ws = None _ssh = None _last_cmd = "" _last_cmd_tip = 0 _log_type = public.lang("aaPanel terminal") _history_len = 0 _client = "" _rep_ssh_config = False _sshd_config_backup = None _rep_ssh_service = False _tp = None _old_conf = None _debug_file = 'logs/terminal.log' _s_code = None _last_num = 0 _key_passwd = None _video_addr = "" _host_row_id = "" def __init__(self): # 创建jp_login_record表记录ssh登录记录 if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'ssh_login_record')).count(): public.M('').execute('''CREATE TABLE ssh_login_record ( id INTEGER PRIMARY KEY AUTOINCREMENT, addr TEXT, server_ip TEXT, user_agent TEXT, ssh_user TEXT, login_time INTEGER DEFAULT 0, close_time INTEGER DEFAULT 0, video_addr TEXT);''') public.M('').execute('CREATE INDEX ssh_login_record ON ssh_login_record (addr);') self.time = time.time() def record(self, rtype, data): if os.path.exists(public.get_panel_path() + "/data/open_ssh_login.pl") and self._video_addr: path=self._video_addr if rtype == 'header': with open(path, 'w') as fw: fw.write(json.dumps(data) + '\n') return True else: with open(path, 'r') as fr: content = json.loads(fr.read()) stdout = content["stdout"] atime = time.time() iodata = [atime - self.time, data] stdout.append(iodata) content["stdout"] = stdout with open(path, 'w') as fw: fw.write(json.dumps(content) + '\n') self.time = atime return True return False def connect(self): ''' @name 连接服务器 @author hwliang<2020-08-07> @return dict{ status: bool 状态 msg: string 详情 } ''' if not self._host: return public.return_message(-1, 0, public.lang("Wrong connection address")) if not self._user: self._user = 'root' if not self._port: self._port = 22 self.is_local() if self._host in ['127.0.0.1','localhost']: self._port = public.get_sshd_port() # self.set_sshd_config(True) num = 0 while num < 5: num +=1 try: self.debug(public.get_msg_gettext('Reconnection attempts:{}',(num,))) if self._rep_ssh_config: time.sleep(0.1) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2 + num) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 8192) sock.connect((self._host, self._port)) break except Exception as e: if num == 5: self.set_sshd_config(True) self.debug(public.get_msg_gettext('Retry connection failed, {}',(e,))) if self._host in ['127.0.0.1','localhost']: return public.return_message(-1, 0, public.lang('Connection failure: {}', ("Authentication failed ," + self._user + "@" + self._host + ":" +str(self._port)))) return public.return_message(-1, 0, public.lang('Connection failure: {} {}', self._host,self._port)) else: time.sleep(0.2) try: try: import paramiko except: public.ExecShell('btpip uninstall paramiko') public.ExecShell('btpip uninstall cryptography') public.ExecShell('btpip install paramiko==2.7.2') public.ExecShell('btpip install cryptography==42.0.5') import paramiko except: return public.return_message(-1, 0, public.lang('The paramiko module does not exist and the installation fails!')) self._tp = paramiko.Transport(sock) print(self._tp.banner_timeout) pkey = None try: self._tp.start_client() if not self._pass and not self._pkey: self.set_sshd_config(True) return public.return_message(-1, 0, public.lang('Password or private key cannot both be empty: {}:{}',self._host,str(self._port))) self._tp.banner_timeout=60 print(self._tp.banner_timeout) if self._pkey: self.debug(public.lang("Authenticating private key")) if sys.version_info[0] == 2: try: self._pkey = self._pkey.encode('utf-8') except: pass p_file = BytesIO(self._pkey) else: p_file = StringIO(self._pkey) try: if self._key_passwd: pkey = paramiko.RSAKey.from_private_key(p_file,password=self._key_passwd) else: pkey = paramiko.RSAKey.from_private_key(p_file) self.debug("Try using RSA private key authentication") except Exception as ex: try: p_file.seek(0) # 重置游标 if self._key_passwd: pkey = paramiko.Ed25519Key.from_private_key(p_file,password=self._key_passwd) else: pkey = paramiko.Ed25519Key.from_private_key(p_file) self.debug("Try to use Ed 25519 private key authentication") except: try: p_file.seek(0) if self._key_passwd: pkey = paramiko.ECDSAKey.from_private_key(p_file,password=self._key_passwd) else: pkey = paramiko.ECDSAKey.from_private_key(p_file) self.debug("Try using ECDSA private key authentication") except: p_file.seek(0) if self._key_passwd: try: pkey = paramiko.DSSKey.from_private_key(p_file,password=self._key_passwd) except Exception as ex: ex = str(ex) if ex.find('OpenSSH private key file checkints do not match') != -1: return public.returnMsg(False, public.lang("Incorrect private key password:{}", ex)) elif ex.find('encountered RSA key, expected DSA key') != -1: pkey = paramiko.RSAKey.from_private_key(p_file,password=self._key_passwd) else: return public.returnMsg(False, public.lang("Private key error: {}", ex)) else: pkey = paramiko.DSSKey.from_private_key(p_file) if not pkey: return public.returnMsg(False, public.lang("Private key error!")) self._tp.auth_publickey(username=self._user, key=pkey) else: try: self._tp.auth_none(self._user) except Exception as e: e = str(e) if e.find('keyboard-interactive') >= 0: self._auth_interactive() else: self.debug('Authenticating password') self._tp.auth_password(username=self._user, password=self._pass) # self._tp.auth_password(username=self._user, password=self._pass) except Exception as e: if self._old_conf: s_file = '/www/server/panel/config/t_info.json' if os.path.exists(s_file): os.remove(s_file) self.set_sshd_config(True) self._tp.close() e = str(e) if e.find('websocket error!') != -1: return public.return_message(0, 0, public.lang("connection succeeded")) if e.find('Authentication timeout') != -1: self.debug("Authentication timed out:{}".format(e)) return public.return_message(-1, 0, public.lang('Authentication timed out, please press enter to try again!{}',e)) if e.find('Authentication failed') != -1: self.debug('Authentication failed:{}'.format(e)) if self._key_passwd: sshd_config = public.readFile('/etc/ssh/sshd_config') if sshd_config and sshd_config.find('ssh-dss') == -1: return public.return_message(-1, 0, public.lang('The private key verification fails, the private key may be incorrect, or the ssh-dss private key authentication type may not be enabled in the /etc/ssh/sshd_config configuration file')) return public.return_message(-1, 0,public.lang('Authentication failed, please check whether the private key is correct: {}',(e + "," + self._user + "@" + self._host + ":" +str(self._port)))) return public.return_message(-1, 0, public.lang('Account or Password incorrect: {}',str(e + "," + self._user + "@" + self._host + ":" +str(self._port)))) if e.find('Bad authentication type; allowed types') != -1: self.debug(public.get_msg_gettext('Authentication failed {}',(str(e),))) if self._host in ['127.0.0.1','localhost'] and self._pass == 'none': return public.return_message(-1, 0, public.lang('Username or Password incorrect: {}',str("Authentication failed ," + self._user + "@" + self._host + ":" +str(self._port)))) return public.return_message(-1, 0, public.lang('Unsupported authentication type: {}',str(e))) if e.find('Connection reset by peer') != -1: self.debug(public.lang("The target server actively refused the connection")) return public.return_message(-1, 0, public.lang("The target server actively refused the connection")) if e.find('Error reading SSH protocol banner') != -1: self.debug('The protocol header response timed out') return public.return_message(-1, 0, public.lang('The protocol header response timed out, and the network quality with the target server was too bad: {}',str(e))) if e.find('encountered RSA key, expected DSA key') != -1: self.debug('Private keys may require password access') return public.return_message(-1, 0, public.lang('Private keys may require password access: {}',str(e))) if e.find('password and salt must not be empty') != -1: self.debug('Private keys may require password access') return public.return_message(-1, 0, public.lang('Private keys may require password access: {}',str(e))) if not e: self.debug('The SSH protocol handshake timed out') return public.return_message(-1, 0, public.lang("The SSH protocol handshake timed out, and the network quality with the target server is too bad")) err = public.get_error_info() self.debug(err) return public.return_message(-1, 0, public.lang('unknown error: {}',str(err))) self.debug(public.lang("The authentication is successful and the session channel is being constructed")) self._ssh = self._tp.open_session() self._ssh.get_pty(term='xterm', width=100, height=34) self._ssh.invoke_shell() self._connect_time = time.time() self._last_send = [] from BTPanel import request self._client = public.GetClientIp() +':' + str(public.get_remote_port()) public.write_log_gettext(self._log_type,'Successfully logged in to the SSH server [{}:{}]',(self._host, self._port)) self.history_send(public.lang("Login success\n")) self.set_sshd_config(True) self.debug(public.lang("Login success")) from BTPanel import session self._video_addr = "/www/server/panel/plugin/jumpserver/static/video/%s.json" % str(int(self._connect_time)) if not os.path.exists("/www/server/panel/plugin/jumpserver/static/video/"): os.makedirs("/www/server/panel/plugin/jumpserver/static/video/") # 如果开启了录像功能 user_agent = str(request.headers.get('User-Agent')) if os.path.exists(public.get_panel_path() + "/data/open_ssh_login.pl"): self._host_row_id = public.M('ssh_login_record').add( 'addr,server_ip,ssh_user,user_agent,login_time,video_addr', (self._client, self._host, self._user, user_agent , int(self._connect_time), self._video_addr)) self.record('header', { "version": 1, "width": 100, "height": 29, "timestamp": int(self._connect_time), "env": { "TERM": "xterm", "SHELL": "/bin/bash", }, "stdout": [] }) return public.return_message(0, 0, public.lang("connection succeeded")) def _auth_interactive(self): self.debug('Verification Code') self.brk = False def handler(title, instructions, prompt_list): if not self._ws: raise public.PanelError('websocket error!') if instructions: self._ws.send(instructions) if title: self._ws.send(title) resp = [] for pr in prompt_list: if str(pr[0]).strip() == "Password:": resp.append(self._pass) elif str(pr[0]).strip() == "Verification code:": # 获取前段传入的验证码 self._ws.send("Verification code# ") self._s_code = True code = "" while True: data = self._ws.receive() if data.find('"resize":1') != -1: self.resize(data) continue self._ws.send(data) if data in ["\n", "\r"]: break code += data resp.append(code) self._ws.send("\n") self._s_code = None return tuple(resp) self._tp.auth_interactive(self._user, handler) def get_login_user(self): ''' @name 获取本地登录用户 @author hwliang<2020-08-07> @return string ''' if self._user != 'root': return self._user l_user = 'root' ssh_config_file = '/etc/ssh/sshd_config' ssh_config = public.readFile(ssh_config_file) if not ssh_config: return l_user if public.get_os_version().lower().find('centos') >= 0: return l_user # 检查是不是 【允许root登录】 或 【允许root用秘钥登录】 rep = re.compile(r"\n[ \t]*PermitRootLogin +((yes)|(without-password))") if rep.search(ssh_config) is not None: return l_user user_list = self.get_ulist() can_login = [] root_group_user = [] # 过滤出能登陆的root组用户 和 能登陆的普通用户 for u_info in user_list: if u_info['user'] == 'root': continue if u_info['login'] in ('/bin/bash', '/bin/sh', '/bin/dash'): if u_info['gid'] == "0": root_group_user.append(u_info) continue can_login.append(u_info) # 找出能登陆的root组用户的可用的 for u_info in chain(root_group_user, can_login): # 通过chain链接并优先使用root组的用户 if os.path.exists(u_info["home"]): # 有家目录的优先 return u_info["user"] # 如果没有则使用 root_group_user 的第1个 if len(root_group_user) >= 1: return root_group_user[0]["user"] # 如果没有则使用 can_login 的第1个 if len(can_login) >= 1: return root_group_user[0]["user"] # return l_user @staticmethod def _get_user_info_by_name(user_name: str): u_data = public.readFile('/etc/passwd') for i in u_data.split("\n"): u_tmp = i.split(':') if len(u_tmp) < 3: continue if u_tmp[0] == user_name: return { 'user': u_tmp[0], 'pass': u_tmp[1], 'uid': u_tmp[2], 'gid': u_tmp[3], 'user_msg': u_tmp[4], 'home': u_tmp[5], 'login': u_tmp[6] } return None def get_ulist(self): ''' @name 获取本地用户列表 @author hwliang<2020-08-07> @return list ''' u_data = public.readFile('/etc/passwd') u_list = [] for i in u_data.split("\n"): u_tmp = i.split(':') if len(u_tmp) < 3: continue u_info = { 'user': u_tmp[0], 'pass': u_tmp[1], 'uid': u_tmp[2], 'gid': u_tmp[3], 'user_msg': u_tmp[4], 'home': u_tmp[5], 'login': u_tmp[6] } u_list.append(u_info) return u_list def is_local(self): ''' @name 处理本地连接 @author hwliang<2020-08-07> @ps 如果host为127.0.0.1或localhost,则尝试自动使用publicKey登录 @return void ''' if self._pass: return if self._pkey: return if self._host in ['127.0.0.1','localhost']: try: self._port = public.get_sshd_port() self.set_sshd_config() s_file = '/www/server/panel/config/t_info.json' if os.path.exists(s_file): ssh_info = json.loads(public.en_hexb(public.readFile(s_file))) self._host = ssh_info['host'].strip() if 'username' in ssh_info: self._user = ssh_info['username'] if 'pkey' in ssh_info: self._pkey = ssh_info['pkey'] if 'password' in ssh_info: self._pass = ssh_info['password'] self._old_conf = True return ssh_key_type_file = '{}/data/ssh_key_type.pl'.format(public.get_panel_path()) ssh_key_type = '' if os.path.exists(ssh_key_type_file): ssh_key_type_new = public.readFile(ssh_key_type_file) if ssh_key_type_new: ssh_key_type = ssh_key_type_new.strip() login_user = self.get_login_user() if self._user == 'root' and login_user == 'root': id_rsa_file = ['/root/.ssh/id_ed25519','/root/.ssh/id_ecdsa','/root/.ssh/id_rsa','/root/.ssh/id_rsa_bt'] if ssh_key_type: id_rsa_file.insert(0,'/root/.ssh/id_{}'.format(ssh_key_type)) for ifile in id_rsa_file: if os.path.exists(ifile): self._pkey = public.readFile(ifile) host_path = self._save_path + self._host if not os.path.exists(host_path): os.makedirs(host_path,384) return # 没有找到key文件时,自动创建 self.create_ssh_key("ed25519") ssh_key_type = "ed25519" for ifile in id_rsa_file: if os.path.exists(ifile): self._pkey = public.readFile(ifile) host_path = self._save_path + self._host if not os.path.exists(host_path): os.makedirs(host_path, 384) return # 登录用户是root, 但root用户不能登录其他用户可以登录时,结合这次登录请求不包含 密码或者秘钥,则必然是初次请求终端 # 则尝试以 login_user 登录,且自动创建所需秘钥 if (self._user == 'root' and login_user != "root") and not (self._pass or self._pkey): self._user = login_user login_user_info = self._get_user_info_by_name(login_user) id_rsa_file = ['.ssh/id_ed25519', '.ssh/id_ecdsa', '.ssh/id_rsa', '.ssh/id_rsa_bt'] for ifile in id_rsa_file: k_file = "{}/{}".format(login_user_info["home"], ifile) if os.path.exists(k_file): self._pkey = public.readFile(k_file) host_path = self._save_path + self._host if not os.path.exists(host_path): os.makedirs(host_path, 384) return # 没有找到key文件时,自动创建 self.create_ssh_key_for_other_user(login_user, login_user_info["home"], "ed25519") k_file = "{}/{}".format(login_user_info["home"], '.ssh/id_ed25519') if os.path.exists(k_file): self._pkey = public.readFile(k_file) host_path = self._save_path + self._host if not os.path.exists(host_path): os.makedirs(host_path, 384) return if not self._pass or not self._pkey or not self._user: home_path = '/home/' + login_user if login_user == 'root': home_path = '/root' self._user = login_user id_rsa_file = [home_path + '/.ssh/id_ed25519',home_path + '/.ssh/id_ecdsa',home_path + '/.ssh/id_rsa',home_path + '/.ssh/id_rsa_bt'] if ssh_key_type: id_rsa_file.insert(0,home_path + '/.ssh/id_{}'.format(ssh_key_type)) for ifile in id_rsa_file: if os.path.exists(ifile): self._pkey = public.readFile(ifile) return self._pass = 'none' return except: return def get_sys_version(self): ''' @name 获取操作系统版本 @author hwliang<2020-08-13> @return bool ''' version = public.readFile('/etc/redhat-release') if not version: version = public.readFile('/etc/issue').strip().split("\n")[0].replace('\\n','').replace(r'\l','').strip() else: version = version.replace('release ','').replace('Linux','').replace('(Core)','').strip() return version def get_ssh_status(self): ''' @name 获取SSH服务状态 @author hwliang<2020-08-13> @return bool ''' version = self.get_sys_version() if os.path.exists('/usr/bin/apt-get'): if os.path.exists('/etc/init.d/sshd'): status = public.ExecShell("service sshd status | grep -P '(dead|stop|not running)'|grep -v grep") else: status = public.ExecShell("service ssh status | grep -P '(dead|stop|not running)'|grep -v grep") else: if version.find(' 7.') != -1 or version.find(' 8.') != -1 or version.find('Fedora') != -1: status = public.ExecShell("systemctl status sshd.service | grep 'dead'|grep -v grep") else: status = public.ExecShell("/etc/init.d/sshd status | grep -e 'stopped' -e '已停'|grep -v grep") if len(status[0]) > 3: status = False else: status = True return status def is_running(self,rep = False): ''' @name 处理SSH服务状态 @author hwliang<2020-08-13> @param rep<bool> 是否恢复原来的SSH服务状态 @return bool ''' try: if rep and self._rep_ssh_service: self.restart_ssh('stop') return True ssh_status = self.get_ssh_status() if not ssh_status: self.restart_ssh('start') self._rep_ssh_service = True return True return False except: return False def set_sshd_config(self,rep = False): ''' @name 设置本地SSH配置文件,以支持pubkey认证 @author hwliang<2020-08-13> @param rep<bool> 是否恢复ssh配置文件 @return bool ''' self.is_running(rep) if rep and not self._rep_ssh_config: return False try: sshd_config_file = '/etc/ssh/sshd_config' if not os.path.exists(sshd_config_file): return False sshd_config = public.readFile(sshd_config_file) if not sshd_config: return False if rep: if self._sshd_config_backup: public.writeFile(sshd_config_file,self._sshd_config_backup) self.restart_ssh() return True pin = r'^\s*PubkeyAuthentication\s+(yes|no)' pubkey_status = re.findall(pin,sshd_config,re.I) if pubkey_status: if pubkey_status[0] == 'yes': pubkey_status = True else: pubkey_status = False pin = r'^\s*RSAAuthentication\s+(yes|no)' rsa_status = re.findall(pin,sshd_config,re.I) if rsa_status: if rsa_status[0] == 'yes': rsa_status = True else: rsa_status = False self._sshd_config_backup = sshd_config is_write = False if not pubkey_status: sshd_config = re.sub(r'\n#?PubkeyAuthentication\s\w+','\nPubkeyAuthentication yes',sshd_config) is_write = True if not rsa_status: sshd_config = re.sub(r'\n#?RSAAuthentication\s\w+','\nRSAAuthentication yes',sshd_config) is_write = True if is_write: public.writeFile(sshd_config_file,sshd_config) self._rep_ssh_config = True self.restart_ssh() else: self._sshd_config_backup = None return True except: return False def restart_ssh(self,act = 'reload'): ''' 重启ssh 无参数传递 ''' version = public.readFile('/etc/redhat-release') if not os.path.exists('/etc/redhat-release'): public.ExecShell('service ssh ' + act) elif version.find(' 7.') != -1 or version.find(' 8.') != -1: public.ExecShell("systemctl " + act + " sshd.service") else: public.ExecShell("/etc/init.d/sshd " + act) def resize(self, data): ''' @name 调整终端大小 @author hwliang<2020-08-07> @param data<dict> 终端尺寸数据 { cols: int 列 rows: int 行 } @return bool ''' try: data = json.loads(data) self._ssh.resize_pty(width=data['cols'], height=data['rows']) return True except: return False def recv(self): ''' @name 读取tty缓冲区数据 @author hwliang<2020-08-07> @return void ''' n = 0 try: while self._ws.connected: resp_line = self._ssh.recv(1024) if not resp_line: if not self._tp.is_active(): self.debug(public.lang("Channel disconnected")) self._ws.send(public.lang("The connection is disconnected, press enter to try to reconnect!")) self.close() return if not resp_line: n+=1 if n > 5: break continue n = 0 if not self._ws.connected: return try: result = resp_line.decode('utf-8','ignore') except: try: result = resp_line.decode() except: result = str(resp_line) self.record('iodata', result) self._ws.send(result) # self.history_recv(result) except Exception as e: e = str(e) if e.find('closed') != -1: self.debug(public.getMsg('SSH_LOGIN_INFO')) elif self._ws.connected: self.debug(public.get_msg_gettext('Error reading tty buffer data, {}',(str(e),))) if not self._ws.connected: self.debug(public.lang("The client has actively disconnected")) self.close() def send(self): ''' @name 写入数据到缓冲区 @author hwliang<2020-08-07> @return void ''' try: while self._ws.connected: if self._s_code: time.sleep(0.1) continue client_data = self._ws.receive() if not client_data: continue if client_data == '{}': continue if len(client_data) > 10: if client_data.find('{"host":"') != -1: continue if client_data.find('"resize":1') != -1: self.resize(client_data) continue self._ssh.send(client_data) # self.history_send(client_data) except Exception as ex: ex = str(ex) if ex.find('_io.BufferedReader') != -1: self.debug(public.lang("An error occurred while reading data from websocket. Retrying")) self.send() return elif ex.find('closed') != -1: self.debug(public.lang("SSH_LOGIN_INFO")) else: self.debug(public.get_msg_gettext('An error occurred while writing data to the buffer: {}',(str(ex),))) if not self._ws.connected: self.debug(public.lang("The client has actively disconnected")) self.close() def history_recv(self,recv_data): ''' @name 从接收实体保存命令 @author hwliang<2020-08-12> @param recv_data<string> 数据实体 @return void ''' #处理TAB补登 if self._last_cmd_tip == 1: if not recv_data.startswith('\r\n'): self._last_cmd += recv_data.replace('\u0007','').replace("\x07","").strip() self._last_cmd_tip = 0 #上下切换命令 if self._last_cmd_tip == 2: self._last_cmd = recv_data.strip().replace("\x08","").replace("\x07","").replace("\x1b[K","") self._last_cmd_tip = 0 def history_send(self,send_data): ''' @name 从发送实体保存命令 @author hwliang<2020-08-12> @param send_data<string> 数据实体 @return void ''' if not send_data: return his_path = self._save_path + self._host if not os.path.exists(his_path): return his_file = his_path + '/history.pl' #上下切换命令 if send_data in ["\x1b[A","\x1b[B"]: self._last_cmd_tip = 2 return #左移光标 if send_data in ["\x1b[C"]: self._last_num -= 1 return # 右移光标 if send_data in ["\x1b[D"]: self._last_num += 1 return #退格 if send_data == "\x7f": self._last_cmd = self._last_cmd[:-1] return #过滤特殊符号 if send_data in ["\x1b[C","\x1b[D","\x1b[K","\x07","\x08","\x03","\x01","\x02","\x04","\x05","\x06","\x1bOB","\x1bOA","\x1b[8P","\x1b","\x1b[4P","\x1b[6P","\x1b[5P"]: return #Tab补全处理 if send_data == "\t": self._last_cmd_tip = 1 return if str(send_data).find("\x1b") != -1: return if send_data[-1] in ['\r','\n']: if not self._last_cmd: return his_shell = [int(time.time()),self._client,self._user,self._last_cmd] public.writeFile(his_file, json.dumps(his_shell) + "\n","a+") self._last_cmd = "" #超过50M则保留最新的20000行 if os.stat(his_file).st_size > 52428800: his_tmp = public.GetNumLines(his_file,20000) public.writeFile(his_file, his_tmp) else: if self._last_num >= 0: self._last_cmd += send_data def close(self): ''' @name 释放连接 @author hwliang<2020-08-07> @return void ''' try: if self._host_row_id: public.M('ssh_login_record').where('id=?', self._host_row_id).update( {'close_time': int(time.time())}) if self._ssh: self._ssh.close() if self._tp: # 关闭宿主服务 self._tp.close() if self._ws.connected: self._ws.close() except: pass def set_attr(self,ssh_info): ''' @name 设置对象属性,并连接服务器 @author hwliang<2020-08-07> @return void ''' self._host = ssh_info['host'].strip() self._port = int(ssh_info['port']) if 'username' in ssh_info: self._user = ssh_info['username'] if 'pkey' in ssh_info: self._pkey = ssh_info['pkey'] if 'password' in ssh_info: self._pass = ssh_info['password'] if 'pkey_passwd' in ssh_info: self._key_passwd = ssh_info['pkey_passwd'] try: result = self.connect() except Exception as ex: if str(ex).find("NoneType") == -1: raise public.PanelError(ex) return result def heartbeat(self): ''' @name 心跳包 @author hwliang<2020-09-10> @return void ''' while True: time.sleep(30) if self._tp.is_active(): self._tp.send_ignore() else: break if self._ws.connected: self._ws.send("") else: break def debug(self,msg): ''' @name 写debug日志 @author hwliang<2020-09-10> @return void ''' msg = "{} - {}:{} => {} \n".format(public.format_date(),self._host,self._port,msg) self.history_send(msg) public.writeFile(self._debug_file,msg,'a+') def run(self,web_socket, ssh_info=None): ''' @name 启动SSH客户端对象 @author hwliang<2020-08-07> @param web_socket<websocket> websocket句柄对像 @param ssh_info<dict> SSH信息{ host: 主机地址, port: 端口 username: 用户名 password: 密码 pkey: 密钥(如果不为空,将使用密钥连接) } @return void ''' self._ws = web_socket if not self._ssh: if not ssh_info: return result = self.set_attr(ssh_info) else: result = public.return_message(0, 0, 'ALREADY_CONNECTED') # 兼容新返回 if result['status'] == 0: sendt = threading.Thread(target=self.send) recvt = threading.Thread(target=self.recv) ht = threading.Thread(target=self.heartbeat) sendt.start() recvt.start() ht.start() sendt.join() recvt.join() ht.join() self.close() else: # 兼容新返回 self._ws.send(result['message']) self.close() def __del__(self): ''' 自动释放 ''' self.close() @staticmethod def create_ssh_key(key_type: str): """在没有秘钥时,自动创建""" public.ExecShell("ssh-keygen -t {s_type} -P '' -f /root/.ssh/id_{s_type} |echo y".format(s_type=key_type)) authorized_keys = '/root/.ssh/authorized_keys' pub_file = "/root/.ssh/id_{s_type}.pub".format(s_type=key_type) public.ExecShell('cat %s >> %s && chmod 600 %s' % (pub_file, authorized_keys, authorized_keys)) key_type_file = '{}/data/ssh_key_type.pl'.format(public.get_panel_path()) public.writeFile(key_type_file, key_type) @staticmethod def create_ssh_key_for_other_user(user_name: str, user_home: str, key_type: str): tmp_sh_file = "/tmp/create_ssh_key_{}.sh".format(int(time.time())) public.writeFile(tmp_sh_file, """#!/bin/bash HOME=$1 HASH_TYPE=$2 # Check if the home directory exists and create it if it doesn't if [ ! -d "${HOME}" ]; then mkdir "${HOME}" fi # Check if the .ssh directory exists, and if it doesn't, create it if [ ! -d "${HOME}/.ssh" ]; then mkdir "${HOME}/.ssh" fi # Set the correct permissions chmod 700 "${HOME}/.ssh" # Generate a key pair # Check if the ${HASH_TYPE} file exists, and if it doesn't, generate a key pair if [ ! -f "${HOME}/.ssh/id_${HASH_TYPE}" ]; then ssh-keygen -t ed25519 -f "${HOME}/.ssh/id_${HASH_TYPE}" -P "" # Add the public key to the authorized_keys file cat "${HOME}/.ssh/id_${HASH_TYPE}.pub" >> "${HOME}/.ssh/authorized_keys" chmod 600 "${HOME}/.ssh/id_${HASH_TYPE}" chmod 644 "${HOME}/.ssh/id_${HASH_TYPE}.pub" chmod 600 "${HOME}/.ssh/authorized_keys" fi """) public.ExecShell("sudo -u {user_name} bash {tmp_sh_file} {uer_home} {key_type}".format( user_name=user_name, tmp_sh_file=tmp_sh_file, uer_home=user_home, key_type=key_type, )) if os.path.exists(tmp_sh_file): os.remove(tmp_sh_file) class ssh_host_admin(ssh_terminal): _panel_path = '/www/server/panel' _save_path = _panel_path + '/config/ssh_info/' _pass_file = _panel_path + '/data/a_pass.pl' _user_command_file = _save_path + '/user_command.json' _sys_command_file = _save_path + '/sys_command.json' _pass_str = None def __init__(self): self.__create_aes_pass() def __create_aes_pass(self): ''' @name 创建AES密码 @author @return string ''' if not os.path.exists(self._save_path): os.makedirs(self._save_path,384) if not os.path.exists(self._pass_file): public.writeFile(self._pass_file,public.GetRandomString(16)) public.set_mode(self._pass_file,600) if not self._pass_str: self._pass_str = public.readFile(self._pass_file) if not self._pass_str: self._pass_str = public.GetRandomString(16) public.writeFile(self._pass_file,self._pass_str) public.set_mode(self._pass_file,600) def get_host_list(self,args = None): ''' @name 获取本机保存的SSH信息列表 @author hwliang<2020-08-07> @param args<dict_obj or None> @return list ''' host_list = [] for name in os.listdir(self._save_path): info_file = self._save_path + name +'/info.json' if not os.path.exists(info_file): continue try: info_tmp = self.get_ssh_info(name) host_info = {} host_info['host'] = name host_info['port'] = info_tmp['port'] host_info['ps'] = info_tmp['ps'] host_info['sort'] = int(info_tmp['sort']) except: if os.path.exists(info_file): os.remove(info_file) continue host_list.append(host_info) host_list = sorted(host_list,key=lambda x: x['sort'],reverse=False) return public.return_message(0, 0, host_list) def get_host_find(self,args): ''' @name 获取指定SSH信息 @author hwliang<2020-08-07> @param args<dict_obj>{ host: 主机地址 } @return dict ''' args.host = args.host.strip() info_file = self._save_path + args.host +'/info.json' if not os.path.exists(info_file): return public.return_message(-1, 0, public.lang("The specified SSH information does not exist!")) info_tmp = self.get_ssh_info(args.host) host_info = {} host_info['host'] = info_tmp['host'] host_info['port'] = info_tmp['port'] host_info['ps'] = info_tmp['ps'] host_info['sort'] = info_tmp['sort'] host_info['username'] = info_tmp['username'] host_info['password'] = info_tmp['password'] host_info['pkey'] = info_tmp['pkey'] host_info['pkey_passwd'] = '' if 'pkey_passwd' in info_tmp: host_info['pkey_passwd'] = info_tmp['pkey_passwd'] return public.return_message(0, 0, host_info) def modify_host(self,args): ''' @name 修改SSH信息 @author hwliang<2020-08-07> @param args<dict_obj>{ host: 被修改的主机地址, new_host: 新的主机地址, port: 端口 ps: 备注 sort: 排序(可选) username: 用户名 password: 密码 pkey: 密钥(如果不为空,将使用密钥连接) pkey_passwd: 密钥的密码 } @return dict ''' args.new_host = args.new_host.strip() args.host = args.host.strip() old_host_path = self._save_path + args.new_host + "_" + args.port + '/info.json' new_host_path = self._save_path + args.host + "_" + args.port + '/info.json' if args.host != args.new_host: info_file = self._save_path + args.new_host +'/info.json' if os.path.exists(new_host_path): return public.return_message(-1, 0, public.lang("The specified host address has been added to other SSH information!")) info_file = self._save_path + args.host +'/info.json' if not os.path.exists(old_host_path): return public.return_message(-1, 0, public.lang("The specified SSH information does not exist!")) if not 'sort' in args: r_data = public.aes_decrypt(public.readFile(old_host_path),self._pass_str) info_tmp = json.loads(r_data) args.sort = info_tmp['sort'] host_info = {} host_info['host'] = args.new_host host_info['port'] = int(args['port']) host_info['ps'] = args['ps'] host_info['sort'] = args['sort'] host_info['username'] = args['username'] host_info['password'] = args['password'] host_info['pkey'] = args['pkey'] if 'pkey_passwd' in args: host_info['pkey_passwd'] = args['pkey_passwd'] else: host_info['pkey_passwd'] = '' if not host_info['pkey']: host_info['pkey'] = '' # result = self.set_attr(host_info) # if not result['status']: return result self.save_ssh_info(args.host, host_info, self._save_path + args.new_host + "_" + args.port) if args.host != args.new_host: public.ExecShell('mv {} {}'.format(old_host_path, new_host_path)) public.write_log_gettext(self._log_type,'Modify the SSH information of HOST: {}',(args.host)) return public.return_message(0, 0, public.lang("Setup successfully!")) def create_host(self,args): ''' @name 添加SSH信息 @author hwliang<2020-08-07> @param args<dict_obj>{ host: 主机地址, port: 端口 ps: 备注 sort: 排序(可选,默认0) username: 用户名 password: 密码 pkey: 密钥(如果不为空,将使用密钥连接) pkey_passwd: 密钥的密码 } @return dict ''' args.host = args.host.strip() host_path = self._save_path + args.host + "_" + args.port info_file = host_path +'/info.json' if os.path.exists(info_file): args.new_host = args.host return self.modify_host(args) #return public.returnMsg(False,'Specify that SSH information has been added!') if not os.path.exists(host_path): os.makedirs(host_path,384) if not 'sort' in args: args.sort = 0 if not 'ps' in args: args.ps = args.host host_info = {} host_info['host'] = args.host host_info['port'] = int(args['port']) host_info['ps'] = args['ps'] host_info['sort'] = int(args['sort']) host_info['username'] = args['username'] host_info['password'] = args['password'] host_info['pkey'] = args['pkey'] host_info['pkey_passwd'] = '' if 'pkey_passwd' in args: host_info['pkey_passwd'] = args['pkey_passwd'] #result = self.set_attr(host_info) #if result['status'] == -1: return result self.save_ssh_info(args.host, host_info, host_path) public.write_log_gettext(self._log_type,'Add the SSH information of HOST: {}',(str(args.host),)) return public.return_message(0, 0, public.lang("Setup successfully!")) def remove_host(self,args): ''' @name 删除指定SSH信息 @author hwliang<2020-08-07> @param args<dict_obj>{ host: 主机地址 } @return dict ''' args.host = args.host.strip() if not args.host: return public.return_message(-1, 0, public.lang("Parameter ERROR!")) host_path = self._save_path + args.host if not os.path.exists(host_path): return public.return_message(-1, 0, public.lang("The specified SSH information does not exist!")) public.ExecShell("rm -rf {}".format(host_path)) public.write_log_gettext(self._log_type,'Delete the SSH information of HOST: {}',(str(args.host),)) return public.return_message(0, 0, public.lang("Setup successfully!")) def get_ssh_info(self,host): ''' @name 获取并解密指定SSH信息 @author hwliang<2020-08-07> @param host<string> 主机地址 @return dict or False ''' info_file = self._save_path + host + '/info.json' if not os.path.exists(info_file): return False try: r_data = public.aes_decrypt(public.readFile(info_file),self._pass_str) except ValueError as ex: r_data = '{}' if str(ex).find('Incorrect AES key length') != -1: if os.path.exists(self._pass_file): os.remove(self._pass_file) self.__create_aes_pass() r_data = public.aes_decrypt(public.readFile(info_file),self._pass_str) return json.loads(r_data) def save_ssh_info(self, host, host_info, host_path=None): ''' @name 获取并解密指定SSH信息 @author hwliang<2020-08-07> @param host<string> 主机地址 @param host_info<dict> ssh信息字典 @return bool ''' host_path = host_path if host_path else self._save_path + host if not os.path.exists(host_path): os.makedirs(host_path,384) info_file = host_path +'/info.json' r_data = public.aes_encrypt(json.dumps(host_info),self._pass_str) public.writeFile(info_file,r_data) return True def set_sort(self,args): ''' @name 获取并解密指定SSH信息 @author hwliang<2020-08-07> @param args<dict_obj>{ sort_list<json>{ 主机host : 排序编号, 主机host : 排序编号, ... } } @return bool ''' if not 'sort_list' in args: return public.return_message(-1, 0, public.lang("Please pass in the [sort_list] field")) sort_list = json.loads(args.sort_list) for name in sort_list.keys(): info_file = self._save_path + name + '/info.json' if not os.path.exists(info_file): continue ssh_info = self.get_ssh_info(name) ssh_info['sort'] = int(sort_list[name]) self.save_ssh_info(name,ssh_info) return public.return_message(0, 0, public.lang("Setup successfully!")) def get_command_list(self,args = None, user_cmd = False , sys_cmd = False): ''' @name 获取常用命令列表 @author hwliang<2020-08-08> @param args<dict_obj> @param user_cmd<bool> 是否不获取用户配置 @param sys_cmd<bool> 是否不获取系统配置 @return list ''' sys_command = [] if not sys_cmd: if os.path.exists(self._sys_command_file): sys_command = json.loads(public.readFile(self._sys_command_file)) user_command = [] if not user_cmd: if os.path.exists(self._user_command_file): user_command = json.loads(public.readFile(self._user_command_file)) command = sys_command + user_command return public.return_message(0, 0,command) def _get_command_list(self,args = None, user_cmd = False , sys_cmd = False): ''' @name 获取常用命令列表 @author hwliang<2020-08-08> @param args<dict_obj> @param user_cmd<bool> 是否不获取用户配置 @param sys_cmd<bool> 是否不获取系统配置 @return list ''' sys_command = [] if not sys_cmd: if os.path.exists(self._sys_command_file): sys_command = json.loads(public.readFile(self._sys_command_file)) user_command = [] if not user_cmd: if os.path.exists(self._user_command_file): user_command = json.loads(public.readFile(self._user_command_file)) command = sys_command + user_command return command def command_exists(self,command,title): ''' @name 判断命令是否存在 @author hwliang<2020-08-08> @param command<list> 常用命令列表 @param title<string> 命令标题 @return bool ''' for cmd in command: if cmd['title'] == title: return True return False def save_command(self,command,sys_cmd=False): ''' @name 保存常用命令 @author hwliang<2020-08-08> @param command<list> 常用命令列表 @param sys_cmd<bool> 是否为系统配置 @return void ''' s_file = self._user_command_file if sys_cmd: s_file = self._sys_command_file public.writeFile(s_file,json.dumps(command)) def create_command(self,args): ''' @name 创建常用命令 @author hwliang<2020-08-08> @param args<dict_obj>{ title<string> 标题 shell<string> 命令文本 } @return dict ''' args.title = args.title.strip() command = self._get_command_list(sys_cmd=True) if self.command_exists(command,args.title): return public.return_message(-1, 0, public.lang("The specified command name already exists")) cmd = { "title": args.title, "shell": args.shell.strip() } command.append(cmd) self.save_command(command) public.write_log_gettext(self._log_type,'Add common commands [{}]',(str(args.title))) return public.return_message(0, 0, public.lang("Setup successfully!")) def get_command_find(self,args = None, title=None): ''' @name 获取指定命令信息 @author hwliang<2020-08-08> @param args<dict_obj>{ title<string> 标题 } 可选 @param title 标题 可选 @return dict ''' if args: title = args.title.strip() command = self._get_command_list() for cmd in command: if cmd['title'] == title or cmd['title'] == args.title: return cmd return public.return_message(-1, 0, public.lang("The specified command does not exist")) def modify_command(self,args): ''' @name 修改常用命令 @author hwliang<2020-08-08> @param args<dict_obj>{ title<string> 标题 new_title<string> 新标题 shell<string> 命令文本 } @return dict ''' title = args.title.strip() command = self._get_command_list(sys_cmd=True) if not self.command_exists(command,args.title): return public.return_message(-1, 0, public.lang("The specified command does not exist")) for i in range(len(command)): if command[i]['title'] == args.title or command[i]['title'] == title: command[i]['title'] = args.new_title.strip() command[i]['shell'] = args.shell.strip() break self.save_command(command) public.write_log_gettext(self._log_type,'Modify common commands [{}]',(str(args.title),)) return public.return_message(0, 0, public.lang("Setup successfully!")) def remove_command(self,args): ''' @name 删除指定命令 @author hwliang<2020-08-08> @param args<dict_obj>{ title<string> 标题 } @return dict ''' args.title = args.title.strip() command = self._get_command_list(sys_cmd=True) if not self.command_exists(command,args.title): return public.return_message(-1, 0, public.lang("The specified command does not exist")) for i in range(len(command)): if command[i]['title'] == args.title: del(command[i]) break self.save_command(command) public.write_log_gettext(self._log_type,'Delete common commands [{}]',(str(args.title),)) return public.return_message(0, 0, public.lang("Setup successfully!")) def into_command(self, args): ''' @name 导入命令 @author law<2023-11-13> @param args @return ''' command_file_path = "/tmp/incommand.csv" from files import files fileObj = files() ff = fileObj.upload(args) if ff["status"]: command = self.get_command_list(sys_cmd=True) import csv import chardet encoding = "utf-8" with open(command_file_path, "rb") as f: encoding = chardet.detect(f.read())['encoding'] with open(command_file_path, 'r', encoding=encoding) as f: reader = csv.reader(f) next(reader) for row in reader: cmd = { "title": row[0].strip(), "shell": row[1].strip() } if self.command_exists(command, cmd['title']): continue command.append(cmd) # 写日志 titles = [t["title"] for t in command] public.WriteLog(self._log_type, 'Import frequently used commands: [{}]'.format(titles)) self.save_command(command) # 删除临时文件 if os.path.exists(command_file_path): os.remove(command_file_path) return public.return_message(0, 0, public.lang('The import was successful')) return public.return_message(0, 0, public.lang('The import failed')) def out_command(self, args): ''' @name 导出命令 @author law<2023-11-13> @return .csv ''' export_file_path = "/tmp/outcommand.csv" # 删除临时文件 if os.path.exists(export_file_path): os.remove(export_file_path) try: command = self.get_command_list(sys_cmd=True) if not command: return public.return_message(-1, 0, public.lang('There are no exportable commands')) for i in command: i["title"] = i["title"].strip() i["shell"] = i["shell"].strip() # 写日志 titles = [t["title"] for t in command] public.WriteLog(self._log_type, 'Export common commands:{}'.format(titles)) # 写入临时文件 with open(export_file_path, mode="w+", encoding="utf-8") as fp: fp.write("Name, command\n") for line in command: tmp = ( line["title"], line["shell"], ) fp.write(",".join(tmp)) fp.write("\n") return public.return_message(0, 0, export_file_path) except Exception as e: return public.return_message(-1, 0, public.lang('The export failed'))