一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 基于ZooKeeper實現隊列源碼

基于ZooKeeper實現隊列源碼

2021-01-05 11:11MassiveStars Java教程

這篇文章主要介紹了基于ZooKeeper實現隊列源碼的相關內容,包括其實現原理和應用場景,以及對隊列的簡單介紹,具有一定參考價值,需要的朋友可以了解下。

實現原理

先進先出隊列是最常用的隊列,使用Zookeeper實現先進先出隊列就是在特定的目錄下創建PERSISTENT_EQUENTIAL節點,創建成功時Watcher通知等待的隊列,隊列刪除序列號最小的節點用以消費。此場景下Zookeeper的znode用于消息存儲,znode存儲的數據就是消息隊列中的消息內容,SEQUENTIAL序列號就是消息的編號,按序取出即可。由于創建的節點是持久化的,所以不必擔心隊列消息的丟失問題。

隊列(Queue)

分布式隊列是通用的數據結構,為了在 Zookeeper 中實現分布式隊列,首先需要指定一個 Znode 節點作為隊列節點(queue node), 各個分布式客戶端通過調用 create() 函數向隊列中放入數據,調用create()時節點路徑名帶"qn-"結尾,并設置順序(sequence)節點標志。 由于設置了節點的順序標志,新的路徑名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增號。需要從隊列中獲取數據/移除數據的客戶端首先調用 getChildren() 函數,有數據則獲取(獲取數據后可以刪除也可以不刪),沒有則在隊列節點(queue node)上將 watch 設置為 true,等待觸發并處理最小序號的節點(即從序號最小的節點中取數據)。

應用場景

Zookeeper隊列不太適合要求高性能的場合,但可以在數據量不大的情況下考慮使用。比如已在項目中使用Zookeeper又需要小規模的隊列應用,這時可以使用Zookeeper實現的隊列;畢竟引進一個消息中間件會增加系統的復雜性和運維的壓力。

詳細代碼

ZookeeperClient工具類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {
 private static String connectionString = "localhost:2181";
 private static int sessionTimeout = 10000;
 public static ZooKeeper getInstance() throws IOException, InterruptedException {
 //--------------------------------------------------------------
 // 為避免連接還未完成就執行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
 // 這里等Zookeeper的連接完成才返回實例
 //--------------------------------------------------------------
 final CountDownLatch connectedSignal = new CountDownLatch(1);
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
   if (event.getState() == Event.KeeperState.SyncConnected) {
   connectedSignal.countDown();
   } else if (event.getState() == Event.KeeperState.Expired) {
   }
  }
  });
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
 return zk;
 }
 public static int getSessionTimeout() {
 return sessionTimeout;
 }
 public static void setSessionTimeout(int sessionTimeout) {
 ZooKeeperClient.sessionTimeout = sessionTimeout;
 }
}

ZooKeeperQueue

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
 * Created by Allen on 2016/12/22.
 */
