Redis 流教程

评论 0 浏览 0 2023-08-12

Redis 流的综合教程

如果您不熟悉流,请参阅 Redis 流。如需更全面的教程,请继续阅读。

介绍

Redis 5.0 引入了 Redis 流数据类型。流以日志数据结构为模型,但也实现了一些操作,以克服典型的纯附加日志的一些限制。这些操作包括在 O(1) 时间内进行随机访问,以及复杂的消耗策略(如消费者分组)。

流基础知识

流是一种仅附加的数据结构。基本的写入命令称为 XADD,将新条目附加到指定的流。

每个流条目由一个或多个字段值对组成,有点像一条记录或 Redis 哈希:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

上面对 XADD 命令的调用使用自动生成的条目 ID(该命令返回的 ID)将条目 sensor-id: 1234, temperature: 19.8 添加到键 mystream 处的流中,具体为 1518951480106-0。它的第一个参数是键名 mystream,第二个参数是标识流中每个条目的条目 ID。然而,在本例中,我们传递了*,因为我们希望服务器为我们生成一个新的ID。每个新的 ID 都会单调递增,因此更简单地说,与所有过去的条目相比,添加的每个新条目都将具有更高的 ID。由服务器自动生成 ID 几乎总是您想要的,并且显式指定 ID 的原因非常罕见。我们稍后会详细讨论这一点。每个 Stream 条目都有一个 ID 的事实是与日志文件的另一个相似之处,可以使用行号或文件内的字节偏移量来识别给定的条目。回到我们的 XADD 示例,在键名称和 ID 之后,下一个参数是组成流条目的字段值对。

只需使用 XLEN 命令即可获取 Stream 中的项目数:

> XLEN mystream
(integer) 1

条目 ID

XADD 命令返回的条目 ID,并明确标识给定流中的每个条目,由两部分组成:

<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是生成流ID的本地Redis节点中的本地时间,但是如果当前毫秒时间恰好小于前一个条目时间,则使用前一个条目时间,因此如果时钟向后跳单调递增的 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号是64位宽,因此实际上同一毫秒内可以生成的条目数量没有限制。

这种 ID 的格式一开始可能看起来很奇怪,温和的读者可能会想知道为什么时间是 ID 的一部分。原因是Redis流支持按ID进行范围查询。由于ID与条目生成的时间相关,因此基本上可以免费查询时间范围。我们很快就会在介绍 XRANGE 命令时看到这一点。

如果由于某种原因用户需要与时间无关但实际上与另一个外部系统 ID 相关联的增量 ID,如前所述,XADD 命令可以采用显式 ID 而不是触发自动生成的 * 通配符 ID,如以下示例所示:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

请注意,在这种情况下,最小 ID 为 0-1,并且该命令不会接受等于或小于前一个 ID 的 ID:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

如果您运行的是 Redis 7 或更高版本,您还可以提供仅包含毫秒部分的显式 ID。在这种情况下,ID 的序列部分将自动生成。为此,请使用以下语法:

> XADD somestream 0-* baz qux
0-3

从 Stream 获取数据

现在我们终于可以通过 XADD 在流中附加条目。然而,虽然将数据附加到流中是相当明显的,但查询流以提取数据的方式却不是那么明显。如果我们继续类比日志文件,一种明显的方法是模仿我们通常使用 Unix 命令 tail -f 所做的事情,也就是说,我们可以开始监听以获取附加到流中的新消息。需要注意的是,与 Redis 的阻塞式列表操作(在这种操作中,给定元素将到达单个客户端,而该客户端在像 BLPOP 这样的弹出式操作中处于阻塞状态)不同,对于流,我们希望多个使用者能够看到附加到流中的新消息(就像许多 tail -f 进程可以看到添加到日志中的内容一样)。使用传统术语,我们希望流能够向多个客户端发送消息

然而,这只是一种潜在的访问模式。我们还可以以完全不同的方式看待流:不是作为消息传递系统,而是作为时间序列存储。在这种情况下,也许附加新消息也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量检查所有历史记录。这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度来看一个流,我们可能希望以另一种方式访问​​该流,即作为一个消息流,可以将其划分给多个正在处理此类消息的消费者,以便消费者组只能看到到达单个流的消息的子集。通过这种方式,可以跨不同的消费者扩展消息处理,而无需单个消费者处理所有消息:每个消费者只会处理不同的消息。这基本上就是 Kafka (TM) 对消费者群体所做的事情。通过消费者组读取消息是另一种有趣的 Redis 流读取模式。

Redis Streams 通过不同的命令支持上述所有三种查询模式。接下来的部分将展示所有这些,从最简单和最直接使用的:范围查询开始。

按范围查询:XRANGE 和 XREVRANGE

