在现有的产品结构中,已有调度平台,其中大多调度用于系统内部数据抽取,实现一个定期同步数据的功能。而调度平台仅仅只有同步数据是远远不够的,还需要一种定期执行某项业务的功能。基于这种需求,在原有的基础上,对调度平台进行了扩展。
在这个产品结构中,承载定时调度的服务是接口平台,该服务启动时会初始化调度容器。在原有的流程中,仅获取系统的定时任务列表,将其添加至调度中。扩展后,同步查询公共类型的定时调度任务。在公共的处理类中,通过fegin调用基础服务中的接口。然后在基础服务中,根据不同的业务调用不同的实现类。而每一个实现类可能调用其他的业务服务。
调度基于quartz实现,公共处理类中使用了简单工厂模式+策略模式
初始化容器
@Configuration
public class ScheduleTriggerManage {
private static final SchedulerFactory schedulerFactory = new org.quartz.impl.StdSchedulerFactory();
/**
* 容器初始化执行
*
* @param
*/
public void initContentStart() throws Exception {
// 获取scheduler
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.clear();
CommonTimerTaskQuery query = new CommonTimerTaskQuery();
query.setTaskStatus(1);
// 查询启用状态的定时任务
R<List<CommonTimerTask>> listR = commonTimerTaskFeignApi.queryTimerTaskByParam(query);
if (listR == null && !listR.getIsSuccess()) {
throw new BizException("获取公共定时任务接口异常");
}
// 获取启用的定时任务数据
List<CommonTimerTask> commonTimerTasks = listR.getData();
if (CollectionUtils.isNotEmpty(commonTimerTasks)) {
for (CommonTimerTask timerTask : commonTimerTasks) {
// 获取表达式
String timerCrons = timerTask.getTimerCrons();
// 表达式转集合
List<String> crons = new ArrayList<String>(Arrays.asList(timerCrons.split("\\|")));
// 遍历表达式
for (String cron : crons) {
try {
addCommonTimerTask(timerTask, cron);
} catch (Exception e) {
logger.error("添加公共定时调度异常,继续执行"+e.getMessage(),e);
}
}
}
}
}
// 添加公共定时任务调度
public void addCommonTimerTask(CommonTimerTask timerTask, String cron) throws Exception {
// 获取taskUUID值 任务key+'&'+表达式
String taskUUID = timerTask.getTaskKey() + "&" + cron;
// 获取scheduler
Scheduler scheduler = schedulerFactory.getScheduler();
// 添加触发器监听
TriggerListener triggerListener = new MonitorTriggerListener();
scheduler.getListenerManager().addTriggerListener(triggerListener);
// 添加job监听
JobListener jobListener = new MonitorJobListener();
scheduler.getListenerManager().addJobListener(jobListener);
// 创建jobDetail对象,指定taskKey 和 任务执行类
JobKey jobKey = new JobKey(taskUUID);
JobDetail jobDetail = JobBuilder.newJob(CommonTimerTaskScheduleExec.class)
.withIdentity(jobKey)
.usingJobData("taskKey", timerTask.getTaskKey())
.withDescription(timerTask.getTaskName())
.build();
// 创建触发器对象
TriggerKey triggerKey = new TriggerKey(taskUUID);
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing())
.build();
// 装配定时任务
scheduler.scheduleJob(jobDetail, trigger);
// 启动定时任务
logger.info("---------启动调度任务" + timerTask.getTaskName() + '&' + cron + "成功-------");
scheduler.start();
}
}
公共定时调度处理类
/**
* 公共定时任务调度
*
* @author zhengjianhua
* @Create 2021-10-09
*/
@Component
public class CommonTimerTaskScheduleExec extends AbstractScheduleExec implements Job {
//调用类
private static CommonTimerTaskFeignApi commonTimerTaskFeignApi;
@Resource
public void setCommonTimerTaskFeignApi(CommonTimerTaskFeignApi commonTimerTaskFeignApi) {
CommonTimerTaskScheduleExec.commonTimerTaskFeignApi = commonTimerTaskFeignApi;
}
@Override
protected void startExec(JobExecutionContext context) throws JobExecutionException {
StringBuffer sbf = new StringBuffer();
sbf.append("执行公共定时任务:{");
// 获取jobDataMap
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
// 获取taskKey
String taskKey = StrHelper.getObjectValue(jobDataMap.get("taskKey"));
// 根据taskKey 触发方法
R<String> rResult = commonTimerTaskFeignApi.triggerCommonTimerTask(taskKey);
// 执行成功
if (ApiReqResultVerifyUtil.verify(rResult)) {
sbf.append(rResult.getData());
} else {
sbf.append("triggerCommonTimerTask公共定时任务触发失败:").append(rResult.getMsg());
}
sbf.append("}");
context.setResult(sbf.toString());
}
@Override
public Object resultService(Map<String, Object> interfaceMap, Object result) {
return null;
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//执行父类
startExec(context);
}
}基础服务中的接收方法
@ApiOperation(value = "根据taskKey触发公共定时任务", notes = "根据taskKey触发公共定时任务")
@GetMapping("/triggerCommonTimerTask")
public R<String> triggerCommonTimerTask(@RequestParam("taskKey") String taskKey) {
try {
// 根据任务参数查询 定时任务数据
CommonTimerTask commonTimerTask = commonTimerTaskService.queryCommonTimerTaskByKey(taskKey);
// 创建公共定时任务环境类
CommonTimerTaskContext timerTaskContext = new CommonTimerTaskContext(commonTimerTask.getTaskType());
// 执行定时任务处理逻辑
String result = timerTaskContext.execTimerTask(taskKey);
// 返回结果
return success(result);
} catch (BizException e) {
return fail(ExceptionCode.SERVICE_EX.getCode(), e.getMessage());
} catch (Exception e) {
log.error("根据taskKey触发公共定时任务错误:[CommonTimerTaskController.triggerCommonTimerTask] Failed to get sequence. exception[{}-{}]",
new Object[]{e.getMessage(), Arrays.deepToString(e.getStackTrace())});
return fail(ExceptionCode.OPERATION_EX.getCode(), "根据taskKey触发公共定时任务错误,请联系系统管理员处理!" + e.getMessage());
}
}公共定时任务环境类
/**
* 公共定时任务环境类
*/
public class CommonTimerTaskContext {
/**
* 持有公共定时任务策略类
*/
private CommonTimerTaskStrategy commonTimerTaskStrategy;
/**
* 构造函数
* @param taskType 定时任务类型
*/
public CommonTimerTaskContext(String taskType){
// 根据任务类型 获取策略类
this.commonTimerTaskStrategy = new CommonTimerTaskStrategyFactory().getCommonTimerTaskStrategy(taskType);
}
/**
* 返回执行结果
* @return 直接结果
*/
public String execTimerTask(String taskKey){
return this.commonTimerTaskStrategy.execTimerTask(taskKey);
}
}公共定时任务策略工厂
/**
* 公共定时任务策略工厂
*/
public class CommonTimerTaskStrategyFactory {
/**
* 根据类型获取策略实现
*
* @param type
* @return
*/
public CommonTimerTaskStrategy getCommonTimerTaskStrategy(String type) {
CommonTimerTaskStrategy commonTimerTaskStrategy = null;
switch (type) {
case "qc_hospital_plan":
commonTimerTaskStrategy = SpringUtils.getBean(QualityCheckHospitalPlanStrategy.class);
break;
case "":
break;
default:
throw new BizException("未找到匹配策略类型");
}
return commonTimerTaskStrategy;
}
}公共定时任务策略类
/**
* 公共定时任务策略类
*/
public interface CommonTimerTaskStrategy {
/**
* 执行订单任务
* @return 执行结果
*/
String execTimerTask(String taskKey);
}院级质控计划定时任务执行类
/**
* 院级质控计划定时任务执行类
*/
@Component
public class QualityCheckHospitalPlanStrategy implements CommonTimerTaskStrategy {
@Autowired
private QualityCheckTaskFeignApi qualityCheckTaskFeignApi;
@Override
public String execTimerTask(String taskKey) {
String result = "";
// 调用护管接口 生成院级质控计划 并返回结果
R<String> rResult = qualityCheckTaskFeignApi.generateQualityCheckHospitalPlanByTimerTask(taskKey);
if (ApiReqResultVerifyUtil.verify(rResult)) {
result = "业务执行成功:" + rResult.getData();
} else {
result = "业务执行失败:" + rResult.getMsg();
}
return result;
}
}

还没有评论,来说两句吧...