一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - Sql Server - 解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

2022-01-10 17:15山治先生 Sql Server

這篇文章主要介紹了SQL Server CDC配合Kafka Connect監聽數據變化,除了數據庫開啟CDC支持以外,主要還是要將變更的數據通過Kafka Connect傳輸數據,Debezium是目前官方推薦的連接器,本文給大家分享實現步驟,感興趣的朋友跟隨小編一起看

寫在前面

  好久沒更新blog了,從crud boy轉型大數據開發,拉寬了不少的知識面,從今年年初開始籌備、組建、招兵買馬,到現在穩定開搞中,期間踏過無數的火坑,也許除了這篇還很寫上三四篇。

  進入主題,通常企業為了實現數據統計、數據分析、數據挖掘、解決信息孤島等全局數據的系統化運作管理 ,為bi、經營分析、決策支持系統等深度開發應用奠定基礎,挖掘數據價值 ,企業會開始著手建立數據倉庫,數據中臺。而這些數據來源則來自于企業的各個業務系統的數據或爬取外部的數據,從業務系統數據到數據倉庫的過程就是一個etl(extract-transform-load)行為,包括了采集、清洗、數據轉換等主要過程,通常異構數據抽取轉換使用sqoop、datax等,日志采集flume、logstash、filebeat等。

  數據抽取分為全量抽取和增量抽取,全量抽取類似于數據遷移或數據復制,全量抽取很好理解;增量抽取在全量的基礎上做增量,只監聽、捕捉動態變化的數據。如何捕捉數據的變化是增量抽取的關鍵,一是準確性,必須保證準確的捕捉到數據的動態變化,二是性能,不能對業務系統造成太大的壓力。

增量抽取方式

  通常增量抽取有幾種方式,各有優缺點。

1. 觸發器

  在源數據庫上的目標表創建觸發器,監聽增、刪、改操作,捕捉到數據的變更寫入臨時表。

優點:操作簡單、規則清晰,對源表不影響;

缺點:對源數據庫有侵入,對業務系統有一定的影響;

2. 全表比對

  在etl過程中,抽取方建立臨時表待全量抽取存儲,然后在進行比對數據。

優點:對源數據庫、源表都無需改動,完全交付etl過程處理,統一管理;

缺點:etl效率低、設計復雜,數據量越大,速度越慢,時效性不確定;

3. 全表刪除后再插入

  在抽取數據之前,先將表中數據清空,然后全量抽取。

優點:etl 操作簡單,速度快。

缺點:全量抽取一般采取t+1的形式,抽取數據量大的表容易對數據庫造成壓力;

4. 時間戳

  時間戳的方式即在源表上增加時間戳列,對發生變更的表進行更新,然后根據時間戳進行提取。

優點:操作簡單,elt邏輯清晰,性能比較好;

缺點:對業務系統有侵入,數據庫表也需要額外增加字段。對于老的業務系統可能不容易做變更。

5. cdc方式

  變更數據捕獲change data capture(簡稱cdc),sqlserver為實時更新數據同步提供了cdc機制,類似于mysql的binlog,將數據更新操作維護到一張cdc表中。開啟cdc的源表在插入insert、更新update和刪除delete活動時會插入數據到日志表中。cdc通過捕獲進程將變更數據捕獲到變更表中,通過cdc提供的查詢函數,可以捕獲這部分數據。詳情可以查看官方介紹:關于變更數據捕獲 (sql server)

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

優點:提供易于使用的api 來設置cdc 環境,縮短etl 的時間,無需修改業務系統表結構。

缺點:受數據庫版本的限制,實現過程相對復雜。

cdc增量抽取

先決條件

1. 已搭建好kafka集群,zookeeper集群;

2. 源數據庫支持cdc,版本采用開發版或企業版。

案例環境:

ubuntu 20.04

kafka2.13-2.7.0

zookeeper 3.6.2

sql server 2012

