一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - 淺談使用java實現阿里云消息隊列簡單封裝

淺談使用java實現阿里云消息隊列簡單封裝

2021-04-09 11:45狂盜一枝梅 JAVA教程

這篇文章主要介紹了淺談使用java實現阿里云消息隊列簡單封裝,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

一、前言

最近公司有使用阿里云消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調用方式以方便內部系統的調用,現在已經完成,特此記錄其中過程和使用到的相關技術,與君共勉。

現在阿里云提供了兩種消息服務:mns服務和ons服務,其中我認為mns是簡化版的ons,而且mns的消息消費需要自定義輪詢策略的,相比之下,ons的發布與訂閱模式功能更加強大(比如相對于mns,ons提供了消息追蹤、日志、監控等功能),其api使用起來更加方便,而且聽聞阿里內部以后不再對mns進行新的開發,只做維護,ons服務則會逐步替代mns服務成為阿里消息服務的主打產品,所以,如果有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。

涉及到的技術:Spring,反射、動態代理、Jackson序列化和反序列化

在看下面的文章之前,需要先看上面的文檔以了解相關概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發送和接收代碼實現。

該博文只針對有消息隊列知識基礎的朋友看,能幫上大家的忙我自然很高興,看不懂的也不要罵,說明你路子不對。

二、設計方案

1.消息發送

在一個簡單的cs架構中,假設server會監聽一個Topic的Producer發送的消息,那么它首先應該提供client一個api,client只需要簡單的調用該api,就可以通過producer來生產消息

2.消息接收

由于api是server制定的,所以server當然也知道如何消費這些消息

在這個過程中,server實際充當著消費者的角色,client實際充當著生產者的角色,但是生產者生產消息的規則則由消費者制定以滿足消費者消費需求。

3.最終目標

我們要創建一個單獨的jar包,起名為queue-core為生產者和消費者提供依賴和發布訂閱的具體實現。

三、消息發送

1.消費者提供接口

java" id="highlighter_443537">
?
1
2
3
4
5
6
7
8
9
@Topic(name="kdyzm",producerId="kdyzm_producer")
public interface UserQueueResource {
  
  @Tag("test1")
  public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);
  
  @Tag("test2")
  public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);
}

由于Topic和producer之間是N:1的關系,所以這里直接將producerId作為Topic的一個屬性;Tag是一個很關鍵的過濾條件,消費者通過它進行消息的分類做不同的業務處理,所以,這里使用Tag作為路由條件。

2.生產者使用消費者提供的api發送消息

由于消費者只提供了接口給生產者使用,接口是沒有辦法直接使用的,因為沒有辦法實例化,這里使用動態代理生成對象,在消費者提供的api中,添加如下config,以方便生產者直接導入config即可使用,這里使用了基于java的spring config,請知悉。

?
1
2
3
4
5
6
7
8
9
@Configuration
public class QueueConfig {
 
  @Autowired
  @Bean
  public UserQueueResource userQueueResource() {
    return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class);
  }
}

3.queue-core對生產者發送消息的封裝

以上1中所有的注解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中注解的定義只是定義了規則,真正的實現實際上是在QueueResourceFactory中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.wy.queue.core.api.MQConnection;
import com.wy.queue.core.utils.JacksonSerializer;
import com.wy.queue.core.utils.MQUtils;
import com.wy.queue.core.utils.QueueCoreSpringUtils;
 
public class QueueResourceFactory implements InvocationHandler {
 
  private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class);
  
  private String topicName;
 
  private String producerId;
  
  private JacksonSerializer serializer=new JacksonSerializer();
  
  private static final String PREFIX="PID_";
  
  public QueueResourceFactory(String topicName,String producerId) {
    this.topicName = topicName;
    this.producerId=producerId;
  }
 
  public static <T> T createProxyQueueResource(Class<T> clazz) {
    String topicName = MQUtils.getTopicName(clazz);
    String producerId = MQUtils.getProducerId(clazz);
    T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(),
        new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId));
    return target;
  }
 
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if(args.length == 0 || args.length>1){
      throw new RuntimeException("only accept one param at queueResource interface.");
    }
    String tagName=MQUtils.getTagName(method);
    ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class);
    MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    
    Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId);
    
    //發送消息
    Message msg = new Message( //
        // 在控制臺創建的 Topic,即該消息所屬的 Topic 名稱
        connectionInfo.getPrefix()+"_"+topicName,
        // Message Tag,
        // 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
        tagName,
        // Message Body
        // 任何二進制形式的數據, MQ 不做任何干預,
        // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
        serializer.serialize(args[0]).getBytes());
    SendResult sendResult = producer.send(msg);
    logger.info("Send Message success. Message ID is: " + sendResult.getMessageId());
    return null;
  
}

這里特意將自定義包和第三方使用的包名都貼過來了,以便于區分。

這里到底做了哪些事情呢?

發送消息的過程就是動態代理創建一個代理對象,該對象調用方法的時候會被攔截,首先解析所有的注解,比如topicName、producerId、tag等關鍵信息從注解中取出來,然后調用阿里sdk發送消息,過程很簡單,但是注意,這里發送消息的時候是分環境的,一般來講現在企業中會區分QA、staging、product三種環境,其中QA和staging是測試環境,對于消息隊列來講,也是會有三種環境的,但是QA和staging環境往往為了降低成本使用同一個阿里賬號,所以創建的topic和productId會放到同一個區域下,這樣同名的TopicName是不允許存在的,所以加上了環境前綴加以區分,比如QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection接口,以獲取配置信息,生產者服務只需要實現該接口即可。

4.生產者發送消息

?
1
2
3
4
5
6
7
8
9
10
@Autowired
private UserQueueResource userQueueResource;
 