要按范围查询流,我们只需要指定两个 ID:startend。返回的范围将包括以开始或结束作为 ID 的元素,因此该范围是包含在内的。两个特殊ID-+分别表示可能的最小和最大ID。

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

返回的每个条目都是一个包含两项的数组:ID 和字段值对列表。我们已经说过条目ID与时间有关,因为-字符左边的部分是创建流条目的本地节点的Unix时间(以毫秒为单位),在该条目被创建的那一刻(但请注意,流是使用完全指定的 XADD 命令进行复制的,因此副本将具有与主服务器相同的 ID)。这意味着我可以使用 XRANGE 查询时间范围。然而,为了做到这一点,我可能想省略 ID 的序列部分:如果省略,则在范围的开始部分将假定为 0,而在结束部分将假定为最大值序列号可用。这样,仅使用两毫秒的 Unix 时间进行查询,我们就能以包容的方式获得在该时间范围内生成的所有条目。例如,如果我想查询两毫秒的时间段,我可以使用:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

我在这个范围内只有一个条目,但是在实际数据集中,我可以查询小时范围,或者在短短两毫秒内可能有很多项目,并且返回的结果可能会很大。因此,XRANGE 支持末尾可选的 COUNT 选项。通过指定计数,我可以获取前 N 项。如果我想要更多,我可以获取返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在下面的示例中看看这一点。我们开始使用 XADD 添加 10 个项目(我不会显示这一点,我们假设流 mystream 填充了 10 个项目)。为了开始迭代,每个命令获取 2 个项目,我从整个范围开始,但计数为 2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

