一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - SpringBoot+SpringBatch+Quartz整合定時批量任務方式

SpringBoot+SpringBatch+Quartz整合定時批量任務方式

2021-12-22 13:19止步前行 Java教程

這篇文章主要介紹了SpringBoot+SpringBatch+Quartz整合定時批量任務方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、引言

最近一周,被借調到其他部門,趕一個緊急需求,需求內容如下:

PC網頁觸發一條設備升級記錄(下圖),后臺要定時批量設備更新。這里定時要用到Quartz,批量數據處理要用到SpringBatch,二者結合,可以完成該需求。

由于之前,沒有用過SpringBatch,于是上網查了下資料,發現可參考的不是很多,于是只能去慢慢的翻看官方文檔

遇到不少問題,就記錄一下吧。

SpringBoot+SpringBatch+Quartz整合定時批量任務方式

二、代碼具體實現

1、pom文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
 </dependencies>

2、application.yaml文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
  datasource:
    username: thinklink
    password: thinklink
    url: jdbc:postgresql://172.16.205.54:5432/thinklink
    driver-class-name: org.postgresql.Driver
  batch:
    job:
      enabled: false
server:
  port: 8073
#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
# 每次批量處理的數據量,默認為5000
batch-size: 5000

3、Service實現類

觸發批處理任務的入口,執行一個job

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service("batchService")
public class BatchServiceImpl implements BatchService {
    // 框架自動注入
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job updateDeviceJob;
    /**
     * 根據 taskId 創建一個Job
     * @param taskId
     * @throws Exception
     */
    @Override
    public void createBatchJob(String taskId) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("taskId", taskId)
                .addString("uuid", UUID.randomUUID().toString().replace("-",""))
                .toJobParameters();
        // 傳入一個Job任務和任務需要的參數
        jobLauncher.run(updateDeviceJob, jobParameters);
    }
}

4、SpringBatch配置類

