一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - Java教程 - SpringAOP+RabbitMQ+WebSocket實戰詳解

SpringAOP+RabbitMQ+WebSocket實戰詳解

2021-06-11 13:21little-sheep Java教程

這篇文章主要介紹了SpringAOP+RabbitMQ+WebSocket實戰詳解,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

背景

最近公司的客戶要求,分配給員工的任務除了有微信通知外,還希望pc端的網頁也能實時收到通知。管理員分配任務是在我們的系統a,而員工接受任務是在系統b。兩個系統都是現在已投入使用的系統。

技術選型

根據需求我們最終選用springaop+rabbitmq+websocket。

springaop可以讓我們不修改原有代碼,直接將原有service作為切點,加入切面。rabbitmq可以讓a系統和b系統解耦。websocket則可以達到實時通知的要求。

SpringAOP+RabbitMQ+WebSocket實戰詳解

springaop

aop稱為面向切面編程,在程序開發中主要用來解決一些系統層面上的問題,比如日志,事務,權限等待。是spring的核心模塊,底層是通過動態代理來實現(動態代理將在之后的文章重點介紹)。

基本概念

aspect(切面):通常是一個類,里面可以定義切入點和通知。

jointpoint(連接點):程序執行過程中明確的點,一般是方法的調用。

advice(通知):aop在特定的切入點上執行的增強處理,有before,after,afterreturning,afterthrowing,around。

pointcut(切入點):就是帶有通知的連接點,在程序中主要體現為書寫切入點表達式。

通知類型

before:在目標方法被調用之前做增強處理。

@before只需要指定切入點表達式即可

afterreturning:在目標方法正常完成后做增強。

@afterreturning除了指定切入點表達式后,還可以指定一個返回值形參名returning,代表目標方法的返回值

afterthrowing:主要用來處理程序中未處理的異常。

@afterthrowing除了指定切入點表達式后,還可以指定一個throwing的返回值形參名,可以通過該形參名

來訪問目標方法中所拋出的異常對象

after:在目標方法完成之后做增強,無論目標方法時候成功完成。

@after可以指定一個切入點表達式

around:環繞通知,在目標方法完成前后做增強處理,環繞通知是最重要的通知類型,像事務,日志等都是環繞通知,注意編程中核心是一個proceedingjoinpoint。

rabbitmq

SpringAOP+RabbitMQ+WebSocket實戰詳解

從圖中我們可以看到rabbitmq主要的結構有:routing、binding、exchange、queue。

queue

queue(隊列)rabbitmq的作用是存儲消息,隊列的特性是先進先出。

exchange

生產者產生的消息并不是直接發送給消息隊列queue的,而是要經過exchange(交換器),由exchange再將消息路由到一個或多個queue,還會將不符合路由規則的消息丟棄。

routing

用于標記或生產者尋找exchange。

binding

用于exchange和queue做關聯。

exchange type fanout

fanout類型的exchange路由規則非常簡單,它會把所有發送到該exchange的消息路由到所有與它綁定的queue中。

direct

direct會把消息路由到那些binding key與routing key完全匹配的queue中。

topic

direct規則是嚴格意義上的匹配,換言之routing key必須與binding key相匹配的時候才將消息傳送給queue,那么topic這個規則就是模糊匹配,可以通過通配符滿足一部分規則就可以傳送。

headers

headers類型的exchange不依賴于routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

websocket

了解websocket必須先知道幾個常用的web通信技術及其區別。

短輪詢

短輪詢的基本思路就是瀏覽器每隔一段時間向瀏覽器發送http請求,服務器端在收到請求后,不論是否有數據更新,都直接進行響應。這種方式實現的即時通信,本質上還是瀏覽器發送請求,服務器接受請求的一個過程,通過讓客戶端不斷的進行請求,使得客戶端能夠模擬實時地收到服務器端的數據的變化。

