JAVA公共调度流程以及实现

原创  郑建华   2021-10-14   95人阅读  0 条评论

    在现有的产品结构中,已有调度平台,其中大多调度用于系统内部数据抽取,实现一个定期同步数据的功能。而调度平台仅仅只有同步数据是远远不够的,还需要一种定期执行某项业务的功能。基于这种需求,在原有的基础上,对调度平台进行了扩展。

image.png

    在这个产品结构中,承载定时调度的服务是接口平台,该服务启动时会初始化调度容器。在原有的流程中,仅获取系统的定时任务列表,将其添加至调度中。扩展后,同步查询公共类型的定时调度任务。在公共的处理类中,通过fegin调用基础服务中的接口。然后在基础服务中,根据不同的业务调用不同的实现类。而每一个实现类可能调用其他的业务服务。

   

     调度基于quartz实现,公共处理类中使用了简单工厂模式+策略模式


初始化容器

@Configuration
public class ScheduleTriggerManage {

private static final SchedulerFactory schedulerFactory = new org.quartz.impl.StdSchedulerFactory();
/**
 * 容器初始化执行
 *
 * @param
 */
public void initContentStart() throws Exception {
    // 获取scheduler
    Scheduler scheduler = schedulerFactory.getScheduler();
    scheduler.clear();
    CommonTimerTaskQuery query = new CommonTimerTaskQuery();
    query.setTaskStatus(1);
    // 查询启用状态的定时任务
    R<List<CommonTimerTask>> listR = commonTimerTaskFeignApi.queryTimerTaskByParam(query);
    if (listR == null && !listR.getIsSuccess()) {
        throw new BizException("获取公共定时任务接口异常");
    }
    // 获取启用的定时任务数据
    List<CommonTimerTask> commonTimerTasks = listR.getData();
    if (CollectionUtils.isNotEmpty(commonTimerTasks)) {
        for (CommonTimerTask timerTask : commonTimerTasks) {
            // 获取表达式
            String timerCrons = timerTask.getTimerCrons();
            // 表达式转集合
            List<String> crons = new ArrayList<String>(Arrays.asList(timerCrons.split("\\|")));
            // 遍历表达式
            for (String cron : crons) {
                try {
                    addCommonTimerTask(timerTask, cron);
                } catch (Exception e) {
                    logger.error("添加公共定时调度异常,继续执行"+e.getMessage(),e);
                }
            }
        }
   }
}


// 添加公共定时任务调度
public void addCommonTimerTask(CommonTimerTask timerTask, String cron) throws Exception {
    // 获取taskUUID值 任务key+'&'+表达式
    String taskUUID = timerTask.getTaskKey() + "&" + cron;
    // 获取scheduler
    Scheduler scheduler = schedulerFactory.getScheduler();
    // 添加触发器监听
    TriggerListener triggerListener = new MonitorTriggerListener();
    scheduler.getListenerManager().addTriggerListener(triggerListener);
    // 添加job监听
    JobListener jobListener = new MonitorJobListener();
    scheduler.getListenerManager().addJobListener(jobListener);

    // 创建jobDetail对象,指定taskKey  和 任务执行类
    JobKey jobKey = new JobKey(taskUUID);
    JobDetail jobDetail = JobBuilder.newJob(CommonTimerTaskScheduleExec.class)
            .withIdentity(jobKey)
            .usingJobData("taskKey", timerTask.getTaskKey())
            .withDescription(timerTask.getTaskName())
            .build();

    // 创建触发器对象
    TriggerKey triggerKey = new TriggerKey(taskUUID);
    Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity(triggerKey)
            .withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing())
            .build();

    // 装配定时任务
    scheduler.scheduleJob(jobDetail, trigger);

    // 启动定时任务
    logger.info("---------启动调度任务" + timerTask.getTaskName() + '&' + cron + "成功-------");
    scheduler.start();

}
}

    

公共定时调度处理类

/**
 * 公共定时任务调度
 *
 * @author zhengjianhua
 * @Create 2021-10-09
 */
@Component
public class CommonTimerTaskScheduleExec extends AbstractScheduleExec implements Job {

    //调用类
    private static CommonTimerTaskFeignApi commonTimerTaskFeignApi;

    @Resource
    public void setCommonTimerTaskFeignApi(CommonTimerTaskFeignApi commonTimerTaskFeignApi) {
        CommonTimerTaskScheduleExec.commonTimerTaskFeignApi = commonTimerTaskFeignApi;
    }