为了继续迭代接下来的两项,我必须选择最后返回的 ID,即 1519073279157-0 并向其添加前缀 (。生成的独占范围间隔(在本例中为 (1519073279157-0)现在可以用作下一个 XRANGE 调用的新 start 参数:

> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

等等。由于XRANGE的复杂度是O(log(N))来查找,然后O(M) > 返回 M 个元素,计数较小时,该命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此,XRANGE 也是事实上的流迭代器,并且不需要 XSCAN 命令。

命令 XREVRANGE 相当于 XRANGE 但以相反的顺序返回元素,因此 XREVRANGE 的实际用途是检查 Stream 中的最后一项是什么:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

请注意,XREVRANGE 命令以相反的顺序采用startstop 参数。

使用 XREAD 监听新项目

当我们不想按流中的某个范围访问项目时,通常我们想要的是订阅到达流的新项目。这一概念可能与 Redis 的 Pub/Sub 或 Redis 的阻塞列表有关,前者让你订阅一个通道,后者让你等待一个键来获取新的元素:

  1. 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都会传递给正在等待给定流中数据的每个消费者。这种行为与阻塞列表不同,阻塞列表中每个消费者都会获得不同的元素。然而,发送给多个消费者的能力类似于 Pub/Sub。
  2. 在 Pub/Sub 中,消息是即发即忘,并且永远不会被存储,而在使用阻塞列表时,当客户端收到消息时,它会弹出(实际上从列表中删除),流以根本不同的方式工作。所有消息都无限期地追加到流中(除非用户明确要求删除条目):不同的消费者将通过记住最后收到的消息的 ID 从其角度知道什么是新消息。
  3. 流消费者组提供了 Pub/Sub 或阻塞列表无法实现的控制级别,同一流具有不同的组、已处理项目的显式确认、检查待处理项目的能力、未处理消息的声明以及每个项目的一致历史记录可见性单个客户端,只能看到其私有的过去消息历史记录。

提供侦听到达流的新消息的功能的命令称为 XREAD。它比 XRANGE 稍微复杂一些,因此我们将开始显示简单的表单,稍后将提供整个命令布局。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

上面是XREAD的非阻塞形式。请注意,COUNT选项不是强制性的,事实上该命令唯一的强制性选项是STREAMS选项,它指定了一个键列表以及调用消费者已看到的每个流的相应最大 ID,因此该命令将仅向客户端提供 ID 大于我们指定的消息。

在上面的命令中,我们写了STREAMS mystream 0,因此我们希望mystream流中的所有消息的ID大于0-0。正如您在上面的示例中看到的,该命令返回键名称,因为实际上可以使用多个键调用此命令以同时从不同的流中读取。例如,我可以写:STREAMS mystream otherstream 0 0。请注意,在 STREAMS 选项之后,我们需要提供键名称,然后提供 ID。因此,STREAMS 选项必须始终是最后一个选项。 任何其他选项都必须位于STREAMS 选项之前。

除了 XREAD 可以同时访问多个流,并且我们能够指定我们拥有的最后一个 ID 来获取更新的消息之外,在这个简单的形式中,该命令没有做任何事情,所以与XRANGE 不同。然而,有趣的是,通过指定 BLOCK 参数,我们可以轻松地将 XREAD 转换为阻塞命令

> XREAD BLOCK 0 STREAMS mystream $

请注意,在上面的示例中,除了删除 COUNT 之外,我还指定了新的 BLOCK 选项,超时时间为 0 毫秒(这意味着永远不会超时)。此外,我没有传递流 mystream 的普通 ID,而是传递了特殊 ID $。这个特殊的 ID 意味着 XREAD 应该使用流 mystream 中已经存储的最大 ID 作为最后一个 ID,这样我们就只会收到新的 消息,从我们开始收听的时间开始。这在某种程度上类似于tail -f Unix 命令。

请注意,当使用BLOCK选项时,我们不必使用特殊ID $。我们可以使用任何有效的 ID。如果该命令能够立即满足我们的请求而不会阻塞,它将这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们会从 ID $ 开始,然后继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。

XREAD 的阻塞形式也可以监听多个 Stream,只需指定多个键名即可。如果由于至少有一个流的元素大于我们指定的相应 ID,因此可以同步处理请求,则它会返回结果。否则,该命令将阻塞并返回获取新数据的第一个流的项目(根据指定的 ID)。

与阻塞列表操作类似,从等待数据的客户端的角度来看,阻塞流读取是公平的,因为语义是 FIFO 风格。当新项目可用时,第一个为给定流阻塞的客户端将第一个被解除阻塞。

XREAD 除了 COUNTBLOCK 之外没有其他选项,因此这是一个非常基本的命令,具有特定的功能目的是将消费者附加到一个或多个流。使用消费者组 API 可以使用更强大的消费流功能,但是通过消费者组进行读取是通过名为 XREADGROUP 的不同命令来实现的,下一节将对此进行介绍本指南的。

消费者组

当手头的任务是使用来自不同客户端的相同流时,XREAD 已经提供了一种向N 个客户端扇出信息流的方法,还可能使用副本来提供更高的读取可扩展性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一流的不同的消息子集。一个明显有用的例子是处理速度慢的消息:拥有 N 个不同的工作线程来接收流的不同部分的能力使我们能够扩展消息处理,通过将不同的消息路由到准备好处理的不同工作线程。做更多的工作。

实际上,如果我们想象有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们想要的是根据下图提供消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一目标,Redis 使用了一个名为“消费者组”的概念。从实现的角度来看,Redis 消费者组与 Kafka (TM) 消费者组没有任何关系,了解这一点非常重要。然而它们在功能上是相似的,所以我决定保留 Kafka 的 (TM) 术语,因为它最初普及了这个想法。

消费者组就像一个从流中获取数据的伪消费者,实际上为多个消费者提供服务,并提供一定的保证:

  1. 每条消息都提供给不同的消费者,因此同一条消息不可能传递给多个消费者。
  2. 消费者在消费者组中通过名称来标识,该名称是实现消费者的客户端必须选择的区分大小写的字符串。这意味着即使在断开连接后,流消费者组仍保留所有状态,因为客户端将再次声明自己是同一个消费者。然而,这也意味着客户端需要提供唯一的标识符。
  3. 每个消费者组都有“第一个 ID 从未被消费过”的概念,这样,当消费者请求新消息时,它可以只提供之前未传递的消息。
  4. 然而,使用消息需要使用特定命令进行显式确认。 Redis 将确认解释为:此消息已正确处理,因此可以将其从消费者组中逐出。
  5. 消费者组跟踪当前待处理的所有消息,即已传递给消费者组的某个消费者但尚未确认已处理的消息。由于此功能,当访问流的消息历史记录时,每个使用者只会看到传递给它的消息

在某种程度上,消费者组可以被想象成关于流的一些状态

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果您从这个角度来看这一点,就很容易理解消费者组可以做什么,它如何能够向消费者提供其待处理消息的历史记录,以及如何为请求新消息的消费者提供服务消息 ID 大于last_delivered_id。同时,如果将消费者组视为 Redis 流的辅助数据结构,很明显,单个流可以有多个消费者组,这些消费者组具有不同的消费者组。实际上,同一个流甚至可以让客户端在没有消费者组的情况下通过 XREAD 进行阅读,并让客户端通过 XREADGROUP在不同的消费群体中。

现在是时候放大看看基本的消费者组命令了。它们是:

  • XGROUP 用于创建、销毁和管理消费者组。
  • XREADGROUP 用于通过消费者组从流中读取。
  • XACK 是允许使用者将待处理消息标记为已正确处理的命令。

创建消费者组

假设我已经存在类型流的密钥mystream,为了创建一个消费者组,我只需要执行以下操作:

> XGROUP CREATE mystream mygroup $
OK

正如您在上面的命令中看到的,在创建消费者组时,我们必须指定一个 ID,在示例中为 $。这是必需的,因为消费者组以及其他状态必须知道在第一个消费者连接时接下来要提供什么消息,也就是说,当该组刚刚连接时,最后一条消息 ID 是什么?创建的。如果我们像以前那样提供$,那么从现在开始只有到达流中的新消息才会提供给组中的消费者。如果我们指定0,消费者组将首先消费流历史记录中的所有消息。当然,您可以指定任何其他有效 ID。您所知道的是,消费者组将开始传递大于您指定的 ID 的消息。因为$表示流中当前最大的ID,所以指定$将具有仅消费新消息的效果。

XGROUP CREATE 还支持自动创建流(如果不存在),使用可选的 MKSTREAM 子命令作为最后一个参数:

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

现在消费者组已创建,我们可以立即尝试使用 XREADGROUP 命令通过消费者组读取消息。我们将从消费者那里读取信息,我们将其称为 Alice 和 Bob,以了解系统如何向 Alice 或 Bob 返回不同的消息。

XREADGROUPXREAD 非常相似,并提供相同的BLOCK 选项,否则为同步命令。然而,有一个必须始终指定的强制选项,即GROUP并且有两个参数:消费者组的名称和正在尝试的消费者的名称读书。还支持选项 COUNT,该选项与 XREAD 中的选项相同。

在从流中读取之前,让我们在其中放入一些消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

注意:这里的 message 是字段名称,fruit 是关联值,记住流项是小字典。

是时候尝试使用消费者组来阅读一些内容了:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUP 回复与 XREAD 回复类似。但请注意上面提供的GROUP <group-name> <consumer-name>。它指出我想使用消费者组mygroup从流中读取数据,而我是消费者Alice。每次消费者对消费者组执行操作时,它都必须指定其名称,以在组内唯一标识该消费者。

上面的命令行中还有一个非常重要的细节,在强制的STREAMS选项之后,为密钥mystream请求的ID是特殊ID>。这个特殊的ID仅在消费者组的上下文中有效,它意味着:消息到目前为止从未传递给其他消费者

这几乎总是您想要的,但是也可以指定一个真实的 ID,例如 0 或任何其他有效的 ID,但是在这种情况下,会发生我们从 XREADGROUP 仅向我们提供待处理消息的历史记录,在这种情况下,永远不会在组中看到新消息。所以基本上 XREADGROUP 根据我们指定的 ID 具有以下行为:

  • 如果 ID 是特殊 ID >,则该命令将仅返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的最后一个 ID
  • 如果 ID 是任何其他有效的数字 ID,则该命令将允许我们访问待处理消息的历史记录。也就是说,传递给此指定使用者(由提供的名称标识)且迄今为止从未使用 XACK 进行确认的消息集。

我们可以立即测试此行为,指定 ID 0,而不使用任何 COUNT 选项:我们只会看到唯一的待处理消息,即有关苹果的消息:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

但是,如果我们确认消息已处理,它将不再是待处理消息历史记录的一部分,因此系统将不再报告任何内容:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

如果您还不知道 XACK 的工作原理,请不要担心,这个想法只是处理后的消息不再是我们可以访问的历史记录的一部分。

现在轮到鲍勃读一些东西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob 请求最多两条消息,并且正在通过同一群组mygroup 阅读。因此,Redis 只报告消息。正如你所看到的,“苹果”消息没有被传递,因为它已经传递给了爱丽丝,所以鲍勃得到了橙子和草莓,等等。

这样,Alice、Bob 和组中的任何其他消费者都能够从同一流中读取不同的消息,读取其尚未处理消息的历史记录,或将消息标记为已处理。这允许创建不同的拓扑和语义来消费来自流的消息。

有几点需要记住:

  • 消费者在第一次被提及时会自动创建,无需显式创建。
  • 即使使用 XREADGROUP 您也可以同时读取多个键,但是要使其工作,您需要在每个键中创建一个具有相同名称的消费者组溪流。这不是常见需求,但值得一提的是,该功能在技术上是可用的。
  • XREADGROUP 是一个写入命令,因为即使它从流中读取,消费者组也会被修改作为读取的副作用,所以它只能在主实例上调用。

下面是使用 Ruby 语言编写的使用消费者组的消费者实现示例。 Ruby 代码的目标是让几乎所有有经验的程序员都可以阅读,即使他们不了解 Ruby:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

正如您所看到的,这里的想法是从消耗历史记录开始,即我们的待处理消息列表。这很有用,因为消费者之前可能已经崩溃了,所以在重新启动的情况下,我们希望重新读取在没有得到确认的情况下传递给我们的消息。请注意,我们可能会多次或一次处理一条消息(至少在消费者失败的情况下,但也涉及 Redis 持久性和复制的限制,请参阅有关本主题的具体部分)。

一旦历史记录被消耗,我们得到一个空的消息列表,我们可以切换到使用>特殊ID来消耗新消息。

从永久性故障中恢复

上面的示例允许我们编写参与同一消费者组的消费者,每个消费者都会处理一部分消息,并在从故障中恢复时重新读取仅传递给它们的待处理消息。然而在现实世界中,消费者可能会永久失败并且永远无法恢复。由于任何原因停止后永远不会恢复的消费者的待处理消息会发生什么?

Redis 消费者组提供了在这些情况下使用的功能,以便声明给定消费者的待处理消息,以便这些消息将更改所有权并重新分配给不同的消费者。特点非常明确。消费者必须检查待处理消息列表,并且必须使用特殊命令来声明特定消息,否则服务器将永远保留待处理消息并分配给旧消费者。这样,不同的应用程序可以选择是否使用该功能,以及如何使用它。

此过程的第一步只是一个命令,它提供消费者组中待处理条目的可观察性,称为 XPENDING。 这是一个只读命令,调用始终安全,并且不会更改任何消息的所有权。 最简单的形式是使用两个参数调用该命令,即流的名称和消费者组的名称。

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

当以这种方式调用时,该命令会输出消费者组中待处理消息的总数(本例中为两条)、待处理消息中较低和较高的消息 ID,最后输出消费者列表以及他们的待处理消息的数量。有。 我们只有 Bob 有两条待处理消息,因为 Alice 请求的单条消息是使用 XACK 确认的。

我们可以通过向 XPENDING 提供更多参数来请求更多信息,因为完整的命令签名如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供开始和结束 ID(可以是 -+,如 XRANGE 中)和计数来控制返回的信息量通过该命令,我们能够了解更多有关待处理消息的信息。如果我们想将输出限制为仅针对给定消费者的待处理消息,则使用可选的最后一个参数(消费者名称),但在以下示例中不会使用此功能。

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

现在我们有了每条消息的详细信息:ID、消费者名称、以毫秒为单位的空闲时间,即自上次将消息传递给某个消费者以来已经过去了多少毫秒,最后给定消息的传递次数。 我们有两条来自 Bob 的消息,它们空闲了 74170458 毫秒,大约 20 小时。

请注意,没有人阻止我们仅使用 XRANGE 检查第一条消息内容是什么。

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我们只需在参数中重复相同的 ID 两次即可。现在我们有了一些想法,Alice 可能会认为,在 20 小时没有处理消息后,Bob 可能无法及时恢复,是时候认领此类消息并代替 Bob 恢复处理了。为此,我们使用 XCLAIM 命令。

该命令非常复杂,并且在完整形式中充满了选项,因为它用于复制消费者组更改,但我们将仅使用通常需要的参数。在这种情况下,它很简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上我们说,对于这个特定的键和组,我希望指定的消息 ID 将更改所有权,并将分配给指定的使用者名称 <consumer>。但是,我们还提供了最小空闲时间,因此只有当提到的消息的空闲时间大于指定的空闲时间时,操作才会起作用。这很有用,因为可能有两个客户端同时重试声明消息:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

然而,作为一个副作用,声明消息将重置其空闲时间并增加其传递计数器的数量,因此第二个客户端将无法声明它。通过这种方式,我们可以避免对消息进行琐碎的重新处理(即使在一般情况下您无法获得一次处理)。

这是命令执行的结果:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

该消息已被 Alice 成功认领,她现在可以处理该消息并确认该消息,并且即使原始消费者没有恢复,也可以继续前进。

从上面的示例可以清楚地看出,作为成功声明给定消息的副作用,XCLAIM 命令也会返回该消息。但这不是强制性的。可以使用 JUSTID 选项来仅返回成功声明的消息的 ID。如果您想减少客户端和服务器之间使用的带宽(以及命令的性能)并且您对消息不感兴趣,那么这很有用,因为您的消费者的实现方式是重新扫描挂起的历史记录不时发消息。

声明也可以通过一个单独的过程来实现:仅检查待处理消息列表,并将空闲消息分配给看似活跃的消费者。可以使用 Redis 流的可观察性功能之一来获取活跃消费者。这是下一节的主题。

自动认领

Redis 6.2 中添加的 XAUTOCLAIM 命令实现了我们上面描述的声明过程。 XPENDINGXCLAIM 为不同类型的恢复机制提供基本构建块。 此命令通过让 Redis 管理通用流程来优化通用流程,并为大多数恢复需求提供简单的解决方案。

XAUTOCLAIM 标识空闲的待处理消息并将其所有权转移给使用者。 该命令的签名如下所示:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

因此,在上面的示例中,我可以使用自动声明来声明一条消息,如下所示:

> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

XCLAIM 类似,该命令会回复一组已声明的消息,但它还会返回一个允许迭代待处理条目的流 ID。 流 ID 是一个游标,我可以在下一次调用中使用它来继续声明空闲待处理消息:

> XAUTOCLAIM mystream mygroup Lora 3600000 1526569498055-0 COUNT 1
1) 0-0
2) 1) 1526569506935-0
   2) 1) "message"
      2) "strawberry"

