Lettuce –Java Redis客户端简介
1.概述
本文是对Lettuce的介绍,这是一个Redis的Java客户端。
Redis是一个内存中的键值存储,可以作为数据库、缓存或消息代理使用。数据的添加、查询、修改和删除都是通过对Redis内存数据结构中的键进行操作的命令。
Lettuce支持同步和异步通信使用完整的Redis API,包括其数据结构、pub/sub消息传递和高可用性的服务器连接。
2.为什么是Lettuce?
我们已经在以前的一篇文章中介绍了Jedis 。是什么让Lettuce与众不同呢?
最显著的区别是它通过Java 8的CompletionStage接口提供的异步支持和对Reactive Streams的支持。正如我们将在下面看到的,Lettuce为从Redis数据库服务器发出异步请求和创建流提供了一个自然的接口。
它还使用Netty来与服务器进行通信。这使得一个“更重的API,但也使得它更适合与一个以上的线程共享一个连接。
3.设置
依赖
让我们首先声明我们在pom.xml中需要的唯一的依赖关系:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.10.RELEASE</version>
</dependency>
库的最新版本可以在GitHub仓库或Maven Central.上查看。
3.2.Redis安装
我们需要安装并运行至少一个Redis实例,如果我们想测试集群或哨兵模式,则需要两个实例(尽管哨兵模式需要三个服务器才能正常运行)。 对于这篇文章,我们使用4.0.x –目前最新的稳定版本。
有关开始使用 Redis 的更多信息可在此处找到,包括用于 Linux 和 MacOS 的下载。
Redis并不正式支持Windows,但有一个服务器的帖子这里,从2021年2月开始存档。我们还可以在Docker中运行Redis,这对Windows 10来说是一个更好的选择,也是一个快速启动和运行的方法。
4.连接
4.1.连接到服务器
连接到Redis由四个步骤组成:
- 创建一个Redis URI
- 使用URI连接到RedisClient
- 打开一个Redis的连接
- 生成一组RedisCommands的命令
让我们来看看实现:
RedisClient redisClient = RedisClient
.create("redis://[email protected]:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
StatefulRedisConnection就是它听起来的样子;一个到Redis服务器的线程安全连接,它将保持与服务器的连接并在需要时重新连接。一旦我们有了一个连接,我们就可以用它来同步或异步地执行 Redis 命令。
RedisClient使用了大量的系统资源,因为它持有Netty资源用于与Redis服务器进行通信。需要多个连接的应用程序应该使用一个RedisClient.。
4.2. Redis URI
我们通过向静态工厂方法传递一个URI来创建一个RedisClient。
Lettuce利用了Redis URI的自定义语法。这是模式:
redis :// [[email protected]] host [: port] [/ database]
[? [timeout=timeout[d|h|m|s|ms|us|ns]]
[&_database=database_]]
有四种URI方案:
- redis – 一个独立的Redis服务器
- rediss – 通过SSL连接的独立Redis服务器
- redis-socket – 通过Unix域套接字的独立的Redis服务器
- redis-sentinel – 一个Redis Sentinel服务器
Redis数据库实例可以作为URL路径的一部分被指定,也可以作为一个额外的参数。如果两者都提供,参数具有更高的优先权。
在上面的例子中,我们使用的是String表示。Lettuce也有一个RedisURI类用于建立连接。它提供了Builder模式:
RedisURI.Builder
.redis("localhost", 6379).auth("password")
.database(1).build();
还有一个构造函数:
new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);
4.3.同步命令
与Jedis类似,Lettuce以方法的形式提供了一个完整的Redis命令集。
然而,Lettuce同时实现了同步和异步版本。我们将简要地看一下同步版本,然后在教程的其余部分使用异步实现。
在我们创建了一个连接后,我们用它来创建一个命令集:
RedisCommands<String, String> syncCommands = connection.sync();
现在,我们有了一个直观的接口来与Redis进行通信。
我们可以设置和获取字符串值:
syncCommands.set("key", "Hello, Redis!");
String value = syncommands.get(“key”);
我们可以用哈希值来工作:
syncCommands.hset("recordName", "FirstName", "John");
syncCommands.hset("recordName", "LastName", "Smith");
Map<String, String> record = syncCommands.hgetall("recordName");
我们将在文章的后面介绍更多的Redis。
Lettuce的同步API使用异步API。阻断是在命令层面为我们完成的。这意味着多个客户端可以共享一个同步连接。
4.4.异步命令
让我们来看看异步命令:
RedisAsyncCommands<String, String> asyncCommands = connection.async();
我们从连接中检索一组RedisAsyncCommands,类似于我们检索同步集的方式。这些命令返回一个RedisFuture(内部是一个CompletableFuture):。
RedisFuture<String> result = asyncCommands.get("key");
可在此处找到使用CompletableFuture 的指南。
4.5.反应式API
最后,让我们看看如何使用非阻塞式的反应式API:
RedisStringReactiveCommands<String, String> reactiveCommands = connection.reactive();
这些命令返回的结果被包裹在Mono或Flux中,来自Project Reactor.。
5.Redis数据结构
上面我们简单看了一下字符串和哈希值,让我们看看Lettuce如何实现Redis的其他数据结构。正如我们所期望的,每个Redis命令都有一个名称相似的方法。
5.1.列表
Lists是保留了插入顺序的字符串的列表。值从两端插入或检索:
asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
RedisFuture<String> redisFuture = asyncCommands.rpop("tasks");
String nextTask = redisFuture.get();
在这个例子中,nextTask等于“firstTask“。Lpush将数值推到列表的头部,然后rpop将数值从列表的末端弹出。
我们也可以从另一端弹出元素:
asyncCommands.del("tasks");
asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
redisFuture = asyncCommands.lpop("tasks");
String nextTask = redisFuture.get();
在第二个例子中,我们先用del删除列表。然后我们再次插入相同的值,但我们用lpop从列表的头部弹出这些值,所以nextTask持有“secondTask”文本。
5.2.Set
Redis Sets是字符串的无序集合,类似于Java的Set;没有重复的元素:
asyncCommands.sadd("pets", "dog");
asyncCommands.sadd("pets", "cat");
asyncCommands.sadd("pets", "cat");
RedisFuture<Set<String>> pets = asyncCommands.smembers("nicknames");
RedisFuture<Boolean> exists = asyncCommands.sismember("pets", "dog");
当我们以Set的形式检索Redis集合时,大小为2,因为重复的“cat”被忽略了。当我们用sismember查询Redis是否存在“dog”时,响应是true。
5.3.哈希值
我们在前面简要地看了一个哈希值的例子。它们值得快速解释一下。
Redis Hash是具有String字段和值的记录。每条记录在主索引中也有一个键:
asyncCommands.hset("recordName", "FirstName", "John");
asyncCommands.hset("recordName", "LastName", "Smith");
RedisFuture<String> lastName
= syncCommands.hget("recordName", "LastName");
RedisFuture<Map<String, String>> record
= syncCommands.hgetall("recordName");
我们使用hset将字段添加到哈希中,传入哈希的名称、字段的名称和一个值。
然后,我们用hget记录的名称和字段来检索一个单独的值。最后,我们用hgetall.将整个记录作为哈希值来获取。
5.4.排序的Set
排序集包含数值和一个等级,通过这个等级进行排序。等级是64位的浮点值。
项目被添加到一个等级中,并在一个范围内被检索:
asyncCommands.zadd("sortedset", 1, "one");
asyncCommands.zadd("sortedset", 4, "zero");
asyncCommands.zadd("sortedset", 2, "two");
RedisFuture<List<String>> valuesForward = asyncCommands.zrange(key, 0, 3);
RedisFuture<List<String>> valuesReverse = asyncCommands.zrevrange(key, 0, 3);
zadd 的第二个参数是一个等级。我们用zrange表示升序,zrevrange表示降序,按等级检索出一个范围。
我们添加了“zero”,其等级为4,所以它将出现在valuesForward的末尾和valuesReverse.的开头。
6.事务
事务允许在一个单一的原子步骤中执行一组命令。这些命令被保证按顺序完全执行。来自另一个用户的命令将不会被执行,直到事务完成。
要么所有的命令都被执行,要么一个都不执行。如果其中一个命令失败,Redis不会执行回滚。一旦exec()被调用,所有的命令将按照指定的顺序执行。
我们来看看一个例子:
asyncCommands.multi();
RedisFuture<String> result1 = asyncCommands.set("key1", "value1");
RedisFuture<String> result2 = asyncCommands.set("key2", "value2");
RedisFuture<String> result3 = asyncCommands.set("key3", "value3");
RedisFuture<TransactionResult> execResult = asyncCommands.exec();
TransactionResult transactionResult = execResult.get();
String firstResult = transactionResult.get(0);
String secondResult = transactionResult.get(0);
String thirdResult = transactionResult.get(0);
对multi的调用开始了事务。当一个事务被启动时,后续的命令不会被执行,直到exec()被调用。
在同步模式下,这些命令返回null。在异步模式下,这些命令返回RedisFuture。Exec返回一个包含响应列表的TransactionResult。
由于RedisFutures也会收到它们的结果,所以异步API客户端会在两个地方收到事务结果。
7.批处理
在正常情况下,Lettuce一旦被API客户端调用,就会立即执行命令。
这是大多数普通应用所希望的,特别是如果它们依赖于串行接收命令结果。
然而,如果应用程序不需要立即得到结果,或者大量的数据被上传,这种行为就不太有效了。
异步应用可以覆盖这一行为:
commands.setAutoFlushCommands(false);
List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
futures.add(commands.set("key-" + i, "value-" + i);
}
commands.flushCommands();
boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
futures.toArray(new RedisFuture[0]));
当setAutoFlushCommands设置为false时,应用程序必须手动调用flushCommands。在这个例子中,我们排查了多个set 命令,然后刷新了通道。AwaitAll等待所有的RedisFutures完成。
这个状态是在每个连接的基础上设置的,并影响到所有使用该连接的线程。这个功能不适用于同步命令。
8.发布/订阅
Redis提供了一个简单的发布/订阅消息系统。订阅者通过subscribe命令从通道中消费消息。消息是不持久的;它们只在用户订阅了一个通道后才会被传递给他们。
Redis使用pub/sub系统来通知Redis数据集,使客户能够接收关于键的设置、删除、过期等的事件。
更多细节请参见文档这里。
8.1.订阅者
一个RedisPubSubListener接收pub/sub-messages。这个接口定义了几个方法,但我们在这里只展示接收消息的方法:
public class Listener implements RedisPubSubListener<String, String> {
@Override
public void message(String channel, String message) {
log.debug("Got {} on channel {}", message, channel);
message = new String(s2);
}
}
我们使用RedisClient来连接一个pub/sub通道,并安装监听器:
StatefulRedisPubSubConnection<String, String> connection
= client.connectPubSub();
connection.addListener(new Listener())
RedisPubSubAsyncCommands<String, String> async
= connection.async();
async.subscribe("channel");
安装了监听器后,我们检索一组RedisPubSubAsyncCommands,并订阅一个频道。
8.2.发布者
发布只是一个连接Pub/Sub通道和检索命令的问题:
StatefulRedisPubSubConnection<String, String> connection
= client.connectPubSub();
RedisPubSubAsyncCommands<String, String> async
= connection.async();
async.publish("channel", "Hello, Redis!");
发布需要有一个渠道和一个信息。
8.3.反应式订阅
Lettuce还提供了一个反应式接口,用于订阅pub/sub消息:
StatefulRedisPubSubConnection<String, String> connection = client
.connectPubSub();
RedisPubSubAsyncCommands<String, String> reactive = connection
.reactive();
reactive.observeChannels().subscribe(message -> {
log.debug("Got {} on channel {}", message, channel);
message = new String(s2);
});
reactive.subscribe("channel").subscribe();
由observeChannels返回的Flux接收所有通道的消息,但由于这是一个流,过滤是很容易做到的。
9.高可用性
Redis为高可用性和可扩展性提供了几种选择。完整的理解需要Redis服务器配置的知识,但我们会简单介绍一下Lettuce如何支持它们。
9.1 主站/从站
Redis 服务器以主/从配置进行自我复制。主服务器向从属服务器发送命令流,将主缓存复制到从属服务器。 Redis 不支持双向复制,所以从属是只读的。
Lettuce可以连接到主/从系统,查询它们的拓扑结构,然后选择从属系统进行读取操作,这可以提高吞吐量:
RedisClient redisClient = RedisClient.create();
StatefulRedisMasterSlaveConnection<String, String> connection
= MasterSlave.connect(redisClient,
new Utf8StringCodec(), RedisURI.create("redis://localhost"));
connection.setReadFrom(ReadFrom.SLAVE);
9.2.哨兵
Redis Sentinel监控主站和从站实例,并在主站发生故障时协调对从站的故障恢复工作。
Lettuce可以连接到Sentinel,用它来发现当前主站的地址,然后返回一个连接给它。
要做到这一点,我们建立一个不同的RedisURI,并将我们的RedisClient与它连接起来:
RedisURI redisUri = RedisURI.Builder
.sentinel("sentinelhost1", "clustername")
.withSentinel("sentinelhost2").build();
RedisClient client = new RedisClient(redisUri);
RedisConnection<String, String> connection = client.connect();
我们用第一个哨兵的主机名(或地址)和一个集群名称构建URI,后面是第二个哨兵地址。当我们连接到哨兵时,Lettuce会查询它的拓扑结构,并为我们返回一个与当前主服务器的连接。
完整的文件可在这里获得。
9.3.集群
Redis Cluster使用分布式配置,以提供高可用性和高吞吐量。
集群将键分散在多达1000个节点上,因此,事务在集群中是不可用的:
RedisURI redisUri = RedisURI.Builder.redis("localhost")
.withPassword("authentication").build();
RedisClusterClient clusterClient = RedisClusterClient
.create(rediUri);
StatefulRedisClusterConnection<String, String> connection
= clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection
.sync();
RedisAdvancedClusterCommands持有集群所支持的Redis命令集,将它们路由到持有密钥的实例。
完整的规范可在这里获得。
10.结语
在本教程中,我们研究了如何使用Lettuce从我们的应用程序中连接和查询Redis服务器。
Lettuce 支持完整的 Redis 功能集,并具有完全线程安全的异步接口。它还广泛使用了Java 8的CompletionStage接口,使应用程序可以精细地控制它们如何接收数据。
像往常一样,代码样本可以在GitHub上找到。