前言
由于我們的新項目使用的是spring-boot,而又要同步新項目中建的數據到老的系統當中.原來已經有一部分的同步代碼,使用的是kafka. 其實只是做數據的同步,我覺得選MQ沒必要使用kafka.首先數據量不大,其實搞kafka又要搞集群,ZK.只是用做一些簡單數據同步的話,有點大材小用.
沒辦法,咱只是個打工的,領導讓搞就搞吧.剛開始的時候發現有一個spring-integration-kafka,描述中說是基于spring-kafka做了一次重寫.但是我看了官方文檔.實在是搞的有點頭大.功能一直沒實現.文檔寫的也不是很漂亮,也可能是剛起步,有很多的問題.我這里只能放棄了,使用了spring-kafka.
實現方法
pom.xml文件如下
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns:xsi = " http://www.w3.org/2001/XMLSchema-instance " xsi:schemaLocation = " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " > < modelVersion >4.0.0</ modelVersion > < groupId >org.linuxsogood.sync</ groupId > < artifactId >linuxsogood-sync</ artifactId > < version >1.0.0-SNAPSHOT</ version > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >1.4.0.RELEASE</ version > </ parent > < properties > < java.version >1.8</ java.version > <!-- 依賴版本 --> < mybatis.version >3.3.1</ mybatis.version > < mybatis.spring.version >1.2.4</ mybatis.spring.version > < mapper.version >3.3.6</ mapper.version > < pagehelper.version >4.1.1</ pagehelper.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-jdbc</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-aop</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-freemarker</ artifactId > </ dependency > <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.0.1.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.1.RELEASE</version> <scope>compile</scope> </dependency>--> < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >1.1.0.RELEASE</ version > </ dependency > <!--<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency>--> < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.12</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.assertj</ groupId > < artifactId >assertj-core</ artifactId > < version >3.5.2</ version > </ dependency > < dependency > < groupId >org.hamcrest</ groupId > < artifactId >hamcrest-all</ artifactId > < version >1.3</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.mockito</ groupId > < artifactId >mockito-all</ artifactId > < version >1.9.5</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-test</ artifactId > < version >4.2.3.RELEASE</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency > < dependency > < groupId >mysql</ groupId > < artifactId >mysql-connector-java</ artifactId > </ dependency > < dependency > < groupId >com.microsoft.sqlserver</ groupId > < artifactId >sqljdbc4</ artifactId > < version >4.0.0</ version > </ dependency > < dependency > < groupId >com.alibaba</ groupId > < artifactId >druid</ artifactId > < version >1.0.11</ version > </ dependency > <!--Mybatis--> < dependency > < groupId >org.mybatis</ groupId > < artifactId >mybatis</ artifactId > < version >${mybatis.version}</ version > </ dependency > < dependency > < groupId >org.mybatis</ groupId > < artifactId >mybatis-spring</ artifactId > < version >${mybatis.spring.version}</ version > </ dependency > <!--<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency>--> <!-- Mybatis Generator --> < dependency > < groupId >org.mybatis.generator</ groupId > < artifactId >mybatis-generator-core</ artifactId > < version >1.3.2</ version > < scope >compile</ scope > < optional >true</ optional > </ dependency > <!--分頁插件--> < dependency > < groupId >com.github.pagehelper</ groupId > < artifactId >pagehelper</ artifactId > < version >${pagehelper.version}</ version > </ dependency > <!--通用Mapper--> < dependency > < groupId >tk.mybatis</ groupId > < artifactId >mapper</ artifactId > < version >${mapper.version}</ version > </ dependency > < dependency > < groupId >com.alibaba</ groupId > < artifactId >fastjson</ artifactId > < version >1.2.17</ version > </ dependency > </ dependencies > < repositories > < repository > < id >repo.spring.io.milestone</ id > < name >Spring Framework Maven Milestone Repository</ name > < url > https://repo.spring.io/libs-milestone </ url > </ repository > </ repositories > < build > < finalName >mybatis_generator</ finalName > < plugins > < plugin > < groupId >org.mybatis.generator</ groupId > < artifactId >mybatis-generator-maven-plugin</ artifactId > < version >1.3.2</ version > < configuration > < verbose >true</ verbose > < overwrite >true</ overwrite > </ configuration > </ plugin > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > < configuration > < mainClass >org.linuxsogood.sync.Starter</ mainClass > </ configuration > </ plugin > </ plugins > </ build > </ project > |
orm層使用了MyBatis,又使用了通用Mapper和分頁插件.
kafka消費端配置
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import org.linuxsogood.sync.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value ( "${kafka.broker.address}" ) private String brokerAddress; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency( 3 ); factory.getContainerProperties().setPollTimeout( 3000 ); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this .brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100" ); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" ); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group" ); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); return propsMap; } @Bean public Listener listener() { return new Listener(); } } |
生產者的配置.
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value ( "${kafka.broker.address}" ) private String brokerAddress; @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this .brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0 ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 1 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432 ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } } |
監聽,監聽里面,寫的就是業務邏輯了,從kafka里面得到數據后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個監聽要監聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個監聽里的group寫的一樣,就會造成只有一個監聽能處理其中的消息,另外監聽就不能處理消息了.也即是kafka的分布式消息處理方式.
在同一個group里的監聽,共同處理接收到的消息,會根據一定的算法來處理.如果不在一個組,但是監聽的是同一個topic的話,就會形成廣播模式
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import com.alibaba.fastjson.JSON; import org.linuxsogood.qilian.enums.CupMessageType; import org.linuxsogood.qilian.kafka.MessageWrapper; import org.linuxsogood.qilian.model.store.Store; import org.linuxsogood.sync.mapper.StoreMapper; import org.linuxsogood.sync.model.StoreExample; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; import java.util.Optional; public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener. class ); @Autowired private StoreMapper storeMapper; /** * 監聽kafka消息,如果有消息則消費,同步數據到新烽火的庫 * @param record 消息實體bean */ @KafkaListener (topics = "linuxsogood-topic" , group = "sync-group" ) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); try { MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper. class ); CupMessageType type = messageWrapper.getType(); //判斷消息的數據類型,不同的數據入不同的表 if (CupMessageType.STORE == type) { proceedStore(messageWrapper); } } catch (Exception e) { LOGGER.error( "將接收到的消息保存到數據庫時異常, 消息:{}, 異常:{}" ,message.toString(),e); } } } /** * 消息是店鋪類型,店鋪消息處理入庫 * @param messageWrapper 從kafka中得到的消息 */ private void proceedStore(MessageWrapper messageWrapper) { Object data = messageWrapper.getData(); Store cupStore = JSON.parseObject(data.toString(), Store. class ); StoreExample storeExample = new StoreExample(); String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore); //如果查詢不到記錄則新增 if (stores.size() == 0 ) { storeMapper.insert(store); } else { store.setStoreId(stores.get( 0 ).getStoreId()); storeMapper.updateByPrimaryKey(store); } } } |
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:http://linuxsogood.org/1572.html