在RedisInsight中管理流和消费者组
了解如何在RedisInsight中管理数据流和消费者组
stream是一个只附加的日志文件。 当你向它添加数据时,你不能改变它。 这似乎是一个缺点;但是,流可以作为一个日志或单一的真相来源。 它也可以用作以不同速度工作的进程之间的缓冲区,并且不需要知道彼此的情况。 有关流的更多概念性信息,请参阅Redis 流。
在本专题中,你将学习如何在RedisInsight中添加和使用流以及消费者组。
这里有一个模拟温度和湿度传感器的流。与流交互的过程执行两个角色中的一个:consumer和producer。流的意义在于它不会结束,所以你不能捕获整个数据集并对其进行一些处理。
在这个流中,传感器被认为是生产者,它们广播数据。 一个消费者从流中读取并对其做一些工作。 例如,如果温度超过某个阈值,它就会放出消息,打开该单位的空调或通知维修人员。
有可能有多个消费者做不同的工作,一个测量湿度,另一个在一段时间内测量温度。 Redis在内存中存储了整个数据集的副本,这是一种有限的资源。 为了避免数据失控,当你向流中添加东西时,可以对其进行修剪。当用XADD
添加到一个流时,你可以选择性地指定流应该被修剪到一个特定的或近似的最新条目的数量,或者只包括ID高于指定ID的条目。 你也可以使用密钥过期来管理流数据所需的存储。一个ID可以是任何数字,但流中的每个新条目必须有一个ID,其值高于最后加入流中的ID。
添加新的条目
使用XADD
和*
作为ID,让Redis自动为你生成一个新的ID,由一个毫秒级精度的时间戳、一个破折号和一个序列号组成。例如:1656416957625-0
。然后提供要存储在新流条目中的字段名和值。
有几种检索的方法。你可以按时间范围检索条目,或者你可以要求获得自你指定的时间戳或ID以来发生的所有事情。使用一个命令,你可以要求从某一天的10:30到11:15的任何东西。
消费者组
一个更现实的用例是一个有许多温度传感器的系统,这些传感器的数据被Redis放在一个流中,记录它们到达的时间,并对它们进行排序。
在右边,我们有两个读取数据流的消费者。其中一个是在温度超过某个数字时发出警报,并给维护人员发短信说他们需要做一些事情,另一个是数据仓库,它正在获取数据并将其放入数据库。
它们彼此独立运行。 在右边,我们有另一种任务。 让我们假设警报和数据仓库真的很快。 你会收到温度是否大于特定值的消息,这可能需要一毫秒。 而警报可以跟上数据流。 你可以扩展消费者的一种方式是消费者组,它允许同一消费者的多个实例或同一代码作为一个团队来处理数据流。
在RedisInsight中管理数据流
你可以通过两种方式在RedisInsight中添加一个流:创建一个新的流或添加到一个现有的流中。
要创建一个流,首先选择密钥类型(流)。 你不能设置生存时间(TTL),因为它不能放在流中的消息上;它只能放在Redis密钥上。将流命名为mystream。 然后,将Entry ID设置为*
,默认为时间戳。 如果你有自己的ID生成策略,从你的序列输入下一个ID。记住,该ID必须高于流中任何其他条目的ID。
然后,用 "+"来输入字段和值,以添加多个字段和值(例如,姓名和位置)。 现在,你有一个出现在Streams视图中的流,你可以继续向它添加字段和值。
RedisInsight为你运行读取命令,所以你可以在Streams视图中看到流条目。 而Consumer Groups视图显示了给定消费者组中的每个消费者和Redis最后一次分配消息,它的ID是什么以及该过程发生了多少次,以及消费者是否你已经使用XACK
命令告诉Redis你已经完成该任务的处理。
在RedisInsight中监控来自传感器的温度和湿度
这个例子展示了如何将一个现有的流带入RedisInsight,并对其进行处理。
设置方法
- 安装RedisInsight。
- 下载并安装Node.js(LTS版本)。
- 安装Redis。在Docker中,检查Redis是否在本地的默认端口6379上运行(没有设置密码)。
- 克隆本例的代码库。请参阅README,以了解有关本例的更多信息和安装提示。
- 在你的命令行上,导航到包含代码库的文件夹,并安装Node.js包管理器(npm)。
npm install
运行生产者
要启动生产者,它将每隔几秒钟向流中添加一个新的条目,请输入:
npm run producer
> streams@1.0.0 producer
> node producer.js
Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...
生产者无限期地运行。 选择Ctrl+C
来停止它。 如果你想更快地将条目添加到流中,你可以启动生产者的多个实例。
运行消费者
要启动消费者,即每隔几秒钟从流中读取数据,请输入:
npm run consumer
> streams@1.0.0 consumer
> node consumer.js
Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
消费者将其读取的最后一个条目ID存储在Redis字符串中的键consumer:lastid
。它使用这个字符串在重启后从它离开的地方继续前进。试着用Ctrl+C
停止它并重新启动它。
一旦消费者处理了流中的每一个条目,它将无限期地等待生产者的实例来增加更多的条目:
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
用Ctrl+C
来阻止它。
运行消费者组
一个消费者组由多个消费者实例组成,它们一起工作。Redis管理从流中读取的条目分配给消费者组的成员。一个组中的消费者将收到一个子集的条目,而整个组将收到所有的条目。当在一个消费者组中工作时,消费者进程必须确认收到/处理每个条目。
使用多个终端窗口,启动消费者组消费者的三个实例,给每个实例起一个独特的名字:
npm run consumergroup consumer1
> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"
Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...
在第二个终端中:
npm run consumergroup consumer2
而在第三个终端下:
npm run consumergroup consumer3
消费者将无限期地运行,等待生产者实例将新的消息添加到流中,当他们集体消耗了整个流时。 注意,在这个模型中,每个消费者实例并不接收流中的所有条目,但该组的三个成员各自接收一个子集。
在RedisInsight中查看数据流
- 启动RedisInsight。
- 选择
localhost:6379
。 - 选择STREAM。可以选择从右上角选择全屏来扩大视野。
你现在可以在Stream和Consumer Groups视图之间切换,以查看你的数据。 正如本主题前面提到的,流是一个只附加的日志,所以你不能修改一个条目的内容,但你可以删除整个条目。 一个有用的情况是在一个所谓的毒丸消息,可能导致消费者崩溃。你可以在Streams视图中物理地删除这些信息,或者在命令行界面(CLI)使用XDEL
命令。
你可以继续在CLI上与你的流进行交互。例如,要获得一个流的当前长度,请使用XLEN
命令:
XLEN ingest:temphumidity
在银行、游戏、供应链、物联网、社交媒体等领域使用流进行审计和处理事件。