一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數(shù)據(jù)庫技術(shù)|

服務(wù)器之家 - 數(shù)據(jù)庫 - Redis - 使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼

使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼

2019-11-28 15:03問點(diǎn)事 Redis

這篇文章主要介紹了使用 Redis 流實(shí)現(xiàn)消息隊(duì)列,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

在介紹了 Redis 流的基本功能之后, 現(xiàn)在是時(shí)候使用這些功能來構(gòu)建一些實(shí)際的應(yīng)用了。 消息隊(duì)列作為流的典型應(yīng)用之一, 具有非常好的示范性, 因此我們將使用 Redis 流的相關(guān)功能構(gòu)建一個(gè)消息隊(duì)列應(yīng)用, 這個(gè)消息隊(duì)列跟我們之前使用其他 Redis 數(shù)據(jù)結(jié)構(gòu)構(gòu)建的消息隊(duì)列具有相似的功能。

代碼清單 10-1 展示了一個(gè)具有基本功能的消息隊(duì)列實(shí)現(xiàn):

  • 代碼最開頭的是幾個(gè)轉(zhuǎn)換函數(shù), 它們負(fù)責(zé)對程序的相關(guān)輸入輸出進(jìn)行轉(zhuǎn)換和格式化;
  • MessageQueue 類用于實(shí)現(xiàn)消息隊(duì)列, 它的添加消息、移除消息以及返回消息數(shù)量三個(gè)方法分別使用了流的 XADD 命令、 XDEL 命令和 XLEN 命令;
  • 消息隊(duì)列的兩個(gè)獲取方法 get_message() 和 get_by_range() 分別以兩種形式調(diào)用了流的 XRANGE 命令;
  • 最后, 用于迭代消息的 iterate() 方法使用了 XREAD 命令對流進(jìn)行迭代。

代碼清單 10-1 使用 Redis 流實(shí)現(xiàn)的消息隊(duì)列: /stream/message_queue.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def reconstruct_message_list(message_list):
  """
  為了讓多條消息能夠以更結(jié)構(gòu)化的方式返回給調(diào)用者,
  將 Redis 返回的多條消息從原來的格式:
  [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]
  轉(zhuǎn)換成以下格式:
  [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]
  """  result = []
  for id, kvs in message_list:
    result.append({id: kvs})
  return result
def get_message_from_nested_list(lst):
  """
  從嵌套列表中取出消息本體。
  """
  return lst[0][1]
class MessageQueue:
  """
  使用 Redis 流實(shí)現(xiàn)的消息隊(duì)列。
  """
  def __init__(self, client, stream_key):
    self.client = client
    self.stream = stream_key
  def add_message(self, key_value_pairs):
    """
    將給定的鍵值對存入到消息里面,并返回相應(yīng)的消息 ID 。
    """
    return self.client.xadd(self.stream, key_value_pairs)
  def get_message(self, message_id):
    """
    根據(jù)給定的消息 ID 返回相應(yīng)的消息,如果消息不存在則返回 None 。
    """
    reply = self.client.xrange(self.stream, message_id, message_id)
    if len(reply) == 1:
      return get_message_from_nested_list(reply)
 
  def remove_message(self, message_id):
    """
    根據(jù)給定的消息 ID 刪除相應(yīng)的消息,如果消息不存在則忽略該動(dòng)作。
    """
    self.client.xdel(self.stream, message_id)
 
  def len(self):
    """
    返回消息隊(duì)列的長度。
    """
    return self.client.xlen(self.stream)
 
  def get_by_range(self, start_id, end_id, max_item=10):
    """
    根據(jù)給定的 ID 區(qū)間范圍返回隊(duì)列中的消息。
    """
    reply = self.client.xrange(self.stream, start_id, end_id, max_item)
    return reconstruct_message_list(reply)
 
  def iterate(self, start_id=0, max_item=10):
    """
    對消息隊(duì)列進(jìn)行迭代,返回最多 N 條大于給定 ID 的消息。
    """
    reply = self.client.xread({self.stream: start_id}, max_item)
    if len(reply) == 0:
      return list()
    else:
      messages = get_message_from_nested_list(reply)
      return reconstruct_message_list(messages)

對于這個(gè)消息隊(duì)列實(shí)現(xiàn), 我們可以通過執(zhí)行以下代碼, 創(chuàng)建出它的實(shí)例:

?
1
2
3
4
>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "mq")

然后通過執(zhí)行以下代碼, 向隊(duì)列里面添加十條消息:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> for i in range(10):
...  key = "key{0}".format(i)
...  value = "value{0}".format(i)
...  msg = {key:value}
...  mq.add_message(msg)
...
'1554113926280-0'
'1554113926280-1'
'1554113926281-0'
'1554113926281-1'
'1554113926281-2'
'1554113926281-3'
'1554113926281-4'
'1554113926281-5'
'1554113926281-6'
'1554113926282-0'

還可以根據(jù) ID 獲取指定的消息, 又或者使用 get_by_range() 方法同時(shí)獲取多條消息:

?
1
2
3
4
5
6
>>> mq.get_message('1554113926280-0')
{'key0': 'value0'}
>>> mq.get_message('1554113926280-1')
{'key1': 'value1'}
>>> mq.get_by_range("-", "+", 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]

又或者使用 iterate() 方法對消息隊(duì)列進(jìn)行迭代, 等等:

