Server IP : 172.67.216.182 / Your IP : 162.158.170.97 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /www/server/panel/class_v2/crontabModelV2/ |
Upload File : |
#coding: utf-8 #------------------------------------------------------------------- # aapanel #------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://aapanel.com) All rights reserved. #------------------------------------------------------------------- # Author: hwliang <[email protected]> #------------------------------------------------------------------- #------------------------------ # 任务编排 #------------------------------ import db import public import time import json import os import sys from crontabModelV2.base import crontabBase class main(crontabBase): _log_type = 'Task Scheduling - Tasks' _operator = ['=','!=','>','>=','<','<=','in','not in'] _exec_script_file = 'script/crontab_task_exec.py' _exec_log_file = 'logs/crontab_task_exec.log' custom_order_path = '/www/server/panel/data/custom_order.json' def __init__(self) -> None: super().__init__() self._sql = db.Sql().dbfile(self.dbfile) self.custom_order = self._load_custom_order() def _load_custom_order(self): if os.path.exists(self.custom_order_path): with open(self.custom_order_path, 'r') as f: return json.load(f) return [] def save_custom_order(self, args): ''' @name 保存自定义排序 @param args<dict_obj>{ order_list<str> 排序后的任务ID列表 (字符串形式的 JSON 数组) } @return dict ''' order_list_str = args.get('order_list') if not order_list_str: return public.returnMsg(False, '排序列表不能为空') try: # 将字符串转换为列表 order_list = json.loads(order_list_str) if not isinstance(order_list, list): return public.returnMsg(False, '排序列表格式不正确') # 保存排序结果到文件 public.writeFile(self.custom_order_path, json.dumps(order_list)) self.custom_order = order_list return public.returnMsg(True, '排序保存成功') except Exception as e: return public.returnMsg(False, '排序保存失败: ' + str(e)) def get_trigger_list_all(self, args=None): # 获取所有任务数据 data = self._sql.table('trigger').order('trigger_id desc').select() if self.custom_order: # 创建任务字典 id_to_task = {str(task['trigger_id']): task for task in data} # 根据自定义顺序重新排序任务 sorted_data = [id_to_task[str(task_id)] for task_id in self.custom_order if str(task_id) in id_to_task] # 获取未包含在自定义排序中的任务 remaining_tasks = [task for task in data if int(task['trigger_id']) not in self.custom_order] # 将剩余任务按照原排序顺序追加到排序后的列表中 sorted_data.extend(remaining_tasks) # 返回重新排序后的任务列表 return sorted_data # 如果没有自定义排序,返回原排序的任务列表 return data def get_trigger_list(self,args = None): ''' @name 获取任务列表 @author hwliang @param args<dict_obj>{ p<int> 分页 rows<int> 每页数量 search<string> 搜索关键字 } @return list ''' p = 1 if args and 'p' in args: p = int(args.p) rows = 10 if args and 'rows' in args: rows = int(args.rows) search = '' if args and 'search' in args: search = args.search tojs = '' if args and 'tojs' in args: tojs = args.tojs where = '' params = [] if search: where = "name like ? or ps like ?" params.append('%' + search + '%') params.append('%' + search + '%') if 'script_id' in args: where = "script_id = ?" params.append(args.script_id) if 'type_id' in args and args.type_id: where = "type_id = ?" params.append(args.type_id) if 'status' in args: where = "status = ?" params.append(args.status) count = self._sql.table('trigger').where(where,tuple(params)).count() data = public.get_page(count,p,rows,tojs) data['data'] = self._sql.table('trigger').where(where,tuple(params)).order('trigger_id desc').limit(data['row'],data['shift']).select() # 按自定义排序顺序重新排序任务列表 if self.custom_order: id_to_task = {str(task['trigger_id']): task for task in data['data']} # 确保每个任务ID唯一 sorted_data = [] seen = set() for task_id in self.custom_order: str_task_id = str(task_id) if str_task_id in id_to_task and str_task_id not in seen: sorted_data.append(id_to_task[str_task_id]) seen.add(str_task_id) remaining_tasks = [task for task_id, task in id_to_task.items() if str(task_id) not in seen] data['data'] = sorted_data + remaining_tasks for i in data['data']: i['operator_where'] = self._sql.table('operator_where').where('trigger_id=?',i['trigger_id']).select() i['return_type'] = 'all' i['script_info'] = {} if not i['args']: i['args'] = '' if i['script_id']: i['script_info'] = self._sql.table('scripts').where('script_id=?',(i['script_id'],)).find() if i['script_info']: i['return_type'] = i['script_info']['return_type'] else: i['script_info'] = {} if not i['return_type']: i['return_type'] = 'all' # 取上次执行时间 i['last_exec_time'] = self._sql.table('tasks').where('trigger_id=?',(i['trigger_id'],)).order('log_id desc').getField('start_time') if not i['last_exec_time']: i['last_exec_time']="" if 'order_param' in args and args.order_param: order_param=getattr(args,'order_param',None) data['data']=self._sort_triggers(data['data'],order_param) return public.return_message(0,0,data) def _sort_triggers(self, task, order_param): sort_key, order = order_param.split(' ') reverse_order = order == "desc" def safe_key(item): value = item[sort_key] # 将所有值转换为字符串进行比较 return str(value) if not isinstance(value, str) else value return sorted(task, key=safe_key, reverse=reverse_order) def sync_crontab(self,trigger_id): ''' @name 同步到计划任务 @param trigger_id<int> 任务ID @return dict ''' trigger_info = self._sql.table('trigger').where('trigger_id=?',(trigger_id,)).find() if not trigger_info: return public.returnMsg(False,'任务不存在') import crontab_v2 as crontab crontab_obj = crontab.crontab() # 检查是否使用了自定义 crontab 表达式 if trigger_info['cycle_type'] == 'custom' and trigger_info['crontab_expression']: cron_config = trigger_info['crontab_expression'] else: trigger_info['type'] = trigger_info['cycle_type'] trigger_info['hour'] = trigger_info['cycle_hour'] trigger_info['minute'] = trigger_info['cycle_minute'] trigger_info['week'] = trigger_info['cycle_where'] trigger_info['where1'] = trigger_info['cycle_where'] cron_config,param,name = crontab_obj.GetCrondCycle(trigger_info) panel_path = public.get_panel_path() # start_time = trigger_info['start_time'] if 'start_time' in trigger_info and trigger_info['start_time'] else '' # exec_count = trigger_info['exec_count'] if 'exec_count' in trigger_info and trigger_info['exec_count'] else 0 cron_line = "{} {} {}/{} {} >> {}/{}".format(cron_config,public.get_python_bin(),panel_path,self._exec_script_file,trigger_id,panel_path,self._exec_log_file) cron_file = crontab_obj.get_cron_file() if not os.path.exists(cron_file): public.writeFile(cron_file,'') cron_body = public.readFile(cron_file) cron_lines = cron_body.strip().split('\n') cron_lines.append(cron_line) cron_body = '\n'.join(cron_lines) cron_body += '\n' result = public.writeFile(cron_file, cron_body) if not result: return False crontab_obj.CrondReload() return True def remove_crontab(self,trigger_id): ''' @name 移除计划任务 @param trigger_id<int> 任务ID @return dict ''' import crontab_v2 as crontab crontab_obj = crontab.crontab() cron_file = crontab_obj.get_cron_file() if not os.path.exists(cron_file): return cron_body = public.readFile(cron_file) cron_lines = cron_body.strip().split('\n') cron_lines_new = [] tip_line = '{} {} >>'.format(self._exec_script_file,trigger_id) for cron_line in cron_lines: if not cron_line: continue if cron_line.find(tip_line) == -1: cron_lines_new.append(cron_line) cron_body = '\n'.join(cron_lines_new) cron_body += '\n' public.writeFile(cron_file,cron_body) crontab_obj.CrondReload() return True def get_trigger_info(self,args = None): ''' @name 获取任务信息 @author hwliang @param args<dict_obj>{ trigger_id<int> 任务ID } @return dict ''' if not args or not 'trigger_id' in args: return public.returnMsg(False,'parameter error') trigger_id = int(args.trigger_id) trigger_info = self._sql.table('trigger').where('trigger_id=?',(trigger_id,)).find() if not trigger_info: return public.returnMsg(False,'Task does not exist') trigger_info['operator_where'] = self._sql.table('operator_where').where('trigger_id=?',trigger_info['trigger_id']).select() return public.returnMsg(True,trigger_info) def create_trigger(self,args = None): ''' @name 添加任务信息 @author hwliang @param args<dict_obj>{ name<string> 任务名称 script_id<int> 脚本ID ps<string> 备注 script_body<string> 脚本内容 cycle_type<string> 任务类型 cycle_where<string> 任务条件 cycle_hour<string> 小时 cycle_minute<string> 运行分钟 operator_where<list>{ operator<string> 操作符 op_value<string> 值 run_script_id<int> 运行脚本ID run_script<string> 运行脚本内容 } } @return dict ''' if not args or not hasattr(args,'name') or not hasattr(args,'script_id'): return public.return_message(-1,0,public.lang('parameter error')) if not args.name: return public.return_message(-1,0,public.lang('The task name cannot be empty')) if not args.script_id and not args.script_body: return public.return_message(-1,0,public.lang('Script content and script ID cannot be empty at the same time')) if not args.cycle_type: return public.return_message(-1,0,public.lang('Task type cannot be empty')) crontab_expression=args.crontab_expression if hasattr(args,'crontab_expression') else '' if args.cycle_type=="custom" and not crontab_expression: return public.return_message(-1,0, public.lang('Crontab expression cannot be empty')) # 验证 crontab 表达式 if crontab_expression and not self.validate_crontab_expression(crontab_expression): return public.return_message(-1,0, public.lang('Invalid crontab expression')) script_args = args.get('args','') #检查脚本是否存在 if args.script_id: script_info = self._sql.table('scripts').where('script_id=?',(args.script_id,)).find() if not script_info: return public.return_message(-1,0,public.lang('Script does not exist')) if 'is_args' in script_info and script_info['is_args'] == 1 and not script_args: return public.return_message(-1,0,public.lang('This script requires script parameters')) #检查任务是否存在 if self._sql.table('trigger').where('name=?',(args.name,)).count(): return public.return_message(-1,0,public.lang('Task already exists')) # 根据用户是否提供 crontab 表达式来决定使用哪种方式 if crontab_expression: cycle_type = 'custom' cycle_where = "" cycle_hour = '' cycle_minute = '' else: cycle_type = args.cycle_type cycle_where = args.cycle_where cycle_hour = args.cycle_hour cycle_minute = args.cycle_minute # 获取开始执行时间和执行次数 start_time = args.start_time if hasattr(args, 'start_time') else 0 exec_count = args.exec_count if hasattr(args, 'exec_count') else 0 # 获取当前时间戳 current_time = int(time.time()) if start_time: # 检查开始时间是否小于当前时间 if start_time < current_time: return public.return_message(-1,0, public.lang('The set start time cannot be earlier than the current time')) pdata = { 'name':args.name, 'script_id':args.script_id, 'ps':args.ps, 'script_body':args.script_body, 'args':script_args, 'cycle_type':cycle_type, 'cycle_where':cycle_where, 'cycle_hour':cycle_hour, 'cycle_minute':cycle_minute, 'create_time': int(time.time()), # 'crontab_expression':crontab_expression, # 'start_time': start_time, # 'exec_count': exec_count } trigger_id = self._sql.table('trigger').insert(pdata) if not trigger_id: return public.return_message(-1,0,public.lang('Add failed')) result = self.sync_crontab(trigger_id) if not result: self._sql.table('trigger').where('trigger_id=?', trigger_id).delete() return public.return_message(-1,0, public.lang('Writing scheduled task failed, please check if the disk is writable or if system hardening is enabled!')) #增加任务条件 operator_where = args.operator_where for op in operator_where: op['trigger_id'] = trigger_id op['create_time'] = int(time.time()) self._sql.table('operator_where').insert(op) # # 更新自定义排序 # self.custom_order.insert(0,trigger_id) # public.writeFile(self.custom_order_path, json.dumps(self.custom_order)) public.set_module_logs('crontab_trigger', 'create_trigger', 1) public.WriteLog(self._log_type,'Task [{}] added successfully!' ,args.name,) return public.return_message(0,0,public.lang('Added successfully')) def validate_crontab_expression(self,expression): import re pattern = ( r'^(\*|([0-5]?\d)(-\d+)?)(,(\*|([0-5]?\d)(-\d+)?))*\s+' # Minute: *, 0-59, 0-59/step r'(\*|([01]?\d|2[0-3])(-\d+)?)(,(\*|([01]?\d|2[0-3])(-\d+)?))*\s+' # Hour: *, 0-23, 0-23/step r'(\*|([1-9]|[12]\d|3[01])(-\d+)?)(,(\*|([1-9]|[12]\d|3[01])(-\d+)?))*\s+' # Day of month: *, 1-31, 1-31/step r'(\*|([1-9]|1[0-2])(-\d+)?)(,(\*|([1-9]|1[0-2])(-\d+)?))*\s+' # Month: *, 1-12, 1-12/step r'(\*|([0-7])(-\d+)?)(,(\*|([0-7])(-\d+)?))*$' # Day of week: *, 0-7, 0-7/step ) return re.match(pattern, expression.strip()) is not None def modify_trigger(self,args = None): ''' @name 修改任务信息 @author hwliang @param args<dict_obj>{ trigger_id<int> 任务ID name<string> 任务名称 script_id<int> 脚本ID ps<string> 备注 script_body<string> 脚本内容 cycle_type<string> 任务类型 cycle_where<string> 任务条件 cycle_hour<string> 小时 cycle_minute<string> 运行分钟 operator_where<list>{ operator<string> 操作符 op_value<string> 值 run_script_id<int> 运行脚本ID run_script<string> 运行脚本内容 } } @return dict ''' if not args or not hasattr(args,'name') or not hasattr(args,'script_id'): return public.return_message(-1,0,public.lang('parameter error')) if not args.name: return public.return_message(-1,0,public.lang('The task name cannot be empty')) if not args.script_id and not args.script_body: return public.return_message(-1,0,public.lang('Script content and script ID cannot be empty at the same time')) if not args.cycle_type: return public.return_message(-1,0,public.lang('Task type cannot be empty')) crontab_expression=args.crontab_expression if hasattr(args,'crontab_expression') else '' if args.cycle_type=="custom" and not crontab_expression: return public.return_message(-1,0, public.lang('Crontab expression cannot be empty')) # 验证 crontab 表达式 if crontab_expression and not self.validate_crontab_expression(crontab_expression): return public.return_message(-1,0, public.lang('Invalid crontab expression')) script_args = args.get('args','') #检查脚本是否存在 if args.script_id: script_info = self._sql.table('scripts').where('script_id=?',(args.script_id,)).find() if not script_info: return public.return_message(-1,0,'Script does not exist') if 'is_args' in script_info and script_info['is_args'] and not script_args: return public.return_message(-1,0,public.lang('This script requires script parameters')) #检查任务是否存在 trigger_info = self._sql.table('trigger').where('trigger_id=?',(args.trigger_id,)).find() if not trigger_info: return public.return_message(-1,0,public.lang('Task does not exist')) #检查任务是否存在 if self._sql.table('trigger').where('name=? and trigger_id!=?',(args.name,args.trigger_id)).count(): return public.return_message(-1,0,public.lang('Task already exists')) # 根据用户是否提供 crontab 表达式来决定使用哪种方式 if crontab_expression: cycle_type = 'custom' cycle_where = "" cycle_hour = '' cycle_minute = '' else: cycle_type = args.cycle_type cycle_where = args.cycle_where cycle_hour = args.cycle_hour cycle_minute = args.cycle_minute # 获取开始执行时间和执行次数 start_time = args.start_time if hasattr(args, 'start_time') else 0 exec_count = args.exec_count if hasattr(args, 'exec_count') else 0 # 获取当前时间戳 current_time = int(time.time()) if start_time: # 检查开始时间是否小于当前时间 if start_time < current_time: return public.return_message(-1,0, public.lang('The set start time cannot be earlier than the current time')) pdata = { 'name':args.name, 'script_id':args.script_id, 'ps':args.ps, 'script_body':args.script_body, 'args':script_args, 'cycle_type':cycle_type, 'cycle_where':cycle_where, 'cycle_hour':cycle_hour, 'cycle_minute':cycle_minute, # 'crontab_expression':crontab_expression, # 'start_time': start_time, # 'exec_count': exec_count } self._sql.table('trigger').where('trigger_id=?',(args.trigger_id,)).update(pdata) # #删除任务条件 # self._sql.table('operator_where').where('trigger_id=?',(args.trigger_id,)).delete() #增加任务条件 operator_where = args.operator_where for op in operator_where: op['trigger_id'] = args.trigger_id op['create_time'] = int(time.time()) self._sql.table('operator_where').insert(op) self.remove_crontab(args.trigger_id) result = self.sync_crontab(args.trigger_id) if not result: return public.return_message(-1,0, public.lang('Writing scheduled task failed, please check if the disk is writable or if system hardening is enabled!')) # 删除 task_count.json 中对应的任务执行次数记录 count_file = '/www/server/panel/data/task_count.json' if os.path.exists(count_file): with open(count_file, 'r') as f: task_counts = json.load(f) if str(args.trigger_id) in task_counts: del task_counts[str(args.trigger_id)] with open(count_file, 'w') as f: json.dump(task_counts, f) # 更新数据库中的任务 self._sql.table('trigger').where('trigger_id=?', (args.trigger_id,)).update(pdata) public.WriteLog(self._log_type,'Task modification [{}] successful!',args.name) return public.return_message(0,0,public.lang('Modified successfully')) def remove_trigger(self,args = None): ''' @name 删除任务信息 @author hwliang @param args<dict_obj>{ trigger_id<int> 任务ID } @return dict ''' if not args or not 'trigger_id' in args: return public.return_message(-1,0,public.lang('parameter error')) trigger_info = self._sql.table('trigger').where('trigger_id=?',(args.trigger_id,)).find() if not trigger_info: return public.return_message(-1,0,public.lang('Task does not exist')) self._sql.table('trigger').where('trigger_id=?',(args.trigger_id,)).delete() self._sql.table('operator_where').where('trigger_id=?',(args.trigger_id,)).delete() self._sql.table("tasks").where('trigger_id=? AND script_id=0',(args.trigger_id,)).delete() self.remove_crontab(args.trigger_id) # 删除 task_count.json 中对应的任务执行次数记录 count_file = '/www/server/panel/data/task_count.json' if os.path.exists(count_file): with open(count_file, 'r') as f: task_counts = json.load(f) if str(args.trigger_id) in task_counts: del task_counts[str(args.trigger_id)] with open(count_file, 'w') as f: json.dump(task_counts, f) # 更新自定义排序 if args.trigger_id in self.custom_order: self.custom_order.remove(args.trigger_id) public.writeFile(self.custom_order_path, json.dumps(self.custom_order)) public.WriteLog(self._log_type,'Task [{}] deleted successfully!',trigger_info['name']) return public.return_message(0,0,public.lang('Delete successfully')) def get_operator_where_list(self,args): ''' @name 获取触发事件列表 @author hwliang @param args<dict_obj>{ trigger_id:<int> } @return list ''' trigger_id = args.get('trigger_id',0) if not trigger_id: return public.return_message(-1,0,public.lang("Task ID cannot be empty")) data = self._sql.table('operator_where').where('trigger_id=?',trigger_id).select() for d in data: d['script_info'] = {} if d['run_script_id']: d['script_info'] = self._sql.table('scripts').where('script_id=?',(d['run_script_id'],)).find() if not d['script_info']: d['script_info'] = {} if not d['args']: d['args'] = '' return public.return_message(0,0,data) def create_operator_where(self,args): ''' @name 创建触发事件 @author hwliang @param args<dict_obj>{ "trigger_id":<int>, 任务ID "operator":<string>, 运算符,支持: =,!=,>,>=,<,<=,in,not in "op_value":<mixed>, 比较值,如果运算符为in,not in 时,此处应当为string类型的数据; 运算符为=,!=时,可以是string/int/float类型的数据;运算符为>,>=,<,<=时,此处应当为: int/float类型的数据 "run_script_id":<int>, 触发成功后运行的脚本ID,此参数与run_script互斥 "run_script":<string> 触发触功后运行的脚本内容,仅支持bash脚本,此参数与run_script_id互斥 } @return dict ''' op = {} op['trigger_id'] = int(args.get('trigger_id',0)) op['operator'] = args.get('operator','=') op['op_value'] = args.get('op_value','') op['run_script_id'] = int(args.get('run_script_id',0)) op['run_script'] = args.get('run_script','') op['args'] = args.get('args','') if not op['run_script_id'] and not op['run_script']: return public.return_message(-1,0,public.lang('Run_stcript_id and run_stcript require at least one valid value')) if not op['operator'] in self._operator: return public.return_message(-1,0,public.lang('Please use one of the following operators:{}',str(self._operator))) trigger_info = self._sql.table('trigger').where('trigger_id=?',(op['trigger_id'],)).find() if not trigger_info: return public.return_message(-1,0,public.lang('The specified task does not exist')) if op['run_script_id']: script_info = self._sql.table('scripts').where('script_id=?',(op['run_script_id'],)).find() if not script_info: return public.return_message(-1,0,public.lang('The specified script does not exist')) if 'is_args' in script_info and script_info['is_args'] and not op['args']: return public.return_message(-1,0,public.lang('The script requires parameters, please fill in the parameters')) op['create_time'] = int(time.time()) if not self._sql.table('operator_where').insert(op): return public.return_message(-1,0,public.lang('Failed to add trigger event')) public.WriteLog(self._log_type,"Added a new event for task [{}]",trigger_info['name']) return public.return_message(0,0,public.lang('Added successfully')) def modify_operator_where(self,args): ''' @name 修改触发事件 @author hwliang @param args<dict_obj>{ "where_id": <int> 事件ID "operator":<string>, 运算符,支持: =,!=,>,>=,<,<=,in,not in "op_value":<mixed>, 比较值,如果运算符为in,not in 时,此处应当为string类型的数据; 运算符为=,!=时,可以是string/int/float类型的数据;运算符为>,>=,<,<=时,此处应当为: int/float类型的数据 "run_script_id":<int>, 触发成功后运行的脚本ID,此参数与run_script互斥 "run_script":<string> 触发触功后运行的脚本内容,仅支持bash脚本,此参数与run_script_id互斥 } @return dict ''' op = {} op['where_id'] = int(args.get('where_id',0)) op['operator'] = args.get('operator','=') op['op_value'] = args.get('op_value','') op['run_script_id'] = args.get('run_script_id',0) op['run_script'] = args.get('run_script','') op['args'] = args.get('args','') if not op['where_id']: return public.return_message(-1,0,public.lang('Event ID cannot be empty')) if not op['operator'] in self._operator: return public.return_message(-1,0,public.lang('Please use one of the following operators:{}',str(self._operator))) where_info = self._sql.table('operator_where').where('where_id=?',(op['where_id'],)).find() if not where_info: return public.return_message(-1,0,public.lang('The specified event does not exist')) if not op['run_script_id'] and not op['run_script']: return public.return_message(-1,0,public.lang('Run_stcript_id and run_stcript require at least one valid value')) if op['run_script_id']: script_info = self._sql.table('scripts').where('script_id=?',(op['run_script_id'],)).find() if not script_info: return public.return_message(-1,0,public.lang('The specified script does not exist')) if 'is_args' in script_info and script_info['is_args'] and not op['args']: return public.return_message(-1,0,public.lang('The script requires parameters, please fill in the parameters')) if not self._sql.table('operator_where').where('where_id=?',op['where_id']).update(op): return public.return_message(-1,0,public.lang('Event modification successful')) public.WriteLog(self._log_type,"Modify Event:{} ",str(op['where_id'])) return public.return_message(0,0,public.lang('Modified successfully')) def remove_operator_where(self,args): ''' @name 删除指定事件 @author hwliang @param args<dict_obj>{ "where_id": <int> 事件ID } @return dict ''' where_id = int(args.get('where_id',0)) if not where_id: return public.return_message(-1,0,public.lang('Event ID cannot be empty')) where_info = self._sql.table('operator_where').where('where_id=?',(where_id,)).find() if not where_info: return public.return_message(-1,0,public.lang('The specified event does not exist')) if not self._sql.table('operator_where').where('where_id=?',where_id).delete(): return public.return_message(-1,0,public.lang('Event deletion failed')) public.WriteLog(self._log_type,"Delete Event: {}",str(where_id)) return public.return_message(0,0,public.lang('Delete successfully')) def exec_script(self,script_id,script_body,s_args): ''' @name 执行脚本内容 @param script_id<int> 脚本ID @param script_body<string> 脚本内容 @return dict ''' script_type = 'bash' script_exts = {'bash':'sh','python':'py','php':'php','node':'js','ruby':'rb','perl':'pl'} if script_id: script_info = self._sql.table('scripts').where('script_id=?',(script_id,)).find() if not script_info: return False script_body = script_info['script'] script_type = script_info['script_type'] if not script_body: return False if s_args: s_args = ' ' + s_args tmp_file = '{}/tmp/{}.{}'.format(public.get_panel_path(),public.GetRandomString(32),script_exts[script_type]) public.writeFile(tmp_file,script_body) if script_type == 'bash': result = public.ExecShell('bash {}{}'.format(tmp_file,s_args)) elif script_type == 'python': result = public.ExecShell('{} {}{}'.format(public.get_python_bin(),tmp_file,s_args)) if os.path.exists(tmp_file): os.remove(tmp_file) return result def add_task_log(self,script_id,trigger_id,where_id,status,result_succ,result_err,start_time,end_time): ''' @name 添加任务日志 @param script_id<int> 脚本ID @param trigger_id<int> 触发器ID @param where_id<int> 事件ID @param status<int> 任务状态 @param result_succ<string> 成功结果 @param result_err<string> 错误结果 @param start_time<int> 开始时间 @param end_time<int> 结束时间 @return bool ''' return self._sql.table('tasks').add('script_id,trigger_id,where_id,status,result_succ,result_err,start_time,end_time', (script_id,trigger_id,where_id,status,result_succ,result_err,start_time,end_time)) def get_trigger_logs(self,args): ''' @name 获取任务执行日志 @param args<dict_obj>{ "trigger_id": <int> 任务ID "p": <int> 页码 "rows": <int> 每页数量 } ''' trigger_id = int(args.get('trigger_id',0)) if not trigger_id: return public.returnMsg(False,public.lang('Task ID cannot be empty')) p = 1 if 'p' in args: p = int(args['p']) rows = 10 if 'rows' in args: rows = int(args['rows']) tojs = '' if 'tojs' in args: tojs = args.tojs where = 'trigger_id=?' where_args = (trigger_id,) count = self._sql.table('tasks').where(where,where_args).count() page = public.get_page(count,p,rows,tojs) page['data'] = self._sql.table('tasks').where(where,where_args).limit(page['row'],page['shift']).order('log_id desc').select() return public.return_message(0,0,page) def get_operator_logs(self,args): ''' @name 获取任务事件执行日志 @param args<dict_obj>{ "where_id": <int> 任务事件ID "p": <int> 页码 "rows": <int> 每页数量 } ''' where_id = int(args.get('where_id',0)) if not where_id: return public.returnMsg(False,public.lang('Task event ID cannot be empty')) p = 1 if 'p' in args: p = int(args['p']) rows = 10 if 'rows' in args: rows = int(args['rows']) tojs = '' if 'tojs' in args: tojs = args.tojs where = 'where_id=?' where_args = (where_id,) count = self._sql.table('tasks').where(where,where_args).count() page = public.get_page(count,p,rows,tojs) page['data'] = self._sql.table('tasks').where(where,where_args).limit(page['row'],page['shift']).order('log_id desc').select() return public.return_message(0,0,page) def test_trigger(self,args): ''' @name 测试指定任务 @author hwliang @param args<dict_obj>{ trigger_id<int> 任务ID } @return dict ''' if not args or not 'trigger_id' in args: return {'status': -1, "timestamp": int(time.time()), "message": {'result':public.lang('parameter error')}} trigger_info = self._sql.table('trigger').where('trigger_id=?',(args.trigger_id,)).find() if not trigger_info: return {'status': -1, "timestamp": int(time.time()), "message": {'result':public.lang('Task does not exist')}} script_body = '' script_type = 'bash' return_type = 'string' script_exts = {'bash':'sh','python':'py','php':'php','node':'js','ruby':'rb','perl':'pl'} script_args = '' #检查脚本是否存在 if trigger_info['script_id']: script_info = self._sql.table('scripts').where('script_id=?',(trigger_info['script_id'],)).find() if not script_info: return {'status': -1, "timestamp": int(time.time()), "message": {'result':public.lang('Script does not exist')}} script_body = script_info['script'] script_type = script_info['script_type'] return_type = script_info['return_type'] if 'args' in trigger_info and trigger_info['args']: script_args = trigger_info['args'] else: script_body = trigger_info['script_body'] trigger_start_time = int(time.time()) result_msg = ['Executing task...'] if script_args: script_args = ' {}'.format(script_args) tmp_file = '{}/tmp/trigger_{}.{}'.format(public.get_panel_path(),trigger_info['trigger_id'],script_exts[script_type]) public.writeFile(tmp_file,script_body) #执行脚本 if script_type == 'bash': result = public.ExecShell("bash {}{}".format(tmp_file,script_args)) else: result = public.ExecShell("{} {}{}".format(public.get_python_bin(),tmp_file,script_args)) if os.path.exists(tmp_file): os.remove(tmp_file) if not result[0]: self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,0,result[0],result[1],trigger_start_time,int(time.time())) return {'status': -1, "timestamp": int(time.time()), "message": {'result':public.lang('Script execution failed with error message:\n{}',str(result[1]))}} result_zero=result[0] search_string=' local variable start_time where it is not associated with a value\n' if result[0].find(search_string) != -1: result_zero = result[0].split(search_string)[1] result_msg.append("The task script has been executed and returned the result:\n{}".format(result_zero)) #检查任务事件 operator_where = self._sql.table('operator_where').where('trigger_id=?',(args.trigger_id,)).select() if not operator_where: result_msg.append("This task has no event set, skip event check.") self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,1,'\n'.join(result_msg),result[1],trigger_start_time,int(time.time())) # return public.return_message(0,0,result_msg) return {'status': 0, "timestamp": int(time.time()), "message": result_msg} if return_type in ['string','json']: _line = result_zero.strip() else: _line = result_zero.strip().split('\n')[-1] _value = _line if return_type == 'string': _value = _line elif return_type == 'json': try: _value = json.loads(_line) except: result_msg.append("The script returns a result that is not in JSON format, skipping event checking.") self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,0,'\n'.join(result_msg),result[1],trigger_start_time,int(time.time())) return {'status': -1, "timestamp": int(time.time()), "message": result_msg} elif return_type == 'int': try: _value = int(_line) except: result_msg.append("The last line of the script return result is not an integer, skip event check.") self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,0,'\n'.join(result_msg),result[1],trigger_start_time,int(time.time())) return {'status': -1, "timestamp": int(time.time()), "message": result_msg} elif return_type == 'float': try: _value = float(_line) except: result_msg.append("The last line of the script return result is not a floating-point number, skip event check.") self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,0,'\n'.join(result_msg),result[1],trigger_start_time,int(time.time())) return {'status': -1, "timestamp": int(time.time()), "message": result_msg} result_msg.append("There are {} events that need to be processed".format(len(operator_where))) n = 1 for op in operator_where: if return_type == 'int': op['op_value'] = int(op['op_value']) elif return_type == 'float': op['op_value'] = float(op['op_value']) _script = op['run_script_id'] if op['run_script_id'] else op['run_script'] result_msg.append("Processing {} th event: [Execute {} when returning value {} {}]".format(n,op['operator'],op['op_value'],_script)) op_start_time = int(time.time()) is_true = False if op['operator'] in ['==','=']: if _value == op['op_value']: is_true = True elif op['operator'] == '!=': if _value != op['op_value']: is_true = True elif op['operator'] == '>': if _value > op['op_value']: is_true = True elif op['operator'] == '>=': if _value >= op['op_value']: is_true = True elif op['operator'] == '<': if _value < op['op_value']: is_true = True elif op['operator'] == '<=': if _value <= op['op_value']: is_true = True elif op['operator'] == 'in': if str(_value).find(str(op['op_value'])) != -1: is_true = True elif op['operator'] == 'not in': if str(_value).find(str(op['op_value'])) == -1: is_true = True if is_true: result_msg.append("Event condition met, executing script...") public.WriteLog('Task arrangement',"Task [{}] triggers condition [returns result {} {}], the preset script has been executed!".format(trigger_info['name'],op['operator'],op['op_value'])) s_args = '' if 'args' in op and op['args']: s_args = op['args'] result_msg.append("Event condition met, executing event script...") result = self.exec_script(op['run_script_id'],op['run_script'],s_args) if not result[0]: public.WriteLog('Task arrangement',"Task [{}] triggers condition [returns result {} {}], execution of preset script fails:{} ".format(trigger_info['name'],op['operator'],op['op_value'],result[1])) result_msg.append("Event script execution failed with error message:\n{}".format("\n".join(result))) self.add_task_log(op['run_script_id'],0,op['where_id'],0,'\n'.join(result_msg),'\n'.join(result),op_start_time,int(time.time())) else: self.add_task_log(op['run_script_id'],0,op['where_id'],1,result[0],result[1],op_start_time,int(time.time())) result_msg.append("Event script execution successful, return result:\n{}".format(result[0])) else: result_msg.append("Event condition not met, skip execution.") result_msg.append("The {} th event has been processed.".format(n)) result_msg.append("-" * 20) n+=1 self.add_task_log(trigger_info['script_id'],trigger_info['trigger_id'],0,1,'\n'.join(result_msg),result[1],trigger_start_time,int(time.time())) return {'status': 0, "timestamp": int(time.time()), "message": {'result':public.lang("\n".join(result_msg))}} # 设置计划任务状态 def set_trigger_status(self, args): try: trigger_id=args.trigger_id triggerInfo = self._sql.table('trigger').where('trigger_id=?',(trigger_id,)).find() if not triggerInfo: return public.returnMsg(False, "未找到对应任务的数据,请刷新页面查看该任务是否存在!") status_msg = ['停用', '启用'] status = int(args.status) if status == 0: if not self.remove_crontab(trigger_id): return public.returnMsg(False, '写入计划任务失败,请检查磁盘是否可写或是否开启了系统加固!') else: if not self.sync_crontab(trigger_id): return public.returnMsg(False, '写入计划任务失败,请检查磁盘是否可写或是否开启了系统加固!') self._sql.table('trigger').where('trigger_id=?',(trigger_id,)).setField('status', status) return public.returnMsg(True, '处理成功') except : import traceback return public.returnMsg(False,traceback.format_exc()) def get_trigger_types(self, args): data = public.M("trigger_types").field("id,name,ps").order("id asc").select() return {'status': True, 'msg': data} def add_trigger_type(self, args): import re # get.name = html.escape(get.name.strip()) name = public.xsssec(args.name.strip()) if re.search('<.*?>', args.name): return public.returnMsg(False, "分类名称不能包含HTML语句") if not name: return public.returnMsg(False, "分类名称不能为空") if len(name) > 16: return public.returnMsg(False, "分类名称长度不能超过16位") trigger_type_sql = public.M('trigger_types') if trigger_type_sql.where('name=?', (name,)).count() > 0: return public.returnMsg(False, "指定分类名称已存在") # 添加新的计划任务分类 trigger_type_sql.add("name", (name,)) return public.returnMsg(True, 'Successfully added') def remove_trigger_type(self, args): trigger_type_sql = public.M('trigger_types') trigger_sql = public.M('trigger') trigger_type_id = args.id if trigger_type_sql.where('id=?', ( trigger_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") trigger_type_sql.where('id=?', (trigger_type_id,)).delete() try: trigger_sql.where('type_id=?', (trigger_type_id,)).save('type_id', (0)) except: pass return public.returnMsg(True, "分类已删除") def modify_trigger_type_name(self, args): import re # get.name = html.escape(get.name.strip()) name = public.xsssec(args.name.strip()) trigger_type_id = args.id if re.search('<.*?>', args.name): return public.returnMsg(False, "分类名称不能包含HTML语句") if not name: return public.returnMsg(False, "分类名称不能为空") if len(name) > 16: return public.returnMsg(False, "分类名称长度不能超过16位") trigger_type_sql = public.M('trigger_types') if trigger_type_sql.where('id=?', (trigger_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") if trigger_type_sql.where('name=? AND id!=?', (name, trigger_type_id)).count() > 0: return public.returnMsg(False, "指定分类名称已存在") # 修改指定的计划任务分类名称 trigger_type_sql.where('id=?', ( trigger_type_id,)).setField('name', name) return public.returnMsg(True, "修改成功") def set_trigger_type(self, args): try: trigger_ids = json.loads(args.trigger_ids) trigger_type_sql = public.M('trigger_types') trigger_sql = public.M('trigger') trigger_type_id = args.id if trigger_type_id=="0": return public.returnMsg(False,"不能设置为默认分类!") if trigger_type_sql.where('id=?', (trigger_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") for s_id in trigger_ids: trigger_sql.where("trigger_id=?", (s_id,)).save('type_id', (trigger_type_id)) return public.returnMsg(True, "设置成功!") except Exception as e: return public.returnMsg(False, "设置失败" + str(e))