前言碎語
首先說明下需求,一個用戶中心產(chǎn)品,用戶在試用產(chǎn)品有三天的期限,三天到期后準(zhǔn)時準(zhǔn)點通知用戶,試用產(chǎn)品到期了。這個需求如果不是準(zhǔn)時通知,而是每天定點通知就簡單了。如果需要準(zhǔn)時通知就只能上延遲隊列了。使用場景除了如上,典型的業(yè)務(wù)場景還有電商中的延時未支付訂單失效等等。
延遲隊列多種實現(xiàn)方式
- 1.如基于RabbitMQ的隊列ttl+死信路由策略:通過設(shè)置一個隊列的超時未消費時間,配合死信路由策略,到達時間未消費后,回會將此消息路由到指定隊列
- 2.基于RabbitMQ延遲隊列插件(rabbitmq-delayed-message-exchange):發(fā)送消息時通過在請求頭添加延時參數(shù)(headers.put("x-delay", 5000))即可達到延遲隊列的效果
- 3.使用redis的zset有序性,輪詢zset中的每個元素,到點后將內(nèi)容遷移至待消費的隊列,(redisson已有實現(xiàn))
- 4.使用redis的key的過期通知策略,設(shè)置一個key的過期時間為延遲時間,過期后通知客戶端
redisson中的延遲隊列實現(xiàn)
怎么封裝便于業(yè)務(wù)使用。
1.首先定義一個延遲job,里面包含一個map參數(shù),和隊列執(zhí)行器的具體實現(xiàn)class,觸發(fā)任務(wù)執(zhí)行時,map參數(shù)會被傳遞到具體的業(yè)務(wù)執(zhí)行器實現(xiàn)內(nèi)
1
2
3
4
5
6
7
8
|
/** * Created by kl on 2018/7/20. * Content :延時job */ public class DelayJob { private Map jobParams; //job執(zhí)行參數(shù) private Class aClass; //具體執(zhí)行實例實現(xiàn) } |
2.定義一個延遲job執(zhí)行器接口,業(yè)務(wù)需要實現(xiàn)這個接口,然后在execute方法內(nèi)寫自己的業(yè)務(wù)邏輯
1
2
3
4
5
6
7
|
/** * Created by kl on 2018/7/20. * Content :延時job執(zhí)行器接口 */ public interface ExecuteJob { void execute(DelayJob job); } |
3.消費已經(jīng)到點的延時job服務(wù),通過job參數(shù)調(diào)用業(yè)務(wù)執(zhí)行器實現(xiàn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
@Component public class JobTimer { static final String jobsTag = "customer_jobtimer_jobs" ; @Autowired private RedissonClient client; @Autowired private ApplicationContext context; ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 ); @PostConstruct public void startJobTimer() { RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag); new Thread() { @Override public void run() { while ( true ) { try { DelayJob job = blockingQueue.take(); executorService.execute( new ExecutorTask(context, job)); } catch (Exception e) { e.printStackTrace(); try { TimeUnit.SECONDS.sleep( 60 ); } catch (Exception ex) { } } } } }.start(); } class ExecutorTask implements Runnable { private ApplicationContext context; private DelayJob delayJob; public ExecutorTask(ApplicationContext context, DelayJob delayJob) { this .context = context; this .delayJob = delayJob; } @Override public void run() { ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass()); service.execute(delayJob); } } } |
4.封裝延時job服務(wù)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/** * Created by kl on 2018/7/20. * Content :延時job服務(wù) */ @Component public class DelayJobService { @Autowired private RedissonClient client; public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){ RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag); RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue); delayedQueue.offer(job,delay,timeUnit); } } |
文末結(jié)語
redisson作為一個分布式利器,這么好用的工具沒人用有點可惜,還有一個原因是有個想法,想將延遲隊列這個功能封裝成一個spring boot的start依賴,然后開源出來,造福四方,希望大家以后多多支持服務(wù)器之家!
原文鏈接:http://www.kailing.pub/article/index/arcid/207.html