一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 淺談spring-boot-rabbitmq動態管理的方法

淺談spring-boot-rabbitmq動態管理的方法

2021-03-12 14:32J猿 Java教程

這篇文章主要介紹了淺談spring-boot-rabbitmq動態管理的方法,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

使用spring boot + rabbitmq的時候,在開發過程中,可能會想要臨時停用/啟用監聽,或修改監聽消費者數量。如果每次修改都重啟比較浪費時間,所以研究了一下不停機就啟用停用監聽或修改一些配置

一. 關于rabbitmq監聽的配置

  1. 配置屬性類:RabbitProperties,包含rabbitmq的認證、監聽、發送者以及其他的一些配置
  2. 自動配置類:RabbitAutoConfiguration,主要配置rabbitmq的連接工廠和發送者等,不包含監聽的配置
  3. rabbitmq監聽的配置是RabbitAnnotationDrivenConfiguration,是通過RabbitAutoConfiguration引入的
?
1
2
3
4
5
6
7
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
  ...
}

RabbitAnnotationDrivenConfiguration中主要就是監聽工廠的配置、監聽工廠,但是這里也只是創建bean,并沒有真正的初始化

通過配置里的bean類名,分析一下,rabbitmq的監聽肯定是由監聽工廠創建的,所以找到監聽工廠SimpleRabbitListenerContainerFactory

?
1
2
3
4
5
6
7
8
9
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
 SimpleRabbitListenerContainerFactoryConfigurer configurer,
 ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  configurer.configure(factory, connectionFactory);
  return factory;
}

既然自動配置里面沒有初始化監聽,那就應該是在其他地方調用的,進入監聽工廠類中,發現有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測初始化肯定與這個方法有關,所以查看有哪些地方調用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有創建監聽容器和初始化的代碼

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * Create and start a new {@link MessageListenerContainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link MessageListenerContainer}.
 * @param factory the {@link RabbitListenerContainerFactory} to use.
 * @return the {@link MessageListenerContainer}.
 */
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
 RabbitListenerContainerFactory<?> factory) {
  MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  if (listenerContainer instanceof InitializingBean) {
   try {
      ((InitializingBean) listenerContainer).afterPropertiesSet();
   }
   catch (Exception ex) {
      throw new BeanInitializationException("Failed to initialize message listener container", ex);
   }
  }
  int containerPhase = listenerContainer.getPhase();
  if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
   if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
      throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
       this.phase + " vs " + containerPhase);
   }
   this.phase = listenerContainer.getPhase();
  
  return listenerContainer;
}

繼續找調用這個方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,發現調用的地方很多了

淺談spring-boot-rabbitmq動態管理的方法

看看afterPropertiesSet方法,是InitializingBean接口中的,猜測應該是spring容器創建bean之后都會調用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里創建的實例。原來是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個自動配置里面初始化的,所以這就找到rabbitmq初始化監聽的源頭了

二. 動態管理rabbitmq監聽

回到最初的問題,想要動態的啟用停用mq的監聽,所以先看看初始化配置的類,既然有初始化,那可能會有相關的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有對監聽容器進行操作,主要源碼如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * @return the managed {@link MessageListenerContainer} instance(s).
 */
public Collection<MessageListenerContainer> getListenerContainers() {
  return Collections.unmodifiableCollection(this.listenerContainers.values());
}
 
@Override
public void start() {
  for (MessageListenerContainer listenerContainer : getListenerContainers()) {
   startIfNecessary(listenerContainer);
  }
}
 
/**
 * Start the specified {@link MessageListenerContainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see MessageListenerContainer#isAutoStartup()
 */
private void startIfNecessary(MessageListenerContainer listenerContainer) {
  if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
   listenerContainer.start();
  }
}
 
@Override
public void stop() {
  for (MessageListenerContainer listenerContainer : getListenerContainers()) {
   listenerContainer.stop();
  }
}

寫個controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對監聽進行啟用停用的操作,并且RabbitListenerEndpointRegistry實例還可以獲取監聽容器,對監聽的一些參數也能進行修改,比如消費者數量。代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.Set;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.itopener.framework.ResultMap;
/**
 * Created by fuwei.deng on 2017年7月24日.
 */
@RestController
@RequestMapping("rabbitmq/listener")
public class RabbitMQController {
 
  @Resource
  private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
  
  @RequestMapping("stop")
  public ResultMap stop(){
   rabbitListenerEndpointRegistry.stop();
   return ResultMap.buildSuccess();
  }
  
  @RequestMapping("start")
  public ResultMap start(){
   rabbitListenerEndpointRegistry.start();
   return ResultMap.buildSuccess();
  }
  
  @RequestMapping("setup")
  public ResultMap setup(int consumer, int maxConsumer){
   Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();
   SimpleMessageListenerContainer container = null;
   for(String id : containerIds){
   container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);
   if(container != null){
    container.setConcurrentConsumers(consumer);
    container.setMaxConcurrentConsumers(maxConsumer);
   }
   }
   return ResultMap.buildSuccess();
  }
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://my.oschina.net/dengfuwei/blog/1595044

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 天堂69亚洲精品中文字幕 | 成人不卡在线 | 99久久国语露脸精品国产 | 免费91麻豆精品国产自产在线观看 | 国产欧美精品一区二区三区–老狼 | narutomanga玖辛奈之乳 | 乌克兰黄色录像 | 日本免费久久久久久久网站 | 日本在线一区二区 | 色综合天天综合中文网 | 国产精品青青青高清在线 | 九九国产视频 | 精品欧美一区二区三区在线观看 | 欧美高清乌克兰精品另类 | 国产亚洲人成网站在线观看不卡 | 性xxxx直播放免费 | 亚洲啊v| 91污污视频| 国产高清一区二区 | 亚洲男女网站 | 国产99久久九九精品免费 | 男女男精品视频 | 国产成人精品一区二三区在线观看 | 国产91第一页 | 免费观看大片毛片 | 全黄一级裸片视频免费 | 91免费精品国自产拍在线不卡 | 精品福利视频一区二区三区 | 手机在线伦理片 | 扒开斗罗美女了的胸罩和内裤漫画 | 美女吃jj| 香蕉精品国产高清自在自线 | 香蕉tv国产在线永久播放 | 国产精品合集一区二区 | 亚洲男人的天堂在线 | 国产成人精品高清不卡在线 | 国产第一福利影院 | 国产拍拍视频一二三四区 | 色噜噜 男人的天堂在线观看 | 成人免费视屏 | 欧美日韩亚洲综合久久久 |