創建批處理服務
本指南將引導您完成創建基本批處理驅動解決方案的過程。
你將建造什么
您將構建一個從 CSV 電子表格導入數據、使用自定義代碼對其進行轉換并將最終結果存儲在數據庫中的服務。
你需要什么
- 約15分鐘
- 最喜歡的文本編輯器或 IDE
- JDK 1.8或更高版本
- Gradle 4+或Maven 3.2+
- 您還可以將代碼直接導入 IDE:
- 彈簧工具套件 (STS)
- IntelliJ IDEA
如何完成本指南
像大多數 Spring入門指南一樣,您可以從頭開始并完成每個步驟,也可以繞過您已經熟悉的基本設置步驟。無論哪種方式,您最終都會得到工作代碼。
要從頭開始,請繼續從 Spring Initializr 開始。
要跳過基礎知識,請執行以下操作:
- 下載并解壓本指南的源代碼庫,或使用Git克隆它:git clone https://github.com/spring-guides/gs-batch-processing.git
- 光盤進入gs-batch-processing/initial
- 繼續創建商務艙。
完成后,您可以對照中的代碼檢查結果
gs-batch-processing/complete。
業務數據
通常,您的客戶或業務分析師會提供電子表格。對于這個簡單的示例,您可以在以下位置找到一些虛構的數據src/main/resources/sample-data.csv:
Jill,Doe Joe,Doe Justin,Doe Jane,Doe John,Doe
此電子表格的每一行都包含名字和姓氏,以逗號分隔。這是一種相當常見的模式,Spring 無需定制即可處理。
接下來,您需要編寫一個 SQL 腳本來創建一個表來存儲數據。您可以在以下位置找到這樣的腳本src/main/resources/schema-all.sql:
DROP TABLE people IF EXISTS; CREATE TABLE people ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20) );
Spring Bootschema-@@platform@@.sql在啟動期間自動運行。-all是所有平臺的默認設置。
從 Spring Initializr 開始
您可以使用這個預先初始化的項目并單擊 Generate 下載 ZIP 文件。此項目配置為適合本教程中的示例。
手動初始化項目:
- 導航到https://start.spring.io。該服務提取應用程序所需的所有依賴項,并為您完成大部分設置。
- 選擇 Gradle 或 Maven 以及您要使用的語言。本指南假定您選擇了 Java。
- 單擊Dependencies并選擇Spring Batch和HyperSQL Database。
- 單擊生成。
- 下載生成的 ZIP 文件,該文件是根據您的選擇配置的 Web 應用程序的存檔。
如果您的 IDE 具有 Spring Initializr 集成,您可以從您的 IDE 完成此過程。
你也可以從 Github 上 fork 項目并在你的 IDE 或其他編輯器中打開它。
創建商務艙
現在您可以看到數據輸入和輸出的格式,您可以編寫代碼來表示一行數據,如以下示例(來自src/main/java/com/example/batchprocessing/Person.java)所示:
package com.example.batchprocessing; public class Person { private String lastName; private String firstName; public Person() { } public Person(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; } }
創建中間處理器
批處理中的一個常見范例是攝取數據,對其進行轉換,然后將其通過管道輸出到其他地方。在這里,您需要編寫一個簡單的轉換器,將名稱轉換為大寫。以下清單(來自src/main/java/com/example/batchprocessing/PersonItemProcessor.java)顯示了如何執行此操作:
package com.example.batchprocessing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; public class PersonItemProcessor implements ItemProcessor { private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class); @Override public Person process(final Person person) throws Exception { final String firstName = person.getFirstName().toUpperCase(); final String lastName = person.getLastName().toUpperCase(); final Person transformedPerson = new Person(firstName, lastName); log.info("Converting (" + person + ") into (" + transformedPerson + ")"); return transformedPerson; } }
PersonItemProcessor實現 Spring Batch 的ItemProcessor接口。這使得將代碼連接到您將在本指南后面定義的批處理作業變得很容易。根據界面,您會收到一個傳入的Person對象,然后將其轉換為大寫的Person.
輸入和輸出類型不必相同。事實上,在讀取一個數據源之后,有時應用程序的數據流需要不同的數據類型。
將批處理作業放在一起
現在您需要將實際的批處理作業放在一起。Spring Batch 提供了許多實用程序類來減少編寫自定義代碼的需要。相反,您可以專注于業務邏輯。
要配置您的作業,您必須首先創建一個 Spring@Configuration類,如下例所示src/main/java/com/exampe/batchprocessing/BatchConfiguration.java:
@Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; ... }
對于初學者,@EnableBatchProcessing注釋添加了許多支持作業并為您節省大量工作的關鍵 bean。此示例使用基于內存的數據庫(由 提供@EnableBatchProcessing),這意味著完成后,數據就消失了。它還自動連接下面需要的幾個工廠?,F在將以下 bean 添加到您的BatchConfiguration類中以定義讀取器、處理器和寫入器:
@Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("personItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Person.class); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); }
第一段代碼定義了輸入、處理器和輸出。
- reader()創建一個ItemReader. 它查找一個名為的文件sample-data.csv,并使用足夠的信息解析每個行項目,以將其轉換為Person.
- processor()創建一個PersonItemProcessor您之前定義的實例,用于將數據轉換為大寫。
- writer(DataSource)創建一個ItemWriter. 這個針對 JDBC 目標,并自動獲取由@EnableBatchProcessing. 它包括插入單個 所需的 SQL 語句Person,由 Java bean 屬性驅動。
最后一個塊(來自src/main/java/com/example/batchprocessing/BatchConfiguration.java)顯示了實際的作業配置:
@Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return stepBuilderFactory.get("step1") .reader(reader()) .processor(processor()) .writer(writer) .build(); }
第一種方法定義了作業,第二種方法定義了一個步驟。作業是由步驟構建的,其中每個步驟都可能涉及讀取器、處理器和寫入器。
在此作業定義中,您需要一個增量器,因為作業使用數據庫來維護執行狀態。然后列出每個步驟(盡管此作業只有一個步驟)。作業結束,Java API 生成一個完美配置的作業。
在步驟定義中,您定義一次寫入多少數據。在這種情況下,它一次最多寫入十個記錄。接下來,您使用之前注入的 bean 配置讀取器、處理器和寫入器。
chunk()是前綴,因為它是一個通用方法。這表示每個處理“塊”的輸入和輸出類型,并與ItemReader和對齊ItemWriter。
批處理配置的最后一點是在作業完成時獲得通知的一種方式。以下示例(來自src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)顯示了這樣一個類:
package com.example.batchprocessing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("!!! JOB FINISHED! Time to verify the results"); jdbcTemplate.query("SELECT first_name, last_name FROM people", (rs, row) -> new Person( rs.getString(1), rs.getString(2)) ).forEach(person -> log.info("Found <" + person + "> in the database.")); } } }
JobCompletionNotificationListener監聽作業的時間,BatchStatus.COMPLETED然后用于JdbcTemplate檢查結果。
使應用程序可執行
盡管批處理可以嵌入到 Web 應用程序和 WAR 文件中,但下面演示的更簡單的方法可以創建一個獨立的應用程序。您將所有內容打包在一個可執行的 JAR 文件中,由一個很好的舊 Javamain()方法驅動。
Spring Initializr 為您創建了一個應用程序類。對于這個簡單的示例,它無需進一步修改即可工作。以下清單(來自
src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)顯示了應用程序類:
JobCompletionNotificationListener監聽作業的時間,BatchStatus.COMPLETED然后用于JdbcTemplate檢查結果。
使應用程序可執行
盡管批處理可以嵌入到 Web 應用程序和 WAR 文件中,但下面演示的更簡單的方法可以創建一個獨立的應用程序。您將所有內容打包在一個可執行的 JAR 文件中,由一個很好的舊 Javamain()方法驅動。
Spring Initializr 為您創建了一個應用程序類。對于這個簡單的示例,它無需進一步修改即可工作。以下清單(來自
src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)顯示了應用程序類:
package com.example.batchprocessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class BatchProcessingApplication { public static void main(String[] args) throws Exception { System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args))); } }
@SpringBootApplication是一個方便的注釋,它添加了以下所有內容:
- @Configuration: 將類標記為應用程序上下文的 bean 定義源。
- @EnableAutoConfiguration:告訴 Spring Boot 根據類路徑設置、其他 bean 和各種屬性設置開始添加 bean。例如,如果spring-webmvc位于類路徑上,則此注釋將應用程序標記為 Web 應用程序并激活關鍵行為,例如設置DispatcherServlet.
- @ComponentScan: 告訴 Spring 在包中查找其他組件、配置和服務com/example,讓它找到控制器。
該main()方法使用 Spring Boot 的SpringApplication.run()方法來啟動應用程序。您是否注意到沒有一行 XML?也沒有web.xml文件。這個 Web 應用程序是 100% 純 Java,您不必處理任何管道或基礎設施的配置。
請注意SpringApplication.exit()并System.exit()確保 JVM 在作業完成后退出。有關更多詳細信息,請參閱Spring Boot 參考文檔中的應用程序退出部分。
出于演示目的,有代碼可以創建一個JdbcTemplate、查詢數據庫并打印出批處理作業插入的人員姓名。
構建一個可執行的 JAR
您可以使用 Gradle 或 Maven 從命令行運行應用程序。您還可以構建一個包含所有必要依賴項、類和資源的單個可執行 JAR 文件并運行它。構建可執行 jar 可以在整個開發生命周期、跨不同環境等中輕松地作為應用程序交付、版本化和部署服務。
如果您使用 Gradle,則可以使用./gradlew bootRun. 或者,您可以使用構建 JAR 文件./gradlew build,然后運行 JAR 文件,如下所示:
java -jar build/libs/gs-batch-processing-0.1.0.jar
如果您使用 Maven,則可以使用./mvnw spring-boot:run. 或者,您可以使用構建 JAR 文件,./mvnw clean package然后運行該 JAR 文件,如下所示:
java -jar 目標/gs-batch-processing-0.1.0.jar
此處描述的步驟創建了一個可運行的 JAR。您還可以構建經典的 WAR 文件。
該作業為每個被轉換的人打印一行。作業運行后,您還可以看到查詢數據庫的輸出。它應該類似于以下輸出:
java -jar 目標/gs-batch-processing-0.1.0.jar
概括
恭喜!您構建了一個批處理作業,該作業從電子表格中提取數據,對其進行處理,然后將其寫入數據庫。
原文地址:https://www.toutiao.com/a7070000624930259459/