此部分最重要(☆☆☆☆☆)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@Configuration
public class BatchConfiguration {
    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
    @Value("${batch-size:5000}")
    private int batchSize;
    // 框架自動注入
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    // 框架自動注入
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    // 數據過濾器,對從數據庫讀出來的數據,注意進行操作
    @Autowired
    public TaskItemProcessor taskItemProcessor;
    // 接收job參數
    public Map<String, JobParameter> parameters;
    public Object taskId;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    // 讀取數據庫操作
    @Bean
    @StepScope
    public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
        String querySql = " SELECT " +
                " e. ID AS taskId, " +
                " e.user_id AS userId, " +
                " e.timing_startup AS startTime, " +
                " u.device_id AS deviceId, " +
                " d.app_name AS appName, " +
                " d.compose_file AS composeFile, " +
                " e.failure_retry AS failureRetry, " +
                " e.tetry_times AS retryTimes, " +
                " e.device_managered AS deviceManagered " +
                " FROM " +
                " eiot_upgrade_task e " +
                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
                " WHERE " +
                " ( " +
                " u.device_upgrade_status = 0 " +
                " OR u.device_upgrade_status = 2" +
                " )" +
                " AND e.tetry_times > u.retry_times " +
                " AND e. ID = ?";
        return new JdbcCursorItemReaderBuilder<DispatchRequest>()
                .name("itemReader")
                .sql(querySql)
                .dataSource(dataSource)
                .queryArguments(new Object[]{parameters.get("taskId").getValue()})
                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
                .build();
    }
    // 將結果寫回數據庫
    @Bean
    @StepScope
    public ItemWriter<ProcessResult> itemWriter() {
        return new ItemWriter<ProcessResult>() {
            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
                Integer retryTimes = jdbcTemplate.queryForObject(
                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
                );
                retryTimes += 1;
                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
                if (updateCount <= 0) {
                    log.warn("no task updated");
                } else {
                    log.info("count of {} task updated", updateCount);
                }
                // 最后一次重試
                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
                    return 1;
                } else {
                    return 0;
                }
            }
            @Override
            @Transactional
            public void write(List<? extends ProcessResult> list) throws Exception {
                Map taskMap = jdbcTemplate.queryForMap(
                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
                        list.get(0).getDispatchRequest().getTaskId() // 我們認定一個批量里面,taskId都是一樣的
                        );
                int deviceManagered = (int)taskMap.get("device_managered");
                Integer deviceCount = (Integer) taskMap.get("device_count");
                if (deviceCount == null) {
                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
                }
                int taskStatus = (int)taskMap.get("task_status");
                for (ProcessResult result: list) {
                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
                }
                if (deviceCount != null && deviceManagered == deviceCount) {
                    taskStatus = 2; //任務狀態 0:待升級,1:升級中,2:已完成
                }
                jdbcTemplate.update("update eiot_upgrade_task  set device_managered = ?, task_status = ? " +
                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
            }
        };
    }
    /**
     * 定義一個下發更新的 job
     * @return
     */
    @Bean
    public Job updateDeviceJob(Step updateDeviceStep) {
        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .listener(new JobListener()) // 設置Job的監聽器
                .flow(updateDeviceStep)// 執行下發更新的Step
                .end()
                .build();
    }
    /**
     * 定義一個下發更新的 step
     * @return
     */
    @Bean
    public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .<DispatchRequest, ProcessResult> chunk(batchSize)
                .reader(itemReader) //根據taskId從數據庫讀取更新設備信息
                .processor(taskItemProcessor) // 每條更新信息,執行下發更新接口
                .writer(itemWriter)
                .build();
    }
    // job 監聽器
    public class JobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " before... ");
            parameters = jobExecution.getJobParameters().getParameters();
            taskId = parameters.get("taskId").getValue();
            log.info("job param taskId : " + parameters.get("taskId"));
        }
        @Override
        public void afterJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " after... ");
            // 當所有job執行完之后,查詢設備更新狀態,如果有失敗,則要定時重新執行job
            String sql = " SELECT " +
                    " count(*) " +
                    " FROM " +
                    " eiot_upgrade_device d " +
                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
                    " WHERE " +
                    " u. ID = ? " +
                    " AND d.retry_times < u.tetry_times " +
                    " AND ( " +
                    " d.device_upgrade_status = 0 " +
                    " OR d.device_upgrade_status = 2 " +
                    " ) ";
            // 獲取更新失敗的設備個數
            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            log.info("update device failure count : " + count);
            // 下面是使用Quartz觸發定時任務
            // 獲取任務時間,單位秒
//            String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            // 此處方便測試,應該從數據庫中取taskId對應的重試間隔,單位秒
            Integer millSecond = 10;
            if(count != null && count > 0){
                String jobName = "UpgradeTask_" + taskId;
                String reTaskId = taskId.toString();
                Map<String,Object> params = new HashMap<>();
                params.put("jobName",jobName);
                params.put("taskId",reTaskId);
                if (QuartzManager.checkNameNotExist(jobName))
                {
                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
                }
            }
        }
    }
}

5、Processor,處理每條數據

可以在此對數據進行過濾操作

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component("taskItemProcessor")
public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
    public static final int STATUS_DISPATCH_FAILED = 2;
    public static final int STATUS_DISPATCH_SUCC = 1;
    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
    private String dispatchUrl;
    @Autowired
    JdbcTemplate jdbcTemplate;
    /**
     * 在這里,執行 下發更新指令 的操作
     * @param dispatchRequest
     * @return
     * @throws Exception
     */
    @Override
    public ProcessResult process(final DispatchRequest dispatchRequest) {
        // 調用接口,下發指令
        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
        log.info("request url:" + url);
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
        JSONObject jsonOuter = new JSONObject();
        JSONObject jsonInner = new JSONObject();
        try {
            jsonInner.put("jobId",dispatchRequest.getTaskId());
            jsonInner.put("name",dispatchRequest.getName());
            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
            jsonInner.put("timestamp",dispatchRequest.getTimestamp());
            jsonOuter.put("method","updateApp");
            jsonOuter.put("params",jsonInner);
        } catch (JSONException e) {
            log.info("JSON convert Exception :" + e);
        }catch (IOException e) {
            log.info("Base64Util bytesToBase64Str :" + e);
        }
        log.info("request body json :" + jsonOuter);
        HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
        int status;
        try {
            ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
            log.info("response :" + response);
            if (response.getStatusCode() == HttpStatus.OK) {
                status = STATUS_DISPATCH_SUCC;
            } else {
                status = STATUS_DISPATCH_FAILED;
            }
        }catch (Exception e){
            status = STATUS_DISPATCH_FAILED;
        }
        return new ProcessResult(dispatchRequest, status);
    }
}

