Springboot-admin整合Quartz实现动态管理定时任务的过程详解

Quartz是一款Java编写的开源任务调度框架,同时它也是Spring默认的任务调度框架,它的作用其实类似于Timer定时器以及ScheduledExecutorService调度线程池,这篇文章主要介绍了Springboot-admin整合Quartz实现动态管理定时任务,需要的朋友可以参考下

boot-admin整合Quartz实现动态管理定时任务

淄博烧烤爆红出了圈,当你坐在八大局的烧烤摊,面前是火炉、烤串、小饼和蘸料,音乐响起,啤酒倒满,烧烤灵魂的party即将开场的时候,你系统中的Scheduler(调试器),也自动根据设定的Trigger(触发器),从容优雅的启动了一系列的Job(后台定时任务)。工作一切早有安排,又何须费心劳神呢?因为boot-admin早已将Quartz这块肉串在了烤签上!
项目源码仓库github
项目源码仓库gitee

Quartz是一款Java编写的开源任务调度框架,同时它也是Spring默认的任务调度框架。它的作用其实类似于Timer定时器以及ScheduledExecutorService调度线程池,当然Quartz作为一个独立的任务调度框架表现更为出色,功能更强大,能够定义更为复杂的执行规则。
boot-admin 是一款采用前后端分离模式、基于 SpringCloud 微服务架构 + vue-element-admin 的 SaaS 后台管理框架。
那么boot-admin怎样才能将Quartz串成串呢?一共分三步:

加入依赖

 org.quartz-schedulerquartz2.3.2

前端整合

vue页面以el-table作为任务的展示控件,串起任务的创建、修改、删除、挂起、恢复、状态查看等功能。

vue页面

api定义

job.js定义访问后台接口的方式

import request from '@/utils/request' //获取空任务 export function getBlankJob() { return request({ url: '/api/system/auth/job/blank', method: 'get' }) } //获取任务列表(分页) export function fetchJobPage(data) { return request({ url: '/api/system/auth/job/page', method: 'post', data }) } //获取用于修改的任务信息 export function getUpdateObject(data) { return request({ url: '/api/system/auth/job/dataforupdate', method: 'post', data }) } //保存任务 export function saveJob(data) { return request({ url: '/api/system/auth/job/save', method: 'post', data }) } //暂停任务 export function pauseJob(data) { return request({ url: '/api/system/auth/job/pause', method: 'post', data }) } //恢复任务 export function resumeJob(data) { return request({ url: '/api/system/auth/job/resume', method: 'post', data }) } //删除任务 export function deleteJob(data) { return request({ url: '/api/system/auth/job/delete', method: 'post', data }) }

后端整合配置类单独数据源配置

Quartz会自动创建11张数据表,数据源可以与系统主数据源相同,也可以独立设置。

笔者建议单独设置Quartz数据源。在配置文件 application.yml 添加以下内容

base2048: job: enable: true datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/base2048job?useSSL=false&serverTimezone=UTC&autoReconnect=true&allowPublicKeyRetrieval=true&useOldAliasMetadataBehavior=true username: root password: mysql

数据源配置类如下:

@Configuration public class QuartzDataSourceConfig { @Primary @Bean(name = "defaultDataSource") @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean(name = "quartzDataSource") @QuartzDataSource @ConfigurationProperties(prefix = "base2048.job.datasource") public DruidDataSource quartzDataSource() { return new DruidDataSource(); } }

调度器配置

在 resources 下添加 quartz.properties 文件,内容如下:

# 固定前缀org.quartz # 主要分为scheduler、threadPool、jobStore、plugin等部分 # # org.quartz.scheduler.instanceName = DefaultQuartzScheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false  org.quartz.scheduler.instanceId = 'AUTO' # 实例化ThreadPool时,使用的线程类为SimpleThreadPool org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount和threadPriority将以setter的形式注入ThreadPool实例 # 并发个数 org.quartz.threadPool.threadCount = 15 # 优先级 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # 默认存储在内存中 #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #持久化 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.maxConnections = 10

调度器配置类内容如下:

@Configuration public class SchedulerConfig { @Autowired private MyJobFactory myJobFactory; @Value("${base2048.job.enable:false}") private Boolean JOB_LOCAL_RUNING; @Value("${base2048.job.datasource.driver-class-name}") private String dsDriver; @Value("${base2048.job.datasource.url}") private String dsUrl; @Value("${base2048.job.datasource.username}") private String dsUser; @Value("${base2048.job.datasource.password}") private String dsPassword; @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setOverwriteExistingJobs(true); // 延时启动 factory.setStartupDelay(20); // 用于quartz集群,QuartzScheduler 启动时更新己存在的Job // factory.setOverwriteExistingJobs(true); // 加载quartz数据源配置 factory.setQuartzProperties(quartzProperties()); // 自定义Job Factory,用于Spring注入 factory.setJobFactory(myJobFactory); // 在com.neusoft.jn.gpbase.quartz.job.BaseJobTemplate 同样出现该配置 //原因 : qrtz 在集群模式下 存在 同一个任务 一个在A服务器任务被分配出去 另一个B服务器任务不再分配的情况. // if(!JOB_LOCAL_RUNING){ // 设置调度器自动运行 factory.setAutoStartup(false); } return factory; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); propertiesFactoryBean.afterPropertiesSet(); Properties properties = propertiesFactoryBean.getObject(); properties.setProperty("org.quartz.dataSource.qzDS.driver",dsDriver); properties.setProperty("org.quartz.dataSource.qzDS.URL",dsUrl); properties.setProperty("org.quartz.dataSource.qzDS.user",dsUser); properties.setProperty("org.quartz.dataSource.qzDS.password",dsPassword); return properties; } /* * 通过SchedulerFactoryBean获取Scheduler的实例 */ @Bean(name="scheduler") public Scheduler scheduler() throws Exception { return schedulerFactoryBean().getScheduler(); } }

任务模板

Job基类

public abstract class BaseJob implements Job, Serializable { private static final String JOB_MAP_KEY = "self"; public static final String STATUS_RUNNING = "1"; public static final String STATUS_NOT_RUNNING = "0"; public static final String CONCURRENT_IS = "1"; public static final String CONCURRENT_NOT = "0"; /** * 任务名称 */ private String jobName; /** * 任务分组 */ private String jobGroup; /** * 任务状态 是否启动任务 */ private String jobStatus; /** * cron表达式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任务执行时调用哪个类的方法 包名+类名 */ private Class beanClass = this.getClass(); /** * 任务是否有状态 */ private String isConcurrent; /** * Spring bean */ private String springBean; /** * 任务调用的方法名 */ private String methodName; /** * 为了将执行后的任务持久化到数据库中 */ @JsonIgnore private JobDataMap dataMap = new JobDataMap(); public JobKey getJobKey(){ return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key } public JobDataMap getDataMap(){ if(dataMap.size() == 0){ dataMap.put(JOB_MAP_KEY,this); } return dataMap; } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobGroup() { return jobGroup; } public void setJobGroup(String jobGroup) { this.jobGroup = jobGroup; } public String getJobStatus() { return jobStatus; } public void setJobStatus(String jobStatus) { this.jobStatus = jobStatus; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public Class getBeanClass() { return beanClass; } public void setBeanClass(Class beanClass) { this.beanClass = beanClass; } public String getIsConcurrent() { return isConcurrent; } public void setIsConcurrent(String isConcurrent) { this.isConcurrent = isConcurrent; } public String getSpringBean() { return springBean; } public void setSpringBean(String springBean) { this.springBean = springBean; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } }

Job模板类

@Slf4j public abstract class BaseJobTemplate extends BaseJob { @Value("${base2048.job.enable:false}") private Boolean JOB_LOCAL_RUNING; @Override public final void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { if (JOB_LOCAL_RUNING) { try { this.runing(jobExecutionContext); } catch (Exception ex) { throw new JobExecutionException(ex); } } else { log.info("配置参数不允许在本机执行定时任务"); } } public abstract void runing(JobExecutionContext jobExecutionContext); }

Job示例类

业务Job从模板类继承。

@Slf4j @Component @DisallowConcurrentExecution public class TestJob extends BaseJobTemplate { @Override public void runing(JobExecutionContext jobExecutionContext)  { try { log.info("测试任务开始:【{}】", Instant.now().atOffset(ZoneOffset.ofHours(8))); System.out.println("============= 测试任务正在运行 ====================="); System.out.println("============= Test job is running ==============="); log.info("测试任务结束:【{}】", Instant.now().atOffset(ZoneOffset.ofHours(8))); } catch (Exception ex) { log.error("测试任务异常:【{}】", Instant.now().atOffset(ZoneOffset.ofHours(8))); log.error(ex.getMessage(), ex); } } }

管理功能

Controller

@RestController @RequestMapping("/api/system/auth/job") @Slf4j public class QuartzJobController { @Resource private QuartzService quartzService; @PostMapping("/save") @ApiOperation(value = "保存添加或修改任务",notes = "保存添加或修改任务") public ResultDTO addOrUpdate(@RequestBody JobUpdateDTO jobUpdateDTO) throws Exception { if (StringUtils.isBlank(jobUpdateDTO.getOldJobName())) { ResultDTO resultDTO = this.addSave(jobUpdateDTO); return resultDTO; } else { /** * 先删除后添加 */ JobDTO jobDTO = new JobDTO(); jobDTO.setJobName(jobUpdateDTO.getOldJobName()); jobDTO.setJobGroup(jobUpdateDTO.getOldJobGroup()); this.delete(jobDTO); ResultDTO resultDTO = this.addSave(jobUpdateDTO); return resultDTO; } } private ResultDTO addSave(@RequestBody JobUpdateDTO jobUpdateDTO) throws Exception { BaseJob job = (BaseJob) Class.forName(jobUpdateDTO.getJobClassName()).newInstance(); job.setJobName(jobUpdateDTO.getJobName()); job.setJobGroup(jobUpdateDTO.getJobGroup()); job.setDescription(jobUpdateDTO.getDescription()); job.setCronExpression(jobUpdateDTO.getCronExpression()); try { quartzService.addJob(job); return  ResultDTO.success(); }catch (Exception ex){ log.error(ex.getMessage(),ex); return ResultDTO.failureCustom("保存添加任务时服务发生意外情况。"); } } @PostMapping("/page") @ApiOperation(value = "查询任务",notes = "查询任务") public ResultDTO getJobPage(@RequestBody BasePageQueryVO basePageQueryVO) { try { IPage jobDtoPage = quartzService.queryJob(basePageQueryVO.getCurrentPage(),basePageQueryVO.getPageSize()); return  ResultDTO.success(jobDtoPage); }catch (Exception ex){ log.error(ex.getMessage(),ex); return ResultDTO.failureCustom("查询任务时服务发生意外情况。"); } } @PostMapping("/pause") @ApiOperation(value = "暂停任务",notes = "暂停任务") public ResultDTO pause(@RequestBody JobDTO jobDTO) { try { quartzService.pauseJob(jobDTO.getJobName(),jobDTO.getJobGroup()); return ResultDTO.success(); }catch (Exception ex){ log.error(ex.getMessage(),ex); return ResultDTO.failureCustom("暂停任务时服务发生意外情况。"); } } @PostMapping("/resume") @ApiOperation(value = "恢复任务",notes = "恢复任务") public ResultDTO resume(@RequestBody JobDTO jobDTO) { try { quartzService.resumeJob(jobDTO.getJobName(),jobDTO.getJobGroup()); return ResultDTO.success(); }catch (Exception ex){ log.error(ex.getMessage(),ex); return ResultDTO.failureCustom("恢复任务时服务发生意外情况。"); } } @PostMapping("/delete") @ApiOperation(value = "删除任务",notes = "删除任务") public ResultDTO delete(@RequestBody JobDTO jobDTO) { try { if(quartzService.deleteJob(jobDTO.getJobName(),jobDTO.getJobGroup())) { return ResultDTO.failureCustom("删除失败。"); }else{ return ResultDTO.success(); } }catch (Exception ex){ log.error(ex.getMessage(),ex); return ResultDTO.failureCustom("删除任务时服务发生意外情况。"); } } @GetMapping("/blank") public ResultDTO getBlankJobDTO(){ JobUpdateDTO jobUpdateDTO = new JobUpdateDTO(); jobUpdateDTO.setJobClassName("com.qiyuan.base2048.quartz.job.jobs."); jobUpdateDTO.setCronExpression("*/9 * * * * ?"); return ResultDTO.success(jobUpdateDTO); } @PostMapping("/dataforupdate") public ResultDTO getUpdateJobDTO(@RequestBody JobDTO jobDTO){ JobUpdateDTO jobUpdateDTO = JobDtoTransMapper.INSTANCE.map(jobDTO); jobUpdateDTO.setOldJobName(jobDTO.getJobName()); jobUpdateDTO.setOldJobGroup(jobDTO.getJobGroup()); return ResultDTO.success(jobUpdateDTO); } }

JobDTO

@Data public class JobDTO { private String jobClassName; private String jobName; private String jobGroup; private String description; private String cronExpression; private String triggerName; private String triggerGroup; private String timeZoneId; private String triggerState; private Date startTime; private Date nextFireTime; private Date previousFireTime; }

JobUpdateDTO

@Data public class JobUpdateDTO  extends JobDTO{ private String oldJobName; private String oldJobGroup; }

Service

@Service @Slf4j public class QuartzServiceImpl implements QuartzService { /** * Scheduler代表一个调度容器,一个调度容器可以注册多个JobDetail和Trigger.当Trigger和JobDetail组合,就可以被Scheduler容器调度了 */ @Autowired private Scheduler scheduler; @Resource private QrtzJobDetailsMapper qrtzJobDetailsMapper; @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired public QuartzServiceImpl(Scheduler scheduler){ this.scheduler = scheduler; } @Override public IPage queryJob(int pageNum, int pageSize) throws Exception{ List jobList = null; try { Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher matcher = GroupMatcher.anyJobGroup(); Set jobKeys = scheduler.getJobKeys(matcher); jobList = new ArrayList<>(); for (JobKey jobKey : jobKeys) { List triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { JobDTO jobDetails = new JobDTO(); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; jobDetails.setCronExpression(cronTrigger.getCronExpression()); jobDetails.setTimeZoneId(cronTrigger.getTimeZone().getDisplayName()); } jobDetails.setTriggerGroup(trigger.getKey().getName()); jobDetails.setTriggerName(trigger.getKey().getGroup()); jobDetails.setJobGroup(jobKey.getGroup()); jobDetails.setJobName(jobKey.getName()); jobDetails.setStartTime(trigger.getStartTime()); jobDetails.setJobClassName(scheduler.getJobDetail(jobKey).getJobClass().getName()); jobDetails.setNextFireTime(trigger.getNextFireTime()); jobDetails.setPreviousFireTime(trigger.getPreviousFireTime()); jobDetails.setTriggerState(scheduler.getTriggerState(trigger.getKey()).name()); jobList.add(jobDetails); } } } catch (SchedulerException e) { e.printStackTrace(); } IPage jobDTOPage = new Page<>(pageNum,pageSize); jobDTOPage.setRecords(jobList); jobDTOPage.setTotal(jobList.size()); jobDTOPage.setCurrent(1); jobDTOPage.setPages(1); jobDTOPage.setSize(jobList.size()); return jobDTOPage; } /** * 添加一个任务 * @param job * @throws SchedulerException */ @Override public void addJob(BaseJob job) throws SchedulerException { /** 创建JobDetail实例,绑定Job实现类 * JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容 * 另外JobDetail还包含了这个任务调度的方案和策略**/ // 指明job的名称,所在组的名称,以及绑定job类 JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass()) .withIdentity(job.getJobKey()) .withDescription(job.getDescription()) .usingJobData(job.getDataMap()) .build(); /** * Trigger代表一个调度参数的配置,什么时候去调度 */ //定义调度触发规则, 使用cronTrigger规则 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(job.getJobName(),job.getJobGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression())) .startNow() .build(); //将任务和触发器注册到任务调度中去 scheduler.scheduleJob(jobDetail,trigger); //判断调度器是否启动 if(!scheduler.isStarted()){ scheduler.start(); } log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName())); } /** * 根据任务名和任务组名来暂停一个任务 * @param jobName * @param jobGroupName * @throws SchedulerException */ @Override public void pauseJob(String jobName,String jobGroupName) throws SchedulerException { scheduler.pauseJob(JobKey.jobKey(jobName,jobGroupName)); } /** * 根据任务名和任务组名来恢复一个任务 * @param jobName * @param jobGroupName * @throws SchedulerException */ @Override public void resumeJob(String jobName,String jobGroupName) throws SchedulerException { scheduler.resumeJob(JobKey.jobKey(jobName,jobGroupName)); } public void rescheduleJob(String jobName,String jobGroupName,String cronExpression,String description) throws SchedulerException { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); // 表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withDescription(description).withSchedule(scheduleBuilder).build(); // 按新的trigger重新设置job执行 scheduler.rescheduleJob(triggerKey, trigger); } /** * 根据任务名和任务组名来删除一个任务 * @param jobName * @param jobGroupName * @throws SchedulerException */ @Override public boolean deleteJob(String jobName,String jobGroupName) throws SchedulerException { TriggerKey triggerKey = TriggerKey.triggerKey(jobName,jobGroupName); scheduler.pauseTrigger(triggerKey); //先暂停 scheduler.unscheduleJob(triggerKey); //取消调度 boolean flag = scheduler.deleteJob(JobKey.jobKey(jobName,jobGroupName)); return flag; } private JobDTO createJob(String jobName, String jobGroup, Scheduler scheduler, Trigger trigger) throws SchedulerException { JobDTO job = new JobDTO(); job.setJobName(jobName); job.setJobGroup(jobGroup); job.setDescription("触发器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setTriggerState(triggerState.name()); if(trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger)trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } return job; } }

至此,烤串完毕,火侯正好,外酥里嫩!

项目源码仓库github
项目源码仓库gitee

到此这篇关于Springboot-admin整合Quartz实现动态管理定时任务的文章就介绍到这了,更多相关Springboot-admin整合Quartz内容请搜索0133技术站以前的文章或继续浏览下面的相关文章希望大家以后多多支持0133技术站!

以上就是Springboot-admin整合Quartz实现动态管理定时任务的过程详解的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » Java