這種方式的優點是比較簡單,易于理解,實現起來也沒有什么技術難點。缺點是顯而易見的,這種方式由于需要不斷的建立http連接,嚴重浪費了服務器端和客戶端的資源。尤其是在客戶端,距離來說,如果有數量級想對比較大的人同時位于基于短輪詢的應用中,那么每一個用戶的客戶端都會瘋狂的向服務器端發送http請求,而且不會間斷。人數越多,服務器端壓力越大,這是很不合理的。

因此短輪詢不適用于那些同時在線用戶數量比較大,并且很注重性能的web應用。

長輪詢/ comet

comet指的是,當服務器收到客戶端發來的請求后,不會直接進行響應,而是先將這個請求掛起,然后判斷服務器端數據是否有更新。如果有更新,則進行響應,如果一直沒有數據,則到達一定的時間限制(服務器端設置)后關閉連接。

長輪詢和短輪詢比起來,明顯減少了很多不必要的http請求次數,相比之下節約了資源。長輪詢的缺點在于,連接掛起也會導致資源的浪費。

sse

sse是html5新增的功能,全稱為server-sent events。它可以允許服務推送數據到客戶端。sse在本質上就與之前的長輪詢、短輪詢不同,雖然都是基于http協議的,但是輪詢需要客戶端先發送請求。而sse最大的特點就是不需要客戶端發送請求,可以實現只要服務器端數據有更新,就可以馬上發送到客戶端。

sse的優勢很明顯,它不需要建立或保持大量的客戶端發往服務器端的請求,節約了很多資源,提升應用性能。并且sse的實現非常簡單,不需要依賴其他插件。

websocket

websocket是html5定義的一個新協議,與傳統的http協議不同,該協議可以實現服務器與客戶端之間全雙工通信。簡單來說,首先需要在客戶端和服務器端建立起一個連接,這部分需要http。連接一旦建立,客戶端和服務器端就處于平等的地位,可以相互發送數據,不存在請求和響應的區別。

websocket的優點是實現了雙向通信,缺點是服務器端的邏輯非常復雜。現在針對不同的后臺語言有不同的插件可以使用。

四種web即時通信技術比較

從兼容性角度考慮,短輪詢>長輪詢>長連接sse>websocket;

從性能方面考慮,websocket>長連接sse>長輪詢>短輪詢。

實戰

項目使用springboot搭建。rabbitmq的安裝這里不講述。

rabbitmq配置

兩個系統a、b都需要操作rabbitmq,其中a生產消息,b消費消息。故都需要配置。

1、首先引入rabbitmq的dependency:

?
1
2
3
4
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

這個dependency中包含了rabbitmq相關dependency。

2、在項目的配置文件里配置為使用rabbitmq及其參數。

application-pro.yml

?
1
2
3
4
5
6
7
8
#消息隊列
message.queue.type: rabbitmq
## rabbit mq properties
rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest

application.properties

?
1
2
#將要使用的隊列名
rabbitmq.websocket.msg.queue=websocket_msg_queue

3、創建配置文件。隊列的創建交給spring。

rabbitmqconfig.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@configuration
@enablerabbit
public class rabbitmqconfig {
 
  @value("${rabbitmq.host}")
  private string host;
  @value("${rabbitmq.port}")
  private string port;
  @value("${rabbitmq.username}")
  private string username;
  @value("${rabbitmq.password}")
  private string password;
  @value("${rabbitmq.websocket.msg.queue}")
  private string websocketmsgqueue;
 
  @bean
  public connectionfactory connectionfactory() throws ioexception {
    cachingconnectionfactory factory = new cachingconnectionfactory();
    factory.setusername(username);
    factory.setpassword(password);
//    factory.setvirtualhost("test");
    factory.sethost(host);
    factory.setport(integer.valueof(port));
    factory.setpublisherconfirms(true);
 
    //設置隊列參數,是否持久化、隊列ttl、隊列消息ttl等
    factory.createconnection().createchannel(false).queuedeclare(websocketmsgqueue, true, false, false, null);
    return factory;
  }
 
  @bean
  public messageconverter messageconverter() {
    return new jackson2jsonmessageconverter();
  }
 
