一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - Redis - 基于redis分布式鎖實現秒殺功能

基于redis分布式鎖實現秒殺功能

2019-11-11 14:52lsfire Redis

這篇文章主要為大家詳細介紹了基于redis分布式鎖實現秒殺功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下

最近在項目中遇到了類似“秒殺”的業務場景,在本篇博客中,我將用一個非常簡單的demo,闡述實現所謂“秒殺”的基本思路。

業務場景

所謂秒殺,從業務角度看,是短時間內多個用戶“爭搶”資源,這里的資源在大部分秒殺場景里是商品;將業務抽象,技術角度看,秒殺就是多個線程對資源進行操作,所以實現秒殺,就必須控制線程對資源的爭搶,既要保證高效并發,也要保證操作的正確。

一些可能的實現

剛才提到過,實現秒殺的關鍵點是控制線程對資源的爭搶,根據基本的線程知識,可以不加思索的想到下面的一些方法:
1、秒殺在技術層面的抽象應該就是一個方法,在這個方法里可能的操作是將商品庫存-1,將商品加入用戶的購物車等等,在不考慮緩存的情況下應該是要操作數據庫的。那么最簡單直接的實現就是在這個方法上加上synchronized關鍵字,通俗的講就是鎖住整個方法;
2、鎖住整個方法這個策略簡單方便,但是似乎有點粗暴。可以稍微優化一下,只鎖住秒殺的代碼塊,比如寫數據庫的部分;
3、既然有并發問題,那我就讓他“不并發”,將所有的線程用一個隊列管理起來,使之變成串行操作,自然不會有并發問題。

上面所述的方法都是有效的,但是都不好。為什么?第一和第二種方法本質上是“加鎖”,但是鎖粒度依然比較高。什么意思?試想一下,如果兩個線程同時執行秒殺方法,這兩個線程操作的是不同的商品,從業務上講應該是可以同時進行的,但是如果采用第一二種方法,這兩個線程也會去爭搶同一個鎖,這其實是不必要的。第三種方法也沒有解決上面說的問題。

那么如何將鎖控制在更細的粒度上呢?可以考慮為每個商品設置一個互斥鎖,以和商品ID相關的字符串為唯一標識,這樣就可以做到只有爭搶同一件商品的線程互斥,不會導致所有的線程互斥。分布式鎖恰好可以幫助我們解決這個問題。

何為分布式鎖

分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。在分布式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分布式鎖。

我們來假設一個最簡單的秒殺場景:數據庫里有一張表,column分別是商品ID,和商品ID對應的庫存量,秒殺成功就將此商品庫存量-1。現在假設有1000個線程來秒殺兩件商品,500個線程秒殺第一個商品,500個線程秒殺第二個商品。我們來根據這個簡單的業務場景來解釋一下分布式鎖。
通常具有秒殺場景的業務系統都比較復雜,承載的業務量非常巨大,并發量也很高。這樣的系統往往采用分布式的架構來均衡負載。那么這1000個并發就會是從不同的地方過來,商品庫存就是共享的資源,也是這1000個并發爭搶的資源,這個時候我們需要將并發互斥管理起來。這就是分布式鎖的應用。
而key-value存儲系統,如redis,因為其一些特性,是實現分布式鎖的重要工具。

具體的實現

先來看看一些redis的基本命令:
SETNX key value
如果key不存在,就設置key對應字符串value。在這種情況下,該命令和SET一樣。當key已經存在時,就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
設置key的過期時間。如果key已過期,將會被自動刪除。
del KEY
刪除key
由于筆者的實現只用到這三個命令,就只介紹這三個命令,更多的命令以及redis的特性和使用,可以參考redis官網。

需要考慮的問題

1、用什么操作redis?幸虧redis已經提供了jedis客戶端用于java應用程序,直接調用jedis API即可。
2、怎么實現加鎖?“鎖”其實是一個抽象的概念,將這個抽象概念變為具體的東西,就是一個存儲在redis里的key-value對,key是于商品ID相關的字符串來唯一標識,value其實并不重要,因為只要這個唯一的key-value存在,就表示這個商品已經上鎖。
3、如何釋放鎖?既然key-value對存在就表示上鎖,那么釋放鎖就自然是在redis里刪除key-value對。
4、阻塞還是非阻塞?筆者采用了阻塞式的實現,若線程發現已經上鎖,會在特定時間內輪詢鎖。
5、如何處理異常情況?比如一個線程把一個商品上了鎖,但是由于各種原因,沒有完成操作(在上面的業務場景里就是沒有將庫存-1寫入數據庫),自然沒有釋放鎖,這個情況筆者加入了鎖超時機制,利用redis的expire命令為key設置超時時長,過了超時時間redis就會將這個key自動刪除,即強制釋放鎖(可以認為超時釋放鎖是一個異步操作,由redis完成,應用程序只需要根據系統特點設置超時時間即可)。

