消息队列

1.消息队列的介绍

消息队列(Message Queue),字面意思是存放消息的队列,最简单的消息队列模型包括3个角色:

  • 消息队列: 存储和管理信息,也被称为消息代理(Message Broker)
  • 生产者: 发送消息到消息队列
  • 消费者: 从消息队列获取信息并处理消息

消息队列的主要作用就是 解耦、消峰

针对MQ的解耦功能,这里举一个生活的例子帮助理解:
假设我们在网上购买了商品,在开会的时候,快递小哥打电话告知物品已经送到,需要现在立刻去取,只有我们取了快递小哥才会结束他的工作,我们才能收到物品(这个流程才能结束)
这个时候,我们只能先和领导打个招呼请假,才能回去签收交接,快递小哥也只能站在原地等待我们回去,整个流程是很不灵活的、很死板,在一次交接完成之前,双方都不能灵活处理其他事情了
例子类似于我们在业务流程中基于http/rpc发起的一次同步请求,上游(快递小哥)在发出请求后(打电话),会阻塞等待下游(作为签收方的我们)给到反馈(完成签收的操作),否则整个流程会一致阻塞住。
然而在实际场景中,还有一个快递的驿站来帮我们作为中转,快递小哥可以先将我们的物品存放在驿站中,登记号接收方的个人信息后,给接收方发完通知短信后,快递小哥就可以先行撤离,忙别的事情,接下来驿站会作为接收方承担起托管物品的职责,我们只需要选择在合适的时间去取就可以了
在这个流程下就变得灵活很多,由于有驿站作为中转,使得我们和快递小哥之间的交互流程可以实现解耦,在这个流程中快递小哥就类似于生产者producer,我们作为接收方,类似于消费者consumer,而负责承上启下、托管快递的驿站则是类似于消息队列MQ

使用MQ的优点:

  1. 有了MQ后,producer不需要过分关心consumer的身份信息,只需要把消息按照指定的协议投递到对应的topic即可
  2. producer在处理请求时,只需要把消息投递到MQ即可认为流程处理结束,相比于同步请求下游,整个流程会更加轻松灵活,拥有更高的吞吐量
  3. 因为有MQ作为中间层,下游consumer可以设定好合适的消费限流参数,按照指定的速率进行消费,能够在很大程度上对consumer起到保护作用

针对MQ的消峰功能 再用同样的例子帮助理解:
假设现在正值双十一时期,我们剁手一通买买买,导致同时有大量的快递在同一个时段到达. 这时候,快递超市就为我们起到“削峰”的效果. 不论快递数量的多少,我们不用第一时间立刻进行响应处理,而是能够选择在合适的时间到达快递超市进行取件. 如果快递数量很大,我们一次拿不完的话,我们也可以量力而行,每次只收取一部分,分成多个批次处理.

这个流程就类似于 MQ 所带来的消息削峰的能力. 在实际的生产环境中,倘若上游请求量很大,而下游都需要第一时间进行同步响应的话,这对于下游系统可能产生很大的负荷. 此时如果能把同步流程转为异步,把消息放到MQ组件中进行一轮缓冲,让下游可以根据自身的处理能力,按照自己的节奏消化这部分积攒的流量,这对于下游系统来说能起到很好的保护作用.

2.作为MQ应该具备什么能力

2.1 消息不丢失

  • producer 将 msg 投递到 mq 时不出现丢失
  • msg 存放在 mq 时不出现丢失
  • consumer 从 mq 消费 msg 时不出现丢失

针对于上述第二点,各 mq 组件在实现上大抵上是基于数据落盘+数据备份的方式保证的,而针对于上述的一、三点,则是通过两个交互环节中的ack 机制保证的. 以 producer 投递 msg 到 mq 的环节为例,只要 mq 没有给到投递成功的 ack 反馈,那么 producer 就应该把本次投递流程视为失败,执行重新投递的操作. consumer 的消费流程同样如此.
因此,mq 交互流程主要通过 ack 机制保证消息投递以及消费环节做到 at least once(至少一次)的语义,然而无法保证消息不重复的问题. 因此,处于最下游的消费者 consumer 需要能够具备消息幂等去重的能力,避免流程被重复处理.

