什么是spring batch
spring batch 是一個輕量級的、完善的批處理框架,旨在幫助企業建立健壯、高效的批處理應用。spring batch是spring的一個子項目,使用java語言并基于spring框架為基礎開發,使的已經使用 spring 框架的開發者或者企業更容易訪問和利用企業服務。
spring batch 提供了大量可重用的組件,包括了日志、追蹤、事務、任務作業統計、任務重啟、跳過、重復、資源管理。對于大數據量和高性能的批處理任務,spring batch 同樣提供了高級功能和特性來支持,比如分區功能、遠程功能。總之,通過 spring batch 能夠支持簡單的、復雜的和大數據量的批處理作業。
spring batch 使用
我們首先配置spring batch 在spring boot 中的使用,數據庫用的是mysql,pom文件如下,因為spring boot 中的spring batch 包含 hsqsldb 所以我們將其去除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-batch</artifactid> <exclusions> <!-- 注意這里--> <exclusion> <groupid>org.hsqldb</groupid> <artifactid>hsqldb</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-jdbc</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.hibernate</groupid> <artifactid>hibernate-validator</artifactid> </dependency> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version> 5.1 . 21 </version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> |
配置好我們需要的實體類。頁面就不展示了。
如果有數據校驗添加的話那么我們需要配置自定義的檢驗器。若果沒有課略過該步驟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class csvbeanvalidator<t> implements validator<t>,initializingbean { private javax.validation.validator validator; @override public void validate(t value) throws validationexception { set<constraintviolation<t >> constraintviolations=validator.validate(value); if (constraintviolations.size()> 0 ){ stringbuilder message= new stringbuilder(); for (constraintviolation<t> constraintviolation:constraintviolations){ message.append(constraintviolation.getmessage() + "\n" ); } throw new validationexception(message.tostring()); } } //在這里我們使用的是jsr-303校驗數據,在此進行初始化 @override public void afterpropertiesset() throws exception { validatorfactory validatorfactory= validation.builddefaultvalidatorfactory(); validator=validatorfactory.usingcontext().getvalidator(); } } public class csvitemprocessor extends validatingitemprocessor<person> { @override public person process(person item) throws validationexception { super .process(item); // 在這里啟動 然后才會調用我們自定義的校驗器,否則不能通過 。 if (item.getnation().equals( "漢族" )){ item.setname( "01" ); } else { item.setnation( "02" ); } return item; } } |
進行job任務監聽 自定義類實現jobexecutionlistener 即可
1
2
3
4
5
6
7
8
9
10
11
12
13
|
long starttime; long endtime; @override public void beforejob(jobexecution jobexecution) { starttime = system.currenttimemillis(); system.out.println( "任務處理開始" ); } @override public void afterjob(jobexecution jobexecution) { endtime = system.currenttimemillis(); system.out.println( "耗時多長時間:" + (endtime - starttime) + "ms" ); system.out.println( "任務處理結束" ); } |
進行spring batch 的注入 方法有xml文件注入bean ,在這里選擇java注入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
@configuration @enablebatchprocessing //開啟批處理 public class csvbatchconfig { /**1 首先我們通過 flatfileitemreader 讀取我們需要的文件 通過setresource來實現 * 2 設置map 在這里通過先設置解析器 setlinetokenizer 來解析我們csv文件中的數 據 * 3 setfieldsetmapper 將我們需要的數據轉化為我們的實體對象 存儲 * 4 如果想 跳過前面的幾行 需要使用setlinestoskip就可以實現 */ @bean public itemreader<person> reader() throws exception { flatfileitemreader<person> reader = new flatfileitemreader<person>(); //1 reader.setresource( new classpathresource( "people.csv" )); //2 reader.setlinemapper( new defaultlinemapper<person>() {{ //3 setlinetokenizer( new delimitedlinetokenizer() {{ setnames( new string[] { "name" , "age" , "nation" , "address" }); }}); setfieldsetmapper( new beanwrapperfieldsetmapper<person>() {{ settargettype(person. class ); }}); }}); reader.setlinestoskip( 3 ); return reader; } @bean public itemprocessor<person, person> processor() { csvitemprocessor processor = new csvitemprocessor(); //1 processor.setvalidator(csvbeanvalidator()); //2 return processor; } /** *寫入數據到數據庫中 * 1執行的sql 語句 2 設置數據源 */ @bean public itemwriter<person> writer(datasource datasource) { //1 jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>(); //2 writer.setitemsqlparametersourceprovider( new beanpropertyitemsqlparametersourceprovider<person>()); string sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)" ; writer.setsql(sql); //3 writer.setdatasource(datasource); return writer; } // 作業的倉庫 就是設置數據源 @bean public jobrepository jobrepository(datasource datasource, platformtransactionmanager transactionmanager) throws exception { jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean(); jobrepositoryfactorybean.setdatasource(datasource); jobrepositoryfactorybean.settransactionmanager(transactionmanager); jobrepositoryfactorybean.setdatabasetype( "mysql" ); return jobrepositoryfactorybean.getobject(); } //調度器 使用它來執行 我們的批處理 @bean public simplejoblauncher joblauncher(datasource datasource, platformtransactionmanager transactionmanager) throws exception { simplejoblauncher joblauncher = new simplejoblauncher(); joblauncher.setjobrepository(jobrepository(datasource, transactionmanager)); return joblauncher; } //將監聽器加入到job中 @bean public job importjob(jobbuilderfactory jobs, step s1) { return jobs.get( "importjob" ) .incrementer( new runidincrementer()) .flow(s1) //1 .end() .listener(csvjoblistener()) //2 .build(); } //步驟綁定 reader 與writer 一次性處理65000條記錄 @bean public step step1(stepbuilderfactory stepbuilderfactory, itemreader<person> reader, itemwriter<person> writer, itemprocessor<person,person> processor) { return stepbuilderfactory .get( "step1" ) .<person, person>chunk( 65000 ) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @bean public csvjoblistener csvjoblistener() { return new csvjoblistener(); } @bean public validator<person> csvbeanvalidator() { return new csvbeanvalidator<person>(); } } |
在配置文件中 啟動自動執行批處理
spring.batch.job.names = job1,job2 #啟動時要執行的job,默認執行全部job
spring.batch.job.enabled=true #是否自動執行定義的job,默認是
spring.batch.initializer.enabled=true #是否初始化spring batch的數據庫,默認為是
spring.batch.schema=
spring.batch.table-prefix= #設置springbatch的數據庫表的前綴
項目匯總
從 項目中我們可以看到 總的步驟就是 首先讀取我們需要實現的文件進行解析,然后轉換成需要的實體類并且綁定到reader中,二 實現我們需要的writer 并且幫到到數據庫上,三實現job監聽器將其綁定到步驟中 。最后開啟批處理 自動執行入庫即可 。這個簡單步驟主要是配置中用到的 理解流程 自己也可以方便實現 批處理的流程。
總結
以上所述是小編給大家介紹的springboot和springbatch 使用,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://yq.aliyun.com/articles/619189