步驟

  除了數據庫開啟cdc支持以外,主要還是要將變更的數據通過kafka connect傳輸數據,debezium是目前官方推薦的連接器,它支持絕大多數主流數據庫:mysql、postgresql、sql server、oracle等等,詳情查看connectors

1. 數據庫步驟

開啟數據庫cdc支持

  在源數據庫執行以下命令:

exec sys.sp_cdc_enable_db go

  附上關閉語句:

exec sys.sp_cdc_disable_db

查詢是否啟用

?
1
select * from sys.databases where is_cdc_enabled = 1

創建測試數據表:(已有表則跳過此步驟)

?
1
2
3
4
5
6
7
8
create  table t_liocdc
(
    id int identity(1,1) primary key ,
    name nvarchar(16),
    sex bit,
    createtime datetime,
    updatetime datetime
);

對源表開啟cdc支持:

?
1
2
3
4
5
exec sp_cdc_enable_table
@source_schema='dbo',
@source_name='t_liocdc',
@role_name=null,
@supports_net_changes = 1;

確認是否有權限訪問cdc table:

exec sys.sp_cdc_help_change_data_capture

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

確認sql server agent已開啟:

exec master.dbo.xp_servicecontrol n'querystate',n'sqlserveragent'

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

  以上則完成對數據庫的cdc操作。

2. kafka步驟

  kafka connect的工作模式分為兩種,分別是standalone模式和distributed模式。standalone用于單機測試,本文用distributed模式,用于生產環境。(kafka必須先運行啟動,再進行以下步驟進行配置。)

下載sql server connector

  下載連接器后,創建一個文件夾來存放,解壓到該目錄下即可,例子路徑:/usr/soft/kafka/kafka_2.13_2.7.0/plugins(記住這個路徑,配置中要用到)

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

下載地址:debezium-connector-sqlserver-1.5.0.final-plugin.tar.gz

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

編輯connect-distributed.properties配置

  修改kafka connect配置文件,$kafka_home/config/connect-distributed.properties,變更內容如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//kafka集群ip+portbootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092
 
key.converter.schemas.enable=false
value.converter.schemas.enable=false
 
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.partitions=3
offset.storage.cleanup.policy=compact
 
config.storage.topic=connect-configs
config.storage.replication.factor=1
 
status.storage.topic=connect-status
status.storage.replication.factor=1
status.storage.partitions=3
//剛剛下載連接器解壓的路徑
plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins

看到配置中有三個topic,分別是

config.storage.topic:用以保存connector和task的配置信息,需要注意的是這個主題的分區數只能是1,而且是有多副本的。

offset.storage.topic:用以保存offset信息。

status.storage.topic:用以保存connetor的狀態信息。

這些topic可以不用創建,啟動后會默認創建。

啟動kafka集群

  保存配置之后,將connect-distributed.properties分發到集群中,然后啟動:

bin/connect-distributed.sh config/connect-distributed.properties

檢查是否啟動

  connector支持rest api的方式進行管理,所以用post man或者fiddler可以調用相關接口進行管理。檢查是否啟動:

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

不用奇怪,上面配置集群的ip是172段,這里的192.168.1.177仍是我的集群中的一個服務器,因為服務器都使用了雙網卡。因為還沒有連接器相關配置,所以接口返回是一個空數組,接下來將新增一個連接器。

編寫sqlserver-cdc-source.json

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "name": "sqlserver-cdc-source",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.sqlserverconnector",
        "database.server.name" : "jnserver",
        "database.hostname" : "172.192.20.2", --目標數據庫的ip
        "database.port" : "1433"--目標數據庫的端口
        "database.user" : "sa",   --目標數據庫的賬號
        "database.password" : "123456"--密碼
        "database.dbname" : "dis"--目標數據庫的數據庫名稱
        "table.whitelist": "dbo.t_liocdc", --監聽表名
         "schemas.enable" : "false"
         "mode":"incrementing"--增量模式
         "incrementing.column.name": "id", --增量列名
        "database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群
        "database.history.kafka.topic": "topictliocdc"--kafka topic內部使用,不是由消費者使用
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.jsonconverter"
    }
}
//源文地址: https://www.cnblogs.com/eminemjk/p/14688907.html

