第1步:生成我們的項目: spring initializr 來生成我們的項目。我們的項目將提供spring mvc / web支持和apache kafka支持。
第2步:發布/讀取kafka主題中的消息:
1
2
3
4
5
6
7
8
|
<b> public </b> <b> class </b> user { <b> private </b> string name; <b> private </b> <b> int </b> age; <b> public </b> user(string name, <b> int </b> age) { <b> this </b>.name = name; <b> this </b>.age = age; } } |
第3步:通過application.yml
配置文件配置kafka:
我們需要創建配置文件。我們需要以某種方式配置我們的kafka生產者和消費者,以便能夠發布和讀取與主題相關的消息。相比建立一個使用@configuration
標注的java類,我們可以直接使用配置文件application.properties或application.yml。spring boot讓我們避免像過去一樣編寫的所有樣板代碼,同時為我們提供了更加智能的配置應用程序的方法,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
server: port: 9000 spring: kafka: consumer: bootstrap: localhost: 9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.stringdeserializer value-deserializer: org.apache.kafka.common.serialization.stringdeserializer producer: bootstrap: localhost: 9092 key-serializer: org.apache.kafka.common.serialization.stringserializer value-serializer: org.apache.kafka.common.serialization.stringserializer |
第4步:創建一個生產者,創建生產者會將我們的消息寫入該主題。
1
2
3
4
5
6
7
8
9
10
11
|
<b> public </b> <b> class </b> producer { <b> private </b> <b> static </b> <b> final </b> logger logger = loggerfactory.getlogger(producer.<b> class </b>); <b> private </b> <b> static </b> <b> final </b> string topic = <font> "users" </font><font>; @autowired <b> private </b> kafkatemplate<string, string> kafkatemplate; <b> public </b> <b> void </b> sendmessage(string message) { logger.info(string.format(</font><font> "#### -> producing message -> %s" </font><font>, message)); <b> this </b>.kafkatemplate.send(topic, message); } } </font> |
自動連接autowire
到 kafkatemplate
,使用它將消息發布到主題 - 這就是消息的生產者!
第5步:創建一個消費者,消費者是負責根據您自己的業務邏輯的需求閱讀處理消息的消息的服務。要進行設置,請輸入以下內容:
1
2
3
4
5
6
7
8
9
10
11
|
@service <b> public </b> <b> class </b> consumer { <b> private </b> <b> final </b> logger logger = loggerfactory.getlogger(producer.<b> class </b>); @kafkalistener (topics = <font> "users" </font><font>, groupid = </font><font> "group_id" </font><font>) <b> public </b> <b> void </b> consume(string message) throws ioexception { logger.info(string.format(</font><font> "#### -> consumed message -> %s" </font><font>, message)); } } </font> |
在這里,我們告訴我們的方法void consume(string message)
訂閱用戶的主題,并將每條消息發送到應用程序日志。在您的實際應用程序中,您可以按照業務需要的方式處理消息。
第6步:創建rest控制器,們已經擁有了能夠消費kafka消息所需的全部內容。
為了充分展示我們創建的所有內容的工作原理,我們需要創建一個具有單一端點的控制器。消息將發布到此端點,然后由我們的生產者處理。然后,我們的消費者將通過登錄到控制臺來捕獲并處理它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@restcontroller @requestmapping (value = <font> "/kafka" </font><font>) <b> public </b> <b> class </b> kafkacontroller { <b> private </b> <b> final </b> producer producer; @autowired kafkacontroller(producer producer) { <b> this </b>.producer = producer; } @postmapping (value = </font><font> "/publish" </font><font>) <b> public </b> <b> void </b> sendmessagetokafkatopic( @requestparam (</font><font> "message" </font><font>) string message) { <b> this </b>.producer.sendmessage(message); } } </font> |
讓我們使用curl將消息發送給kafka:
curl -x post -f 'message=test' http://localhost:9000/kafka/publish
基本上就是這樣!在不到10個步驟中,您了解了將apache kafka添加到spring boot項目是多么容易。如果您遵循本指南,您現在知道如何將kafka集成到spring boot項目中,并且您已準備好使用這個超級工具!
總結
以上所述是小編給大家介紹的在spring boot應用程序中使用apache kafka的方法步驟詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://www.jdon.com/50613