talk is cheap,show me the code

在代碼實現層面,注解有并發的方法和參數,通過動態代理獲取注解的方法和參數,在代理中加鎖,執行完被代理的方法后釋放鎖。

幾個注解定義:

cachelock是方法級的注解,用于注解會產生并發問題的方法:

?
1
2
3
4
5
6
7
8
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
 String lockedPrefix() default "";//redis 鎖key的前綴
 long timeOut() default 2000;//輪詢鎖的時間
 int expireTime() default 1000;//key在redis里存在的時間,1000S
}

lockedObject是參數級的注解,用于注解商品ID等基本類型的參數:

?
1
2
3
4
5
6
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
 //不需要值
}

LockedComplexObject也是參數級的注解,用于注解自定義類型的參數:

?
1
2
3
4
5
6
7
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
 String field() default "";//含有成員變量的復雜對象中需要加鎖的成員變量,如一個商品對象的商品ID
 
}

CacheLockInterceptor實現InvocationHandler接口,在invoke方法中獲取注解的方法和參數,在執行注解的方法前加鎖,執行被注解的方法后釋放鎖:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class CacheLockInterceptor implements InvocationHandler{
 public static int ERROR_COUNT = 0;
 private Object proxied;
 
 public CacheLockInterceptor(Object proxied) {
 this.proxied = proxied;
 }
 
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 
 CacheLock cacheLock = method.getAnnotation(CacheLock.class);
 //沒有cacheLock注解,pass
 if(null == cacheLock){
  System.out.println("no cacheLock annotation"); 
  return method.invoke(proxied, args);
 }
 //獲得方法中參數的注解
 Annotation[][] annotations = method.getParameterAnnotations();
 //根據獲取到的參數注解和參數列表獲得加鎖的參數
 Object lockedObject = getLockedObject(annotations,args);
 String objectValue = lockedObject.toString();
 //新建一個鎖
 RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
 //加鎖
 boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
 if(!result){//取鎖失敗
  ERROR_COUNT += 1;
  throw new CacheLockException("get lock fail");
 
 }
 try{
  //加鎖成功,執行方法
  return method.invoke(proxied, args);
 }finally{
  lock.unlock();//釋放鎖
 }
 
 }
 /**
 *
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
 private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
 if(null == args || args.length == 0){
  throw new CacheLockException("方法參數為空,沒有被鎖定的對象");
 }
 
 if(null == annotations || annotations.length == 0){
  throw new CacheLockException("沒有被注解的參數");
 }
 //不支持多個參數加鎖,只支持第一個注解為lockedObject或者lockedComplexObject的參數
 int index = -1;//標記參數的位置指針
 for(int i = 0;i < annotations.length;i++){
  for(int j = 0;j < annotations[i].length;j++){
  if(annotations[i][j] instanceof LockedComplexObject){//注解為LockedComplexObject
   index = i;
   try {
   return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
   } catch (NoSuchFieldException | SecurityException e) {
   throw new CacheLockException("注解對象中沒有該屬性" + ((LockedComplexObject)annotations[i][j]).field());
   }
  }
 
  if(annotations[i][j] instanceof LockedObject){
   index = i;
   break;
  }
  }
  //找到第一個后直接break,不支持多參數加鎖
  if(index != -1){
  break;
  }
 }
 
 if(index == -1){
  throw new CacheLockException("請指定被鎖定參數");
 }
 
 return args[index];
 }
}

最關鍵的RedisLock類中的lock方法和unlock方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * 加鎖
 * 使用方式為:
 * lock();
 * try{
 * executeMethod();
 * }finally{
 * unlock();
 * }
 * @param timeout timeout的時間范圍內輪詢鎖
 * @param expire 設置鎖超時時間
 * @return 成功 or 失敗
 */
 public boolean lock(long timeout,int expire){
 long nanoTime = System.nanoTime();
 timeout *= MILLI_NANO_TIME;
 try {
  //在timeout的時間范圍內不斷輪詢鎖
  while (System.nanoTime() - nanoTime < timeout) {
  //鎖不存在的話,設置鎖并設置鎖過期時間,即加鎖
  if (this.redisClient.setnx(this.key, LOCKED) == 1) {
   this.redisClient.expire(key, expire);//設置鎖過期時間是為了在沒有釋放
   //鎖的情況下鎖過期后消失,不會造成永久阻塞
   this.lock = true;
   return this.lock;
  }
  System.out.println("出現鎖等待");
  //短暫休眠,避免可能的活鎖
  Thread.sleep(3, RANDOM.nextInt(30));
  }
 } catch (Exception e) {
  throw new RuntimeException("locking error",e);
 }
 return false;
 }
 
 public void unlock() {
 try {
  if(this.lock){
  redisClient.delKey(key);//直接刪除
  }
 } catch (Throwable e) {
 
 }
 }

