1、原始需求
既要同步原始全量數(shù)據(jù),也要實時同步mysql特定庫的特定表增量數(shù)據(jù),同時對應的修改、刪除也要對應。
數(shù)據(jù)同步不能有侵入性:不能更改業(yè)務程序,并且不能對業(yè)務側有太大性能壓力。
應用場景:數(shù)據(jù)etl同步、降低業(yè)務服務器壓力。
2、解決方案
3、canal介紹、安裝
canal是阿里巴巴旗下的一款開源項目,純java開發(fā)。基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費,目前主要支持了mysql(也支持mariadb)。
工作原理:mysql主備復制實現(xiàn)
從上層來看,復制分成三步:
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。
canal的工作原理
原理相對比較簡單:
- canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
架構
說明:
- server代表一個canal運行實例,對應于一個jvm
- instance對應于一個數(shù)據(jù)隊列 (1個server對應1..n個instance)
instance模塊:
- eventparser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
- eventsink (parser和store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
- eventstore (數(shù)據(jù)存儲)
- metamanager (增量訂閱&消費信息管理器)
安裝
1、mysql、kafka環(huán)境準備
2、canal下載:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz
4、對目錄conf里文件參數(shù)配置
對canal.properties配置:
進入conf/example里,對instance.properties配置:
5、啟動:bin/startup.sh
6、日志查看:
4、驗證
1、開發(fā)對應的kafka消費者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
package org.kafka; import java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer; /** * * title: kafkaconsumertest * description: * kafka消費者 demo * version:1.0.0 * @author pancm * @ date 2018年1月26日 */ public class kafkaconsumertest implements runnable { private final kafkaconsumer<string, string> consumer; private consumerrecords<string, string> msglist; private final string topic; private static final string groupid = "groupa" ; public kafkaconsumertest(string topicname) { properties props = new properties(); props.put( "bootstrap.servers" , "192.168.7.193:9092" ); props.put( "group.id" , groupid); props.put( "enable.auto.commit" , "true" ); props.put( "auto.commit.interval.ms" , "1000" ); props.put( "session.timeout.ms" , "30000" ); props.put( "auto.offset.reset" , "latest" ); props.put( "key.deserializer" , stringdeserializer.class.getname()); props.put( "value.deserializer" , stringdeserializer.class.getname()); this.consumer = new kafkaconsumer<string, string>(props); this.topic = topicname; this.consumer.subscribe(arrays.aslist(topic)); } @override public void run() { int messageno = 1; system. out .println( "---------開始消費---------" ); try { for (; ; ) { msglist = consumer.poll(1000); if ( null != msglist && msglist. count () > 0) { for (consumerrecord<string, string> record : msglist) { //消費100條就打印 ,但打印的數(shù)據(jù)不一定是這個規(guī)律的 system. out .println(messageno + "=======receive: key = " + record. key () + ", value = " + record.value() + " offset===" + record.offset()); // string v = decodeunicode(record.value()); // system. out .println(v); //當消費了1000條就退出 if (messageno % 1000 == 0) { break; } messageno++; } } else { thread.sleep(11); } } } catch (interruptedexception e) { e.printstacktrace(); } finally { consumer. close (); } } public static void main(string args[]) { kafkaconsumertest test1 = new kafkaconsumertest( "sample-data" ); thread thread1 = new thread(test1); thread1.start(); } /* * 中文轉unicode編碼 */ public static string gbencoding(final string gbstring) { char [] utfbytes = gbstring.tochararray(); string unicodebytes = "" ; for ( int i = 0; i < utfbytes.length; i++) { string hexb = integer .tohexstring(utfbytes[i]); if (hexb.length() <= 2) { hexb = "00" + hexb; } unicodebytes = unicodebytes + "\\u" + hexb; } return unicodebytes; } /* * unicode編碼轉中文 */ public static string decodeunicode(final string datastr) { int start = 0; int end = 0; final stringbuffer buffer = new stringbuffer(); while (start > -1) { end = datastr.indexof( "\\u" , start + 2); string charstr = "" ; if ( end == -1) { charstr = datastr. substring (start + 2, datastr.length()); } else { charstr = datastr. substring (start + 2, end ); } char letter = ( char ) integer .parseint(charstr, 16); // 16進制parse整形字符串。 buffer.append(new character (letter).tostring()); start = end ; } return buffer.tostring(); } } |
2、對表bak1進行增加數(shù)據(jù)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
create table `bak1` ( `vin` varchar (20) not null , `p1` double default null , `p2` double default null , `p3` double default null , `p4` double default null , `p5` double default null , `p6` double default null , `p7` double default null , `p8` double default null , `p9` double default null , `p0` double default null ) engine=innodb default charset=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv' , `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10 |
3、查看輸出結果:
到此這篇關于mysql特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案的文章就介紹到這了,更多相關mysql特定表數(shù)據(jù)同步內(nèi)容請搜索服務器之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.cnblogs.com/lilei2blog/p/15608206.html