6、封裝數據庫返回數據的實體Bean

注意靜態內部類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DispatchRequest {
    private String taskId;
    private String deviceId;
    private String userId;
    private String name;
    private byte[] composeFile;
    private String policy;
    private String timestamp;
    private String md5;
    private int failureRetry;
    private int retryTimes;
    private int deviceManagered;
   // 省略構造函數,setter/getter/tostring方法
   //......
   
    public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
        @Override
        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
            DispatchRequest dispatchRequest = new DispatchRequest();
            dispatchRequest.setTaskId(resultSet.getString("taskId"));
            dispatchRequest.setUserId(resultSet.getString("userId"));
            dispatchRequest.setPolicy(resultSet.getString("startTime"));
            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
            dispatchRequest.setName(resultSet.getString("appName"));
            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
            return dispatchRequest;
        }
    }
}

7、啟動類上要加上注解

?
1
2
3
4
5
6
7
@SpringBootApplication
@EnableBatchProcessing
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

三、小結一下

其實SpringBatch并沒有想象中那么好用,當從數據庫中每次取5000條數據后,進入processor中是逐條處理的,這個時候不能不行操作,等5000條數據處理完之后,再一次性執行ItemWriter方法。

在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個地方,如何執行自定義的Sql,參考文中代碼就行。至于Quartz定時功能,很簡單,只要定時創建SpringBatch里面的Job,讓這個job啟動就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。

spring-batch與quartz集成過程中遇到的問題

問題

啟動時報Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,應用運行在weblogic上

解決

driverDelegateClass對應配置改為

?
1
org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/zxd1435513775/article/details/99677223

延伸 · 閱讀

精彩推薦
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
主站蜘蛛池模板: 精品日韩二区三区精品视频 | 精品无人区麻豆乱码1区2 | 无限时间看片在线观看 | 福利视频久久 | 国产成人免费片在线观看 | 欧美 亚洲 综合 卡通 另类 区 | 国产精品国产香蕉在线观看网 | 国产123区在线视频观看 | 午夜视频一区二区 | 久久久久久免费高清电影 | 精品国产91久久久久久久 | 天天综合色天天综合色sb | 日韩精品免费一级视频 | 性夜影院爽黄A爽免费动漫 性色欲情网站IWWW九文堂 | 国产精品久久久久久久久99热 | 成在线人免费视频一区二区三区 | 久久精品WWW人人爽人人 | 男女肉文高h | 男人疯狂擦进女人下面 | 放荡的女老板bd中文字幕 | 欧美影院天天5g天天爽 | 亚洲精品在线看 | 先锋资源av | oneday日本在线观看完整版 | 国产91精品在线播放 | 爆操| 成年视频在线播放 | 欧美国产影院 | 人阁色第四影院在线观看 | 午夜伦午夜伦锂电影 | 国产乱插 | 精品国产一级在线观看 | 日本护士撒尿xxxxhd | 国产99精品成人免费视频 | 亚洲精品一线二线三线 | 国产v日韩v欧美v精品专区 | 欧美ⅹxxxhd3d | 精品久久久久久久久久久久久久久 | 精品久久久久亚洲 | 四虎影视在线永久免费观看 | 精品手机在线1卡二卡3卡四卡 |