簡介
quartz是一款功能強大的任務調度器,可以實現較為復雜的調度功能,如每月一號執(zhí)行、每天凌晨執(zhí)行、每周五執(zhí)行等等,還支持分布式調度。本文使用springboot+mybatis+quartz實現對定時任務的增、刪、改、查、啟用、停用等功能。并把定時任務持久化到數據庫以及支持集群。
quartz的3個基本要素
- scheduler:調度器。所有的調度都是由它控制。
- trigger: 觸發(fā)器。決定什么時候來執(zhí)行任務。
- jobdetail & job: jobdetail定義的是任務數據,而真正的執(zhí)行邏輯是在job中。使用jobdetail + job而不是job,這是因為任務是有可能并發(fā)執(zhí)行,如果scheduler直接使用job,就會存在對同一個job實例并發(fā)訪問的問題。而jobdetail & job 方式,sheduler每次執(zhí)行,都會根據jobdetail創(chuàng)建一個新的job實例,這樣就可以規(guī)避并發(fā)訪問的問題。
如何使用quartz
1.添加依賴
1
2
3
4
5
6
7
8
9
10
|
<dependency> <groupid>org.quartz-scheduler</groupid> <artifactid>quartz</artifactid> <version> 2.2 . 3 </version> </dependency> <dependency> <groupid>org.quartz-scheduler</groupid> <artifactid>quartz-jobs</artifactid> <version> 2.2 . 3 </version> </dependency> |
2.創(chuàng)建配置文件
在maven項目的resource目錄下創(chuàng)建quartz.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
org.quartz.scheduler.instancename = myscheduler org.quartz.scheduler.instanceid = auto org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapjobexecutioninusertransaction = false #線程池配置 org.quartz.threadpool. class = org.quartz.simpl.simplethreadpool org.quartz.threadpool.threadcount = 10 org.quartz.threadpool.threadpriority = 5 org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true #持久化配置 org.quartz.jobstore.misfirethreshold = 50000 org.quartz.jobstore. class = org.quartz.impl.jdbcjobstore.jobstoretx #支持集群 org.quartz.jobstore.isclustered = true org.quartz.jobstore.useproperties: true org.quartz.jobstore.clustercheckininterval = 15000 #使用weblogic連接oracle驅動 org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.oracle.weblogic.weblogicoracledelegate #org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate org.quartz.jobstore.tableprefix = qrtz_ org.quartz.jobstore.datasource = qzds #數據源連接信息,quartz默認使用c3p0數據源可以被自定義數據源覆蓋 org.quartz.datasource.qzds.driver = oracle.jdbc.driver.oracledriver org.quartz.datasource.qzds.url = jdbc:oracle:thin: @localhost : 1521 /xe org.quartz.datasource.qzds.user = root org.quartz.datasource.qzds.password = 123456 org.quartz.datasource.qzds.maxconnections = 10 |
說明:在使用quartz做持久化的時候需要用到quartz的11張表,可以去quartz官網下載對應版本的quartz,解壓打開docs/dbtables里面有對應數據庫的建表語句。關于quartz.properties配置的詳細解釋可以查看quartz官網。另外新建一張表tb_app_quartz用于存放定時任務基本信息和描述等信息,定時任務的增、刪、改、執(zhí)行等功能與此表沒有任何關系。
quartz的11張表:
1
2
3
4
5
6
7
8
9
10
|
//tb_app_quartz表的實體類 public class appquartz { private integer quartzid; //id 主鍵 private string jobname; //任務名稱 private string jobgroup; //任務分組 private string starttime; //任務開始時間 private string cronexpression; //corn表達式 private string invokeparam; //需要傳遞的參數 ...省略set get } |
3.quartz配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/** * 創(chuàng)建job 實例工廠,解決spring注入問題,如果使用默認會導致spring的@autowired 無法注入問題 * @author llq * */ @component public class jobfactory extends adaptablejobfactory{ @autowired private autowirecapablebeanfactory capablebeanfactory; @override protected object createjobinstance(triggerfiredbundle bundle) throws exception { //調用父類的方法 object jobinstance = super .createjobinstance(bundle); //進行注入 capablebeanfactory.autowirebean(jobinstance); return jobinstance; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
@configuration public class schedulerconfig implements applicationlistener<contextrefreshedevent>{ @autowired private jobfactory jobfactory; @autowired @qualifier ( "datasource" ) private datasource primarydatasource; @override public void onapplicationevent(contextrefreshedevent event) { system.out.println( "任務已經啟動..." +event.getsource()); } @bean public schedulerfactorybean schedulerfactorybean() throws ioexception { //獲取配置屬性 propertiesfactorybean propertiesfactorybean = new propertiesfactorybean(); propertiesfactorybean.setlocation( new classpathresource( "/quartz.properties" )); //在quartz.properties中的屬性被讀取并注入后再初始化對象 propertiesfactorybean.afterpropertiesset(); //創(chuàng)建schedulerfactorybean schedulerfactorybean factory = new schedulerfactorybean(); factory.setquartzproperties(propertiesfactorybean.getobject()); //使用數據源,自定義數據源 factory.setdatasource( this .primarydatasource); factory.setjobfactory(jobfactory); factory.setwaitforjobstocompleteonshutdown( true ); //這樣當spring關閉時,會等待所有已經啟動的quartz job結束后spring才能完全shutdown。 factory.setoverwriteexistingjobs( false ); factory.setstartupdelay( 1 ); return factory; } /* * 通過schedulerfactorybean獲取scheduler的實例 */ @bean (name= "scheduler" ) public scheduler scheduler() throws ioexception { return schedulerfactorybean().getscheduler(); } @bean public quartzinitializerlistener executorlistener() { return new quartzinitializerlistener(); } } |
4.創(chuàng)建定時任務服務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
@service public class jobutil { @autowired @qualifier ( "scheduler" ) private scheduler scheduler; /** * 新建一個任務 * */ public string addjob(appquartz appquartz) throws exception { simpledateformat df = new simpledateformat( "yyyy-mm-dd hh:mm:ss" ); date date=df.parse(appquartz.getstarttime()); if (!cronexpression.isvalidexpression(appquartz.getcronexpression())) { return "illegal cron expression" ; //表達式格式不正確 } jobdetail jobdetail= null ; //構建job信息 if ( "jobone" .equals(appquartz.getjobgroup())) { jobdetail = jobbuilder.newjob(jobone. class ).withidentity(appquartz.getjobname(), appquartz.getjobgroup()).build(); } if ( "jobtwo" .equals(appquartz.getjobgroup())) { jobdetail = jobbuilder.newjob(jobtwo. class ).withidentity(appquartz.getjobname(), appquartz.getjobgroup()).build(); } //表達式調度構建器(即任務執(zhí)行的時間,不立即執(zhí)行) cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(appquartz.getcronexpression()).withmisfirehandlinginstructiondonothing(); //按新的cronexpression表達式構建一個新的trigger crontrigger trigger = triggerbuilder.newtrigger().withidentity(appquartz.getjobname(), appquartz.getjobgroup()).startat(date) .withschedule(schedulebuilder).build(); //傳遞參數 if (appquartz.getinvokeparam()!= null && ! "" .equals(appquartz.getinvokeparam())) { trigger.getjobdatamap().put( "invokeparam" ,appquartz.getinvokeparam()); } scheduler.schedulejob(jobdetail, trigger); // pausejob(appquartz.getjobname(),appquartz.getjobgroup()); return "success" ; } /** * 獲取job狀態(tài) * @param jobname * @param jobgroup * @return * @throws schedulerexception */ public string getjobstate(string jobname, string jobgroup) throws schedulerexception { triggerkey triggerkey = new triggerkey(jobname, jobgroup); return scheduler.gettriggerstate(triggerkey).name(); } //暫停所有任務 public void pausealljob() throws schedulerexception { scheduler.pauseall(); } //暫停任務 public string pausejob(string jobname, string jobgroup) throws schedulerexception { jobkey jobkey = new jobkey(jobname, jobgroup); jobdetail jobdetail = scheduler.getjobdetail(jobkey); if (jobdetail == null ) { return "fail" ; } else { scheduler.pausejob(jobkey); return "success" ; } } //恢復所有任務 public void resumealljob() throws schedulerexception { scheduler.resumeall(); } // 恢復某個任務 public string resumejob(string jobname, string jobgroup) throws schedulerexception { jobkey jobkey = new jobkey(jobname, jobgroup); jobdetail jobdetail = scheduler.getjobdetail(jobkey); if (jobdetail == null ) { return "fail" ; } else { scheduler.resumejob(jobkey); return "success" ; } } //刪除某個任務 public string deletejob(appquartz appquartz) throws schedulerexception { jobkey jobkey = new jobkey(appquartz.getjobname(), appquartz.getjobgroup()); jobdetail jobdetail = scheduler.getjobdetail(jobkey); if (jobdetail == null ) { return "jobdetail is null" ; } else if (!scheduler.checkexists(jobkey)) { return "jobkey is not exists" ; } else { scheduler.deletejob(jobkey); return "success" ; } } //修改任務 public string modifyjob(appquartz appquartz) throws schedulerexception { if (!cronexpression.isvalidexpression(appquartz.getcronexpression())) { return "illegal cron expression" ; } triggerkey triggerkey = triggerkey.triggerkey(appquartz.getjobname(),appquartz.getjobgroup()); jobkey jobkey = new jobkey(appquartz.getjobname(),appquartz.getjobgroup()); if (scheduler.checkexists(jobkey) && scheduler.checkexists(triggerkey)) { crontrigger trigger = (crontrigger) scheduler.gettrigger(triggerkey); //表達式調度構建器,不立即執(zhí)行 cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(appquartz.getcronexpression()).withmisfirehandlinginstructiondonothing(); //按新的cronexpression表達式重新構建trigger trigger = trigger.gettriggerbuilder().withidentity(triggerkey) .withschedule(schedulebuilder).build(); //修改參數 if (!trigger.getjobdatamap().get( "invokeparam" ).equals(appquartz.getinvokeparam())) { trigger.getjobdatamap().put( "invokeparam" ,appquartz.getinvokeparam()); } //按新的trigger重新設置job執(zhí)行 scheduler.reschedulejob(triggerkey, trigger); return "success" ; } else { return "job or trigger not exists" ; } } } |
1
2
3
4
5
6
7
8
9
10
11
|
@persistjobdataafterexecution @disallowconcurrentexecution @component public class jonone implements job{ @override public void execute(jobexecutioncontext context) throws jobexecutionexception{ jobdatamap data=context.gettrigger().getjobdatamap(); string invokeparam =(string) data.get( "invokeparam" ); //在這里實現業(yè)務邏輯 } } |
1
2
3
4
5
6
7
8
9
10
11
|
@persistjobdataafterexecution @disallowconcurrentexecution @component public class jobtwo implements job{ @override public void execute(jobexecutioncontext context) throws jobexecutionexception{ jobdatamap data=context.gettrigger().getjobdatamap(); string invokeparam =(string) data.get( "invokeparam" ); //在這里實現業(yè)務邏輯 } } |
說明:每個定時任務都必須有一個分組,名稱和corn表達式,corn表達式也就是定時任務的觸發(fā)時間,關于corn表達式格式以及含義可以參考一些網絡資源。每個定時任務都有一個入口類在這里我把類名當成定時任務的分組名稱,例如:只要創(chuàng)建定時任務的分組是jobone的都會執(zhí)行jobone這個任務類里面的邏輯。如果定時任務需要額外的參數可以使用jobdatamap傳遞參數,當然也可以從數據庫中獲取需要的數據。@persistjobdataafterexecution和@disallowconcurrentexecution注解是不讓某個定時任務并發(fā)執(zhí)行,只有等當前任務完成下一個任務才會去執(zhí)行。
5.封裝定時任務接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
@restcontroller public class jobcontroller { @autowired private jobutil jobutil; @autowired private appquartzservice appquartzservice; //添加一個job @requestmapping (value= "/addjob" ,method=requestmethod.post) public returnmsg addjob( @requestbody appquartz appquartz) throws exception { appquartzservice.insertappquartzser(appquartz); result=jobutil.addjob(appquartz); } //暫停job @requestmapping (value= "/pausejob" ,method=requestmethod.post) public returnmsg pausejob( @requestbody integer[]quartzids) throws exception { appquartz appquartz= null ; if (quartzids.length> 0 ){ for (integer quartzid:quartzids) { appquartz=appquartzservice.selectappquartzbyidser(quartzid).get( 0 ); jobutil.pausejob(appquartz.getjobname(), appquartz.getjobgroup()); } return new returnmsg( "200" , "success pausejob" ); } else { return new returnmsg( "404" , "fail pausejob" ); } } //恢復job @requestmapping (value= "/resumejob" ,method=requestmethod.post) public returnmsg resumejob( @requestbody integer[]quartzids) throws exception { appquartz appquartz= null ; if (quartzids.length> 0 ) { for (integer quartzid:quartzids) { appquartz=appquartzservice.selectappquartzbyidser(quartzid).get( 0 ); jobutil.resumejob(appquartz.getjobname(), appquartz.getjobgroup()); } return new returnmsg( "200" , "success resumejob" ); } else { return new returnmsg( "404" , "fail resumejob" ); } } //刪除job @requestmapping (value= "/deletjob" ,method=requestmethod.post) public returnmsg deletjob( @requestbody integer[]quartzids) throws exception { appquartz appquartz= null ; for (integer quartzid:quartzids) { appquartz=appquartzservice.selectappquartzbyidser(quartzid).get( 0 ); string ret=jobutil.deletejob(appquartz); if ( "success" .equals(ret)) { appquartzservice.deleteappquartzbyidser(quartzid); } } return new returnmsg( "200" , "success deletejob" ); } //修改 @requestmapping (value= "/updatejob" ,method=requestmethod.post) public returnmsg modifyjob( @requestbody appquartz appquartz) throws exception { string ret= jobutil.modifyjob(appquartz); if ( "success" .equals(ret)) { appquartzservice.updateappquartzser(appquartz); return new returnmsg( "200" , "success updatejob" ,ret); } else { return new returnmsg( "404" , "fail updatejob" ,ret); } } //暫停所有 @requestmapping (value= "/pauseall" ,method=requestmethod.get) public returnmsg pausealljob() throws exception { jobutil.pausealljob(); return new returnmsg( "200" , "success pauseall" ); } //恢復所有 @requestmapping (value= "/repauseall" ,method=requestmethod.get) public returnmsg repausealljob() throws exception { jobutil.resumealljob(); return new returnmsg( "200" , "success repauseall" ); } } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://segmentfault.com/a/1190000016554033