2.2 支持消息存储

以我们前面提到的取快递的例子来说,快递超市需要有一个实体店面,店面具有着一定的容量能够存放一定数量的快递. 这样当下游 consumer 没来得及第一时间消费消息时,消息能缓存在 mq 组件中一段时间,让消费方自由选择合适的时间过来进行消费.

消息流程类型

  • push 型:
    push 型指的是当 producer 将消息投递到 mq 时,由 mq 负责将消息以推送的形式主动发送给各个建立了订阅关系的消费方
  • pull 型:
    pull 型指的是当 mq 中存在消息时,由 consumer 主动执行拉取消息的操作

关于以上两种 mq 类型的认知:

  1. 对于 push 型
  • 优势

    • 流程实时性比较强,消息来了就执行推送

    • 比较契合发布/订阅的模型

  • 劣势

    • 对下游 consumer 的保护力度不够. mq 的核心功能是解耦、削峰,本质上是提供了一个缓冲的空间,让 consumer 能根据自己的消费能力在合适的时机进行消息处理. 所以 push 型在这方面体现的优势不够明显,消息到达后就需要向各个 consumer 发起推送. 不过这个问题可以在一定程度上通过消费限流的方式加以弥补.
  1. 对于 pull 型则刚好相反:
  • 优势

    • 下游握有消费操作的主动权,能选择在合适的时机执行消费操作
  • 劣势

    • 实时性会弱一些,和主动 pull 的轮询机制有关

Redis消息队列

基于 redis 实现 mq 存在的一类通用问题:

  1. 存储昂贵

    redis 本身是基于内存实现的缓存组件,因此在存储消息时总容量相对有限.

  2. 数据丢失

    此外,redis 存储消息时会不可避免地存在数据丢失的风险,可以从两个方面出发考虑:

    • 内存是易失性存储. 即便 redis 中有 rdb/aof 之类的持久化机制加以弥补,但这个持久化流程是异步执行的,无法提供百分百的保证力度

    • redis 走的是 ap 高可用流派,数据的主从复制流程是异步执行的,主从切换时数据存在弱一致问题

以上问题,不论是在 redis 缓存数据还是实现 mq 的流程中都是存在的

Redis提供了三种不同的方式来实现消息队列:

  • list结构: 基于List结构模拟消息队列
  • PubSub: 基本的点对点消息模型
  • Stream: 比较完善的消息队列模型

1.基于List实现消息队列

Redis的List数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用: LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

基于List的消息队列有哪些缺点?

  • 优点:

    1. 利用Redis存储,不受限于JVM内存上限
    2. 基于Redis的持久化机制,数据安全性有保证
    3. 可以满足消息有序性
  • 缺点:

    1. 无法避免消息丢失
    2. 只支持单消费者

2.基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

  • SUBSCRIBE channel:订阅一个或多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern:订阅与pattern格式匹配的所有频道

优缺点:
优点:

  • 采用发布订阅模型,支持多生产、多消费
    缺点:
  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

3.基于Stream的消息队列

Stream是Redis 5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:

例如:

3.1 XREAD

读取消息的方式之一:XREAD

XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

3.2 消费者组

**消费者组(Consumer Group):**将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 创建消费者组:
    XGROUP CREATE key groupName ID [MKSTREAM]
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

其他常见命令:

2. 从消费者组读取消息:

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLock milliseconds:当没有消息时的最长等待时间
  • NOACK:无需手动ACK,获取信息后自动确认
  • STRAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从下一个未消费的消息开始
    • 其他:根据指定id从pending-list中获取已消费但未确定的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路:

XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争夺消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

整体对比分析

mq 实现方案 发布/订阅能力 消费端ACK机制 消息缓存能力 数据丢失风险
list 不支持 不支持 支持
pub/sub 支持 不支持 不支持
streams 支持 支持 支持