  @bean
  @scope(configurablebeanfactory.scope_prototype)
  // 必須是prototype類型
  public rabbittemplate rabbittemplate() throws ioexception {
    return new rabbittemplate(connectionfactory());
  }
 
  @bean
  public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory() throws ioexception {
    simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
    factory.setconnectionfactory(connectionfactory());
    factory.setconcurrentconsumers(3);
    factory.setmaxconcurrentconsumers(10);
    factory.setacknowledgemode(acknowledgemode.manual);
    return factory;
  }
}

4、系統b中創建隊列監聽,當隊列有消息時,發送websocket通知。

rabbitmqlistener.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@component
public class rabbitmqlistener {
 
  @autowired
  private rabbitmqservice mqservice;
 
  /**
   * websocket推送監聽器
   * @param socketentity
   * @param deliverytag
   * @param channel
   */
  @rabbitlistener(queues = "websocket_msg_queue")
  public void websocketmsglistener(@payload websocketmsgentity socketmsgentity, @header(amqpheaders.delivery_tag) long deliverytag, channel channel) throws ioexception {
    mqservice.handlewebsocketmsg(socketmsgentity, deliverytag, channel);
  }
 
}

rabbitmqservice.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class rabbitmqservice {
  @autowired
  private messagewebsockethandler messagewebsockethandler;
 
  /**
   * @param socketmsgentity
   * @param deliverytag
   * @param channel
   * @throws ioexception
   */
  void handlewebsocketmsg(websocketmsgentity socketmsgentity, long deliverytag, channel channel) throws ioexception {
    try {
      messagewebsockethandler.sendmessagetousers(socketmsgentity.tojsonstring(), socketmsgentity.gettouserids());
      channel.basicack(deliverytag, false);
    } catch (exception e) {
      channel.basicnack(deliverytag, false, false);
    }
  }
}

websocketmsgentity為mq中傳送的實體。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class websocketmsgentity implements serializable {
  public enum ordertype{
    repair("維修"),
    maintain("保養"),
    measure("計量");
 
    ordertype(string value){
      this.value = value;
    }
    string value;
 
    public string getvalue() {
      return value;
    }
  }
  //設備名稱
  private string equname;
  //設備編號
  private string equid;
  //工單類型
  private ordertype ordertype;
  //工單單號
  private string orderid;
  //工單狀態
  private string orderstatus;
  //創建時間
  private date createtime;
  //消息接收人id
  private list<string> touserids;
 
  public string getequname() {
    return equname;
  }
 
  public void setequname(string equname) {
    equname = equname;
  }
 
  public string getorderid() {
    return orderid;
  }
 
  public void setorderid(string orderid) {
    this.orderid = orderid;
  }
 
  public string getequid() {
    return equid;
  }
 
  public void setequid(string equid) {
    equid = equid;
  }
 
  public string getorderstatus() {
    return orderstatus;
  }
 
  public void setorderstatus(string orderstatus) {
    this.orderstatus = orderstatus;
  }
 
 
  public ordertype getordertype() {
    return ordertype;
  }
 
  public void setordertype(ordertype ordertype) {
    this.ordertype = ordertype;
  }
 
  public date getcreatetime() {
    return createtime;
  }
 
  public void setcreatetime(date createtime) {
    this.createtime = createtime;
  }
 
  public list<string> gettouserids() {
    return touserids;
  }
 
  public void settouserids(list<string> touserids) {
    this.touserids = touserids;
  }
 
  public string tojsonstring(){
    return json.tojsonstring(this);
  }
}

springaop

1、系統a中創建一個切面類datainterceptor.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@aspect
@component
public class datainterceptor {
  @autowired
  private messagequeueservice queueservice;
 
 
  //維修工單切點
  @pointcut("execution(* com.zhishang.hes.common.service.impl.repairserviceimpl.executeflow(..))")
  private void repairmsg() {
  }
 
