|
|
51CTO旗下网站
|
|
移步端
  • Redis5新特点Streams房消息队列

    本文所采取 Redis 本子为 5.0.5 。如果采取更早的 5.x 本子,局部 API 采用效益,与资金文中描述略有不同。

    笔者:ytao-杨滔 来源:中华开源| 2020-01-14 15:08

    前言

    Redis 5 新特点中,Streams 数量结构的引入,可以说他是在此次迭代中最大特性。他使本次 5.x 本子迭代中,Redis 表现信息队列使用时,得到更全面,更有力的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的花费组模型概念和计划,使消费消息处理上更加便捷快速。本文就 Streams 数量结构中滥用 API 拓展分析。

    未雨绸缪

    本文所采取 Redis 本子为 5.0.5 。如果采取更早的 5.x 本子,局部 API 采用效益,与资金文中描述略有不同。

    补充消息

    Streams 补充数据使用 XADD 指令进行补充,信息中的数据以 K-V 键值对的样式展开操作。一枝信息可以存在多个键值对,补充命令格式:

          
    1. XADD key ID field string [field string ...] 

    其中 key 为 Streams 的称谓,ID 为消息的专门标志,不得重复,field string 就为键值对。下我们就添加以 person 为名称的流,拓展操作。

          
    1. XADD person * name ytao des https://ytao.top 

    地方添加案例中,ID 采用 * 号复制,此地代表着劳动端自动生成 Id,补充后归来数据 "1578238486193-0"

    此地自动生成的 Id 分立式为 <millisecondstime>-<sequencenumber> Id 是由两部分构成:

    1.  millisecondsTime 为眼前服务器时间毫秒时间戳。
    2.  sequenceNumber 眼前序列号,取值来源于当前零点内,浮动消息的程序,默认从 0 起来加 1 递增。

    比如:1578238486193-3 表示在 1578238486193 毫秒的年华戳时,补充的程序 4 条信息。

    除了服务端自动生成 Id 方式外,也支持指定 Id 的变通,但是指定 Id 有以下条件限制:

    1.  Id 中的前后部分必须为数字。
    2.  最小 Id 为 0-1,决不能为 0-0,但是 2-0,3-0 .... 是把允许的。
    3.  补充的信息,Id 的明天半部分不能比存在 Id 最大的值小,Id 此后半部分不能比存在未来半部分相同的最大后半部分小。

    否则,顶不满足上述条件时,补充后会抛出异常:

          
    1. (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

    实际上,顶添加一枝信息时,会进展两部操作。着重地,先判断如果不存在 Streams,则创造 Streams 的称谓,再补消息到 Streams 官方。即使添加消息时,出于 Id 独特,也得以在 Redis 官方生存以目前 Streams 的称谓。 Streams 官方 Id 也可作为指针使用,因为他是一番有序的标志。

    生产中,如果这样使用添加消息,会存在一番问题,那就是信息数量太大时,会使服务宕机。此地 Streams 的计划初期也有考虑到这个题目,那就是可以指定 Streams 的增量。如果容量操作这个设定的值,就会对调旧的信息。在添加消息时,安装 MAXLEN 数。

          
    1. XADD person MAXLEN 5 * name ytao des https://ytao.top 

    这样就指定该了 Streams 中的容量为 5 条信息。也可采取 XTRIM 截取消息,连年剔除多余的信息:

          
    1. XTRIM person MAXLEN 8 

    信息数量

    翻开消息数量使用 XLEN 指令进行操作。

          
    1. XLEN key 

    例:翻开 person 流中的消息数量:

          
    1. > XLEN person  
    2. (integer) 5 

    查询消息

    查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

    XRANGE

    查询数据时,可以按照指定 Id 规模开展查询,XRANGE 查询指令格式:

          
    1. XRANGE key start end [COUNT count] 

    数说明:

  •  key 为 Streams 的称谓
  •  start 为规模查询开始 Id,包含本 Id。
  •  start 为规模查询结束 Id,包含本 Id。
  •  Count 为查询返回最大的信息数量,非必填。
  • 此地 start 和 end 有-和+两个非指定值,她们分别表示无穷小和无穷大,故此当使用这个两个值时,会查询出全部之信息。

          
    1. > XRANGE person - +  
    2. 1) 1) "0-1"  
    3.    2) 1) "name"  
    4.       2) "ytao"  
    5.       3) "des"  
    6.       4) "https://ytao.top"  
    7. 2) 1) "0-2"  
    8.    2) 1) "name"  
    9.       2) "luffy"  
    10.       3) "des"  
    11.       4) "valiant!"  
    12. 3) 1) "2-0"  
    13.    2) 1) "name"  
    14.       2) "gaga"  
    15.       3) "des"  
    16.       4) "fishion!" 

    地方查询的信息数据,可以看出是按照先进先出的程序查询出来的。

    采用 COUNT 指定查询返回的多寡:

          
    1. # 查询所有的信息,并且返回一枝数据  
    2. > XRANGE person - + COUNT 1  
    3. 1) 1) "0-1"  
    4.    2) 1) "name"  
    5.       2) "ytao"  
    6.       3) "des"  
    7.       4) "https://ytao.top" 

    在规模查询中,Id 的下半部分可概括,此后半部分中的数据会全部询问到。

    XREVRANGE

    XREVRANGE 的询问和 XRANGE 指令中的使用类似,但查询的 start 和 end 数顺序进行了调换:

          
    1. XREVRANGE key end start [COUNT count] 

    采用案例:

          
    1. > XREVRANGE person +  -  
    2. 1) 1) "2-0"  
    3.    2) 1) "name"  
    4.       2) "gaga"  
    5.       3) "des"  
    6.       4) "fishion!"  
    7. 2) 1) "0-2"  
    8.    2) 1) "name"  
    9.       2) "luffy"  
    10.       3) "des"  
    11.       4) "valiant!"  
    12. 3) 1) "0-1"  
    13.    2) 1) "name"  
    14.       2) "ytao"  
    15.       3) "des"  
    16.       4) "https://ytao.top" 

    查询后的结果与 XRANGE 的结果顺序刚好相反,其它都一样,这两个指令可进行消息的升序和降序的返回。

    剔除消息

    剔除消息使用 XDEL 指令操作,只需指定将要删除的 Streams 名称和 Id 即可,支持一次删除多个消息 。

          
    1. XDEL key ID [ID ...] 

    剔除案例:

          
    1. # 查询所有消息  
    2. > XRANGE person - +  
    3. 1) 1) "0-1"  
    4.    2) 1) "name"  
    5.       2) "ytao"  
    6.       3) "des"  
    7.       4) "https://ytao.top"  
    8. 2) 1) "0-2"  
    9.    2) 1) "name"  
    10.       2) "luffy"  
    11.       3) "des"  
    12.       4) "valiant!"  
    13. 3) 1) "2-0"  
    14.    2) 1) "name"  
    15.       2) "gaga"  
    16.       3) "des"  
    17.       4) "fishion!"  
    18. # 剔除消息        
    19. > XDEL person 2-0  
    20. (integer) 1  
    21. # 再次查询删除后的一切消息  
    22. > XRANGE person - +  
    23. 1) 1) "0-1"  
    24.    2) 1) "name"  
    25.       2) "ytao"  
    26.       3) "des"  
    27.       4) "https://ytao.top"  
    28. 2) 1) "0-2"  
    29.    2) 1) "name"  
    30.       2) "luffy"  
    31.       3) "des"  
    32.       4) "valiant!"  
    33. # 查询删除后的长短        
    34. > XLEN person  
    35. (integer) 2        

    副上面可以看出,剔除消息后,长也会减少相应的多寡。

    消费消息

    在 Redis 的 PUB/SUB 官方,咱们是通过订阅来消费消息,在 Streams 数量结构中,同样也能实现同等功能,顶没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

    单独消费

    单独消费使用 XREAD 指令。可以看出,下命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的信息。顶 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

          
    1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

    地方的 COUNT 数用来指定读取的最大数量,与 XRANGE 的用法一样。

          
    1. > XREAD COUNT 1 STREAMS person 0  
    2. 1) 1) "person"  
    3.    2) 1) 1) "0-1"  
    4.          2) 1) "name"  
    5.             2) "ytao"  
    6.             3) "des"  
    7.             4) "https://ytao.top"  
    8. > XREAD COUNT 2 STREAMS person 0  
    9. 1) 1) "person"  
    10.    2) 1) 1) "0-1"  
    11.          2) 1) "name"  
    12.             2) "ytao"  
    13.             3) "des"  
    14.             4) "https://ytao.top"  
    15.       2) 1) "0-2"  
    16.          2) 1) "name"  
    17.             2) "luffy"  
    18.             3) "des"  
    19.             4) "valiant!" 

    在 XREAD 其中还有个 BLOCK 数,其一是用来阻塞订阅消息的,BLOCK 带走的底数为阻塞时间,单位为毫秒,如果在这个日子内没有新的消息消费,这就是说就会释放该阻塞。顶这里的年华指定为 0 时,会一直阻塞,直到有新的消息来消费到。

          
    1. # 入海口 1 起来阻塞,等待新信息的赶来  
    2. > XREAD BLOCK 0 STREAMS person $  
    3. # 另开一个连接窗口 2,补充一枝新的消息  
    4. > XADD person 2-2 name tao des coder  
    5. "2-2"  
    6. # 入海口 1,获取到有新的消息来消费,并且带有阻塞的年华  
    7. > XREAD BLOCK 0 STREAMS person $  
    8. 1) 1) "person"  
    9.    2) 1) 1) "2-2"  
    10.          2) 1) "name"  
    11.             2) "tao"  
    12.             3) "des"  
    13.             4) "coder"  
    14. (60.81s) 

    顶使用 XREAD 拓展顺序消费时,要求额外记录下读取到位置的 Id,富有下次持续消费。

    群组消费

    群组消费的首要目的也就是为了分流消息给不同之客户端处理,以更便捷的准确率处理消息。为达到这一肝功能需求,咱们需要做三件事:创造群组,群组读取信息,向劳动端确认消息以拍卖。

    群组操作

    借鉴群组使用 XGROUP 指令:

          
    1. XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] 

    地方命令中,包含操作有:

  •  CREATE 创造消费组。
  •  SETID 修改下一个处理消息的 Id。
  •  DESTROY 销毁消费组。
  •  DELCONSUMER 剔除消费组中指定的顾客。
  • 咱们目前需要采取的是创造消费组:

          
    1. # 以目前存在的最大 Id 表现消费起始   
    2. > XGROUP CREATE person group1 $  
    3. OK 

    群组读取信息

    群组读取使用 XREADGROUP 指令,COUNT和BLOCK的采取类似 XREAD 的借鉴,只是多了个群组和顾客之指定:

          
    1. XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

    出于群组消费和单独消费类似,此地只进行个闭塞分析,此地 Id 也有个突出值>,表示还未进行消费的信息:

          
    1. # 入海口 1,消费群组中,taotao 顾客建立阻塞监听  
    2. XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
    3. # 入海口 2,消费群组中,yangyang 顾客建立阻塞监听   
    4. XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
    5. # 入海口 3,补充消费消息  
    6. > XADD person 3-1 name tony des 666  
    7. "3-1"  
    8. # 入海口 1,读取到新信息,此刻 入海口 2 没有其他反应  
    9. > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
    10. 1) 1) "person"  
    11.    2) 1) 1) "3-1"  
    12.          2) 1) "name"  
    13.             2) "tony"  
    14.             3) "des"  
    15.             4) "666"  
    16. (77.54s)  
    17. # 入海口 3,再次添加消费消息  
    18. > XADD person 3-2 name james des abc!  
    19. "3-2"  
    20. # 入海口 2,读取到新信息,此刻 入海口 1 没有其他反应  
    21. > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
    22. 1) 1) "person"  
    23.    2) 1) 1) "3-2"  
    24.          2) 1) "name"  
    25.             2) "james"  
    26.             3) "des"  
    27.             4) "abc!"  
    28. (76.36s) 

    上述执行流程中,group1 群组中有两个消费者,顶添加两枝信息后,这两个消费者轮流消费。

    信息ACK

    信息消费后,为避免再次重申消费,这是要求向劳动端发送 ACK,确保消息被消费后的标志。 例如下列情形,咱们上面我们将最新两枝信息已开展了消费,但是当我们再次读取信息时,还是被读到:

          
    1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
    2. 1) 1) "person"  
    3.    2) 1) 1) "3-2"  
    4.          2) 1) "name"  
    5.             2) "james"  
    6.             3) "des"  
    7.             4) "abc!" 

    这会儿,咱们采用 XACK 指令告诉服务器,咱们已处理的信息:

          
    1. XACK key group ID [ID ...]0 

    让服务器标记 3-2 已处理:

          
    1. > XACK person group1 3-2  
    2. (integer) 1 

    再次获取群组读取信息:

          
    1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
    2. 1) 1) "person"  
    3.    2) (empty list or set) 

    列中没有了可读消息。 除了上面以教学到的 API 外,翻开消费群组信息可采取 XINFO 指令查看,本文不做分析。

    总结

    上面对 Streams 公用 API 拓展了分析,咱们可以感受到 Redis 在信息队列支持的征程上,也越来越强大。如果采取过他的 PUB/SUB 效益的话,就会感受到 5.x 迭代正是将你的组成部分痛点进行了多元化。

    【编纂推荐】

    1. 一文看懂金融级分布式必发娱乐登录架构设计
    2. 详解MySQL必发娱乐登录累积聚合原理与用法实例分析
    3. 必发娱乐登录主从复制,读写分离,国库分表,分区讲解
    4. 必发娱乐登录分库分表后,带来的这个题目,如何解决?
    5. 必发娱乐手机版全球最受欢迎必发娱乐登录新鲜出炉,你猜中了吗?
    【义务编辑: 庞桂玉 TEL:(010)68476606】

    点赞 0
  • Redis5  Streams  必发娱乐登录
  • 分享:
    大家都在看
    猜你喜欢
  • 订阅专栏+更多

    Python使用场景实战手册

    Python使用场景实战手册

    Python使用场景实战手册
    共3章 | KaliArch

    115人口订阅学习

    一步到位玩儿透Ansible

    一步到位玩儿透Ansible

    Ansible
    共17章 | 骏马金龙1

    182人口订阅学习

    云架构师修炼手册

    云架构师修炼手册

    云架构师之必不可少技能
    共3章 | Allen在路上

    131人口订阅学习

    读 书 +更多

    网管员必读―传感器与数据存储

    《网管员必读―传感器与数据存储》圆满、系统地介绍了在我党、尖端网络管理和网络工程实施中两个基本点方面的激流技术和运用:硬件服务器和数量...

    订阅51CTO邮刊

    点击这里查看样刊

    订阅51CTO邮刊

    51CTO劳务号

    51CTO官微


    
       
       
       
       
  • &lt;div id="854af14a"&gt;&lt;/div&gt;