一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服務(wù)器之家 - 編程語言 - JAVA教程 - 詳解Java如何實現(xiàn)基于Redis的分布式鎖

詳解Java如何實現(xiàn)基于Redis的分布式鎖

2020-06-08 12:03daisy JAVA教程

在不同進程需要互斥地訪問共享資源時,分布式鎖是一種非常有用的技術(shù)手段。這篇文章運用圖文和實例代碼介紹了Java如何實現(xiàn)基于Redis的分布式鎖,文章介紹的很詳細(xì),對Java和Redis剛興趣的朋友們可以參考借鑒,下面來一起看看

前言

單JVM內(nèi)同步好辦, 直接用JDK提供的鎖就可以了,但是跨進程同步靠這個肯定是不可能的,這種情況下肯定要借助第三方,我這里實現(xiàn)用Redis,當(dāng)然還有很多其他的實現(xiàn)方式。其實基于Redis實現(xiàn)的原理還算比較簡單的,在看代碼之前建議大家先去看看原理,看懂了之后看代碼應(yīng)該就容易理解了。

我這里不實現(xiàn)JDK的java.util.concurrent.locks.Lock接口,而是自定義一個,因為JDK的有個newCondition方法我這里暫時沒實現(xiàn)。這個Lock提供了5個lock方法的變體,可以自行選擇使用哪一個來獲取鎖,我的想法是最好用帶超時返回的那幾個方法,因為不這樣的話,假如redis掛了,線程永遠都在那死循環(huán)了(關(guān)于這里,應(yīng)該還可以進一步優(yōu)化,如果redis掛了,Jedis的操作肯定會拋異常之類的,可以定義個機制讓redis掛了的時候通知使用這個lock的用戶,或者說是線程)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
public interface Lock {
 
 /**
 * 阻塞性的獲取鎖, 不響應(yīng)中斷
 */
 void lock;
 
 /**
 * 阻塞性的獲取鎖, 響應(yīng)中斷
 *
 * @throws InterruptedException
 */
 void lockInterruptibly throws InterruptedException;
 
 /**
 * 嘗試獲取鎖, 獲取不到立即返回, 不阻塞
 */
 boolean tryLock;
 
 /**
 * 超時自動返回的阻塞性的獲取鎖, 不響應(yīng)中斷
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內(nèi)未???取到鎖
  *
 */
 boolean tryLock(long time, TimeUnit unit);
 
 /**
 * 超時自動返回的阻塞性的獲取鎖, 響應(yīng)中斷
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內(nèi)未獲取到鎖
 * @throws InterruptedException 在嘗試獲取鎖的當(dāng)前線程被中斷
 */
 boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
 
 /**
 * 釋放鎖
 */
 void unlock;
 
}

看其抽象實現(xiàn):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 鎖的骨架實現(xiàn), 真正的獲取鎖的步驟由子類去實現(xiàn).
 *
 * @author lixiaohui
 *
 */
public abstract class AbstractLock implements Lock {
 
 /**
 * <pre>
 * 這里需不需要保證可見性值得討論, 因為是分布式的鎖,
 * 1.同一個jvm的多個線程使用不同的鎖對象其實也是可以的, 這種情況下不需要保證可見性
 * 2.同一個jvm的多個線程使用同一個鎖對象, 那可見性就必須要保證了.
 * </pre>
 */
 protected volatile boolean locked;
 
 /**
 * 當(dāng)前jvm內(nèi)持有該鎖的線程(if have one)
 */
 private Thread exclusiveOwnerThread;
 
