場景
開發(fā)中經(jīng)常需要用到定時任務(wù),對于商城來說,定時任務(wù)尤其多,比如優(yōu)惠券定時過期、訂單定時關(guān)閉、微信支付2小時未支付關(guān)閉訂單等等,都需要用到定時任務(wù),但是定時任務(wù)本身有一個問題,一般來說我們都是通過定時輪詢查詢數(shù)據(jù)庫來判斷是否有任務(wù)需要執(zhí)行,也就是說不管怎么樣,我們需要先查詢數(shù)據(jù)庫,而且有些任務(wù)對時間準(zhǔn)確要求比較高的,需要每秒查詢一次,對于系統(tǒng)小倒是無所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實了,所以需要其他方式的,當(dāng)然實現(xiàn)的方式有多種多樣的,比如redis實現(xiàn)定時隊列、基于優(yōu)先級隊列的jdk延遲隊列、時間輪等。因為我們項目中本身就使用到了rabbitmq,所以基于方便開發(fā)和維護(hù)的原則,我們使用了rabbitmq延遲隊列來實現(xiàn)定時任務(wù),不知道rabbitmq是什么的和不知道springboot怎么集成rabbitmq的可以查看我之前的文章 spring boot集成rabbitmq
rabbitmq延遲隊列
rabbitmq本身是沒有延遲隊列的,只能通過rabbitmq本身隊列的特性來實現(xiàn),想要rabbitmq實現(xiàn)延遲隊列,需要使用rabbitmq的死信交換機(jī)(exchange)和消息的存活時間ttl(time to live)
死信交換機(jī)
一個消息在滿足如下條件下,會進(jìn)死信交換機(jī),記住這里是交換機(jī)而不是隊列,一個交換機(jī)可以對應(yīng)很多隊列。
- 一個消息被consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費(fèi)者使用。
- 上面的消息的ttl到了,消息過期了。
- 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
死信交換機(jī)就是普通的交換機(jī),只是因為我們把過期的消息扔進(jìn)去,所以叫死信交換機(jī),并不是說死信交換機(jī)是某種特定的交換機(jī)
消息ttl(消息存活時間)
消息的ttl就是消息的存活時間。rabbitmq可以對隊列和消息分別設(shè)置ttl。對隊列設(shè)置就是隊列沒有消費(fèi)者連著的保留時間,也可以對每一個單獨(dú)的消息做單獨(dú)的設(shè)置。超過了這個時間,我們認(rèn)為這個消息就死了,稱之為死信。如果隊列設(shè)置了,消息也設(shè)置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設(shè)置)。這里單講單個消息的ttl,因為它才是實現(xiàn)延遲任務(wù)的關(guān)鍵。
1
2
3
4
|
byte [] messagebodybytes = "hello, world!" .getbytes(); amqp.basicproperties properties = new amqp.basicproperties(); properties.setexpiration( "60000" ); channel.basicpublish( "my-exchange" , "queue-key" , properties, messagebodybytes); |
可以通過設(shè)置消息的expiration字段或者x-message-ttl屬性來設(shè)置時間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個int類型的字符串: 當(dāng)上面的消息扔到隊列中后,過了60秒,如果沒有被消費(fèi),它就死了。不會被消費(fèi)者消費(fèi)到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費(fèi)者消費(fèi)。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去
處理流程圖
創(chuàng)建交換機(jī)(exchanges)和隊列(queues)
創(chuàng)建死信交換機(jī)
如圖所示,就是創(chuàng)建一個普通的交換機(jī),這里為了方便區(qū)分,把交換機(jī)的名字取為:delay
創(chuàng)建自動過期消息隊列
這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關(guān)閉訂單,我們就需要把消息放進(jìn)這個隊列里面,把消息過期時間設(shè)置為2小時
創(chuàng)建一個一個名為delay_queue1的自動過期的隊列,當(dāng)然圖片上面的參數(shù)并不會讓消息自動過期,因為我們并沒有設(shè)置x-message-ttl參數(shù),如果整個隊列的消息有消息都是相同的,可以設(shè)置,這里為了靈活,所以并沒有設(shè)置,另外兩個參數(shù)x-dead-letter-exchange代表消息過期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個道理,根據(jù)這個key將消息放入不同的隊列
創(chuàng)建消息處理隊列
這個隊列才是真正處理消息的隊列,所有進(jìn)入這個隊列的消息都會被處理
消息隊列的名字為delay_queue2
消息隊列綁定到交換機(jī)
進(jìn)入交換機(jī)詳情頁面,將創(chuàng)建的2個隊列(delay queue1和delay queue2)綁定到交換機(jī)上面
自動過期消息隊列的routing key 設(shè)置為delay
綁定delay queue2
delay queue2 的key要設(shè)置為創(chuàng)建自動過期的隊列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了
綁定后的管理頁面如下圖:
當(dāng)然這個綁定也可以使用代碼來實現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺來操作
發(fā)送消息
1
2
3
4
5
6
|
string msg = "hello word" ; messageproperties messageproperties = new messageproperties(); messageproperties.setexpiration( "6000" ); messageproperties.setcorrelationid(uuid.randomuuid().tostring().getbytes()); message message = new message(msg.getbytes(), messageproperties); rabbittemplate.convertandsend( "delay" , "delay" ,message); |
主要的代碼就是
1
|
messageproperties.setexpiration( "6000" ); |
設(shè)置了讓消息6秒后過期
注意:因為要讓消息自動過期,所以一定不能設(shè)置delay_queue1的監(jiān)聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過期了
接收消息
接收消息配置好delay_queue2的監(jiān)聽就好了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.acknowledgemode; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.message; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.channelawaremessagelistener; import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class delayqueue { /** 消息交換機(jī)的名字*/ public static final string exchange = "delay" ; /** 隊列key1*/ public static final string routingkey1 = "delay" ; /** 隊列key2*/ public static final string routingkey2 = "delay_key" ; /** * 配置鏈接信息 * @return */ @bean public connectionfactory connectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory( "120.76.237.8" , 5672 ); connectionfactory.setusername( "kberp" ); connectionfactory.setpassword( "kberp" ); connectionfactory.setvirtualhost( "/" ); connectionfactory.setpublisherconfirms( true ); // 必須要設(shè)置 return connectionfactory; } /** * 配置消息交換機(jī) * 針對消費(fèi)者配置 fanoutexchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念 headersexchange :通過添加屬性key-value匹配 directexchange:按照routingkey分發(fā)到指定隊列 topicexchange:多關(guān)鍵字匹配 */ @bean public directexchange defaultexchange() { return new directexchange(exchange, true , false ); } /** * 配置消息隊列2 * 針對消費(fèi)者配置 * @return */ @bean public queue queue() { return new queue( "delay_queue2" , true ); //隊列持久 } /** * 將消息隊列2與交換機(jī)綁定 * 針對消費(fèi)者配置 * @return */ @bean @autowired public binding binding() { return bindingbuilder.bind(queue()).to(defaultexchange()).with(delayqueue.routingkey2); } /** * 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息 * 針對消費(fèi)者配置 * @return */ @bean @autowired public simplemessagelistenercontainer messagecontainer2(connectionfactory connectionfactory) { simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory()); container.setqueues(queue()); container.setexposelistenerchannel( true ); container.setmaxconcurrentconsumers( 1 ); container.setconcurrentconsumers( 1 ); container.setacknowledgemode(acknowledgemode.manual); //設(shè)置確認(rèn)模式手工確認(rèn) container.setmessagelistener( new channelawaremessagelistener() { public void onmessage(message message, com.rabbitmq.client.channel channel) throws exception { byte [] body = message.getbody(); system.out.println( "delay_queue2 收到消息 : " + new string(body)); channel.basicack(message.getmessageproperties().getdeliverytag(), false ); //確認(rèn)消息成功消費(fèi) } }); return container; } } |
在消息監(jiān)聽中處理需要定時處理的任務(wù)就好了,因為rabbitmq能發(fā)送消息,所以可以把任務(wù)特征碼發(fā)過來,比如關(guān)閉訂單就把訂單id發(fā)過來,這樣就避免了需要查詢一下那些訂單需要關(guān)閉而加重mysql負(fù)擔(dān)了,畢竟一旦訂單量大的話,查詢本身也是一件很費(fèi)io的事情
總結(jié)
基于rabbitmq實現(xiàn)定時任務(wù),就是將消息設(shè)置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉(zhuǎn)入另外一個隊列中,監(jiān)控這個隊列消息的監(jiān)聽處來處理定時任務(wù)具體的操作
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://raye.wang/2018/05/19/rabbitmqyan-chi-dui-lie-shi-xian-ding-shi-ren-wu/