?
1
2
3
4
>>> mq.iterate(0, 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]
>>> mq.iterate('1554113926281-0', 3)
[{'1554113926281-1': {'key3': 'value3'}}, {'1554113926281-2': {'key4': 'value4'}}, {'1554113926281-3': {'key5': 'value5'}}]

總結(jié)

以上所述是小編給大家介紹的使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對服務(wù)器之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!

原文鏈接:http://blog.huangz.me/2019/redis-message-queue.html

延伸 · 閱讀

精彩推薦
  • Redisredis實(shí)現(xiàn)排行榜功能

    redis實(shí)現(xiàn)排行榜功能

    排行榜在很多地方都能使用到,redis的zset可以很方便地用來實(shí)現(xiàn)排行榜功能,本文就來簡單的介紹一下如何使用,具有一定的參考價(jià)值,感興趣的小伙伴們...

    乘月歸5022021-08-05
  • RedisRedis如何實(shí)現(xiàn)數(shù)據(jù)庫讀寫分離詳解

    Redis如何實(shí)現(xiàn)數(shù)據(jù)庫讀寫分離詳解

    Redis的主從架構(gòu),能幫助我們實(shí)現(xiàn)讀多,寫少的情況,下面這篇文章主要給大家介紹了關(guān)于Redis如何實(shí)現(xiàn)數(shù)據(jù)庫讀寫分離的相關(guān)資料,文中通過示例代碼介紹...

    羅兵漂流記6092019-11-11
  • Redis詳解Redis復(fù)制原理

    詳解Redis復(fù)制原理

    與大多數(shù)db一樣,Redis也提供了復(fù)制機(jī)制,以滿足故障恢復(fù)和負(fù)載均衡等需求。復(fù)制也是Redis高可用的基礎(chǔ),哨兵和集群都是建立在復(fù)制基礎(chǔ)上實(shí)現(xiàn)高可用的...

    李留廣10222021-08-09
  • RedisRedis全量復(fù)制與部分復(fù)制示例詳解

    Redis全量復(fù)制與部分復(fù)制示例詳解

    這篇文章主要給大家介紹了關(guān)于Redis全量復(fù)制與部分復(fù)制的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Redis爬蟲具有一定的參考學(xué)習(xí)...

    豆子先生5052019-11-27
  • Redisredis中如何使用lua腳本讓你的靈活性提高5個(gè)逼格詳解

    redis中如何使用lua腳本讓你的靈活性提高5個(gè)逼格詳解

    這篇文章主要給大家介紹了關(guān)于redis中如何使用lua腳本讓你的靈活性提高5個(gè)逼格的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具...

    一線碼農(nóng)5812019-11-18
  • Redisredis 交集、并集、差集的具體使用

    redis 交集、并集、差集的具體使用

    這篇文章主要介紹了redis 交集、并集、差集的具體使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友...

    xiaojin21cen10152021-07-27
  • RedisRedis 事務(wù)知識點(diǎn)相關(guān)總結(jié)

    Redis 事務(wù)知識點(diǎn)相關(guān)總結(jié)

    這篇文章主要介紹了Redis 事務(wù)相關(guān)總結(jié),幫助大家更好的理解和學(xué)習(xí)使用Redis,感興趣的朋友可以了解下...

    AsiaYe8232021-07-28
  • RedisRedis的配置、啟動(dòng)、操作和關(guān)閉方法

    Redis的配置、啟動(dòng)、操作和關(guān)閉方法

    今天小編就為大家分享一篇Redis的配置、啟動(dòng)、操作和關(guān)閉方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧 ...

    大道化簡5312019-11-14
主站蜘蛛池模板: 精品国产自在现线久久 | 女教师被学生糟蹋三天 | 大肥臀风间由美 中文字幕 大东北chinesexxxx露脸 | 国产在线xvideos | 青青青国产精品国产精品久久久久 | 视频一区二区三区在线观看 | 青草视频免费 | juliaann大战两个黑人 | 超级乱淫伦短篇小说做车 | 欧美精品亚洲精品日韩1818 | 我年轻漂亮的继坶2中字在线播放 | 欧美xxxxx九色视频免费观看 | 国产精品吹潮香蕉在线观看 | 国产精品久久久精品视频 | 色播影院性播影院私人影院 | 动漫美女人物被黄漫小说 | 女人又色又爽又黄 | hd在线观看免费高清视频 | 无罩看奶禁18 | 茄子视频懂你更多apl | 国产成人一区二区三区小说 | 极品虎白女在线观看一线天 | 厨房里摸着乳丰满在线观看 | 荡女人人爱 | 美女舒服好紧太爽了视频 | 国产成人无精品久久久久国语 | 九九精品视频一区二区三区 | xxxxxx日本处大片免费看 | 免费一级毛片完整版在线看 | 9久热久爱免费精品视频在线观看 | 国产极品久久 | a级毛片毛片免费观看永久 a级黄色片免费 | 国产欧美一区二区精品性色 | 国产精品久久久久久吹潮 | 欧美日韩亚洲综合久久久 | 色综合久久中文字幕综合网 | 亚洲午夜精品久久久久久人妖 | 成年女人毛片免费观看中文w | 亚洲夜色夜色综合网站 | 国内精品久久久久影院嫩草 | 天天操天天射天天爽 |