    @Override
    protected void startExec(JobExecutionContext context) throws JobExecutionException {
        StringBuffer sbf = new StringBuffer();
        sbf.append("执行公共定时任务:{");
        //  获取jobDataMap
        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        // 获取taskKey
        String taskKey = StrHelper.getObjectValue(jobDataMap.get("taskKey"));
        // 根据taskKey 触发方法
        R<String> rResult = commonTimerTaskFeignApi.triggerCommonTimerTask(taskKey);
        // 执行成功
        if (ApiReqResultVerifyUtil.verify(rResult)) {
            sbf.append(rResult.getData());
        } else {
            sbf.append("triggerCommonTimerTask公共定时任务触发失败:").append(rResult.getMsg());
        }
        sbf.append("}");
        context.setResult(sbf.toString());
    }


    @Override
    public Object resultService(Map<String, Object> interfaceMap, Object result) {
        return null;
    }

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //执行父类
        startExec(context);
    }
}


基础服务中的接收方法

@ApiOperation(value = "根据taskKey触发公共定时任务", notes = "根据taskKey触发公共定时任务")
@GetMapping("/triggerCommonTimerTask")
public R<String> triggerCommonTimerTask(@RequestParam("taskKey") String taskKey) {
    try {
        // 根据任务参数查询 定时任务数据
        CommonTimerTask commonTimerTask = commonTimerTaskService.queryCommonTimerTaskByKey(taskKey);
        // 创建公共定时任务环境类
        CommonTimerTaskContext timerTaskContext = new CommonTimerTaskContext(commonTimerTask.getTaskType());
        // 执行定时任务处理逻辑
        String result = timerTaskContext.execTimerTask(taskKey);
        // 返回结果
        return success(result);
    } catch (BizException e) {
        return fail(ExceptionCode.SERVICE_EX.getCode(), e.getMessage());
    } catch (Exception e) {
        log.error("根据taskKey触发公共定时任务错误:[CommonTimerTaskController.triggerCommonTimerTask] Failed to get sequence. exception[{}-{}]",
                new Object[]{e.getMessage(), Arrays.deepToString(e.getStackTrace())});
        return fail(ExceptionCode.OPERATION_EX.getCode(), "根据taskKey触发公共定时任务错误,请联系系统管理员处理!" + e.getMessage());
    }
}


公共定时任务环境类

/**
 * 公共定时任务环境类
 */
public class CommonTimerTaskContext {

    /**
     *  持有公共定时任务策略类
     */
    private CommonTimerTaskStrategy commonTimerTaskStrategy;

    /**
     * 构造函数
     * @param taskType 定时任务类型
     */
    public CommonTimerTaskContext(String taskType){
        // 根据任务类型 获取策略类
        this.commonTimerTaskStrategy = new CommonTimerTaskStrategyFactory().getCommonTimerTaskStrategy(taskType);
    }

    /**
     * 返回执行结果
     * @return 直接结果
     */
    public String execTimerTask(String taskKey){
        return this.commonTimerTaskStrategy.execTimerTask(taskKey);
    }
}


公共定时任务策略工厂

/**
 * 公共定时任务策略工厂
 */
public class CommonTimerTaskStrategyFactory {


    /**
     * 根据类型获取策略实现
     *
     * @param type
     * @return
     */
    public CommonTimerTaskStrategy getCommonTimerTaskStrategy(String type) {
        CommonTimerTaskStrategy commonTimerTaskStrategy = null;
        switch (type) {
            case "qc_hospital_plan":
                commonTimerTaskStrategy = SpringUtils.getBean(QualityCheckHospitalPlanStrategy.class);
                break;
            case "":
                break;
            default:
                throw new BizException("未找到匹配策略类型");
        }
        return commonTimerTaskStrategy;
    }
}


公共定时任务策略类

/**
 * 公共定时任务策略类
 */
public interface CommonTimerTaskStrategy {

    /**
     * 执行订单任务
     * @return 执行结果
     */
    String execTimerTask(String taskKey);
}


院级质控计划定时任务执行类

/**
 * 院级质控计划定时任务执行类
 */
@Component
public class QualityCheckHospitalPlanStrategy implements CommonTimerTaskStrategy {

    @Autowired
    private QualityCheckTaskFeignApi qualityCheckTaskFeignApi;

    @Override
    public String execTimerTask(String taskKey) {
        String result = "";
        // 调用护管接口 生成院级质控计划 并返回结果
        R<String> rResult = qualityCheckTaskFeignApi.generateQualityCheckHospitalPlanByTimerTask(taskKey);
        if (ApiReqResultVerifyUtil.verify(rResult)) {
            result = "业务执行成功:" + rResult.getData();
        } else {
            result = "业务执行失败:" + rResult.getMsg();
        }
        return result;
    }
}


本文地址:https://www.zjh336.cn/?id=2063
版权声明:本文为原创文章,版权归 郑建华 所有,欢迎分享本文,转载请保留出处!

发表评论


表情

还没有留言,还不快点抢沙发?