public class ZooKeeperQueue {
 private ZooKeeper zk;
 private int sessionTimeout;
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
 private static String QUEUE_ROOT = "/QUEUE";
 private String queueName;
 private String queuePath;
 private Object mutex = new Object();
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
 this.queueName = queueName;
 this.queuePath = QUEUE_ROOT + "/" + queueName;
 this.zk = ZooKeeperClient.getInstance();
 this.sessionTimeout = zk.getSessionTimeout();
 //----------------------------------------------------
 // 確保隊列根目錄/QUEUE和當前隊列的目錄的存在
 //----------------------------------------------------
 ensureExists(QUEUE_ROOT);
 ensureExists(queuePath);
 }
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
 List<String> nodes = null;
 byte[] returnVal = null;
 Stat stat = null;
 do {
  synchronized (mutex) {
  nodes = zk.getChildren(queuePath, new ProduceWatcher());
  //----------------------------------------------------
  // 如果沒有消息節點,等待生產者的通知
  //----------------------------------------------------
  if (nodes == null || nodes.size() == 0) {
   mutex.wait();
  } else {
   SortedSet<String> sortedNode = new TreeSet<String>();
   for (String node : nodes) {
   sortedNode.add(queuePath + "/" + node);
   }
   //----------------------------------------------------
   // 消費隊列里序列號最小的消息
   //----------------------------------------------------
   String first = sortedNode.first();
   returnVal = zk.getData(first, false, stat);
   zk.delete(first, -1);
   System.out.print(Thread.currentThread().getName() + " ");
   System.out.print("consume a message from queue:" + first);
   System.out.println(", message data is: " + new String(returnVal,"UTF-8"));
   return returnVal;
  }
  }
 } while (true);
 }
 class ProduceWatcher implements Watcher {
 @Override
 public void process(WatchedEvent event) {
  //----------------------------------------------------
  // 生產一條消息成功后通知一個等待線程
  //----------------------------------------------------
  synchronized (mutex) {
  mutex.notify();
  }
 }
 }
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {
 //----------------------------------------------------
 // 確保當前隊列目錄存在
 // example: /QUEUE/queueName
 //----------------------------------------------------
 ensureExists(queuePath);
 String node = zk.create(queuePath + "/", data,
  ZooDefs.Ids.OPEN_ACL_UNSAFE,
  CreateMode.PERSISTENT_SEQUENTIAL);
 System.out.print(Thread.currentThread().getName() + " ");
 System.out.print("produce a message to queue:" + node);
 System.out.println(" , message data is: " + new String(data,"UTF-8"));
 }
 public void ensureExists(String path) {
 try {
  Stat stat = zk.exists(path, false);
  if (stat == null) {
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
 } catch (KeeperException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 }
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
 String queueName = "test";
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);
 for (int i = 0; i < 10; i++) {
  new Thread(new Runnable() {
  @Override
  public void run() {
   try {
   queue.consume();
   System.out.println("--------------------------------------------------------");
   System.out.println();
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }).start();
 }
 new Thread(new Runnable() {
  @Override
  public void run() {
  for (int i = 0; i < 10; i++) {
   try {
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
   queue.produce(("massive" + i).getBytes());
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }
 },"Produce-thread").start();
 }
}

測試

運行main方法,本機器的某次輸出結果

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9

總結

以上就是本文有關于隊列和基于ZooKeeper實現隊列源碼介紹的全部內容,希望對大家有所幫助。

感謝朋友們對本站的支持!

原文鏈接:https://www.2cto.com/kf/201612/582914.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 视频在线观看大片 | 乌克兰bbw | 校园全肉高h湿一女多男 | 黄网在线观看免费网站台湾swag | 波多野结衣中文字幕 | 欧美成黑人性猛交xxoo | 国产久热香蕉在线观看 | 国产rpg迷雾之风冷狐破解 | 免费看a片毛片 | 99在线在线视频免费视频观看 | 国产区1| 男人添女人 | 先锋影音 av | 黑人双渗透| 99久久精品99999久久 | 久久精品一区二区三区资源网 | 午夜伦理:伦理片 | 小鸟酱喷水| 奇米777狠狠 | 色哟哟国产成人精品 | 猛h辣h高h文湿校园1v1 | 桥本有菜ssni-677在线观看 | 白白国产永久免费视频 | 国产精品女同久久免费观看 | 欧美性bbbbbxxxxxxx | 秋霞黄色大片 | 欧美牛逼aa| 亚洲国产精品91 | 精品国产91久久久久久久 | 日本中文字幕一区二区三区不卡 | 亚洲 欧美 在线观看 | 日本在线精品视频 | 国产久视频 | 99re这里只有精品在线观看 | 7777奇米 | 美女翘臀内疯狂进出 | 边摸边吃奶边做爽gif动态图 | 亚洲系列第一页 | 美女脱得一二净无内裤全身的照片 | 亚洲欧美日韩另类在线一 | 国色天香社区在线 |