XAUTOCLAIM返回“0-0”流ID作为光标时,这意味着它到达了消费者组待处理条目列表的末尾。 这并不意味着没有新的空闲待处理消息,因此该过程通​​过从流的开头调用 XAUTOCLAIM 继续。

领取及送货柜台

您在 XPENDING 输出中观察到的计数器是每条消息的传送次数。计数器以两种方式递增:当通过 XCLAIM 成功声明消息时或当 XREADGROUP 调用用于访问待处理消息的历史记录。

当出现故障时,消息会被多次传递是正常的,但最终它们通常会得到处理和确认。但是,处理某些特定消息可能会出现问题,因为它已损坏或以触发处理代码中的错误的方式制作。在这种情况下,消费者将持续无法处理该特定消息。因为我们有传递尝试的计数器,所以我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦递送计数器达到您选择的给定大数字,将此类消息放入另一个流中并向系统管理员发送通知可能是更明智的做法。这基本上就是 Redis Streams 实现死信概念的方式。

流可观测性

缺乏可观察性的消息系统很难使用。不知道谁在消费消息、哪些消息正在等待处理、给定流中活跃的消费者组集合,使得一切变得不透明。因此,Redis Streams 和消费者群体有不同的方式来观察正在发生的事情。我们已经介绍过 XPENDING,它允许我们检查给定时刻正在处理的消息列表,以及它们的空闲时间和传送数量。

