一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - SpringBoot整合分布式消息平臺Pulsar

SpringBoot整合分布式消息平臺Pulsar

2022-01-10 23:00君哥聊技術朱晉君 Java教程

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

SpringBoot整合分布式消息平臺Pulsar

大家好,我是君哥。

作為優秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。

部署 Pulsar

Pulsar 的部署方式主要有 3 種,本地安裝二進制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一個單節點的 Pulsar 集群。實驗環境是 2 核 CPU 和 4G 內存。

部署命令如下:

  1. docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone 

安裝過程可能會出現下面的錯誤:

  1. unknown flag: --mount 
  2. See 'docker run --help'

這是因為 docker 版本低,不支持 mount 參數,把 docker 版本升級到 17.06 以上就可以了。

部署過程中可能會因為網絡的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。

  1. 2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone 

本地單節點集群啟動后,會創建一個 namespace,名字叫 public/default

Pulsar 客戶端

目前 Pulsar 支持多種語言的客戶端,包括:

Java 客戶端Go 客戶端Python 客戶端C++ 客戶端Node.js 客戶端WebSocket 客戶端C# 客戶端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:

  1. <dependency> 
  2.     <groupId>org.apache.pulsar</groupId> 
  3.     <artifactId>pulsar-client</artifactId> 
  4.     <version>2.9.1</version> 
  5. </dependency> 

然后在 properties 文件中添加配置:

  1. # Pulsar 地址 
  2. pulsar.url=pulsar://192.168.59.155:6650 
  3. # topic 
  4. pulsar.topic=testTopic 
  5. # consumer group 
  6. pulsar.subscription=topicGroup 

創建 Client

創建客戶端非常簡單,代碼如下:

  1. client = PulsarClient.builder() 
  2.                 .serviceUrl(url) 
  3.                 .build(); 

上面的 url 就是 properties 文件中定義的 pulsar.url 。

創建 Client 時,即使集群沒有啟成功,程序也不會報錯,因為這時還沒有真正地去連接集群。

創建 Producer

  1. producer = client.newProducer() 
  2.                 .topic(topic) 
  3.                 .compressionType(CompressionType.LZ4) 
  4.                 .sendTimeout(0, TimeUnit.SECONDS) 
  5.                 .enableBatching(true
  6.                 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) 
  7.                 .batchingMaxMessages(1000) 
  8.                 .maxPendingMessages(1000) 
  9.                 .blockIfQueueFull(true
  10.                 .roundRobinRouterBatchingPartitionSwitchFrequency(10) 
  11.                 .batcherBuilder(BatcherBuilder.DEFAULT
  12.                 .create(); 

創建 Producer,會真正的連接集群,這時如果集群有問題,就會報連接錯誤。

下面解釋一下創建 Producer 的參數:

topic:Producer 要寫入的 topic。

compressionType:壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個策略才會生效。

sendTimeout:超時時間,如果 Producer 在超時時間為收到 ACK,會進行重新發送。

enableBatching:是否開啟消息批量處理,這里默認 true,這個參數只有在異步發送 (sendAsync) 時才能生效,選擇同步發送會失效。

batchingMaxPublishDelay:批量發送消息的時間段,這里定義的是 10ms,需要注意的是,設置了批量時間,就不會受消息數量的影響。批量發送會把要發送的批量消息放在一個網絡包里發送出去,減少網絡 IO 次數,大大提高網卡的發送效率。

batchingMaxMessages:批量發送消息的最大數量。

maxPendingMessages:等待從 broker 接收 ACK 的消息隊列最大長度。如果這個隊列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設置了 blockIfQueueFull 值是 true。

blockIfQueueFull:Producer 發送消息時會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發送。

roundRobinRouterBatchingPartition-SwitchFrequency:如果發送消息時沒有指定 key,那默認采用 round robin 的方式發送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。

創建 Consumer

Pulsar 的消費模型如下圖:

SpringBoot整合分布式消息平臺Pulsar

從圖中可以看到,Consumer 要綁定一個 subscription 才能進行消費。

  1. consumer = client.newConsumer() 
  2.         .topic(topic) 
  3.         .subscriptionName(subscription) 
  4.         .subscriptionType(SubscriptionType.Shared) 
  5.         .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) 
  6.         .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) 
  7.         .receiverQueueSize(1000) 
  8.         .subscribe(); 

下面解釋一下創建 Consumer 的參數:

topic:Consumer 要訂閱的 topic。

subscriptionName:consumer 要關聯的 subscription 名字。

subscriptionType:訂閱類型,Pulsar 支持四種類型訂閱:

Exclusive:獨占模式,同一個 Topic 只能有一個消費者,如果多個消費者,就會出錯。Failover:災備模式,同一個 Topic 可以有多個消費者,但是只能有一個消費者消費,其他消費者作為故障轉移備用,如果當前消費者出了故障,就從備用消費者中選擇一個進行消費。如下圖:

SpringBoot整合分布式消息平臺Pulsar

Shared:共享模式,同一個 Topic 可以由多個消費者訂閱和消費。消息通過 round robin 輪詢機制分發給不同的消費者,并且每個消息僅會被分發給一個消費者。當消費者斷開,如果發送給它消息沒有被消費,這些消息會被重新分發給其它存活的消費者。如下圖:

SpringBoot整合分布式消息平臺Pulsar

Key_Shared:消息和消費者都會綁定一個key,消息只會發送給綁定同一個key的消費者。如果有新消費者建立連接或者有消費者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費者并發地消費消息,又能保證同一Key下的消息順序。如下圖:

SpringBoot整合分布式消息平臺Pulsar

subscriptionInitialPosition:創建新的 subscription 時從哪里開始消費,有兩個選項:

Latest:從最新的消息開始消費Earliest:從最早的消息開始消費

negativeAckRedeliveryDelay:消費失敗后間隔多久 broker 重新發送。

receiverQueueSize:在調用 receive 方法之前,最多能累積多少條消息。可以設置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設置為 0,可以防止批量消息多發給一個 Consumer 而導致其他 Consumer 空閑。

Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:

  1. Message message = consumer.receive() 
  2. CompletableFuture<Message> message = consumer.receiveAsync(); 
  3. Messages message = consumer.batchReceive(); 
  4. CompletableFuture<Messages> message = consumer.batchReceiveAsync(); 

對于批量接收,也可以設置批量接收的策略,代碼如下:

  1. consumer = client.newConsumer() 
  2.     .topic(topic) 
  3.     .subscriptionName(subscription) 
  4.         .batchReceivePolicy(BatchReceivePolicy.builder() 
  5.         .maxNumMessages(100) 
  6.         .maxNumBytes(1024 * 1024) 
  7.         .timeout(200, TimeUnit.MILLISECONDS) 
  8.         .build()) 
  9.     .subscribe(); 

代碼中的參數說明如下:

maxNumMessages:批量接收的最大消息數量。maxNumBytes:批量接收消息的大小,這里是 1MB。

測試

首先編寫 Producer 發送消息的代碼,如下:

  1. public void sendMsg(String key, String data) { 
  2.     CompletableFuture<MessageId> future = producer.newMessage() 
  3.         .key(key
  4.         .value(data.getBytes()).sendAsync(); 
  5.     future.handle((v, ex) -> { 
  6.         if (ex == null) { 
  7.             logger.info("發送消息成功, key:{}, msg: {}"key, data); 
  8.         } else { 
  9.             logger.error("發送消息失敗, key:{}, msg: {}"key, data); 
  10.         } 
  11.         return null
  12.     }); 
  13.     future.join(); 
  14.     logger.info("發送消息完成, key:{}, msg: {}"key, data); 

然后編寫一個 Consumer 消費消息的代碼,如下:

  1. public void start() throws Exception{ 
  2.     while (true) { 
  3.         Message message = consumer.receive(); 
  4.         String key = message.getKey(); 
  5.         String data = new String(message.getData()); 
  6.         String topic = message.getTopicName(); 
  7.         if (StringUtils.isNotEmpty(data)) { 
  8.             try{ 
  9.                 logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data); 
  10.             }catch(Exception e){ 
  11.                 logger.error("接收消息異常,topic:{}, key:{}, data:{}", topic, key, data, e); 
  12.             } 
  13.         } 
  14.         consumer.acknowledge(message); 
  15.     } 

最后編寫一個 Controller 類,調用 Producer 發送消息,代碼如下:

  1. @RequestMapping("/send"
  2. @ResponseBody 
  3. public String send(@RequestParam String key, @RequestParam String data) { 
  4.     logger.info("收到消息發送請求, key:{}, value:{}"key, data); 
  5.     pulsarProducer.sendMsg(key, data); 
  6.     return "success"

調用 Producer 發送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車:

  1. http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 

可以看到控制臺輸出下面日志:

  1. 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 發送消息成功, key:key1, msg: data1 
  2. 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 發送消息完成, key:key1, msg: data1 
  3. 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1 
  4. 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 
  5. 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 

從日志中看到,這里使用的 namespace 就是創建集群時生成的public/default。

總結

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

原文鏈接:https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

延伸 · 閱讀

精彩推薦
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
主站蜘蛛池模板: 性色香蕉AV久久久天天网 | 香蕉精品国产高清自在自线 | 五月桃花网婷婷亚洲综合 | 国产欧美日韩精品高清二区综合区 | 五月天久久久 | 农村美女沟厕嘘嘘被偷看 | 五月天网站 | 欧美亚洲另类在线观看 | 欧美日韩高清一区 | 国产午夜精品一区二区 | 啊哈~嗯哼~用力cao我小说 | 亚欧日韩 | 亚洲成av人在线视 | 日本精品中文字幕在线播放 | 日本一区视频在线 | 毛片网站大全 | 情趣内衣在线观看 | 国产精品久久亚洲一区二区 | www.四虎影| 成人在线观看网站 | 视频一区二区三区在线 | 亚洲天堂一区二区在线观看 | 日本护士xxxx爽爽爽 | 爱爱亚洲| 成人福利影院 | 欧美成a人片免费看久久 | 99热视频| 欧美福利在线观看 | 无码欧美喷潮福利XXXX | 91久久夜色精品国产九色 | 国产一级持黄大片99久久 | 国内精品99| avav一区| 91一个人的在线观看www | 小向美奈子av | 东北老妇露脸xxxxx | 亚洲福利一区二区 | 天天操天天干天天舔 | 国产亚洲综合久久 | 久久精品美女 | 国产亚洲毛片在线 |