還有其他額外的配置,可以參考官方文檔。然后執行

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

繼續執行檢查,就發現連接器已經成功配置了:

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

其他api

?
1
2
3
4
5
6
7
8
9
10
11
12
13
get /connectors – 返回所有正在運行的connector名。
post /connectors – 新建一個connector; 請求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
get /connectors/{name} – 獲取指定connetor的信息。
get /connectors/{name}/config – 獲取指定connector的配置信息。
put /connectors/{name}/config – 更新指定connector的配置信息。
get /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在運行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體信息。
get /connectors/{name}/tasks – 獲取指定connector正在運行的task。
get /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態信息。
put /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。
put /connectors/{name}/resume – 恢復一個被暫停的connector。
post /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用
post /connectors/{name}/tasks/{taskid}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。
delete /connectors/{name} – 刪除一個connector,停止它的所有task并刪除配置。//源文地址: https://www.cnblogs.com/eminemjk/p/14688907.html

查看topic

/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

topicjnserver.dbo.t_liocdc則是供我們消費的主題,啟動一個消費者進行監聽測試:

bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092  --consumer-property group.id=group1 --consumer-property client.id=consumer-1  --topic jnserver.dbo.t_liocdc

然后再源表進行一些列增刪改操作,

?
1
2
3
4
5
6
7
8
9
10
11
12
--測試代碼
insert into t_liocdc(name, sex, createtime,updatetime)  values ('a',1,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('b',0,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('c',1,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('d',0,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('e',1,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('f',1,getdate(),getdate())
insert into t_liocdc(name, sex, createtime,updatetime)  values ('g',0,getdate(),getdate())
 
update t_liocdc
set name='lio.huang',updatetime=getdate()
where id=7

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

已經成功捕捉到數據的變更,對比幾個操作json,依次是insert、update、delete:

解析SQL Server CDC配合Kafka Connect監聽數據變化的問題解析SQL Server CDC配合Kafka Connect監聽數據變化的問題解析SQL Server CDC配合Kafka Connect監聽數據變化的問題

到此這篇關于sql server cdc配合kafka connect監聽數據變化的文章就介紹到這了,更多相關sql server cdc監聽數據變化內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/EminemJK/p/14688907.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 王淑兰与铁柱全文免费阅读 | 国产欧美二区三区 | 好大好硬好深好爽gif图 | 粉嫩国产14xxxxx0000 | 国产色视频一区二区三区 | 毛片视频网站 | 日韩欧美一级大片 | 欧美性色老妇人 | 欧美在线视频免费播放 | 奶大逼紧| 北条麻妃黑人正在播放 | 亚洲va欧美va天堂v国产综合 | 成年美女黄网色大观看全 | 久久久久免费视频 | 精品高潮呻吟99AV无码 | 色老板在线免费观看 | 五月婷婷丁香在线视频 | 欧美a一片xxxx片与善交 | 秋霞午夜视频在线观看 | 国产成人刺激视频在线观看 | 色播艾小青国产专区在线播放 | 日韩在线成人 | 成年人网站免费在线观看 | 网站色小妹 | 乖女的嫩奶水h文孕妇 | 国产裸露片段精华合集链接 | 97影视| 精品日本一区二区 | 69福利区| 四虎永久在线精品国产 | 色人阁小说 | 欧美激情精品久久久久久不卡 | 桃色视频破解版 | 日韩欧美推理片免费看完整版 | 国产精品永久免费自在线观看 | 四缺一的小说 | 操一操影院 | 欧美高清在线精品一区二区不卡 | 欧美日韩精品一区二区三区高清视频 | 欧美交换乱理伦片120秒 | 日本一区二区不卡久久入口 |