如何操作Redis和zookeeper實現分布式鎖
在分布式場景下,有很多種情況都需要實現最終一致性。在設計遠程上下文的領域事件的時候,為了保證最終一致性,在通過領域事件進行通訊的方式中,可以共享存儲(領域模型和消息的持久化數據源),或者做全局XA事務(兩階段提交,數據源可分開),也可以借助消息中間件(消費者處理需要能冪等)。通過Observer模式來發布領域事件可以提供很好的高并發性能,并且事件存儲也能追溯更小粒度的事件數據,使各個應用系統擁有更好的自治性。
1.分布式鎖
分布式鎖一般用在分布式系統或者多個應用中,用來控制同一任務是否執行或者任務的執行順序。在項目中,部署了多個tomcat應用,在執行定時任務時就會遇到同一任務可能執行多次的情況,我們可以借助分布式鎖,保證在同一時間只有一個tomcat應用執行了定時任務。
2.分布式鎖的實現方式
- 使用redis的setnx()和expire()
- 使用redis的getset()
- 使用zookeeper的創建節點node
- 使用zookeeper的創建臨時序列節點
3.使用redis的setnx()和expire()來實現分布式鎖
1
2
|
setnx( key ,value) 如果 key 不存在,設置為當前 key 的值為value;如果 key 存在,直接返回。 expire()來設置超時時間 |
定義注解類:
1
2
3
4
5
6
7
8
9
10
|
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface Lockable{ // redis緩存 key String key (); // redis緩存 key 中的數據 String value() default "" ; // 過期時間(秒),默認為一分鐘 long expire() default 60; } |
定時任務增加注解@Lockable:
1
2
3
|
@Lockable( key = "DistributedLock:dealExpireRecords" ) public void dealExpireRecords() { } |
定義一個aop切面LockAspect,使用@Around處理所有注解為@Lockable的方法,通過連接點確認此注解是用在方法上,通過方法獲取注解信息,使用setIfAbsent來判斷是否獲取分布式鎖,如果沒有獲取分布式鎖,直接返回;如果獲取到分布式鎖,通過expire設置過期時間,并調用指定方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
@Component @Slf4j @Aspect public class LockAspect { @Autowired private RedisTemplate redisTemplate; @Around( "@annotation(com.records.aop.Lockable)" ) public Object distributeLock(ProceedingJoinPoint pjp) { Object resultObject = null ; //確認此注解是用在方法上 Signature signature = pjp.getSignature(); if (!(signature instanceof MethodSignature)) { log.error( "Lockable is method annotation!" ); return resultObject; } MethodSignature methodSignature = (MethodSignature) signature; Method targetMethod = methodSignature.getMethod(); //獲取注解信息 Lockable lockable = targetMethod.getAnnotation(Lockable.class); String key = lockable. key (); String value = lockable.value(); long expire = lockable.expire(); // 分布式鎖,如果沒有此 key ,設置此值并返回 true ;如果有此 key ,則返回 false boolean result = redisTemplate.boundValueOps( key ).setIfAbsent(value); if (!result) { //其他程序已經獲取分布式鎖 return resultObject; } //設置過期時間,默認一分鐘 redisTemplate.boundValueOps( key ).expire(expire, TimeUnit.SECONDS); try { resultObject = pjp.proceed(); //調用對應方法執行 } catch (Throwable throwable) { throwable.printStackTrace(); } return resultObject; } } |
4.使用redis的getset()來實現分布式鎖
此方法使redisTemplate.boundValueOps(key).getAndSet(value)的方法,如果返回空,表示獲取了分布式鎖;如果返回不為空,表示分布式鎖已經被其他程序占用
5.使用zookeeper的創建節點node
使用zookeeper創建節點node,如果創建節點成功,表示獲取了此分布式鎖;如果創建節點失敗,表示此分布式鎖已經被其他程序占用(多個程序同時創建一個節點node,只有一個能夠創建成功)
6.使用zookeeper的創建臨時序列節點
使用zookeeper創建臨時序列節點來實現分布式鎖,適用于順序執行的程序,大體思路就是創建臨時序列節點,找出最小的序列節點,獲取分布式鎖,程序執行完成之后此序列節點消失,通過watch來監控節點的變化,從剩下的節點的找到最小的序列節點,獲取分布式鎖,執行相應處理,依次類推......
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!