 public void lock {
 try {
 lock(false, 0, null, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 }
 
 public void lockInterruptibly throws InterruptedException {
 lock(false, 0, null, true);
 }
 
 public boolean tryLock(long time, TimeUnit unit) {
 try {
 return lock(true, time, unit, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 return false;
 }
 
 public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
 return lock(true, time, unit, true);
 }
 
 public void unlock {
 // TODO 檢查當(dāng)前線程是否持有鎖
 if (Thread.currentThread != getExclusiveOwnerThread) {
 throw new IllegalMonitorStateException("current thread does not hold the lock");
 }
 
 unlock0;
 setExclusiveOwnerThread(null);
 }
 
 protected void setExclusiveOwnerThread(Thread thread) {
 exclusiveOwnerThread = thread;
 }
 
 protected final Thread getExclusiveOwnerThread {
 return exclusiveOwnerThread;
 }
 
 protected abstract void unlock0;
 
 /**
 * 阻塞式獲取鎖的實現(xiàn)
 *
 * @param useTimeout
 * @param time
 * @param unit
 * @param interrupt 是否響應(yīng)中斷
 * @return
 * @throws InterruptedException
 */
 protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;
 
}

基于Redis的最終實現(xiàn),關(guān)鍵的獲取鎖,釋放鎖的代碼在這個類的lock方法和unlock0方法里,大家可以只看這兩個方法然后完全自己寫一個:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
import redis.clients.jedis.Jedis;
 
/**
 * <pre>
 * 基于Redis的SETNX操作實現(xiàn)的分布式鎖
 *
 * 獲取鎖時最好用lock(long time, TimeUnit unit), 以免網(wǎng)路問題而導(dǎo)致線程一直阻塞
 *
 * <a href="http://redis.io/commands/setnx">SETNC操作參考資料</a>
 * </pre>
 *
 * @author lixiaohui
 *
 */
public class RedisBasedDistributedLock extends AbstractLock {
 
 private Jedis jedis;
 
 // 鎖的名字
 protected String lockKey;
 
 // 鎖的有效時長(毫秒)
 protected long lockExpires;
 
 public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
 this.jedis = jedis;
 this.lockKey = lockKey;
 this.lockExpires = lockExpires;
 }
 
 // 阻塞式獲取鎖的實現(xiàn)
 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
 if (interrupt) {
 checkInterruption;
 }
 
 long start = System.currentTimeMillis;
 long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
 
 while (useTimeout ? isTimeout(start, timeout) : true) {
 if (interrupt) {
 checkInterruption;
 }
 
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖
 // TODO 成功獲取到鎖, 設(shè)置相關(guān)標(biāo)識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假設(shè)多個線程(非單jvm)同時走到這里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的)
 // 加入拿到的oldValue依然是expired的,那么就說明拿到鎖了
 if (oldValue != null && isTimeExpired(oldValue)) {
  // TODO 成功獲取到鎖, 設(shè)置相關(guān)標(biāo)識
  locked = true;
  setExclusiveOwnerThread(Thread.currentThread);
  return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }
 }
 return false;
 }
 
 public boolean tryLock {
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖
 // TODO 成功獲取到鎖, 設(shè)置相關(guān)標(biāo)識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假設(shè)多個線程(非單jvm)同時走到這里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的)
 // 假如拿到的oldValue依然是expired的,那么就說明拿到鎖了
 if (oldValue != null && isTimeExpired(oldValue)) {
 // TODO 成功獲取到鎖, 設(shè)置相關(guān)標(biāo)識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }
 
 return false;
 }
 
 /**
 * Queries if this lock is held by any thread.
 *
 * @return {@code true} if any thread holds this lock and
  *   {@code false} otherwise
 */
 public boolean isLocked {
 if (locked) {
 return true;
 } else {
 String value = jedis.get(lockKey);
 // TODO 這里其實是有問題的, 想:當(dāng)get方法返回value后, 假設(shè)這個value已經(jīng)是過期的了,
 // 而就在這瞬間, 另一個節(jié)點set了value, 這時鎖是被別的線程(節(jié)點持有), 而接下來的判斷
 // 是檢測不出這種情況的.不過這個問題應(yīng)該不會導(dǎo)致其它的問題出現(xiàn), 因為這個方法的目的本來就
 // 不是同步控制, 它只是一種鎖狀態(tài)的報告.
 return !isTimeExpired(value);
 }
 }
 
 @Override
 protected void unlock0 {
 // TODO 判斷鎖是否過期
 String value = jedis.get(lockKey);
 if (!isTimeExpired(value)) {
 doUnlock;
 }
 }
 
 private void checkInterruption throws InterruptedException {
 if(Thread.currentThread.isInterrupted) {
 throw new InterruptedException;
 }
 }
 
 private boolean isTimeExpired(String value) {
 return Long.parseLong(value) < System.currentTimeMillis;
 }
 
 private boolean isTimeout(long start, long timeout) {
 return start + timeout > System.currentTimeMillis;
 }
 
 private void doUnlock {
 jedis.del(lockKey);
 }
 
}

如果將來還換一種實現(xiàn)方式(比如zookeeper之類的),到時直接繼承AbstractLock并實現(xiàn)lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)unlock0方法即可(所謂抽象嘛)

測試

模擬全局ID增長器,設(shè)計一個IDGenerator類,該類負(fù)責(zé)生成全局遞增ID,其代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cc.lixiaohui.lock;
 
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
 
/**
 * 模擬ID生成
 * @author lixiaohui
 *
 */
public class IDGenerator {
 
 private static BigInteger id = BigInteger.valueOf(0);
 
 private final Lock lock;
 
 private static final BigInteger INCREMENT = BigInteger.valueOf(1);
 
 public IDGenerator(Lock lock) {
 this.lock = lock;
 }
 
 public String getAndIncrement {
 if (lock.tryLock(3, TimeUnit.SECONDS)) {
 try {
 // TODO 這里獲取到鎖, 訪問臨界區(qū)資源
 return getAndIncrement0;
 } finally {
 lock.unlock;
 }
 }
 return null;
 //return getAndIncrement0;
 }
 