然而,我们可能想做的不止这些,XINFO命令是一个可观察性接口,可以与子命令一起使用以获得有关流的信息或消费群体。

此命令使用子命令来显示有关流及其使用者组状态的不同信息。例如XINFO STREAM 报告有关流本身的信息。

> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entryid"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "groups"
14) (integer) 1
15) "first-entry"
16) 1) "1638125133432-0"
    2) 1) "message"
       2) "apple"
17) "last-entry"
18) 1) "1638125141232-0"
    2) 1) "message"
       2) "banana"

输出显示有关流内部编码方式的信息,还显示流中的第一条和最后一条消息。另一条可用信息是与该流关联的消费者组的数量。我们可以进一步挖掘,询问有关消费者群体的更多信息。

> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1638126030001-0"
    9) "entries-read"
   10) (integer) 2
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "some-other-group"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1638126028070-0"
    9) "entries-read"
   10) (integer) 1
   11) "lag"
   12) (integer) 1

正如您在此和之前的输出中所看到的,XINFO 命令输出一系列字段值项。因为它是一个可观察性命令,所以允许人类用户立即了解报告的信息,并允许该命令在将来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须提高带宽效率的命令,例如 XPENDING,仅报告不带字段名称的信息。

上面示例的输出(其中使用了 GROUPS 子命令)应该可以清楚地观察到字段名称。我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

