spring Batch是一個基于Spring的企業級批處理框架,它通過配合定時器Quartz來輕易實現大批量的數據讀取或插入,并且全程自動化,無需人員管理。
在使用spring batch之前,得對spring batch的流程有一個基本了解
每個batch它都包含了一個job,而一個job中卻有可能包含多個step,整個batch中干活的是step,batch主要是用來對數據的操作,所以step就有三個操作數據的東西,一個是ItemReader用來讀取數據的,一個是ItemProcessor用來處理數據的,一個是ItemWriter用來寫數據(可以是文件也可以是插入sql語句),JobLauncher用來啟動Job,JobRepository是上述處理提供的一種持久化機制,它為JobLauncher,Job,和Step實例提供CRUD操作。
pom.xml 三個batch的jar包
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
< dependency > < groupId >org.springframework</ groupId > < artifactId >spring-batch-core</ artifactId > < version >2.1.8.RELEASE</ version > </ dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-batch-infrastructure</ artifactId > < version >2.1.8.RELEASE</ version > < dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-batch-test</ artifactId > < version >2.1.8.RELEASE</ version > </ dependency > |
batch.xml
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
< beans xmlns = " http://www.springframework.org/schema/beans " xmlns:batch = " http://www.springframework.org/schema/batch " xmlns:xsi = " http://www.w3.org/2001/XMLSchema-instance " xsi:schemaLocation=" http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd "> < bean id = "jobLauncher" class = "org.springframework.batch.core.launch.support.SimpleJobLauncher" > < property name = "jobRepository" ref = "jobRepository" /> </ bean > < bean id = "jobRepository" class = "org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" > < property name = "validateTransactionState" value = "false" /> </ bean > <!--一個job--> < batch:job id = "writerteacherInterview" > < batch:step id = "teacherInterview" > < batch:tasklet > < batch:chunk reader = "jdbcItemReaderTeacherInterview" writer = "teacherInterviewItemWriter" processor = "teacherInterviewProcessor" commit-interval = "10" > </ batch:chunk > </ batch:tasklet > </ batch:step > </ batch:job > <!--job的讀取數據操作--> < bean id = "jdbcItemReaderTeacherInterview" class = "org.springframework.batch.item.database.JdbcCursorItemReader" scope = "step" > < property name = "dataSource" ref = "dataSource" /> < property name = "sql" value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord where pdate >'${detail_startime}' and pdate < '${detail_endtime}' GROUP BY teacherName " /> < property name = "rowMapper" ref = "teacherInterviewMapper" > </ property > </ bean > </ beans > |
讀取數據 teacherInterviewMapper
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package com.yc.batch; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; @Component ( "teacherInterviewMapper" ) public class TeacherInterviewMapper implements RowMapper { @Override public Object mapRow(ResultSet rs, int rowNum) throws SQLException { TeacherInterviewdetail TId= new TeacherInterviewdetail(); TId.setTeacherName(rs.getString( "teacherName" )); TId.setNum(rs.getInt( "num" )); return TId; } } |
處理數據 teacherInterviewProcessor ,這個處理數據方法,一般都是在這里在這里進行一些數據的加工,比如有些數據沒有讀到,你也可以在這個方法和后面那個寫入數據的類里面寫,所以就導致了這個類里面你可以什么都不敢,直接把數據拋到后面去,讓后面的寫數據類來處理;我這里就是處理數據的這個類什么都沒寫,但是最好還是按它的規則來!
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.yc.batch; import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; //業務層 @Component ( "teacherInterviewProcessor" ) public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> { @Override public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception { return teacherInterviewdetail; } } |
寫數據 teacherInterviewItemWriter 這個類里面主要是把數據寫進一個文件里,同時我這個類里面還有一些數據處理
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package com.yc.batch; import java.io.InputStream; import java.text.NumberFormat; import java.util.ArrayList; import java.util.List; import java.util.Properties; import javax.annotation.Resource; import org.springframework.batch.item.ItemWriter; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.biz.ExamineeClassBiz; import com.yc.biz.WorkBiz; import com.yc.utils.CsvUtils; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; import net.sf.ehcache.util.PropertyUtil; //寫 @Component ( "teacherInterviewItemWriter" ) public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{ @Override public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception { Properties props = new Properties(); InputStream in= PropertyUtil. class .getClassLoader().getResourceAsStream( "connectionConfig.properties" ); props.load(in); String time=props.getProperty( "detail_time" ); CsvUtils cu= new CsvUtils(); List<Object> works= new ArrayList<Object>(); for (TeacherInterviewdetail t:teacherInterviewdetails){ works.add(t); } String path= this .getClass().getResource( "/" ).getPath(); path=path.substring( 0 ,path.lastIndexOf( "/" )); path=path.substring( 0 ,path.lastIndexOf( "/" )); path=path.substring( 0 ,path.lastIndexOf( "/" )); path=path.substring( 0 ,path.lastIndexOf( "/" )); cu.writeCsv(path+ "/csv/teacherInterview_" +time+ ".csv" ,works ); } } |
我這里有用到一個吧數據寫進CSV文件的jar包
1
|
2
3
4
5
|
< dependency > < groupId >net.sourceforge.javacsv</ groupId > < artifactId >javacsv</ artifactId > < version >2.0</ version > </ dependency > |
CsvUtils幫助類的寫入CSV文件方法
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
/** * 寫入CSV文件 * @throws IOException */ public void writeCsv(String path,List<Object> t) throws IOException{ String csvFilePath = path; String filepath=path.substring( 0 ,path.lastIndexOf( "/" )); File f= new File(filepath); if (!f.exists()){ f.mkdirs(); } File file= new File(path); if (!file.exists()){ file.createNewFile(); } CsvWriter wr = new CsvWriter(csvFilePath, ',' ,Charset.forName( "GBK" )); try { for (Object obj:t){ String[] contents=obj.toString().split( "," ); wr.writeRecord(contents); } wr.close(); } catch (IOException e) { e.printStackTrace(); } } |
就這樣一個基本的batch流程就跑起來了,它通過從數據里讀取一些數據,然后經過處理后,被存進服務器下的一個文件里面,之后像這種數據的讀取就不需要去數據庫里面查詢了,而是可以直接通過讀取CSV文件來處理這個業務。一般使用這個的都會配一個定時器,讓它們每隔一段時間跑一次,從而獲得較新的數據
下面是定時器的配置
定時器的配置非常簡單,我是使用注解方式來配置的
定時器任務類
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.yc.task.impl; import javax.transaction.Transactional; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.batch.ClassBatch; import com.yc.batch.MessageItemBatch; import com.yc.batch.TeacherInterviewBatch; import com.yc.batch.TearcherBatch; import com.yc.po.Work; import com.yc.task.WorkTask; import com.yc.vo.Workdetail; @Service public class WorkTaskImpl implements WorkTask{ @Autowired private TeacherInterviewBatch teacherInterviewBatch; //教師訪談記錄 public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) { this .teacherInterviewBatch = teacherInterviewBatch; } @Scheduled (cron= "0 30 22 * * ?" ) //每天晚上十點30執行一次 這個注解會讓框架會自動把這個方法看成任務啟動方法 @Override public void task() { try { teacherInterviewBatch.test(); //教師訪談 } catch (Exception e) { e.printStackTrace(); } } } |
定時器所真正要執行的方法
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package com.yc.batch; import javax.annotation.Resource; import org.apache.commons.jexl2.Main; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TeacherInterviewBatch { private Job job; private JobLauncher launcher; @Resource (name= "writerteacherInterview" ) public void setJob(Job job) { this .job = job; } @Autowired public void setLauncher(JobLauncher launcher) { this .launcher = launcher; } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://blog.csdn.net/pttaoge/article/details/76684656