 private String getAndIncrement0 {
 String s = id.toString;
 id = id.add(INCREMENT);
 return s;
 }
}

測試主邏輯:同一個JVM內(nèi)開兩個線程死循環(huán)地(循環(huán)之間無間隔,有的話測試就沒意義了)獲取ID(我這里并不是死循環(huán)而是跑20s),獲取到ID存到同一個Set里面,在存之前先檢查該IDset中是否存在,如果已存在,則讓兩個線程都停止。如果程序能正常跑完20s,那么說明這個分布式鎖還算可以滿足要求,如此測試的效果應(yīng)該和不同JVM(也就是真正的分布式環(huán)境中)測試的效果是一樣的,下面是測試類的代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package cc.lixiaohui.DistributedLock.DistributedLock;
 
import java.util.HashSet;
import java.util.Set;
 
import org.junit.Test;
 
import redis.clients.jedis.Jedis;
import cc.lixiaohui.lock.IDGenerator;
import cc.lixiaohui.lock.Lock;
import cc.lixiaohui.lock.RedisBasedDistributedLock;
 
public class IDGeneratorTest {
 
 private static Set<String> generatedIds = new HashSet<String>;
 
 private static final String LOCK_KEY = "lock.lock";
 private static final long LOCK_EXPIRE = 5 * 1000;
 
 @Test
 public void test throws InterruptedException {
 Jedis jedis1 = new Jedis("localhost", 6379);
 Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g1 = new IDGenerator(lock1);
 IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");
 
 Jedis jedis2 = new Jedis("localhost", 6379);
 Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g2 = new IDGenerator(lock2);
 IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");
 
 Thread t1 = new Thread(consume1);
 Thread t2 = new Thread(consume2);
 t1.start;
 t2.start;
 
 Thread.sleep(20 * 1000); //讓兩個線程跑20秒
 
 IDConsumeMission.stop;
 
 t1.join;
 t2.join;
 }
 
 static String time {
 return String.valueOf(System.currentTimeMillis / 1000);
 }
 
 static class IDConsumeMission implements Runnable {
 
 private IDGenerator idGenerator;
 
 private String name;
 
 private static volatile boolean stop;
 
 public IDConsumeMission(IDGenerator idGenerator, String name) {
 this.idGenerator = idGenerator;
 this.name = name;
 }
 
 public static void stop {
 stop = true;
 }
 
 public void run {
 System.out.println(time + ": consume " + name + " start ");
 while (!stop) {
 String id = idGenerator.getAndIncrement;
 if(generatedIds.contains(id)) {
  System.out.println(time + ": duplicate id generated, id = " + id);
  stop = true;
  continue;
 }
 
 generatedIds.add(id);
 System.out.println(time + ": consume " + name + " add id = " + id);
 }
 System.out.println(time + ": consume " + name + " done ");
 }
 
 }
 
}

說明一點,我這里停止兩個線程的方式并不是很好,我是為了方便才這么做的,因為只是測試,最好不要這么做。

測試結(jié)果

跑20s打印的東西太多,前面打印的被clear了,只有差不多跑完的時候才有,下面截圖。說明了這個鎖能正常工作:

詳解Java如何實現(xiàn)基于Redis的分布式鎖

當(dāng)IDGererator沒有加鎖(即IDGereratorgetAndIncrement方法內(nèi)部獲取id時不上鎖)時,測試是不通過的,非常大的概率中途就會停止,下面是不加鎖時的測試結(jié)果:

這個1秒都不到:

詳解Java如何實現(xiàn)基于Redis的分布式鎖

這個也1秒都不到:

詳解Java如何實現(xiàn)基于Redis的分布式鎖

結(jié)束語

好了,以上就是Java實現(xiàn)基于Redis的分布式鎖的全部內(nèi)容,各位如果發(fā)現(xiàn)問題希望能指正,希望這篇文章能對大家的學(xué)習(xí)和工作帶來一定的幫助,如果有疑問可以留言交流。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲欧美一区二区三区在饯 | 亚洲精品国偷拍自产在线观看蜜臀 | 99年水嫩漂亮粉嫩在线播放 | 国产精品久久久久久久久免费 | 插得好舒服| 果冻传媒天美传媒在线小视频播放 | 脱jk裙的美女露小内内无遮挡 | 日本强不卡在线观看 | 波多野结衣快播 | 天天爱天天操天天射 | 色老板在线免费观看 | 特黄视频 | 色综合天天五月色 | 垫底辣妹免费观看完整版 | 欧美亚洲一区二区三区在线 | 日韩精品久久不卡中文字幕 | 久久久无码精品无码国产人妻丝瓜 | 欧美一区二区三区免费不卡 | juy_661佐佐木明希在线播放 | 毛片亚洲毛片亚洲毛片 | 亚洲bt区 | 高h孕交 | 激情三级做爰在线观看激情 | 国产精品中文 | 无限在线看免费视频大全 | 婷婷激情综合五月天 | 欧美国产日韩在线播放 | 天天爱天天做天天爽天天躁 | 国产视频久久久久 | xvideoscom极品肌肉警察 | 国产成人精品高清在线观看99 | 日朝欧美亚洲精品 | 情乱奶水欲 | 69av美女| 亚洲欧美日韩国产综合专区 | 黑人巨荃大战乌克兰美女 | 视频精品一区二区三区 | 国产探花在线视频 | 青草视频久久 | 四虎最新网址在线观看 | 成人高辣h视频一区二区在线观看 |