前面我們介紹了Spring Boot 整合 Elasticsearch 實現數據查詢檢索的功能,在實際項目中,我們的數據一般存儲在數據庫中,而且隨著業務的發送,數據也會隨時變化。
那么如何保證數據庫中的數據與Elasticsearch存儲的索引數據保持一致呢? 最原始的方案就是:當數據發生增刪改操作時同步更新Elasticsearch。但是這樣的設計耦合太高。接下來我們介紹一種非常簡單的數據同步方式:Logstash 數據同步。
一、Logstash簡介
1.什么是Logstash
logstash是一個開源的服務器端數據處理工具。簡單來說,就是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上濾網,Logstash提供里很多功能強大的濾網以滿足你的各種應用場景。
Logstash常用于日志系統中做日志采集設備,最常用于ELK中作為日志收集器使用。
2.Logstash的架構原理
Logstash的基本流程架構:input=》 filter =》 output 。
input(輸入):采集各種樣式,大小和來源數據,從各個服務器中收集數據。常用的有:jdbc、file、syslog、redis等。
filter(過濾器)負責數據處理與轉換。主要是將event通過output發出之前對其實現的某些處理功能。
output(輸出):將我們過濾出的數據保存到那些數據庫和相關存儲中,。

3.Logstash如何與Elasticsearch數據同步
實際項目中,我們不可能通過手動添加的方式將數據插入索引庫,所以需要借助第三方工具,將數據庫的數據同步到索引庫。此時,Logstash出現了,它可以將不同數據庫的數據同步到Elasticsearch中。保證數據庫與Elasticsearch的數據保持一致。

目前支持數據庫與ES數據同步的插件有很多,個人認為Logstash是眾多同步mysql數據到es的插件中,最穩定并且最容易配置的一個。
二、安裝Logstash
Logstash的使用方法也很簡單,下面講解一下,Logstash是如何使用的。需要說明的是:這里以windows 環境為例,演示Logstash的安裝和配置。
1.下載Logstash
首先,下載對應版本的Logstash包,可以通過上面提供下載elasticsearch的地址進行下載,完成后解壓。

上面是Logstash解壓后的目錄,我們需要關注是bin目錄中的執行文件和config中的配置文件。一般生產情況下,會使用Linux服務器,并且會將Logstash配置成自啟動的服務。這里測試的話,直接啟動。
2.配置Logstash
接下來,配置Logstash。需要我們編寫配置文件,根據官網和網上提供的配置文件,將其進行修改。
第一步:在Logstash根目錄下創建mysql文件夾,添加mysql.conf配置文件,配置Logstash需要的相應信息,具體配置如下:
- input {
- stdin {
- }
- jdbc {
- # mysql數據庫連接
- jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
- # mysqly用戶名和密碼
- jdbc_user => "root"
- jdbc_password => "root"
- # 驅動配置
- jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar"
- # 驅動類名
- jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
- #jdbc_paging_enabled => "true"
- #jdbc_page_size => "50000"
- jdbc_default_timezone => "Asia/Shanghai"
- # 執行指定的sql文件
- statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql"
- use_column_value => true
- # 是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false);
- lowercase_column_names => false
- # 需要記錄的字段,用于增量同步,需是數據庫字段
- tracking_column => updatetime
- # Value can be any of: numeric,timestamp,Default value is "numeric"
- tracking_column_type => timestamp
- # record_last_run上次數據存放位置;
- record_last_run => true
- #上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值
- last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log"
- # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
- clean_run => false
- # 設置監聽 各字段含義 分 時 天 月 年 ,默認全部為*代表含義:每分鐘都更新
- schedule => "* * * * *"
- # 索引類型
- type => "id"
- }
- }
- output {
- elasticsearch {
- #es服務器
- hosts => ["10.2.1.231:9200"]
- #ES索引名稱
- index => "book"
- #自增ID
- document_id => "%{id}"
- }
- stdout {
- codec => json_lines
- }
- }
第二步:將mysql-connector-java.jar 拷貝到前面配置的目錄下。上面的mysql.conf配置的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\
mysql-connector-java-8.0.20.jar。那么jar包拷貝到此目錄下即可:

上面是mysql的驅動,如果是sqlserver數據庫,下載SqlServer對應的驅動即可。放置的位置要與mysql.conf 配置文件中的jdbc_driver_library 地址保持一致。
第三步:創建sql目錄,創建bookquery.sql文件用于保存需要執行的sql 腳本。示例代碼如下:
- select * from book where updatetime >= :sql_last_value
- order by updatetime desc
這里使用的增量更新,所以使用:sql_last_value 記錄上一次記錄的最后時間。
3.啟動Logstash
進入logstash的bin目錄,執行如下命令:
- logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf
啟動成功之后,Logstash就會自動定時將數據寫入到Elasticsearch。如下圖所示:

同步完成后,我們使用Postman查詢Elasticsearch,驗證索引是否都創建成功。在postman中,發送 Get 請求:
http://10.2.1.231:9200/book/_search 。返回結果如下圖所示:

可以看到,數據庫中的數據已經通過Logstash同步至Elasticsearch。說明Logstash配置成功。
三、創建查詢服務
數據同步完成后,接下來我們使用Spring Boot 構建Elasticsearch查詢服務。首先創建Spring Boot項目并整合Elasticsearch,這個之前都已經介紹過,不清楚的朋友可以看我之前的文章。
接下來演示如何封裝完整的數據查詢服務。
1.數據實體
- @Document( indexName = "book" , replicas = 0)
- public class Book {
- @Id
- private Long id;
- @Field(analyzer = "ik_max_word",type = FieldType.Text)
- private String bookName;
- @Field(analyzer = "ik_max_word",type = FieldType.Text)
- private String author;
- private float price;
- private int page;
- @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- private Date createTime;
- @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- private Date updateTime;
- @Field(analyzer = "ik_max_word",type = FieldType.Text)
- private String category;
- public Long getId() {
- return id;
- }
- public void setId(Long id) {
- this.id = id;
- }
- public String getBookName() {
- return bookName;
- }
- public void setBookName(String bookName) {
- this.bookName = bookName;
- }
- public String getAuthor() {
- return author;
- }
- public void setAuthor(String author) {
- this.author = author;
- }
- public float getPrice() {
- return price;
- }
- public void setPrice(float price) {
- this.price = price;
- }
- public int getPage() {
- return page;
- }
- public void setPage(int page) {
- this.page = page;
- }
- public String getCategory() {
- return category;
- }
- public void setCategory(String category) {
- this.category = category;
- }
- public Book(){
- }
- public Date getCreateTime() {
- return createTime;
- }
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
- public Date getUpdateTime() {
- return updateTime;
- }
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
- }
2.請求封裝類
- public class BookQuery {
- public String category;
- public String bookName;
- public String author;
- public int priceMin;
- public int priceMax;
- public int pageMin;
- public int pageMax;
- public String sort;
- public String sortType;
- public int page;
- public int limit;
- }
3.創建Controller控制器
- @RestController
- public class ElasticSearchController {
- @Autowired
- private ElasticsearchRestTemplate elasticsearchRestTemplate;
- /**
- * 查詢信息
- * @param
- * @return
- */
- @PostMapping(value = "/book/query")
- public JSONResult query(@RequestBody BookQuery bookQuery){
- Query query= getQueryBuilder(bookQuery);
-
SearchHits
searchHits = elasticsearchRestTemplate.search(query, Book.class); -
List
> result = searchHits.getSearchHits(); - return JSONResult.ok(result);
- }
- public Query getQueryBuilder(BookQuery query) {
- BoolQueryBuilder builder = boolQuery();
- // 匹配器 模糊查詢部分,分析器使用ik (ik_max_word)
-
List
must = builder.must(); - if (query.getBookName()!=null && !query.getBookName().isEmpty())
- must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*"));
- if (query.getCategory()!=null && !query.getCategory().isEmpty())
- must.add(wildcardQuery("category", "*" +query.getCategory()+ "*"));
- if (query.getAuthor()!=null && !query.getAuthor().isEmpty())
- must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*"));
- // 篩選器 精確查詢部分
-
List
filter = builder.filter(); - // 范圍查詢
- if (query.getPriceMin()>0 && query.getPriceMax()>0) {
- RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax());
- filter.add(price);
- }
- // 范圍查詢
- if (query.getPageMin()>0 && query.getPageMax()>0) {
- RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax());
- filter.add(page);
- }
- // 分頁
- PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit());
- // 排序
- SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC);
- //設置高亮效果
- String preTag = "";//google的色值
- String postTag = "";
- HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag);
- Query searchQuery = new NativeSearchQueryBuilder()
- .withQuery(builder)
- .withHighlightFields(highlightFields)
- .withPageable(pageable)
- .withSort(sort)
- .build();
- return searchQuery;
- }
- }
4.測試驗證
啟動項目,在Postman中,請求
http://localhost:8080/book/query 接口查詢書籍信息數據。查看接口返回情況。

我們看到接口成功返回數據。說明數據查詢服務創建成功。
最后
以上,我們就把使用Spring Boot + Elasticsearch + Logstash 實現完整的數據查詢檢索服務介紹完了。
原文鏈接:https://mp.weixin.qq.com/s/Y5Wq0Q8CAHgc_6aYHr7CTA