  /**
   * 返回通知,方法執行正常返回時觸發
   *
   * @param joinpoint
   * @param result
   */
  @afterreturning(value = "repairmsg()", returning = "result")
  public void afterreturning(joinpoint joinpoint, object result) {
    //此處可以獲得切點方法名
    //string methodname = joinpoint.getsignature().getname();
    equipmentrepair equipmentrepair = (equipmentrepair) result;
    websocketmsgentity websocketmsgentity = this.generaterepairmsgentity(equipmentrepair);
    if (websocketmsgentity == null) {
      return;
    }
    queueservice.send(websocketmsgentity);
  }
 
  /**
   * 生成發送到mq的維修消息
   *
   * @param equipmentrepair
   * @return
   */
  private websocketmsgentity generaterepairmsgentity(equipmentrepair equipmentrepair) {
    websocketmsgentity websocketmsgentity = generaterepairmsgfromtasks(equipmentrepair);
    return websocketmsgentity;
  }
 
  /**
   * 從任務中生成消息
   *
   * @param equipmentrepair
   * @return
   */
  private websocketmsgentity generaterepairmsgfromtasks(equipmentrepair equipmentrepair) {
    //業務代碼略
  }
 
}

2、發送消息到mq。這里只貼了發送的核心代碼

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class rabbitmessagequeue extends abstractmessagequeue {
 
  @value("${rabbitmq.websocket.msg.queue}")
  private string websocketmsgqueue;
 
  @autowired
  private rabbittemplate rabbittemplate;
 
  @override
  public void send(websocketmsgentity entity) {
    //沒有指定exchange,則使用默認名為“”的exchange,binding名與queue名相同
    rabbittemplate.convertandsend(websocketmsgqueue, entity);
  }
}

websocket

1、 系統b中引入websocket服務端dependency

?
1
2
3
4
5
<dependency>
  <groupid>org.springframework</groupid>
  <artifactid>spring-websocket</artifactid>
  <version>4.3.10.release</version>
</dependency>

2、 配置websocket,添加處理類

websocketconfigurer.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@configuration
@enablewebsocket
public class websocketconfig extends webmvcconfigureradapter implements websocketconfigurer {
 
  private static logger logger = loggerfactory.getlogger(websocketconfig.class);
 
  @override
  public void registerwebsockethandlers(websockethandlerregistry registry) {
    //配置websocket路徑
    registry.addhandler(messagewebsockethandler(),"/msg-websocket").addinterceptors(new myhandshakeinterceptor()).setallowedorigins("*");
    //配置websocket路徑 支持前端使用socketjs
    registry.addhandler(messagewebsockethandler(), "/sockjs/msg-websocket").setallowedorigins("*").addinterceptors(new myhandshakeinterceptor()).withsockjs();
  }
 
  @bean
  public messagewebsockethandler messagewebsockethandler() {
    logger.info("......創建messagewebsockethandler......");
    return new messagewebsockethandler();
  }
 
}

