一、引言
最近一周,被借調到其他部門,趕一個緊急需求,需求內容如下:
PC網頁觸發一條設備升級記錄(下圖),后臺要定時批量設備更新。這里定時要用到Quartz,批量數據處理要用到SpringBatch,二者結合,可以完成該需求。
由于之前,沒有用過SpringBatch,于是上網查了下資料,發現可參考的不是很多,于是只能去慢慢的翻看官方文檔。
遇到不少問題,就記錄一下吧。
二、代碼具體實現
1、pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
< dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >org.postgresql</ groupId > < artifactId >postgresql</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-jdbc</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-batch</ artifactId > </ dependency > < dependency > < groupId >org.projectlombok</ groupId > < artifactId >lombok</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-batch</ artifactId > </ dependency > </ dependencies > |
2、application.yaml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
spring: datasource: username: thinklink password: thinklink url: jdbc:postgresql: //172.16.205.54:5432/thinklink driver- class -name: org.postgresql.Driver batch: job: enabled: false server: port: 8073 #upgrade-dispatch-base-url: http: //172.16.205.125:8080/api/rpc/dispatch/command/ upgrade-dispatch-base-url: http: //172.16.205.211:8080/api/noauth/rpc/dispatch/command/ # 每次批量處理的數據量,默認為 5000 batch-size: 5000 |
3、Service實現類
觸發批處理任務的入口,執行一個job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Service ( "batchService" ) public class BatchServiceImpl implements BatchService { // 框架自動注入 @Autowired private JobLauncher jobLauncher; @Autowired private Job updateDeviceJob; /** * 根據 taskId 創建一個Job * @param taskId * @throws Exception */ @Override public void createBatchJob(String taskId) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString( "taskId" , taskId) .addString( "uuid" , UUID.randomUUID().toString().replace( "-" , "" )) .toJobParameters(); // 傳入一個Job任務和任務需要的參數 jobLauncher.run(updateDeviceJob, jobParameters); } } |
4、SpringBatch配置類
此部分最重要(☆☆☆☆☆)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
@Configuration public class BatchConfiguration { private static final Logger log = LoggerFactory.getLogger(BatchConfiguration. class ); @Value ( "${batch-size:5000}" ) private int batchSize; // 框架自動注入 @Autowired public JobBuilderFactory jobBuilderFactory; // 框架自動注入 @Autowired public StepBuilderFactory stepBuilderFactory; // 數據過濾器,對從數據庫讀出來的數據,注意進行操作 @Autowired public TaskItemProcessor taskItemProcessor; // 接收job參數 public Map<String, JobParameter> parameters; public Object taskId; @Autowired private JdbcTemplate jdbcTemplate; // 讀取數據庫操作 @Bean @StepScope public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) { String querySql = " SELECT " + " e. ID AS taskId, " + " e.user_id AS userId, " + " e.timing_startup AS startTime, " + " u.device_id AS deviceId, " + " d.app_name AS appName, " + " d.compose_file AS composeFile, " + " e.failure_retry AS failureRetry, " + " e.tetry_times AS retryTimes, " + " e.device_managered AS deviceManagered " + " FROM " + " eiot_upgrade_task e " + " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " + " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " + " WHERE " + " ( " + " u.device_upgrade_status = 0 " + " OR u.device_upgrade_status = 2" + " )" + " AND e.tetry_times > u.retry_times " + " AND e. ID = ?" ; return new JdbcCursorItemReaderBuilder<DispatchRequest>() .name( "itemReader" ) .sql(querySql) .dataSource(dataSource) .queryArguments( new Object[]{parameters.get( "taskId" ).getValue()}) .rowMapper( new DispatchRequest.DispatchRequestRowMapper()) .build(); } // 將結果寫回數據庫 @Bean @StepScope public ItemWriter<ProcessResult> itemWriter() { return new ItemWriter<ProcessResult>() { private int updateTaskStatus(DispatchRequest dispatchRequest, int status) { log.info( "update taskId: {}, deviceId: {} to status {}" , dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status); Integer retryTimes = jdbcTemplate.queryForObject( "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?" , new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer. class ); retryTimes += 1 ; int updateCount = jdbcTemplate.update( "update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " + "where device_id = ? and upgrade_task_id = ?" , status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()); if (updateCount <= 0 ) { log.warn( "no task updated" ); } else { log.info( "count of {} task updated" , updateCount); } // 最后一次重試 if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) { log.info( "the last retry of {} failed, inc deviceManagered" , dispatchRequest.getTaskId()); return 1 ; } else { return 0 ; } } @Override @Transactional public void write(List<? extends ProcessResult> list) throws Exception { Map taskMap = jdbcTemplate.queryForMap( "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?" , list.get( 0 ).getDispatchRequest().getTaskId() // 我們認定一個批量里面,taskId都是一樣的 ); int deviceManagered = ( int )taskMap.get( "device_managered" ); Integer deviceCount = (Integer) taskMap.get( "device_count" ); if (deviceCount == null ) { log.warn( "deviceCount of task {} is null" , list.get( 0 ).getDispatchRequest().getTaskId()); } int taskStatus = ( int )taskMap.get( "task_status" ); for (ProcessResult result: list) { deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus()); } if (deviceCount != null && deviceManagered == deviceCount) { taskStatus = 2 ; //任務狀態 0:待升級,1:升級中,2:已完成 } jdbcTemplate.update( "update eiot_upgrade_task set device_managered = ?, task_status = ? " + "where id = ?" , deviceManagered, taskStatus, list.get( 0 ).getDispatchRequest().getTaskId()); } }; } /** * 定義一個下發更新的 job * @return */ @Bean public Job updateDeviceJob(Step updateDeviceStep) { return jobBuilderFactory.get(UUID.randomUUID().toString().replace( "-" , "" )) .listener( new JobListener()) // 設置Job的監聽器 .flow(updateDeviceStep) // 執行下發更新的Step .end() .build(); } /** * 定義一個下發更新的 step * @return */ @Bean public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) { return stepBuilderFactory.get(UUID.randomUUID().toString().replace( "-" , "" )) .<DispatchRequest, ProcessResult> chunk(batchSize) .reader(itemReader) //根據taskId從數據庫讀取更新設備信息 .processor(taskItemProcessor) // 每條更新信息,執行下發更新接口 .writer(itemWriter) .build(); } // job 監聽器 public class JobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() + " before... " ); parameters = jobExecution.getJobParameters().getParameters(); taskId = parameters.get( "taskId" ).getValue(); log.info( "job param taskId : " + parameters.get( "taskId" )); } @Override public void afterJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() + " after... " ); // 當所有job執行完之后,查詢設備更新狀態,如果有失敗,則要定時重新執行job String sql = " SELECT " + " count(*) " + " FROM " + " eiot_upgrade_device d " + " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " + " WHERE " + " u. ID = ? " + " AND d.retry_times < u.tetry_times " + " AND ( " + " d.device_upgrade_status = 0 " + " OR d.device_upgrade_status = 2 " + " ) " ; // 獲取更新失敗的設備個數 Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer. class ); log.info( "update device failure count : " + count); // 下面是使用Quartz觸發定時任務 // 獲取任務時間,單位秒 // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class); // 此處方便測試,應該從數據庫中取taskId對應的重試間隔,單位秒 Integer millSecond = 10 ; if (count != null && count > 0 ){ String jobName = "UpgradeTask_" + taskId; String reTaskId = taskId.toString(); Map<String,Object> params = new HashMap<>(); params.put( "jobName" ,jobName); params.put( "taskId" ,reTaskId); if (QuartzManager.checkNameNotExist(jobName)) { QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic. class ,params,millSecond); } } } } } |
5、Processor,處理每條數據
可以在此對數據進行過濾操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Component ( "taskItemProcessor" ) public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> { public static final int STATUS_DISPATCH_FAILED = 2 ; public static final int STATUS_DISPATCH_SUCC = 1 ; private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor. class ); @Value ( "${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}" ) private String dispatchUrl; @Autowired JdbcTemplate jdbcTemplate; /** * 在這里,執行 下發更新指令 的操作 * @param dispatchRequest * @return * @throws Exception */ @Override public ProcessResult process( final DispatchRequest dispatchRequest) { // 調用接口,下發指令 String url = dispatchUrl + dispatchRequest.getDeviceId()+ "/" +dispatchRequest.getUserId(); log.info( "request url:" + url); RestTemplate restTemplate = new RestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>(); JSONObject jsonOuter = new JSONObject(); JSONObject jsonInner = new JSONObject(); try { jsonInner.put( "jobId" ,dispatchRequest.getTaskId()); jsonInner.put( "name" ,dispatchRequest.getName()); jsonInner.put( "composeFile" , Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile())); jsonInner.put( "policy" , new JSONObject().put( "startTime" ,dispatchRequest.getPolicy())); jsonInner.put( "timestamp" ,dispatchRequest.getTimestamp()); jsonOuter.put( "method" , "updateApp" ); jsonOuter.put( "params" ,jsonInner); } catch (JSONException e) { log.info( "JSON convert Exception :" + e); } catch (IOException e) { log.info( "Base64Util bytesToBase64Str :" + e); } log.info( "request body json :" + jsonOuter); HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers); int status; try { ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String. class ); log.info( "response :" + response); if (response.getStatusCode() == HttpStatus.OK) { status = STATUS_DISPATCH_SUCC; } else { status = STATUS_DISPATCH_FAILED; } } catch (Exception e){ status = STATUS_DISPATCH_FAILED; } return new ProcessResult(dispatchRequest, status); } } |
6、封裝數據庫返回數據的實體Bean
注意靜態內部類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class DispatchRequest { private String taskId; private String deviceId; private String userId; private String name; private byte [] composeFile; private String policy; private String timestamp; private String md5; private int failureRetry; private int retryTimes; private int deviceManagered; // 省略構造函數,setter/getter/tostring方法 //...... public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> { @Override public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException { DispatchRequest dispatchRequest = new DispatchRequest(); dispatchRequest.setTaskId(resultSet.getString( "taskId" )); dispatchRequest.setUserId(resultSet.getString( "userId" )); dispatchRequest.setPolicy(resultSet.getString( "startTime" )); dispatchRequest.setDeviceId(resultSet.getString( "deviceId" )); dispatchRequest.setName(resultSet.getString( "appName" )); dispatchRequest.setComposeFile(resultSet.getBytes( "composeFile" )); dispatchRequest.setTimestamp(DateUtil.DateToString( new Date())); dispatchRequest.setRetryTimes(resultSet.getInt( "retryTimes" )); dispatchRequest.setFailureRetry(resultSet.getInt( "failureRetry" )); dispatchRequest.setDeviceManagered(resultSet.getInt( "deviceManagered" )); return dispatchRequest; } } } |
7、啟動類上要加上注解
1
2
3
4
5
6
7
|
@SpringBootApplication @EnableBatchProcessing public class Application { public static void main(String[] args) { SpringApplication.run(Application. class , args); } } |
三、小結一下
其實SpringBatch并沒有想象中那么好用,當從數據庫中每次取5000條數據后,進入processor中是逐條處理的,這個時候不能不行操作,等5000條數據處理完之后,再一次性執行ItemWriter方法。
在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個地方,如何執行自定義的Sql,參考文中代碼就行。至于Quartz定時功能,很簡單,只要定時創建SpringBatch里面的Job,讓這個job啟動就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。
spring-batch與quartz集成過程中遇到的問題
問題
啟動時報Exception
Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB
原因
quartz的driverDelegateClass配置的是OracleDelegate,應用運行在weblogic上
解決
driverDelegateClass對應配置改為
1
|
org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate |
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/zxd1435513775/article/details/99677223