如果您不记得命令的语法,只需向命令本身寻求帮助:

> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

与 Kafka(TM) 分区的差异

Redis 流中的消费者组可能在某种程度上类似于基于 Kafka (TM) 分区的消费者组,但请注意,Redis 流实际上非常不同。分区只是逻辑分区,消息只是放入单个 Redis 键中,因此为不同客户端提供服务的方式取决于谁准备好处理新消息,而不是来自哪个分区客户端阅读。例如,如果消费者 C3 在某个时刻永久失败,Redis 将继续为 C1 和 C2 提供所有新到达的消息,就好像现在只有两个逻辑分区一样。

类似地,如果给定的消费者处理消息的速度比其他消费者快得多,则该消费者将在相同的时间单位内按比例收到更多的消息。这是可能的,因为 Redis 显式跟踪所有未确认的消息,并记住谁收到了哪条消息以及从未传递给任何消费者的第一条消息的 ID。

然而,这也意味着在 Redis 中,如果您确实想将同一流中的消息分区到多个 Redis 实例中,则必须使用多个键和一些分片系统(例如 Redis Cluster 或其他特定于应用程序的分片系统)。单个 Redis 流不会自动分区到多个实例。

我们可以概括地说以下内容是正确的:

  • 如果您使用 1 个流 -> 1 个消费者,您将按顺序处理消息。
  • 如果您将 N 个流与 N 个消费者一起使用,以便只有给定的消费者访问 N 个流的子集,则可以扩展上述 1 个流 -> 1 个消费者的模型。
  • 如果您使用 1 个流 -> N 个消费者,则负载平衡到 N 个消费者,但是在这种情况下,有关同一逻辑项的消息可能会无序消费,因为给定消费者处理消息 3 的速度可能比另一个消费者处理的速度快消息 4.