messagewebsockethandler.java 主要用于websocket連接及消息發送處理。配置中還使用了連接握手時的處理,主要是取用戶登陸信息,這里不多講述。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class messagewebsockethandler extends textwebsockethandler {
  private static logger logger = loggerfactory.getlogger(systemwebsockethandler.class);
  private static concurrenthashmap<string, copyonwritearrayset<websocketsession>> users = new concurrenthashmap<>();
 
  @override
  public void afterconnectionestablished(websocketsession session) throws exception {
    string userid = session.getattributes().get("websocket_userid").tostring();
    logger.info("......afterconnectionestablished......");
    logger.info("session.getid:" + session.getid());
    logger.info("session.getlocaladdress:" + session.getlocaladdress().tostring());
    logger.info("userid:" + userid);
    //websocket連接后記錄連接信息
    if (users.keyset().contains(userid)) {
      copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);
      websocketsessions.add(session);
    } else {
      copyonwritearrayset<websocketsession> websocketsessions = new copyonwritearrayset<>();
      websocketsessions.add(session);
      users.put(userid, websocketsessions);
    }
  }
 
  @override
  public void handletransporterror(websocketsession session, throwable throwable) throws exception {
    removeusersession(session);
    if (session.isopen()) {
      session.close();
    }
    logger.info("異常出現handletransporterror" + throwable.getmessage());
  }
 
  @override
  public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception {
    removeusersession(session);
    logger.info("關閉afterconnectionclosed" + closestatus.getreason());
  }
 
  @override
  public boolean supportspartialmessages() {
    return false;
  }
 
  /**
   * 給符合要求的在線用戶發送消息
   *
   * @param message
   */
  public void sendmessagetousers(string message, list<string> userids) throws ioexception{
    if (stringutils.isempty(message) || collectionutils.isempty(userids)) {
      return;
    }
    if (users.isempty()) {
      return;
    }
    for (string userid : userids) {
      if (!users.keyset().contains(userid)) {
        continue;
      }
      copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);
      if (websocketsessions == null) {
        continue;
      }
      for (websocketsession websocketsession : websocketsessions) {
        if (websocketsession.isopen()) {
          try {
            websocketsession.sendmessage(new textmessage(message));
          } catch (ioexception e) {
            logger.error(" websocket server send message error " + e.getmessage());
            try {
              throw e;
            } catch (ioexception e1) {
              e1.printstacktrace();
            }
          }
        }
      }
    }
  }
 
  /**
   * websocket清除連接信息
   *
   * @param session
   */
  private void removeusersession(websocketsession session) {
    string userid = session.getattributes().get("websocket_userid").tostring();
    if (users.keyset().contains(userid)) {
      copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);
      websocketsessions.remove(session);
      if (websocketsessions.isempty()) {
        users.remove(userid);
      }
    }
  }
}

整個功能完成后,a系統分配任務時,系統b登陸用戶收到的消息如圖:

SpringAOP+RabbitMQ+WebSocket實戰詳解

總體流程:

1、對于系統b,每個登陸的用戶都會和服務器建立websocket長連接。

2、系統a生成任務,aop做出響應,將封裝的消息發送給mq。

3、系統b中的mq監聽發現隊列有消息到達,消費消息。

4、系統b通過websocket長連接將消息發給指定的登陸用戶。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.cnblogs.com/little-sheep/p/9934887.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲邪恶天堂影院在线观看 | 天使萌痴汉在线中文字幕 | 亚洲成在人网站天堂一区二区 | 成人中文字幕在线观看 | 成人精品一区二区三区中文字幕 | 91久久国产视频 | 午夜精品久视频在线观看 | 欧美穿高跟鞋做爰 | 波多野结衣两女调教 | 99年水嫩漂亮粉嫩在线播放 | 日本视频在线免费播放 | 日本免费一区二区三区四区五六区 | 成人午夜影院在线观看 | 亚洲视频在线一区二区三区 | 婷婷综合久久中文字幕 | 国产成人精品视频一区二区不卡 | bt天堂在线最新版在线 | sao虎影院桃红视频在线观看 | 国产一级在线观看视频 | 亚洲精品免费在线观看 | 3d动漫美女物被遭强视频 | 91制片厂制作传媒免费版樱花 | 99久久久无码国产精品 | 国产草逼视频 | 国产精品乱码高清在线观看 | 99久久免费国产香蕉麻豆 | 国产成人a v在线影院 | 非洲一级毛片又粗又长aaaa | 国产精品亚洲精品青青青 | 美女扒开腿让男生捅 | 日本春菜花在线中文字幕 | 国产精品激情综合久久 | 国产欧美一区二区精品性色99 | gogort99人体专业网站 | 侮辱丰满美丽的人妻 | 奇米777四色精品综合影院 | 欧美特黄三级在线观看 | 99午夜 | 色婷丁香| 放荡警察巨r麻麻出轨小说 范冰冰特黄xx大片 饭冈加奈子在线播放观看 法国老妇性xx在线播放 | 国产成+人+综合+亚洲不卡 |