Server IP : 172.67.216.182 / Your IP : 172.68.242.82 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/ |
Upload File : |
#coding: utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: hwliang <[email protected]> # +------------------------------------------------------------------- import sqlite3 import os,time,sys os.chdir('/www/server/panel') if not 'class/' in sys.path: sys.path.insert(0,'class/') import public class Sql(): #------------------------------ # 数据库操作类 For sqlite3 #------------------------------ __DB_FILE = None # 数据库文件 __DB_CONN = None # 数据库连接对象 __DB_TABLE = "" # 被操作的表名称 __OPT_WHERE = "" # where条件 __OPT_LIMIT = "" # limit条件 __OPT_ORDER = "" # order条件 __OPT_FIELD = "*" # field条件 __OPT_PARAM = () # where值 __LOCK = '/dev/shm/sqlite_lock.pl' def __init__(self): self.__DB_FILE = 'data/default.db' def __enter__(self): return self def __exit__(self,exc_type,exc_value,exc_trackback): self.close() def __GetConn(self): #取数据库对象 try: if self.__DB_CONN == None: self.__DB_CONN = sqlite3.connect(self.__DB_FILE) self.__DB_CONN.text_factory = str except Exception as ex: return "error: " + str(ex) def connect(self): #连接数据库 self.__GetConn() return self def dbfile(self,name): #设置数据库文件 if name[0] == '/': self.__DB_FILE = name else: self.__DB_FILE = 'data/' + name + '.db' return self def table(self,table): #设置表名 self.__DB_TABLE = table return self def where(self,where,param): #WHERE条件 if where: self.__OPT_WHERE = " WHERE " + where self.__OPT_PARAM = self.__to_tuple(param) return self def __to_tuple(self,param): #将参数转换为tuple if type(param) != tuple: if type(param) == list: param = tuple(param) else: param = (param,) return param def order(self,order): #ORDER条件 if len(order): self.__OPT_ORDER = " ORDER BY "+order return self def limit(self,limit,offset = 0): #LIMIT条件 if limit and not offset: self.__OPT_LIMIT = " LIMIT {}".format(limit) elif limit and offset: self.__OPT_LIMIT = " LIMIT {},{}".format(offset,limit) return self def field(self,field): #FIELD条件 if len(field): self.__OPT_FIELD = field return self def select(self): #查询数据集 self.__GetConn() try: self.__get_columns() sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + self.__OPT_WHERE + self.__OPT_ORDER + self.__OPT_LIMIT result = self.__DB_CONN.execute(sql,self.__OPT_PARAM) data = result.fetchall() #构造字典系列 if self.__OPT_FIELD != "*": fields = self.__format_field(self.__OPT_FIELD.split(',')) tmp = [] for row in data: i=0 tmp1 = {} for key in fields: tmp1[key.strip('`')] = row[i] i += 1 tmp.append(tmp1) del(tmp1) data = tmp del(tmp) else: #将元组转换成列表 tmp = list(map(list,data)) data = tmp del(tmp) self._close() return data except Exception as ex: return "error: " + str(ex) def get(self): self.__get_columns() return self.select() def __format_field(self,field): import re fields = [] for key in field: s_as = re.search(r'\s+as\s+',key,flags=re.IGNORECASE) if s_as: as_tip = s_as.group() key = key.split(as_tip)[1] fields.append(key) return fields def __get_columns(self): if self.__OPT_FIELD == '*': tmp_cols = self.query('PRAGMA table_info('+self.__DB_TABLE+')',()) cols = [] for col in tmp_cols: if len(col) > 2: cols.append('`' + col[1] + '`') if len(cols) > 0: self.__OPT_FIELD = ','.join(cols) def getField(self,keyName): #取回指定字段 try: result = self.field(keyName).select() if len(result) != 0: return result[0][keyName] return result except: return None def setField(self,keyName,keyValue): #更新指定字段 return self.save(keyName,(keyValue,)) def find(self): #取一行数据 try: result = self.limit("1").select() if len(result) == 1: return result[0] return result except:return None def count(self): #取行数 key="COUNT(*)" data = self.field(key).select() try: return int(data[0][key]) except: return 0 def add(self,keys,param): #插入数据 self.write_lock() self.__GetConn() self.__DB_CONN.text_factory = str try: values="" for key in keys.split(','): values += "?," values = values[0:len(values)-1] sql = "INSERT INTO "+self.__DB_TABLE+"("+keys+") "+"VALUES("+values+")" result = self.__DB_CONN.execute(sql,self.__to_tuple(param)) id = result.lastrowid self._close() self.__DB_CONN.commit() self.rm_lock() return id except Exception as ex: return "error: " + str(ex) #插入数据 def insert(self,pdata): if not pdata: return False keys,param = self.__format_pdata(pdata) return self.add(keys,param) #更新数据 def update(self,pdata): if not pdata: return False keys,param = self.__format_pdata(pdata) return self.save(keys,param) #构造数据 def __format_pdata(self,pdata): keys = pdata.keys() keys_str = ','.join(keys) param = [] for k in keys: param.append(pdata[k]) return keys_str,tuple(param) def addAll(self,keys,param): #插入数据 self.write_lock() self.__GetConn() self.__DB_CONN.text_factory = str try: values="" for key in keys.split(','): values += "?," values = values[0:len(values)-1] sql = "INSERT INTO "+self.__DB_TABLE+"("+keys+") "+"VALUES("+values+")" result = self.__DB_CONN.execute(sql,self.__to_tuple(param)) self.rm_lock() return True except Exception as ex: return "error: " + str(ex) def commit(self): self._close() self.__DB_CONN.commit() def save(self,keys,param): #更新数据 self.write_lock() self.__GetConn() self.__DB_CONN.text_factory = str try: opt = "" for key in keys.split(','): opt += key + "=?," opt = opt[0:len(opt)-1] sql = "UPDATE " + self.__DB_TABLE + " SET " + opt+self.__OPT_WHERE #处理拼接WHERE与UPDATE参数 tmp = list(self.__to_tuple(param)) for arg in self.__OPT_PARAM: tmp.append(arg) self.__OPT_PARAM = tuple(tmp) result = self.__DB_CONN.execute(sql,self.__OPT_PARAM) self._close() self.__DB_CONN.commit() self.rm_lock() return result.rowcount except Exception as ex: return "error: " + str(ex) def delete(self,id=None): #删除数据 self.write_lock() self.__GetConn() try: if id: self.__OPT_WHERE = " WHERE id=?" self.__OPT_PARAM = (id,) sql = "DELETE FROM " + self.__DB_TABLE + self.__OPT_WHERE result = self.__DB_CONN.execute(sql,self.__OPT_PARAM) self._close() self.__DB_CONN.commit() self.rm_lock() return result.rowcount except Exception as ex: return "error: " + str(ex) def execute(self,sql,param = ()): #执行SQL语句返回受影响行 self.write_lock() self.__GetConn() try: result = self.__DB_CONN.execute(sql,self.__to_tuple(param)) self.__DB_CONN.commit() self.rm_lock() return result.rowcount except Exception as ex: return "error: " + str(ex) #是否有锁 def is_lock(self): return # n = 0 # while os.path.exists(self.__LOCK): # n+=1 # if n > 100: # self.rm_lock() # break # time.sleep(0.01) #写锁 def write_lock(self): return # self.is_lock() # with open(self.__LOCK,'wb+') as f: # f.close() #解锁 def rm_lock(self): return # if os.path.exists(self.__LOCK): # os.remove(self.__LOCK) def query(self,sql,param = ()): #执行SQL语句返回数据集 self.__GetConn() try: result = self.__DB_CONN.execute(sql,self.__to_tuple(param)) #将元组转换成列表 data = list(map(list,result)) return data except Exception as ex: return "error: " + str(ex) def create(self,name): #创建数据表 self.write_lock() self.__GetConn() script = public.readFile('data/' + name + '.sql') result = self.__DB_CONN.executescript(script) self.__DB_CONN.commit() self.rm_lock() return result.rowcount def fofile(self,filename): #执行脚本 self.write_lock() self.__GetConn() script = public.readFile(filename) result = self.__DB_CONN.executescript(script) self.__DB_CONN.commit() self.rm_lock() return result.rowcount def _close(self): #清理条件属性 self.__OPT_WHERE = "" self.__OPT_FIELD = "*" self.__OPT_ORDER = "" self.__OPT_LIMIT = "" self.__OPT_PARAM = () def is_connect(self): #检查是否连接数据库 if not self.__DB_CONN: return False return True def close(self): #释放资源 try: self.__DB_CONN.close() self.__DB_CONN = None except: pass