上述的代碼是框架性的代碼,現在來講解如何使用上面的簡單框架來寫一個秒殺函數。

先定義一個接口,接口里定義了一個秒殺方法:

?
1
2
3
4
5
6
7
8
public interface SeckillInterface {
/**
*現在暫時只支持在接口方法上注解
*/
 //cacheLock注解可能產生并發的方法
 @CacheLock(lockedPrefix="TEST_PREFIX")
 public void secKill(String userID,@LockedObject Long commidityID);//最簡單的秒殺方法,參數是用戶ID和商品ID。可能有多個線程爭搶一個商品,所以商品ID加上LockedObject注解
}

上述SeckillInterface接口的實現類,即秒殺的具體實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SecKillImpl implements SeckillInterface{
 static Map<Long, Long> inventory ;
 static{
 inventory = new HashMap<>();
 inventory.put(10000001L, 10000l);
 inventory.put(10000002L, 10000l);
 }
 
 @Override
 public void secKill(String arg1, Long arg2) {
 //最簡單的秒殺,這里僅作為demo示例
 reduceInventory(arg2);
 }
 //模擬秒殺操作,姑且認為一個秒殺就是將庫存減一,實際情景要復雜的多
 public Long reduceInventory(Long commodityId){
 inventory.put(commodityId,inventory.get(commodityId) - 1);
 return inventory.get(commodityId);
 }
 
}

模擬秒殺場景,1000個線程來爭搶兩個商品:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Test
 public void testSecKill(){
 int threadCount = 1000;
 int splitPoint = 500;
 CountDownLatch endCount = new CountDownLatch(threadCount);
 CountDownLatch beginCount = new CountDownLatch(1);
 SecKillImpl testClass = new SecKillImpl();
 
 Thread[] threads = new Thread[threadCount];
 //起500個線程,秒殺第一個商品
 for(int i= 0;i < splitPoint;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   //等待在一個信號量上,掛起
   beginCount.await();
   //用動態代理的方式調用secKill方法
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId1);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();
 
 }
 //再起500個線程,秒殺第二件商品
 for(int i= splitPoint;i < threadCount;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   //等待在一個信號量上,掛起
   beginCount.await();
   //用動態代理的方式調用secKill方法
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId2);
   //testClass.testFunc("test", 10000001L);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();
 
 }
 
 
 long startTime = System.currentTimeMillis();
 //主線程釋放開始信號量,并等待結束信號量,這樣做保證1000個線程做到完全同時執行,保證測試的正確性
 beginCount.countDown();
 
 try {
  //主線程等待結束信號量
  endCount.await();
  //觀察秒殺結果是否正確
  System.out.println(SecKillImpl.inventory.get(commidityId1));
  System.out.println(SecKillImpl.inventory.get(commidityId2));
  System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
  System.out.println("total cost " + (System.currentTimeMillis() - startTime));
 } catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }
 }

在正確的預想下,應該每個商品的庫存都減少了500,在多次試驗后,實際情況符合預想。如果不采用鎖機制,會出現庫存減少499,498的情況。
這里采用了動態代理的方法,利用注解和反射機制得到分布式鎖ID,進行加鎖和釋放鎖操作。當然也可以直接在方法進行這些操作,采用動態代理也是為了能夠將鎖操作代碼集中在代理中,便于維護。
通常秒殺場景發生在web項目中,可以考慮利用spring的AOP特性將鎖操作代碼置于切面中,當然AOP本質上也是動態代理。