所以基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费者组是一个服务器端负载平衡系统,将消息从给定的流发送到 N 个不同的消费者。

限制流

许多应用程序不希望永远将数据收集到流中。有时,流中最多包含给定数量的项目是有用的,其他时候,一旦达到给定的大小,将数据从 Redis 移动到不在内存中且速度不那么快但适合存储的存储中是有用的可能是未来几十年的历史。 Redis 流对此有一些支持。其中之一是 XADD 命令的 MAXLEN 选项。这个选项的使用非常简单:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用MAXLEN,当达到指定长度时,旧条目将被自动逐出,以便流保持恒定大小。目前没有选项可以告诉流只保留不早于给定时间段的项目,因为这样的命令为了一致地运行,可能会阻塞很长时间以逐出项目。想象一下,如果有一个插入尖峰,然后是一个长时间的暂停,然后是另一个插入,所有这些都具有相同的最大时间,会发生什么。流将阻塞以驱逐在暂停期间变得太旧的数据。因此,用户需要做一些规划并了解所需的最大流长度是多少。此外,虽然流的长度与使用的内存成正比,但按时间修剪不太容易控制和预测:它取决于插入速率,插入速率经常随时间变化(当它不改变时,则只需按时间修剪)大小是微不足道的)。

然而,使用MAXLEN进行修剪可能会很昂贵:流由宏节点表示为基数树,以便非常内存高效。改变由几十个元素组成的单个宏节点并不是最佳选择。因此可以通过以下特殊形式使用该命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

MAXLEN 选项和实际计数之间的 ~ 参数意味着,我实际上并不需要恰好是 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保保存至少 1000 个项目即可。通过这个参数,只有当我们可以删除整个节点时才会执行修剪。这使得它更加高效,而这通常正是您想要的。

还有 XTRIM 命令,它执行的操作与上面的 MAXLEN 选项非常相似,只是它可以运行通过它自己:

> XTRIM mystream MAXLEN 10

或者,对于 XADD 选项:

> XTRIM mystream MAXLEN ~ 10

但是,XTRIM 旨在接受不同的修剪策略。另一种修剪策略是 MINID,它会逐出 ID 低于指定值的条目。

由于 XTRIM 是一个显式命令,因此用户应该了解不同修剪策略可能存在的缺点。

将来可能会添加到 XTRIM 中的另一个有用的驱逐策略是通过一系列 ID 进行删除,以方便使用 XRANGEXTRIM 用于将数据从 Redis 移动到其他存储系统(如果需要)。

流 API 中的特殊 ID

您可能已经注意到,有几个特殊的 ID 可以在 Redis API 中使用。这是一个简短的回顾,以便将来能够更有意义。

前两个特殊 ID 是 -+,用于通过 XRANGE 命令进行范围查询。这两个ID分别表示可能的最小ID(基本上是0-1)和可能的最大ID(即18446744073709551615-18446744073709551615)。正如您所看到的,写-+而不是那些数字要干净得多。

然后还有我们想要说的 API,即流中 ID 最大的项目的 ID。这就是$的意思。因此,例如,如果我只想要带有 XREADGROUP 的新条目,我使用此 ID 来表示我已经拥有所有现有条目,但不是将要添加的新条目插入到未来。类似地,当我创建或设置消费者组的 ID 时,我可以将最后交付的项目设置为 $,以便仅向该组中的消费者交付新条目。

正如您所看到的,$并不意味着+,它们是两个不同的东西,因为+是每个可能的流中可能的最大ID,而$是包含给定条目的给定流中的最大ID。此外,API 通常只能理解 +$,但避免加载具有多种含义的给定符号很有用。

另一个特殊的ID是>,它是仅与消费者组相关且仅在使用XREADGROUP命令时才具有的特殊含义。这个特殊的 ID 意味着我们只需要迄今为止从未交付给其他消费者的条目。所以基本上 > ID 是消费者组的最后交付的 ID

最后,特殊ID*只能与XADD命令一起使用,意味着为我们自动为新条目选择一个ID。

所以我们有-+$>*,它们都有不同的含义,而且大多数时候,可以在不同的上下文中使用。

持久性、复制和消息安全

与任何其他 Redis 数据结构一样,Stream 会异步复制到副本并持久保存到 AOF 和 RDB 文件中。然而,可能不太明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本,因此如果消息在主服务器中待处理,副本也将具有相同的信息。同样,重启后,AOF 将恢复消费者组的状态。

