前言
由于業務需求需要把strom與kafka整合到spring boot項目里,實現其他服務輸出日志至kafka訂閱話題,storm實時處理該話題完成數據監控及其他數據統計,但是網上教程較少,今天想寫的就是如何整合storm+kafka 到spring boot,順帶說一說我遇到的坑。
使用工具及環境配置
? 1. java 版本jdk-1.8
? 2. 編譯工具使用idea-2017
? 3. maven作為項目管理
? 4.spring boot-1.5.8.release
需求體現
1.為什么需要整合到spring boot
為了使用spring boot 統一管理各種微服務,及同時避免多個分散配置
2.具體思路及整合原因
? 使用spring boot統一管理kafka、storm、redis等所需要的bean,通過其他服務日志收集至kafka,kafka實時發送日志至storm,在strom bolt時進行相應的處理操作
遇到的問題
? 1.使用spring boot并沒有相關整合storm
? 2.以spring boot啟動方式不知道如何觸發提交topolgy
? 3.提交topology時遇到numbis not client localhost 問題
? 4.storm bolt中無法通過注解獲得實例化bean進行相應的操作
解決思路
在整合之前我們需要知道相應的spring boot 的啟動方式及配置(如果你在閱讀本文時,默認你已經對storm,kafka及spring boot有相關了解及使用)
spring boot 對storm進行整合的例子在網上很少,但是因為有相應的需求,因此我們還是需要整合.
首先導入所需要jar包:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
|
<dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version> 0.10 . 1.1 </version> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-stream-kafka</artifactid> <exclusions> <exclusion> <artifactid>zookeeper</artifactid> <groupid>org.apache.zookeeper</groupid> </exclusion> <exclusion> <artifactid>spring-boot-actuator</artifactid> <groupid>org.springframework.boot</groupid> </exclusion> <exclusion> <artifactid>kafka-clients</artifactid> <groupid>org.apache.kafka</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> <exclusions> <exclusion> <artifactid>kafka-clients</artifactid> <groupid>org.apache.kafka</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.springframework.data</groupid> <artifactid>spring-data-hadoop</artifactid> <version> 2.5 . 0 .release</version> <exclusions> <exclusion> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> </exclusion> <exclusion> <artifactid>commons-logging</artifactid> <groupid>commons-logging</groupid> </exclusion> <exclusion> <artifactid>netty</artifactid> <groupid>io.netty</groupid> </exclusion> <exclusion> <artifactid>jackson-core-asl</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>curator-client</artifactid> <groupid>org.apache.curator</groupid> </exclusion> <exclusion> <artifactid>jettison</artifactid> <groupid>org.codehaus.jettison</groupid> </exclusion> <exclusion> <artifactid>jackson-mapper-asl</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>jackson-jaxrs</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>snappy-java</artifactid> <groupid>org.xerial.snappy</groupid> </exclusion> <exclusion> <artifactid>jackson-xc</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>guava</artifactid> <groupid>com.google.guava</groupid> </exclusion> <exclusion> <artifactid>hadoop-mapreduce-client-core</artifactid> <groupid>org.apache.hadoop</groupid> </exclusion> <exclusion> <artifactid>zookeeper</artifactid> <groupid>org.apache.zookeeper</groupid> </exclusion> <exclusion> <artifactid>servlet-api</artifactid> <groupid>javax.servlet</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.zookeeper</groupid> <artifactid>zookeeper</artifactid> <version> 3.4 . 10 </version> <exclusions> <exclusion> <artifactid>slf4j-log4j12</artifactid> <groupid>org.slf4j</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-client</artifactid> <version> 1.2 . 4 </version> <exclusions> <exclusion> <artifactid>log4j</artifactid> <groupid>log4j</groupid> </exclusion> <exclusion> <artifactid>zookeeper</artifactid> <groupid>org.apache.zookeeper</groupid> </exclusion> <exclusion> <artifactid>netty</artifactid> <groupid>io.netty</groupid> </exclusion> <exclusion> <artifactid>hadoop-common</artifactid> <groupid>org.apache.hadoop</groupid> </exclusion> <exclusion> <artifactid>guava</artifactid> <groupid>com.google.guava</groupid> </exclusion> <exclusion> <artifactid>hadoop-annotations</artifactid> <groupid>org.apache.hadoop</groupid> </exclusion> <exclusion> <artifactid>hadoop-yarn-common</artifactid> <groupid>org.apache.hadoop</groupid> </exclusion> <exclusion> <artifactid>slf4j-log4j12</artifactid> <groupid>org.slf4j</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version> 2.7 . 3 </version> <exclusions> <exclusion> <artifactid>commons-logging</artifactid> <groupid>commons-logging</groupid> </exclusion> <exclusion> <artifactid>curator-client</artifactid> <groupid>org.apache.curator</groupid> </exclusion> <exclusion> <artifactid>jackson-mapper-asl</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>jackson-core-asl</artifactid> <groupid>org.codehaus.jackson</groupid> </exclusion> <exclusion> <artifactid>log4j</artifactid> <groupid>log4j</groupid> </exclusion> <exclusion> <artifactid>snappy-java</artifactid> <groupid>org.xerial.snappy</groupid> </exclusion> <exclusion> <artifactid>zookeeper</artifactid> <groupid>org.apache.zookeeper</groupid> </exclusion> <exclusion> <artifactid>guava</artifactid> <groupid>com.google.guava</groupid> </exclusion> <exclusion> <artifactid>hadoop-auth</artifactid> <groupid>org.apache.hadoop</groupid> </exclusion> <exclusion> <artifactid>commons-lang</artifactid> <groupid>commons-lang</groupid> </exclusion> <exclusion> <artifactid>slf4j-log4j12</artifactid> <groupid>org.slf4j</groupid> </exclusion> <exclusion> <artifactid>servlet-api</artifactid> <groupid>javax.servlet</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-examples</artifactid> <version> 2.7 . 3 </version> <exclusions> <exclusion> <artifactid>commons-logging</artifactid> <groupid>commons-logging</groupid> </exclusion> <exclusion> <artifactid>netty</artifactid> <groupid>io.netty</groupid> </exclusion> <exclusion> <artifactid>guava</artifactid> <groupid>com.google.guava</groupid> </exclusion> <exclusion> <artifactid>log4j</artifactid> <groupid>log4j</groupid> </exclusion> <exclusion> <artifactid>servlet-api</artifactid> <groupid>javax.servlet</groupid> </exclusion> </exclusions> </dependency> <!--storm--> <dependency> <groupid>org.apache.storm</groupid> <artifactid>storm-core</artifactid> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-slf4j-impl</artifactid> </exclusion> <exclusion> <artifactid>servlet-api</artifactid> <groupid>javax.servlet</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.storm</groupid> <artifactid>storm-kafka</artifactid> <version> 1.1 . 1 </version> <exclusions> <exclusion> <artifactid>kafka-clients</artifactid> <groupid>org.apache.kafka</groupid> </exclusion> </exclusions> </dependency> |
其中去除jar包是因為需要相與項目構建依賴有多重依賴問題,storm版本為1.1.0 spring boot相關依賴為
```java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
<!-- spring boot --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> <exclusions> <exclusion> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-logging</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-log4j2</artifactid> </dependency> <dependency> <groupid>org.mybatis.spring.boot</groupid> <artifactid>mybatis-spring-boot-starter</artifactid> <version>${mybatis-spring.version}</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-configuration-processor</artifactid> <optional> true </optional> </dependency> |
ps:maven的jar包僅因為項目使用需求,不是最精簡,僅供大家參考.
項目結構:
config-存儲不同環境配置文件
存儲構建spring boot 相關實現類 其他如構建名
啟動spring boot的時候我們會發現
其實開始整合前,對storm了解的較少,屬于剛開始沒有接觸過,后面參考發現整合到spring boot里面啟動spring boot之后并沒有相應的方式去觸發提交topolgy的函數,所以也造成了以為啟動spring boot之后就完事了結果等了半個小時什么事情都沒發生才發現沒有實現觸發提交函數.
為了解決這個問題我的想法是: 啟動spring boot->創建kafka監聽topic然后啟動topolgy完成啟動,可是這樣的問題kafka監聽這個主題會重復觸發topolgy,這明顯不是我們想要的.看了一會后發現spring 有相關啟動完成之后執行某個時間方法,這個對我來說簡直是救星啊.所以現在觸發topolgy的思路變為:
啟動spring boot ->執行觸發方法->完成相應的觸發條件
構建方法為:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
/** * @author leezer * @date 2017/12/28 * spring加載完后自動自動提交topology **/ @configuration @component public class autoload implements applicationlistener<contextrefreshedevent> { private static string brokerzkstr; private static string topic; private static string host; private static string port; public autoload( @value ( "${storm.brokerzkstr}" ) string brokerzkstr, @value ( "${zookeeper.host}" ) string host, @value ( "${zookeeper.port}" ) string port, @value ( "${kafka.default-topic}" ) string topic ){ brokerzkstr = brokerzkstr; host= host; topic= topic; port= port; } @override public void onapplicationevent(contextrefreshedevent event) { try { //實例化topologybuilder類。 topologybuilder topologybuilder = new topologybuilder(); //設置噴發節點并分配并發數,該并發數將會控制該對象在集群中的線程數。 brokerhosts brokerhosts = new zkhosts(brokerzkstr); // 配置kafka訂閱的topic,以及zookeeper中數據節點目錄和名字 spoutconfig spoutconfig = new spoutconfig(brokerhosts, topic, "/storm" , "s32" ); spoutconfig.scheme = new schemeasmultischeme( new stringscheme()); spoutconfig.zkservers = collections.singletonlist(host); spoutconfig.zkport = integer.parseint(port); //從kafka最新輸出日志讀取 spoutconfig.startoffsettime = offsetrequest.latesttime(); kafkaspout receiver = new kafkaspout(spoutconfig); topologybuilder.setspout( "kafka-spout" , receiver, 1 ).setnumtasks( 2 ); topologybuilder.setbolt( "alarm-bolt" , new alarmbolt(), 1 ).setnumtasks( 2 ).shufflegrouping( "kafka-spout" ); config config = new config(); config.setdebug( false ); /*設置該topology在storm集群中要搶占的資源slot數,一個slot對應這supervisor節點上的以個worker進程,如果你分配的spot數超過了你的物理節點所擁有的worker數目的話,有可能提交不成功,加入你的集群上面已經有了一些topology而現在還剩下2個worker資源,如果你在代碼里分配4個給你的topology的話,那么這個topology可以提交但是提交以后你會發現并沒有運行。 而當你kill掉一些topology后釋放了一些slot后你的這個topology就會恢復正常運行。 */ config.setnumworkers( 1 ); localcluster cluster = new localcluster(); cluster.submittopology( "kafka-spout" , config, topologybuilder.createtopology()); } catch (exception e) { e.printstacktrace(); } } } |
? 注:
啟動項目時因為使用的是內嵌tomcat進行啟動,可能會報如下錯誤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[tomcat-startstop- 1 ] error o.a.c.c.containerbase - a child container failed during start java.util.concurrent.executionexception: org.apache.catalina.lifecycleexception: failed to start component [standardengine[tomcat].standardhost[localhost].tomcatembeddedcontext[]] at java.util.concurrent.futuretask.report(futuretask.java: 122 ) ~[?: 1.8 .0_144] at java.util.concurrent.futuretask.get(futuretask.java: 192 ) ~[?: 1.8 .0_144] at org.apache.catalina.core.containerbase.startinternal(containerbase.java: 939 ) [tomcat-embed-core- 8.5 . 23 .jar: 8.5 . 23 ] at org.apache.catalina.core.standardhost.startinternal(standardhost.java: 872 ) [tomcat-embed-core- 8.5 . 23 .jar: 8.5 . 23 ] at org.apache.catalina.util.lifecyclebase.start(lifecyclebase.java: 150 ) [tomcat-embed-core- 8.5 . 23 .jar: 8.5 . 23 ] at org.apache.catalina.core.containerbase$startchild.call(containerbase.java: 1419 ) [tomcat-embed-core- 8.5 . 23 .jar: 8.5 . 23 ] at org.apache.catalina.core.containerbase$startchild.call(containerbase.java: 1409 ) [tomcat-embed-core- 8.5 . 23 .jar: 8.5 . 23 ] at java.util.concurrent.futuretask.run$$$capture(futuretask.java: 266 ) [?: 1.8 .0_144] at java.util.concurrent.futuretask.run(futuretask.java) [?: 1.8 .0_144] at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java: 1149 ) [?: 1.8 .0_144] at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java: 624 ) [?: 1.8 .0_144] at java.lang.thread.run(thread.java: 748 ) [?: 1.8 .0_144] |
這是因為有相應導入的jar包引入了servlet-api版本低于內嵌版本,我們需要做的就是打開maven依賴把其去除
1
2
3
4
|
<exclusion> <artifactid>servlet-api</artifactid> <groupid>javax.servlet</groupid> </exclusion> |
然后重新啟動就可以了.
啟動過程中還有可能報:
org.apache.storm.utils.nimbusleadernotfoundexception: could not find leader nimbus from seed hosts [localhost]. did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.nimbusclient.getconfiguredclientas(nimbusclient.java:90
這個問題我思考了很久,發現網上的解釋都是因為storm配置問題導致不對,可是我的storm是部署在服務器上的.并沒有相關的配置,按理也應該去服務器上讀取相關配置,可是結果并不是這樣的。最后嘗試了幾個做法發現都不對,這里才發現,在構建集群的時候storm提供了相應的本地集群
1
|
localcluster cluster = new localcluster(); |
進行本地測試,如果在本地測試就使用其進行部署測試,如果部署到服務器上需要把:
1
2
3
|
cluster.submittopology( "kafka-spout" , config, topologybuilder.createtopology()); //修正為: stormsubmitter.submittopology( "kafka-spout" , config, topologybuilder.createtopology()); |
進行任務提交;
以上解決了上面所述的問題1-3
問題4:是在bolt中使用相關bean實例,我發現我把其使用@component加入spring中也無法獲取到實例:我的猜想是在我們構建提交topolgy的時候,它會在:
topologybuilder.setbolt("alarm-bolt",new alarmbolt(),1).setnumtasks(2).shufflegrouping("kafka-spout");
執行bolt相關:
1
2
3
4
5
6
7
|
@override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this .collector = collector; stormlauncher stormlauncher = stormlauncher.getstormlauncher(); datarepositorys =(alarmdatarepositorys) stormlauncher.getbean( "alarmdatarepositorys" ); } |
而不會實例化bolt,導致線程不一而spring 獲取不到.(這里我也不是太明白,如果有大佬知道可以分享一波)
而我們使用spring boot的意義就在于這些獲取這些繁雜的對象,這個問題困擾了我很久.最終想到,我們可以通過上下文getbean獲取實例不知道能不能行,然后我就開始了定義:
例如我需要在bolt中使用一個服務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
/** * @author leezer * @date 2017/12/27 * 存儲操作失敗時間 **/ @service ( "alarmdatarepositorys" ) public class alarmdatarepositorys extends redisbase implements ialarmdatarepositorys { private static final string erro = "erro" ; /** * @param type 類型 * @param key key值 * @return 錯誤次數 **/ @override public string geterrnumfromredis(string type,string key) { if (type== null || key == null ){ return null ; } else { valueoperations<string, string> valueoper = primarystringredistemplate.opsforvalue(); return valueoper.get(string.format( "%s:%s:%s" ,erro,type,key)); } } /** * @param type 錯誤類型 * @param key key值 * @param value 存儲值 **/ @override public void seterrnumtoredis(string type, string key,string value) { try { valueoperations<string, string> valueoper = primarystringredistemplate.opsforvalue(); valueoper.set(string.format( "%s:%s:%s" , erro,type, key), value, dictionaries.apikeydayoflifecycle, timeunit.seconds); } catch (exception e){ logger.info(dictionaries.redis_error_prefix+string.format( "key為%s存入redis失敗" ,key)); } } |
這里我指定了該bean的名稱,則在bolt執行prepare時:使用getbean方法獲取了相關bean就能完成相應的操作.
然后kafka訂閱主題發送至我bolt進行相關的處理.而這里getbean的方法是在啟動bootmain函數定義:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
@springbootapplication @enabletransactionmanagement @componentscan ({ "service" , "storm" }) @enablemongorepositories (basepackages = { "storm" }) @propertysource (value = { "classpath:service.properties" , "classpath:application.properties" , "classpath:storm.properties" }) @importresource (locations = { "classpath:/configs/spring-hadoop.xml" , "classpath:/configs/spring-hbase.xml" }) public class stormlauncher extends springbootservletinitializer { //設置 安全線程launcher實例 private volatile static stormlauncher stormlauncher; //設置上下文 private applicationcontext context; public static void main(string[] args) { springapplicationbuilder application = new springapplicationbuilder(stormlauncher. class ); // application.web(false).run(args);該方式是spring boot不以web形式啟動 application.run(args); stormlauncher s = new stormlauncher(); s.setapplicationcontext(application.context()); setstormlauncher(s); } private static void setstormlauncher(stormlauncher stormlauncher) { stormlauncher.stormlauncher = stormlauncher; } public static stormlauncher getstormlauncher() { return stormlauncher; } @override protected springapplicationbuilder configure(springapplicationbuilder application) { return application.sources(stormlauncher. class ); } /** * 獲取上下文 * * @return the application context */ public applicationcontext getapplicationcontext() { return context; } /** * 設置上下文. * * @param appcontext 上下文 */ private void setapplicationcontext(applicationcontext appcontext) { this .context = appcontext; } /** * 通過自定義name獲取 實例 bean. * * @param name the name * @return the bean */ public object getbean(string name) { return context.getbean(name); } /** * 通過class獲取bean. * * @param <t> the type parameter * @param clazz the clazz * @return the bean */ public <t> t getbean( class <t> clazz) { return context.getbean(clazz); } /** * 通過name,以及clazz返回指定的bean * * @param <t> the type parameter * @param name the name * @param clazz the clazz * @return the bean */ public <t> t getbean(string name, class <t> clazz) { return context.getbean(name, clazz); } |
到此集成storm 和kafka至spring boot已經結束了,相關kafka及其他配置我會放入github上面
對了這里還有一個kafkaclient的坑:
async loop died! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
項目會報kafka client 問題,這是因為storm-kafka中,kafka使用的是0.8版本,而networksend是0.9以上的版本,這里集成需要與你集成的kafka相關版本一致.
雖然集成比較簡單,但是參考都比較少,加之剛開始接觸storm所以思考比較多,也在這記錄一下.
項目地址 - github
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://juejin.im/post/5a4755ea51882538d31043be