Server IP : 172.67.216.182 / Your IP : 162.158.170.172 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
#coding: utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: hwliang <[email protected]> # +------------------------------------------------------------------- import re,os,sys,public,json import pymysql class panelMysql: __DB_PASS = None __DB_USER = 'root' __DB_NAME = None __DB_PORT = 3306 __DB_HOST = 'localhost' __DB_PREFIX = '' __DB_CONN = None __DB_CUR = None __DB_ERR = None __DB_TABLE = "" # 被操作的表名称 __OPT_WHERE = "" # where条件 __OPT_LIMIT = "" # limit条件 __OPT_ORDER = "" # order条件 __OPT_FIELD = "*" # field条件 __OPT_PARAM = () # where值 _USER = None _ex = None def __init__(self): pass def set_name(self,name): self.__DB_NAME = str(name) return self def set_prefix(self,prefix): self.__DB_PREFIX = prefix return self def set_host(self,host,port,name,username,password,prefix = ''): self.__DB_HOST = host self.__DB_PORT = int(port) self.__DB_NAME = name if self.__DB_NAME: self.__DB_NAME = str(self.__DB_NAME) self.__DB_USER = str(username) self._USER = str(username) self.__DB_PASS = str(password) self.__DB_PREFIX = prefix if not self.__GetConn(): return False return self #连接MYSQL数据库 def __GetConn(self): try: # print(self.__DB_HOST,self.__DB_PORT,self.__DB_NAME,self.__DB_USER,self.__DB_PASS) self.__DB_CONN = pymysql.connect(host=self.__DB_HOST,user=self.__DB_USER,passwd=str(self.__DB_PASS),db=self.__DB_NAME,port=self.__DB_PORT,connect_timeout=15,read_timeout=60,write_timeout=60) except Exception as ex: self.__DB_ERR = "error: " + str(ex) self._ex = ex print(ex) if self.__DB_ERR.find("timed out") != -1 or self.__DB_ERR.find("is not allowed to connect") != -1: return False try: self.__DB_CONN = pymysql.connect(host=self.__DB_HOST,user=self.__DB_USER,passwd=str(self.__DB_PASS),db=self.__DB_NAME,port=self.__DB_PORT) except Exception as ex: self.__DB_ERR = "error: " + str(ex) self._ex = ex print(ex) return False self.__DB_CUR = self.__DB_CONN.cursor() return True def table(self,table): #设置表名 self.__DB_TABLE = self.__DB_PREFIX + table return self def where(self,where,param): #WHERE条件 if where: self.__OPT_WHERE = " WHERE " + where self.__OPT_PARAM = self.__to_tuple(param) return self def __to_tuple(self,param): #将参数转换为tuple if type(param) != tuple: if type(param) == list: param = tuple(param) else: param = (param,) return param def order(self,order): #ORDER条件 if len(order): self.__OPT_ORDER = " ORDER BY "+order return self def limit(self,limit): #LIMIT条件 limit = str(limit) if len(limit): self.__OPT_LIMIT = " LIMIT "+ limit return self def field(self,field): #FIELD条件 if len(field): self.__OPT_FIELD = field return self def select(self): #查询数据集 self.__GetConn() if not self.__DB_CUR: return self.__DB_ERR try: self.__get_columns() sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + self.__OPT_WHERE + self.__OPT_ORDER + self.__OPT_LIMIT self.__DB_CUR.execute(sql,self.__OPT_PARAM) data = self.__DB_CUR.fetchall() #构造字典系列 if self.__OPT_FIELD != "*": fields = self.__format_field(self.__OPT_FIELD.split(',')) tmp = [] for row in data: i=0 tmp1 = {} for key in fields: tmp1[key.strip('`')] = row[i] i += 1 tmp.append(tmp1) del(tmp1) data = tmp del(tmp) else: #将元组转换成列表 tmp = list(map(list, data)) data = tmp del(tmp) self.__close() return data except Exception as ex: self._ex = ex return "error: " + str(ex) def get(self): self.__get_columns() return self.select() def __format_field(self, field): import re fields = [] for key in field: s_as = re.search(r'\s+as\s+', key, flags=re.IGNORECASE) if s_as: as_tip = s_as.group() key = key.split(as_tip)[1] fields.append(key) return fields def __get_columns(self): if self.__OPT_FIELD == '*': tmp_cols = self.query( "select COLUMN_NAME from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}';" .format(self.__DB_TABLE, self.__DB_NAME), False) cols = [] for col in tmp_cols: cols.append('`' + col[0] + '`') if len(cols) > 0: self.__OPT_FIELD = ','.join(cols) def getField(self, keyName): #取回指定字段 try: result = self.field(keyName).select() if len(result) != 0: return result[0][keyName] return result except: return None def setField(self, keyName, keyValue): #更新指定字段 return self.save(keyName, (keyValue, )) def find(self): #取一行数据 try: result = self.limit("1").select() if len(result) == 1: return result[0] return result except: return None def count(self): #取行数 key = "COUNT(*)" data = self.field(key).select() try: return int(data[0][key]) except: return 0 def add(self, keys, param): #插入数据 self.__GetConn() self.__DB_CONN.text_factory = str try: values = "" for key in keys.split(','): values += "%s," values = values[0:len(values) - 1] sql = "INSERT INTO " + self.__DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")" self.__DB_CUR.execute(sql, self.__to_tuple(param)) id = self.__DB_CUR.lastrowid self.__close() self.__DB_CONN.commit() return id except Exception as ex: self._ex = ex return "error: " + str(ex) #插入数据 def insert(self, pdata): if not pdata: return False keys, param = self.__format_pdata(pdata) return self.add(keys, param) #更新数据 def update(self, pdata): if not pdata: return False keys, param = self.__format_pdata(pdata) return self.save(keys, param) #构造数据 def __format_pdata(self, pdata): keys = pdata.keys() keys_tmp = [] for k in keys: keys_tmp.append("`{}`".format(k)) keys_str = ','.join(keys_tmp) param = [] for k in keys: #if pdata[k] == None: pdata[k] = '' param.append(pdata[k]) return keys_str, tuple(param) def addAll(self, keys, param): #插入数据 self.__GetConn() self.__DB_CONN.text_factory = str try: values = "" for key in keys.split(','): values += "%s," values = values[0:len(values) - 1] sql = "INSERT INTO " + self.__DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")" result = self.__DB_CUR.execute(sql, self.__to_tuple(param)) return True except Exception as ex: self._ex = ex return "error: " + str(ex) def commit(self): self.__close() self.__DB_CONN.commit() def save(self, keys, param): #更新数据 self.__GetConn() self.__DB_CONN.text_factory = str try: opt = "" for key in keys.split(','): opt += key + "=%s," opt = opt[0:len(opt) - 1] sql = "UPDATE " + self.__DB_TABLE + " SET " + opt + self.__OPT_WHERE #处理拼接WHERE与UPDATE参数 if param: tmp = list(self.__to_tuple(param)) for arg in self.__OPT_PARAM: tmp.append(arg) self.__OPT_PARAM = tuple(tmp) self.__DB_CUR.execute(sql,self.__OPT_PARAM) else: self.__DB_CUR.execute(sql) self.__close() self.__DB_CONN.commit() return self.__DB_CUR.rowcount except Exception as ex: self._ex = ex return "error: " + str(ex) def delete(self, id=None): #删除数据 self.__GetConn() try: if id: self.__OPT_WHERE = " WHERE id=%s" self.__OPT_PARAM = (id, ) sql = "DELETE FROM " + self.__DB_TABLE + self.__OPT_WHERE self.__DB_CUR.execute(sql, self.__OPT_PARAM) self.__close() self.__DB_CONN.commit() return self.__DB_CUR.rowcount except Exception as ex: return "error: " + str(ex) def execute(self,sql,param = ()): #执行SQL语句返回受影响行 if not self.__GetConn(): return self.__DB_ERR try: if param: self.__OPT_PARAM = list(self.__to_tuple(param)) result = self.__DB_CUR.execute(sql,self.__OPT_PARAM) else: result = self.__DB_CUR.execute(sql) self.__DB_CONN.commit() self.__close() return result except Exception as ex: self._ex = ex return ex def query(self,sql,is_close=True,param=()): #执行SQL语句返回数据集 if not self.__GetConn(): return self.__DB_ERR try: if param: self.__OPT_PARAM = list(self.__to_tuple(param)) self.__DB_CUR.execute(sql,self.__OPT_PARAM) else: self.__DB_CUR.execute(sql) result = self.__DB_CUR.fetchall() #将元组转换成列表 data = list(map(list,result)) if is_close: self.__Close() return data except Exception as ex: self._ex = ex return ex #关闭连接 def __Close(self): self.__DB_CUR.close() self.__DB_CONN.close() def __close(self): #清理条件属性 self.__OPT_WHERE = "" self.__OPT_FIELD = "*" self.__OPT_ORDER = "" self.__OPT_LIMIT = "" self.__OPT_PARAM = () def close(self): #释放资源 try: self.__DB_CUR.close() self.__DB_CUR.close() except: pass