小結

這篇文章從業務場景出發,從抽象到實現闡述了如何利用redis實現分布式鎖,完成簡單的秒殺功能,也記錄了筆者思考的過程,希望能給閱讀到本篇文章的人一些啟發。

源碼倉庫

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
  • Redis詳解Redis復制原理

    詳解Redis復制原理

    與大多數db一樣,Redis也提供了復制機制,以滿足故障恢復和負載均衡等需求。復制也是Redis高可用的基礎,哨兵和集群都是建立在復制基礎上實現高可用的...

    李留廣10222021-08-09
  • RedisRedis如何實現數據庫讀寫分離詳解

    Redis如何實現數據庫讀寫分離詳解

    Redis的主從架構,能幫助我們實現讀多,寫少的情況,下面這篇文章主要給大家介紹了關于Redis如何實現數據庫讀寫分離的相關資料,文中通過示例代碼介紹...

    羅兵漂流記6092019-11-11
  • RedisRedis全量復制與部分復制示例詳解

    Redis全量復制與部分復制示例詳解

    這篇文章主要給大家介紹了關于Redis全量復制與部分復制的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Redis爬蟲具有一定的參考學習...

    豆子先生5052019-11-27
  • Redisredis中如何使用lua腳本讓你的靈活性提高5個逼格詳解

    redis中如何使用lua腳本讓你的靈活性提高5個逼格詳解

    這篇文章主要給大家介紹了關于redis中如何使用lua腳本讓你的靈活性提高5個逼格的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具...

    一線碼農5812019-11-18
  • Redisredis實現排行榜功能

    redis實現排行榜功能

    排行榜在很多地方都能使用到,redis的zset可以很方便地用來實現排行榜功能,本文就來簡單的介紹一下如何使用,具有一定的參考價值,感興趣的小伙伴們...

    乘月歸5022021-08-05
  • RedisRedis的配置、啟動、操作和關閉方法

    Redis的配置、啟動、操作和關閉方法

    今天小編就為大家分享一篇Redis的配置、啟動、操作和關閉方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧 ...

    大道化簡5312019-11-14
  • Redisredis 交集、并集、差集的具體使用

    redis 交集、并集、差集的具體使用

    這篇文章主要介紹了redis 交集、并集、差集的具體使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友...

    xiaojin21cen10152021-07-27
  • RedisRedis 事務知識點相關總結

    Redis 事務知識點相關總結

    這篇文章主要介紹了Redis 事務相關總結,幫助大家更好的理解和學習使用Redis,感興趣的朋友可以了解下...

    AsiaYe8232021-07-28
主站蜘蛛池模板: julia ann一hd| 97影院网| 亚洲va韩国va欧美va天堂 | 欧美成人tv| 久久高清一级毛片 | 国产一区国产二区国产三区 | 国产成人综合网亚洲欧美在线 | 国产精品毛片va一区二区三区 | 丝瓜草莓香蕉绿巨人幸福宝 | 强女明星系列小说 | 欧美一区二区三区在线观看不卡 | 国产精品极品美女自在线 | 国产精品国色综合久久 | 国产免费一区二区 | 日日草视频 | 亚洲成人伦理 | 久久伊人在 | 国产一区二区三区福利 | 无遮挡h肉动漫高清在线 | 日韩精品亚洲专区在线影视 | 久久www免费人成高清 | 亚洲国产精品自在在线观看 | 免费看男女污污完整版 | 男女视频在线观看 | 久久re亚洲在线视频 | 国产成人愉拍免费视频 | 富士av105| 亚洲欧美精品一区二区 | 精品国产成人AV在线看 | 石原莉奈被店长侵犯免费 | 皇上撞着太子妃的秘密小说 | 亚洲va在线va天堂成人 | 日本中文字幕一区二区有码在线 | 国产欧美精品专区一区二区 | 996热精品视频在线观看 | 贤妻良母电影日本 | 午夜免费无码福利视频麻豆 | 肉搏潘金莲三级18春 | 免费网址在线观看入口推荐 | 美女扒开屁股让我桶免费 | 久久99r66热这里只有精品 |