前言
changestream是monggodb的3.6版本之后出現(xiàn)的一種基于collection(數(shù)據(jù)庫集合)的變更事件流,應(yīng)用程序通過db.collection.watch()這樣的命令可以獲得被監(jiān)聽對象的實時變更
想必對mysql主從復(fù)制原理比較熟悉的同學應(yīng)該知道,其根本就是從節(jié)點通過監(jiān)聽binlog日志,然后解析binlog日志數(shù)據(jù)達到數(shù)據(jù)同步的目的,于是,基于mysql主從復(fù)制原理,阿里開源了canal這樣的數(shù)據(jù)同步中間件工具
Change Stream 介紹
Chang Stream(變更記錄流) 是指collection(數(shù)據(jù)庫集合)的變更事件流,應(yīng)用程序通過db.collection.watch()這樣的命令可以獲得被監(jiān)聽對象的實時變更。
關(guān)于changestream做如下說明,提供參考
- 在該特性出現(xiàn)之前,開發(fā)者可通過拉取 oplog達到同樣的目的;
- 但 oplog 的處理及解析相對復(fù)雜,而且存在被回滾的風險,如果使用不當?shù)脑掃€會帶來性能問題;
- Change Stream 可以與aggregate framework結(jié)合使用,對變更集進行進一步的過濾或轉(zhuǎn)換;
- 由于Change Stream 利用了存儲在 oplog 中的信息,因此對于單進程部署的MongoDB無法支持Change Stream功能,其只能用于啟用了副本集的獨立集群或分片集群
changestream可用于監(jiān)聽的mongodb目標類型
- 單個集合,除系統(tǒng)庫(admin/local/config)之外的集合,3.6版本支持
- 單個數(shù)據(jù)庫,除系統(tǒng)庫(admin/local/config)之外的數(shù)據(jù)庫集合,4.0版本支持
- 整個集群,整個集群內(nèi)除去系統(tǒng)庫( (admin/local/config)之外的集合 ,4.0版本支持
一個Change Stream Event的基本結(jié)構(gòu)如下所示:
{ _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <ObjectId> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ] } "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } }
關(guān)于上面的數(shù)據(jù)結(jié)構(gòu),做簡單的解釋說明,
- _id,變更事件的Token對象
- operationType,變更類型(見下面介紹)
- fullDocument,文檔內(nèi)容
- ns,監(jiān)聽的目標
- ns.db,變更的數(shù)據(jù)庫
- ns.coll,變更的集合
- documentKey,變更文檔的鍵值,含_id字段
- updateDescription,變更描述
- updateDescription.updatedFields,變更中更新字段
- updateDescription.removedFields,變更中刪除字段
- clusterTime,對應(yīng)oplog的時間戳
- txnNumber,事務(wù)編號,僅在多文檔事務(wù)中出現(xiàn),4.0版本支持
- lsid,事務(wù)關(guān)聯(lián)的會話編號,僅在多文檔事務(wù)中出現(xiàn),4.0版本支持
Change Steram支持的變更類型,對于上面的operationType 這個參數(shù),主要包括有以下幾個:
- insert,插入文檔
- delete,刪除文檔
- replace,替換文檔,當執(zhí)行replace操作指定upsert時,可能是insert事件
- update,更新文檔,當執(zhí)行update操作指定upsert時,可能是insert事件
- invalidate,失效事件,比如執(zhí)行了collection.drop或collection.rename
以上的幾種類型,可以簡單理解為,監(jiān)聽的mongo用戶操作的事件類型,比如新增數(shù)據(jù),刪除數(shù)據(jù),修改數(shù)據(jù)等
以上為changestream的必備理論知識,想要深入學習的話無比要了解,下面通過實操來展示下changestream的使用
環(huán)境準備
mongdb復(fù)制集群,本例的復(fù)制集群對應(yīng)的mongodb版本為 4.0.X
登錄primary節(jié)點,創(chuàng)建一個數(shù)據(jù)庫
友情提醒:數(shù)據(jù)庫需要提前創(chuàng)建
1、啟動兩個Mongo shell,一個操作數(shù)據(jù)庫,一個watch
在其中一個窗口執(zhí)行如下命令,開啟監(jiān)聽
cursor = db.comment.watch()
2、在另一個窗口下,給上面的articledb插入一條數(shù)據(jù)
數(shù)據(jù)寫入成功后,在第一個窗口下,執(zhí)行下面的命令:
cursor.next()
說明已經(jīng)成功監(jiān)聽到新增的數(shù)據(jù),修改、刪除事件可以做類似的操作即可
以上先通過shell窗口展示了一下changestream的使用效果,接下來,將通過程序演示下如何在客戶端集成并使用changestream
Java客戶端操作changestream
1、引入maven依賴
<dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.12.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
2、測試類核心代碼
import com.mongodb.*; import com.mongodb.client.MongoDatabase; import org.bson.conversions.Bson; import java.util.List; import static java.util.Collections.singletonList; import com.alibaba.fastjson.JSONObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.util.Arrays.asList; public class MongoTest { private static Logger logger = LoggerFactory.getLogger(MongoTest.class); public static void main(String[] args) { showmogodbdata(); } private static void showmogodbdata() { String sURI = "mongodb://IP:27017"; MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI)); MongoDatabase database = mongoClient.getDatabase("articledb"); MongoCollection<Document> collec = database.getCollection("comment"); List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.articleid': '100007'}"), Filters.in("operationType", asList("insert", "update", "delete"))))); MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator(); while (cursor.hasNext()) { ChangeStreamDocument<Document> next = cursor.next(); logger.info("輸出mogodb的next的對應(yīng)的值" + next.toString()); String Operation = next.getOperationType().getValue(); String tableNames = next.getNamespace().getCollectionName(); System.out.println(tableNames); //獲取主鍵id的值 String pk_id = next.getDocumentKey().toString(); //同步修改數(shù)據(jù)的操作 if (next.getUpdateDescription() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson()); System.out.println(jsonObject); } //同步插入數(shù)據(jù)的操作 if (next.getFullDocument() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson()); //同步刪除數(shù)據(jù)的操作 if (next.getUpdateDescription() == null && Operation.matches("delete")) { JSONObject jsonObject = JSONObject.parseObject(pk_id); } }
這段程序主要分為幾個核心部分,做如下解釋說明,
- 連接mogodb服務(wù)端及相關(guān)配置
- 通過pipline開啟watch監(jiān)聽
- 監(jiān)聽到特定數(shù)據(jù)庫下集合的數(shù)據(jù)變化,然后打印出變化的數(shù)據(jù)
啟動這段程序,觀察控制臺日志數(shù)據(jù)
在未對articledb數(shù)據(jù)庫下的comment集合做任何操作之前,由于watch為檢測到任何數(shù)據(jù)變化,所以無法進入到while循環(huán)中,接下來,從shell端給comment集合新增一條數(shù)據(jù),然后再次觀察控制臺數(shù)據(jù)變化
可以看到,控制臺很快就檢測到變化的數(shù)據(jù)
以下為完整的日志數(shù)據(jù)
{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
至于在業(yè)務(wù)中的具體使用,可以結(jié)合自身的情況,舉例來說,應(yīng)用程序只想監(jiān)聽修改數(shù)據(jù)的事件,那么就可以在修改數(shù)據(jù)事件的監(jiān)聽邏輯中,解析變化后的數(shù)據(jù)做后續(xù)的操作
springboot整合changestream
在實際開發(fā)中,更通用的場景是整合到springboot工程中使用,有過一定的開發(fā)經(jīng)驗的同學應(yīng)該很容易想到核心的邏輯長什么樣了,和canal的客戶端操作類似,需要在一個配置類去監(jiān)聽即可
下面來看看具體的整合步驟
1、引入核心依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <artifactId>spring-boot-starter</artifactId>
2、核心配置文件
本例演示的是基于上文搭建的mongodb復(fù)制集群
server.port=8081 #mongodb配置 spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512
3、編寫實體類,映射comment集合中的字段
import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @Document(collection="comment") public class Comment { @Id private String articleid; private String content; private String userid; private String nickname; private Date createdatetime; public String getArticleid() { return articleid; } public void setArticleid(String articleid) { this.articleid = articleid; public String getContent() { return content; public void setContent(String content) { this.content = content; public String getUserid() { return userid; public void setUserid(String userid) { this.userid = userid; public String getNickname() { return nickname; public void setNickname(String nickname) { this.nickname = nickname; public Date getCreatedatetime() { return createdatetime; public void setCreatedatetime(Date createdatetime) { this.createdatetime = createdatetime; }
4、編寫一個服務(wù)類
簡單的添加2個用接口測試的方法
import com.congge.entity.Comment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import java.util.List; @Service public class MongoDbService { private static final Logger logger = LoggerFactory.getLogger(MongoDbService.class); @Autowired private MongoTemplate mongoTemplate; /** * 查詢所有 * @return */ public List<Comment> findAll() { return mongoTemplate.findAll(Comment.class); } /*** * 根據(jù)id查詢 * @param id public Comment getBookById(String id) { Query query = new Query(Criteria.where("articleid").is(id)); return mongoTemplate.findOne(query, Comment.class); }
5、編寫一個接口
@RestController public class CommentController { @Autowired private MongoDbService mongoDbService; @GetMapping("/listAll") public Object listAll(){ return mongoDbService.findAll(); } @GetMapping("/findById") public Object findById(String id){ return mongoDbService.getBookById(id); }
啟動本工程,然后瀏覽器調(diào)用下查詢所有數(shù)據(jù)的接口,數(shù)據(jù)能正常返回,說明工程的基礎(chǔ)結(jié)構(gòu)就完成了
6、接下來,只需要依次添加下面3個配置類即可
MongoMessageListener 類 ,顧名思義,該類用于監(jiān)聽特定數(shù)據(jù)庫下的集合數(shù)據(jù)變化使用的,在實際開發(fā)中,該類的作用也是非常重要的,類似于許多中間件的客戶端監(jiān)聽程序,當監(jiān)聽到數(shù)據(jù)變化后,做出后續(xù)的業(yè)務(wù)響應(yīng),比如,數(shù)據(jù)入庫、推送消息到kafka、發(fā)送相關(guān)的事件等等
import com.congge.entity.Comment; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.OperationType; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.stereotype.Component; @Component public class MongoMessageListener implements MessageListener<ChangeStreamDocument<Document>,Comment> { private static Logger logger = LoggerFactory.getLogger(MongoMessageListener.class); @Override public void onMessage(Message<ChangeStreamDocument<Document>, Comment> message) { OperationType operationType = message.getRaw().getOperationType(); System.out.println("操作類型為 :" + operationType); System.out.println("變更數(shù)據(jù)主體 :" + message.getBody().getArticleid()); System.out.println("變更數(shù)據(jù)主體 :" + message.getBody().getContent()); System.out.println("變更數(shù)據(jù)主體 :" + message.getBody().getNickname()); System.out.println("變更數(shù)據(jù)主體 :" + message.getBody().getUserid()); System.out.println(); /*logger.info("Received Message in collection: {},message raw: {}, message body:{}", message.getProperties().getCollectionName(), message.getRaw(), message.getBody());*/ } }
ChangeStream 類 ,事件注冊類,即開篇中提到的那幾種事件類型的操作等
import com.congge.entity.Comment; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.data.mongodb.core.query.Criteria; @Configuration public class ChangeStream implements CommandLineRunner { @Autowired private MongoMessageListener mongoMessageListener; private MessageListenerContainer messageListenerContainer; @Override public void run(String... args) throws Exception{ ChangeStreamRequest<Comment> request = ChangeStreamRequest.builder(mongoMessageListener) .collection("comment") .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace")))) .build(); messageListenerContainer.register(request,Comment.class); } }
MongoConfig 配置MessageListenerContainer 容器的相關(guān)參數(shù)
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Configuration public class MongoConfig { @Bean MessageListenerContainer messageListenerContainer(MongoTemplate mongoTemplate){ Executor executor = Executors.newFixedThreadPool(5); return new DefaultMessageListenerContainer(mongoTemplate,executor){ @Override public boolean isAutoStartup(){ return true; } }; } }
3個類添加完成后,再次啟動程序,并觀察控制臺數(shù)據(jù)日志
測試1:通過shell窗口登錄primary節(jié)點,并給comment集合添加一條數(shù)據(jù)
幾乎是實時的監(jiān)聽到事件操作的數(shù)據(jù)變化,下面是完整的輸出日志
測試2:通過shell窗口刪除上面新增的這條數(shù)據(jù)
典型應(yīng)用場景
數(shù)據(jù)遷移
如果一個系統(tǒng)的數(shù)據(jù)需要遷移到另一個系統(tǒng),可以考慮使用mongodb changestream這種方式,試想,如果老系統(tǒng)數(shù)據(jù)非常雜亂,并且文檔中存在一些臟數(shù)據(jù)時,為了確保遷移后的數(shù)據(jù)能較快的投產(chǎn),通過應(yīng)用程序的方式,能夠原始的數(shù)據(jù)做類似ETL的處理,這樣更加方便
應(yīng)用監(jiān)控
如果您的系統(tǒng)對數(shù)據(jù)監(jiān)管較為嚴格,可以考慮使用changestream這種方式,訂閱特定事件的數(shù)據(jù)操作,比如修改和刪除數(shù)據(jù)的事件,然后及時的發(fā)送告警通知
對接大數(shù)據(jù)應(yīng)用
我們知道,mongodb作為一款性能優(yōu)秀的分布式文檔型數(shù)據(jù)庫,其實是可以存儲海量數(shù)據(jù)的,在一些大數(shù)據(jù)場景下,比如下游其他的應(yīng)用采用大數(shù)據(jù)技術(shù),需要對mongo中的數(shù)據(jù)做軌跡行為分析,changestream就是一種不錯的選擇,當監(jiān)聽到特定事件的數(shù)據(jù)變化時,向消息隊列,比如kafka推送相應(yīng)的消息,下游相關(guān)的大數(shù)據(jù)應(yīng)用就可以做后續(xù)的業(yè)務(wù)處理了