但请注意,Redis 流和使用者组是使用 Redis 默认复制进行持久化和复制的,因此:

  • 如果消息持久性在您的应用程序中很重要,则 AOF 必须与强大的 fsync 策略一起使用。
  • 默认情况下,异步复制不会保证 XADD 命令或使用者组状态更改被复制:故障转移后,某些内容可能会丢失,具体取决于副本的能力接收来自主站的数据。
  • 可以使用WAIT命令来强制将更改传播到一组副本。但请注意,虽然这使得数据丢失的可能性很小,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程仅执行尽最大努力检查以故障转移到最新更新的副本,并且在某些特定的故障条件下,可能会提升缺少某些数据的副本。

因此,在使用 Redis 流和消费者组设计应用程序时,请确保了解应用程序在故障期间应具有的语义属性,并进行相应的配置,评估它对于您的用例是否足够安全。

从流中删除单个项目

流还有一个特殊的命令,用于仅通过 ID 从流中间删除项目。通常,对于仅附加数据结构,这可能看起来像是一个奇怪的功能,但它实际上对于涉及隐私法规等应用程序很有用。该命令称为 XDEL 并接收流的名称,后跟要删除的 ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

然而,在当前的实现中,直到宏节点完全为空时,内存才真正被回收,因此您不应该滥用此功能。

零长度流

流和其他 Redis 数据结构之间的区别在于,当其他数据结构不再具有任何元素时,作为调用删除元素的命令的副作用,键本身将被删除。例如,当调用 ZREM 将删除排序集中的最后一个元素时,排序集将被完全删除。另一方面,流允许保持零元素,这都是由于使用计数为零的 MAXLEN 选项 (XADDXTRIM 命令),或者因为 XDEL 被调用。

之所以存在这种不对称性,是因为流可能有关联的消费者组,我们不希望仅仅因为流中不再有任何项目而丢失消费者组定义的状态。目前,即使流没有关联的消费者组,也不会被删除。

消费一条消息的总延迟

非阻塞流命令,例如 XRANGEXREADXREADGROUP 不带 BLOCK 选项与任何其他 Redis 命令一样同步服务,因此讨论此类命令的延迟是没有意义的:检查 Redis 文档中命令的时间复杂度更有趣。应该足以说,在提取范围时,流命令至少与排序集命令一样快,并且 XADD 非常快并且可以轻松地从如果使用流水线,普通机器每秒处理 50 万到 100 万个项目。

然而,如果我们想了解处理消息的延迟,在消费者组中阻塞消费者的情况下,从通过 XADD 生成消息的那一刻到那一刻,延迟就成为一个有趣的参数。该消息由消费者获取,因为 XREADGROUP 随消息一起返回。

为被封锁的消费者提供服务如何运作

在提供执行测试的结果之前,了解 Redis 使用什么模型来路由流消息(以及通常如何管理任何等待数据的阻塞操作)是很有趣的。

  • 被阻止的客户端在哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到正在等待该键的消费者列表。这样,给定接收数据的密钥,我们就可以解析所有正在等待此类数据的客户端。
  • 当发生写入时,在本例中,当调用 XADD 命令时,它会调用 signalKeyAsReady() 函数。该函数会将密钥放入需要处理的密钥列表中,因为这样的密钥可能有被阻止的消费者的新数据。请注意,此类“就绪键”将在稍后处理,因此在同一事件循环周期的过程中,该键可能会接收其他写入。
  • 最后,在返回事件循环之前,就绪键最终被处理。对于每个密钥,都会扫描等待数据的客户端列表,如果适用,此类客户端将接收到达的新数据。在流的情况下,数据是消费者请求的适用范围内的消息。

正如您所看到的,基本上,在返回事件循环之前,调用 XADD 的客户端和被阻止使用消息的客户端都会在输出中得到回复缓冲区,因此 XADD 的调用者应该在消费者收到新消息的同时收到来自 Redis 的回复。

该模型是基于推送的,因为将数据添加到消费者缓冲区将直接通过调用 XADD 的操作来执行,因此延迟往往是可以预测的。

延迟测试结果

为了检查这些延迟特征,使用 Ruby 程序的多个实例来执行测试,该程序推送具有计算机毫秒时间作为附加字段的消息,并且 Ruby 程序从消费者组读取消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。

获得的结果:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

因此,99.9% 的请求的延迟时间 <= 2 毫秒,异常值仍然非常接近平均值。

向流中添加几百万条未确认的消息不会改变基准测试的要点,大多数查询仍然以非常短的延迟进行处理。

几点说明:

  • 这里我们每次迭代最多处理 10k 条消息,这意味着 XREADGROUPCOUNT 参数设置为 10000。这增加了很多延迟,但为了让缓慢的消费者能够跟上消息流,这是必需的。因此,您可以预期现实世界的延迟要小得多。
  • 与当今的标准相比,用于此基准测试的系统非常慢。
最后更新2023-10-04
0 个评论
标签