一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - springBoot整合RocketMQ及坑的示例代碼

springBoot整合RocketMQ及坑的示例代碼

2021-06-11 13:50龍俊潔 Java教程

這篇文章主要介紹了springBoot整合RocketMQ及坑的示例代碼,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

版本:

  • jdk:1.8
  • springboot:1.5.10
  • rocketmq:4.2.0

pom 配置:    

?
1
2
3
4
5
6
7
8
9
10
<parent>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-parent</artifactid>
 <version>1.5.10.release</version>
</parent>
<dependency>
  <groupid>org.apache.rocketmq</groupid>
  <artifactid>rocketmq-client</artifactid>
  <version>4.2.0</version>
</dependency>

application.properties  配置:

?
1
2
3
4
5
6
# 消費者的組名
apache.rocketmq.consumer.pushconsumer=pushconsumer
# 生產者的組名
apache.rocketmq.producer.producergroup=producer
# nameserver地址
apache.rocketmq.namesrvaddr=localhost:9876

java代碼:

生產者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package test.config.rocketmq;
 
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;
import org.apache.rocketmq.remoting.common.remotinghelper;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
import org.springframework.util.stopwatch;
import javax.annotation.postconstruct;
 
@component
public class rocketmqclient {
  /**
   * 生產者的組名
   */
  @value("${apache.rocketmq.producer.producergroup}")
  private string producergroup;
 
  /**
   * nameserver 地址
   */
  @value("${apache.rocketmq.namesrvaddr}")
  private string namesrvaddr;
 
  @postconstruct
  public void defaultmqproducer() {
    //生產者的組名
    defaultmqproducer producer = new defaultmqproducer(producergroup);
    //指定nameserver地址,多個地址以 ; 隔開
    producer.setnamesrvaddr(namesrvaddr);
    producer.setvipchannelenabled(false);
    try {
      /**
       * producer對象在使用之前必須要調用start初始化,初始化一次即可
       * 注意:切記不可以在每次發送消息時,都調用start方法
       */
      producer.start();
 
      //創建一個消息實例,包含 topic、tag 和 消息體
      //如下:topic 為 "topictest",tag 為 "push"
      message message = new message("topictest", "push", "發送消息----zhisheng-----".getbytes(remotinghelper.default_charset));
 
      stopwatch stop = new stopwatch();
      stop.start();
 
      for (int i = 0; i < 1; i++) {
        sendresult result = producer.send(message);
        system.out.println("發送響應:msgid:" + result.getmsgid() + ",發送狀態:" + result.getsendstatus());
      }
      stop.stop();
      system.out.println("----------------發送一萬條消息耗時:" + stop.gettotaltimemillis());
    } catch (exception e) {
      e.printstacktrace();
    } finally {
      producer.shutdown();
    }
  }
}

消費者: 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.common.consumer.consumefromwhere;
import org.apache.rocketmq.common.message.messageext;
import org.apache.rocketmq.remoting.common.remotinghelper;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
 
import javax.annotation.postconstruct;
 
 
@component
public class rocketmqserver {
  /**
   * 消費者的組名
   */
  @value("${apache.rocketmq.consumer.pushconsumer}")
  private string consumergroup;
 
  /**
   * nameserver 地址
   */
  @value("${apache.rocketmq.namesrvaddr}")
  private string namesrvaddr;
 
  @postconstruct
  public void defaultmqpushconsumer() {
    //消費者的組名
    defaultmqpushconsumer consumer = new defaultmqpushconsumer(consumergroup);
 
    //指定nameserver地址,多個地址以 ; 隔開
    consumer.setnamesrvaddr(namesrvaddr);
    consumer.setvipchannelenabled(false);
    try {
      //訂閱pushtopic下tag為push的消息
      consumer.subscribe("topictest", "push");
 
      //設置consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
      //如果非第一次啟動,那么按照上次消費的位置繼續消費
      consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
      consumer.registermessagelistener((messagelistenerconcurrently) (list, context) -> {
        try {
          for (messageext messageext : list) {
 
            system.out.println("messageext: " + messageext);//輸出消息內容
 
            string messagebody = new string(messageext.getbody(), remotinghelper.default_charset);
 
            system.out.println("消費響應:msgid : " + messageext.getmsgid() + ", msgbody : " + messagebody);//輸出消息內容
          }
        } catch (exception e) {
          e.printstacktrace();
          return consumeconcurrentlystatus.reconsume_later; //稍后再試
        }
        return consumeconcurrentlystatus.consume_success; //消費成功
      });
      consumer.start();
    } catch (exception e) {
      e.printstacktrace();
    }
  }
}

掉坑總結:

1.rocketmq啟動時,命令不是  mqbroker -n 127.0.0.1:9876

         正確應該是:mqbroker -n 127.0.0.1:9876 butiautocreatetopicenable=true

         否則會拋出:no route info of this topic, topictest

2.客戶端連接時拋出異常

        org.apache.rocketmq.client.exception.mqclientexception: 

        send [3] times, still failed, cost [3180]ms, topic: topictest, brokerssent: \

        [win-93cgo0s5g25, win-93cgo0s5g25, win-93cgo0s5g25]

解決方式兩種

1.producer.setvipchannelenabled(false); 生產者和消費者添加這行代買。

2.降rocketmq版本,降成3.2.6

關于spring.rocketmq.name-server的坑

看下圖:

springBoot整合RocketMQ及坑的示例代碼

注意:

如果你是springboot2.0+的框架,或者是jdk10。

你需要將你自己的項目配置文件中的,spring.rocketmq.name-server改成

spring.rocketmq.nameserver。注意是nameserver。

不然就會報各種稀奇古怪的bug。

關于啟動報內存不足的錯

在安裝啟動name server和broker的時候,一定要修改配置文件,不然內存會爆炸。

native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory 

springBoot整合RocketMQ及坑的示例代碼

將下面的配置文件根據你的需要改

我這里以前默認是xms4g,都是g,我修改到m就行了。

java_opt="${java_opt} -server -xms256m -xmx256m -xmn128m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m"

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/qq_24853627/article/details/79443437

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 私人影院免费观看 | 91麻豆精品 | 亚洲国产午夜看片 | 免费看国产精品麻豆 | 美女跪式抽搐gif动态图 | 日韩精品免费一区二区 | chinesemature精品 chinesefree普通对话 | 奇米888在线看奇米999 | 亚洲欧美日韩国产一区二区精品 | bt7086新片速递亚洲最新合集 | 久久99re2热在线播放7 | 成人国产精品一区二区不卡 | 国产视频在线一区 | 国产卡一卡二卡三乱码手机 | 啊啊啊好大好爽视频 | 国产自拍偷拍自拍 | 欧美gay xxxx| 亚洲国产精品综合久久一线 | 喜爱夜蒲2三级做爰 | 欧美a一片xxxx片与善交 | 青青视频国产依人在线 | 国产免费一区二区三区 | 亚洲国产成人久久77 | 国产成人在线免费视频 | 欧美另类videos另类粗暴 | kk4444了欧美 | 国产巨大bbbb俄罗斯 | 亚洲2017天堂色无码 | 国产成人精品视频午夜 | 91麻豆精品国产自产在线 | 国产播放器一区 | 护士的小嫩嫩好紧好舒服 | 久久亚洲免费视频 | 亚洲国产成人久久综合区 | 日韩欧美亚洲一区二区综合 | 糖心视频在线观看 | 偷拍综合网 | 免费观看成年肉动漫网站 | 四虎音影 | 本站只有精品 | 国产网站免费观看 |