@Override
public void sendMessage() {
  UserModel userModel=new UserModel();
  userModel.setName("kdyzm");
  userModel.setAge(25);
  userQueueResource.handleUserInfo(userModel);
}

只需要數行代碼即可將消息發送到指定的Topic,相對于原生的發送代碼,精簡了太多。

四、消息消費

相對于消息發送,消息的消費要復雜一些。

1.消息消費設計

由于Topic和Consumer之間是N:N的關系,所以將ConsumerId放到消費者具體實現的方法上

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Controller
@QueueResource
public class UserQueueResourceImpl implements UserQueueResource {
 
  private Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer")
  public void handleUserInfo(UserModel user) {
    logger.info("收到消息1:{}", new Gson().toJson(user));
  }
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer1")
  public void handleUserInfo1(UserModel user) {
    logger.info("收到消息2:{}", new Gson().toJson(user));
  }
}

這里又有兩個新的注解@QueueResource和@ConsumerAnnotation,這兩個注解后續會討論如何使用。有人會問我為什么要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,因為Consumer這個名字和aliyun提供的sdk中的名字沖突了。。。。

在這里, 消費者提供api 接口給生產者以方便生產者發送消息,消費者則實現該接口以消費生產者發送的消息,如何實現api接口就實現了監聽,這點是比較關鍵的邏輯。

2.queue-core實現消息隊列監聽核心邏輯

第一步:使用sping 容器的監聽方法獲取所有加上QueueResource注解的Bean

第二步:分發處理Bean

如何處理這些Bean呢,每個Bean實際上都是一個對象,有了對象,比如上面例子中的UserQueueResourceImpl 對象,我們可以拿到該對象實現的接口字節碼對象,進而可以拿到該接口UserQueueRerousce上的注解以及方法上和方法中的注解,當然UserQueueResourceImpl實現方法上的注解也能拿得到,這里我將獲取到的信息以consumerId為key,其余相關信息封裝為Value緩存到了一個Map對象中,核心代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Class<?> clazz = resourceImpl.getClass();
    Class<?> clazzIf = clazz.getInterfaces()[0];
    Method[] methods = clazz.getMethods();
    String topicName = MQUtils.getTopicName(clazzIf);
    for (Method m : methods) {
      ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class);
 
      if (null == consumerAnno) {
//        logger.error("method={} need Consumer annotation.", m.getName());
        continue;
      }
      String consuerId = consumerAnno.value();
      if (StringUtils.isEmpty(consuerId)) {
        logger.error("method={} ConsumerId can't be null", m.getName());
        continue;
      }
      Class<?>[] parameterTypes = m.getParameterTypes();
      Method resourceIfMethod = null;
      try {
        resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes);
      } catch (NoSuchMethodException | SecurityException e) {
        logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(),
            e);
        continue;
      }
      String tagName = MQUtils.getTagName(resourceIfMethod);
      consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m));
    }

第三步:通過反射實現消費的動作

首先,先確定好反射動作執行的時機,那就是監聽到了新的消息

其次,如何執行反射動作?不贅述,有反射相關基礎的童鞋都知道怎么做,核心代碼如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    String topicPrefix=connectionInfo.getPrefix()+"_";
    String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_";
    for(String consumerId:consumersMap.keySet()){
      MethodInfo methodInfo=consumersMap.get(consumerId);
      Properties connectionProperties=convertToProperties(connectionInfo);
      // 您在控制臺創建的 Consumer ID
      connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId);
      Consumer consumer = ONSFactory.createConsumer(connectionProperties);
      consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag
        public Action consume(Message message, ConsumeContext context) {
          try {
            String messageBody=new String(message.getBody(),"UTF-8");
            logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody);
            Method method=methodInfo.getMethod();
            Class<?> parameType = method.getParameterTypes()[0];
            Object arg = jacksonSerializer.deserialize(messageBody, parameType);
            Object[] args={arg};
            method.invoke(resourceImpl, args);
          } catch (Exception e) {
            logger.error("",e);
          }
          return Action.CommitMessage;
        }
      });
      consumer.start();
      logger.info("consumer={} has started.",consumerIdPrefix+consumerId);
    }

五、完整代碼見下面的git鏈接

 https://github.com/kdyzm/queue-core.git

 以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.cnblogs.com/kuangdaoyizhimei/p/8508357.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日本三级香港三级久久99 | 二区三区视频 | 亚洲国产高清一区二区三区 | 欧美日韩一品道 | 嫩草视频在线观看视频播放 | 国产在线观看精品 | 精品国产一二三区在线影院 | 精品一区二区国语对白 | 免费看视频 | 四虎e234hcom| 9191免费永久观看 | 国产99精品| 精品99在线观看 | 久九九精品免费视频 | 日本捏胸吃奶视频免费 | 风间由美m3u8在线 | 我和老丈洗澡同性 | 成年人免费在线播放 | 人人爽人人射 | 久久综合老色鬼网站 | 色婷婷六月丁香在线观看 | 亚飞与亚基国语1080p在线观看 | 国产精品露脸国语对白河北 | 免费网站看v片在线成人国产系列 | 国产午夜精品一区二区三区 | 日日碰日日操 | 99精品国产成人一区二区在线 | 91久久福利国产成人精品 | 亚洲成综合 | 国产第9页 | 四虎免费在线观看视频 | 欧美一卡2卡3卡无卡 | 青草草视频在线观看 | 四虎tv在线观看884aa | 久久久久国产一级毛片高清片 | 日韩免费一级 | 精品国产免费一区二区三区 | 青草青草视频 | 欧美视频在线播放观看免费福利资源 | 色戒完整版2小时38分钟 | 亚洲性视频在线观看 |