Lettuce –Java Redis客户端简介

评论 0 浏览 0 2018-01-28

1.概述

本文是对Lettuce的介绍,这是一个Redis的Java客户端。

Redis是一个内存中的键值存储,可以作为数据库、缓存或消息代理使用。数据的添加、查询、修改和删除都是通过对Redis内存数据结构中的键进行操作的命令

Lettuce支持同步和异步通信使用完整的Redis API,包括其数据结构、pub/sub消息传递和高可用性的服务器连接。

2.为什么是Lettuce?

我们已经在以前的一篇文章中介绍了Jedis 是什么让Lettuce与众不同呢?

最显著的区别是它通过Java 8的CompletionStage接口提供的异步支持和对Reactive Streams的支持。正如我们将在下面看到的,Lettuce为从Redis数据库服务器发出异步请求和创建流提供了一个自然的接口。

它还使用Netty来与服务器进行通信。这使得一个“更重的API,但也使得它更适合与一个以上的线程共享一个连接。

3.设置

依赖

让我们首先声明我们在pom.xml中需要的唯一的依赖关系:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.1.10.RELEASE</version>
</dependency>

库的最新版本可以在GitHub仓库Maven Central.上查看。

3.2.Redis安装

我们需要安装并运行至少一个Redis实例,如果我们想测试集群或哨兵模式,则需要两个实例(尽管哨兵模式需要三个服务器才能正常运行)。 对于这篇文章,我们使用4.0.x –目前最新的稳定版本。

有关开始使用 Redis 的更多信息可在此处找到,包括用于 Linux 和 MacOS 的下载。

Redis并不正式支持Windows,但有一个服务器的帖子这里,从2021年2月开始存档。我们还可以在Docker中运行Redis,这对Windows 10来说是一个更好的选择,也是一个快速启动和运行的方法。

4.连接

4.1.连接到服务器

连接到Redis由四个步骤组成:

  1. 创建一个Redis URI
  2. 使用URI连接到RedisClient
  3. 打开一个Redis的连接
  4. 生成一组RedisCommands的命令

让我们来看看实现:

RedisClient redisClient = RedisClient
  .create("redis://[email protected]:6379/");
StatefulRedisConnection<String, String> connection
 = redisClient.connect();

StatefulRedisConnection就是它听起来的样子;一个到Redis服务器的线程安全连接,它将保持与服务器的连接并在需要时重新连接。一旦我们有了一个连接,我们就可以用它来同步或异步地执行 Redis 命令。

RedisClient使用了大量的系统资源,因为它持有Netty资源用于与Redis服务器进行通信。需要多个连接的应用程序应该使用一个RedisClient.

4.2. Redis URI

我们通过向静态工厂方法传递一个URI来创建一个RedisClient

Lettuce利用了Redis URI的自定义语法。这是模式:

redis :// [[email protected]] host [: port] [/ database]
  [? [timeout=timeout[d|h|m|s|ms|us|ns]]
  [&_database=database_]]

有四种URI方案:

  • redis – 一个独立的Redis服务器
  • rediss – 通过SSL连接的独立Redis服务器
  • redis-socket – 通过Unix域套接字的独立的Redis服务器
  • redis-sentinel – 一个Redis Sentinel服务器

Redis数据库实例可以作为URL路径的一部分被指定,也可以作为一个额外的参数。如果两者都提供,参数具有更高的优先权。

在上面的例子中,我们使用的是String表示。Lettuce也有一个RedisURI类用于建立连接。它提供了Builder模式:

RedisURI.Builder
  .redis("localhost", 6379).auth("password")
  .database(1).build();

还有一个构造函数:

new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

4.3.同步命令

与Jedis类似,Lettuce以方法的形式提供了一个完整的Redis命令集。

然而,Lettuce同时实现了同步和异步版本。我们将简要地看一下同步版本,然后在教程的其余部分使用异步实现。

在我们创建了一个连接后,我们用它来创建一个命令集:

RedisCommands<String, String> syncCommands = connection.sync();

现在,我们有了一个直观的接口来与Redis进行通信。

我们可以设置和获取字符串值:

syncCommands.set("key", "Hello, Redis!");

String value = syncommands.get(“key”);

我们可以用哈希值来工作:

syncCommands.hset("recordName", "FirstName", "John");
syncCommands.hset("recordName", "LastName", "Smith");
Map<String, String> record = syncCommands.hgetall("recordName");

我们将在文章的后面介绍更多的Redis。

Lettuce的同步API使用异步API。阻断是在命令层面为我们完成的。这意味着多个客户端可以共享一个同步连接。

4.4.异步命令

让我们来看看异步命令:

RedisAsyncCommands<String, String> asyncCommands = connection.async();

我们从连接中检索一组RedisAsyncCommands,类似于我们检索同步集的方式。这些命令返回一个RedisFuture(内部是一个CompletableFuture:

RedisFuture<String> result = asyncCommands.get("key");

可在此处找到使用CompletableFuture 的指南。

4.5.反应式API

最后,让我们看看如何使用非阻塞式的反应式API:

RedisStringReactiveCommands<String, String> reactiveCommands = connection.reactive();

这些命令返回的结果被包裹在MonoFlux中,来自Project Reactor.

使用Project Reactor的指南可以在这里找到

5.Redis数据结构

上面我们简单看了一下字符串和哈希值,让我们看看Lettuce如何实现Redis的其他数据结构。正如我们所期望的,每个Redis命令都有一个名称相似的方法。

5.1.列表

Lists是保留了插入顺序的字符串的列表。值从两端插入或检索:

asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
RedisFuture<String> redisFuture = asyncCommands.rpop("tasks");

String nextTask = redisFuture.get();

在这个例子中,nextTask等于“firstTask“。Lpush将数值推到列表的头部,然后rpop将数值从列表的末端弹出。

我们也可以从另一端弹出元素:

asyncCommands.del("tasks");
asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
redisFuture = asyncCommands.lpop("tasks");

String nextTask = redisFuture.get();

在第二个例子中,我们先用del删除列表。然后我们再次插入相同的值,但我们用lpop从列表的头部弹出这些值,所以nextTask持有“secondTask”文本。

5.2.Set

Redis Sets是字符串的无序集合,类似于Java的Set;没有重复的元素:

asyncCommands.sadd("pets", "dog");
asyncCommands.sadd("pets", "cat");
asyncCommands.sadd("pets", "cat");
 
RedisFuture<Set<String>> pets = asyncCommands.smembers("nicknames");
RedisFuture<Boolean> exists = asyncCommands.sismember("pets", "dog");

当我们以Set的形式检索Redis集合时,大小为2,因为重复的“cat”被忽略了。当我们用sismember查询Redis是否存在“dog”时,响应是true

5.3.哈希值

我们在前面简要地看了一个哈希值的例子。它们值得快速解释一下。

Redis Hash是具有String字段和值的记录。每条记录在主索引中也有一个键:

asyncCommands.hset("recordName", "FirstName", "John");
asyncCommands.hset("recordName", "LastName", "Smith");

RedisFuture<String> lastName 
  = syncCommands.hget("recordName", "LastName");
RedisFuture<Map<String, String>> record 
  = syncCommands.hgetall("recordName");

我们使用hset将字段添加到哈希中,传入哈希的名称、字段的名称和一个值。

然后,我们用hget记录的名称和字段来检索一个单独的值。最后,我们用hgetall.将整个记录作为哈希值来获取。

5.4.排序的Set

排序集包含数值和一个等级,通过这个等级进行排序。等级是64位的浮点值。

项目被添加到一个等级中,并在一个范围内被检索:

asyncCommands.zadd("sortedset", 1, "one");
asyncCommands.zadd("sortedset", 4, "zero");
asyncCommands.zadd("sortedset", 2, "two");

RedisFuture<List<String>> valuesForward = asyncCommands.zrange(key, 0, 3);
RedisFuture<List<String>> valuesReverse = asyncCommands.zrevrange(key, 0, 3);

zadd 的第二个参数是一个等级。我们用zrange表示升序,zrevrange表示降序,按等级检索出一个范围。

我们添加了“zero”,其等级为4,所以它将出现在valuesForward的末尾和valuesReverse.的开头。

6.事务

事务允许在一个单一的原子步骤中执行一组命令。这些命令被保证按顺序完全执行。来自另一个用户的命令将不会被执行,直到事务完成。

要么所有的命令都被执行,要么一个都不执行。如果其中一个命令失败,Redis不会执行回滚。一旦exec()被调用,所有的命令将按照指定的顺序执行。

我们来看看一个例子:

asyncCommands.multi();
    
RedisFuture<String> result1 = asyncCommands.set("key1", "value1");
RedisFuture<String> result2 = asyncCommands.set("key2", "value2");
RedisFuture<String> result3 = asyncCommands.set("key3", "value3");

RedisFuture<TransactionResult> execResult = asyncCommands.exec();

TransactionResult transactionResult = execResult.get();

String firstResult = transactionResult.get(0);
String secondResult = transactionResult.get(0);
String thirdResult = transactionResult.get(0);

multi的调用开始了事务。当一个事务被启动时,后续的命令不会被执行,直到exec()被调用。

在同步模式下,这些命令返回null。在异步模式下,这些命令返回RedisFutureExec返回一个包含响应列表的TransactionResult

由于RedisFutures也会收到它们的结果,所以异步API客户端会在两个地方收到事务结果。

7.批处理

在正常情况下,Lettuce一旦被API客户端调用,就会立即执行命令。

这是大多数普通应用所希望的,特别是如果它们依赖于串行接收命令结果。

然而,如果应用程序不需要立即得到结果,或者大量的数据被上传,这种行为就不太有效了。

异步应用可以覆盖这一行为:

commands.setAutoFlushCommands(false);

List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
    futures.add(commands.set("key-" + i, "value-" + i);
}
commands.flushCommands();

boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
  futures.toArray(new RedisFuture[0]));

当setAutoFlushCommands设置为false时,应用程序必须手动调用flushCommands。在这个例子中,我们排查了多个set 命令,然后刷新了通道。AwaitAll等待所有的RedisFutures完成。

这个状态是在每个连接的基础上设置的,并影响到所有使用该连接的线程。这个功能不适用于同步命令。

8.发布/订阅

Redis提供了一个简单的发布/订阅消息系统。订阅者通过subscribe命令从通道中消费消息。消息是不持久的;它们只在用户订阅了一个通道后才会被传递给他们。

Redis使用pub/sub系统来通知Redis数据集,使客户能够接收关于键的设置、删除、过期等的事件。

更多细节请参见文档这里

8.1.订阅者

一个RedisPubSubListener接收pub/sub-messages。这个接口定义了几个方法,但我们在这里只展示接收消息的方法:

public class Listener implements RedisPubSubListener<String, String> {

    @Override
    public void message(String channel, String message) {
        log.debug("Got {} on channel {}",  message, channel);
        message = new String(s2);
    }
}

我们使用RedisClient来连接一个pub/sub通道,并安装监听器:

StatefulRedisPubSubConnection<String, String> connection
 = client.connectPubSub();
connection.addListener(new Listener())

RedisPubSubAsyncCommands<String, String> async
 = connection.async();
async.subscribe("channel");

安装了监听器后,我们检索一组RedisPubSubAsyncCommands,并订阅一个频道。

8.2.发布者

发布只是一个连接Pub/Sub通道和检索命令的问题:

StatefulRedisPubSubConnection<String, String> connection 
  = client.connectPubSub();

RedisPubSubAsyncCommands<String, String> async 
  = connection.async();
async.publish("channel", "Hello, Redis!");

发布需要有一个渠道和一个信息。

8.3.反应式订阅

Lettuce还提供了一个反应式接口,用于订阅pub/sub消息:

StatefulRedisPubSubConnection<String, String> connection = client
  .connectPubSub();

RedisPubSubAsyncCommands<String, String> reactive = connection
  .reactive();

reactive.observeChannels().subscribe(message -> {
    log.debug("Got {} on channel {}",  message, channel);
    message = new String(s2);
});
reactive.subscribe("channel").subscribe();

observeChannels返回的Flux接收所有通道的消息,但由于这是一个流,过滤是很容易做到的。

9.高可用性

Redis为高可用性和可扩展性提供了几种选择。完整的理解需要Redis服务器配置的知识,但我们会简单介绍一下Lettuce如何支持它们。

9.1 主站/从站

Redis 服务器以主/从配置进行自我复制。主服务器向从属服务器发送命令流,将主缓存复制到从属服务器。 Redis 不支持双向复制,所以从属是只读的。

Lettuce可以连接到主/从系统,查询它们的拓扑结构,然后选择从属系统进行读取操作,这可以提高吞吐量:

RedisClient redisClient = RedisClient.create();

StatefulRedisMasterSlaveConnection<String, String> connection
 = MasterSlave.connect(redisClient, 
   new Utf8StringCodec(), RedisURI.create("redis://localhost"));
 
connection.setReadFrom(ReadFrom.SLAVE);

9.2.哨兵

Redis Sentinel监控主站和从站实例,并在主站发生故障时协调对从站的故障恢复工作。

Lettuce可以连接到Sentinel,用它来发现当前主站的地址,然后返回一个连接给它。

要做到这一点,我们建立一个不同的RedisURI,并将我们的RedisClient与它连接起来:

RedisURI redisUri = RedisURI.Builder
  .sentinel("sentinelhost1", "clustername")
  .withSentinel("sentinelhost2").build();
RedisClient client = new RedisClient(redisUri);

RedisConnection<String, String> connection = client.connect();

我们用第一个哨兵的主机名(或地址)和一个集群名称构建URI,后面是第二个哨兵地址。当我们连接到哨兵时,Lettuce会查询它的拓扑结构,并为我们返回一个与当前主服务器的连接。

完整的文件可在这里获得。

9.3.集群

Redis Cluster使用分布式配置,以提供高可用性和高吞吐量。

集群将键分散在多达1000个节点上,因此,事务在集群中是不可用的:

RedisURI redisUri = RedisURI.Builder.redis("localhost")
  .withPassword("authentication").build();
RedisClusterClient clusterClient = RedisClusterClient
  .create(rediUri);
StatefulRedisClusterConnection<String, String> connection
 = clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection
  .sync();

RedisAdvancedClusterCommands持有集群所支持的Redis命令集,将它们路由到持有密钥的实例。

完整的规范可在这里获得。

10.结语

在本教程中,我们研究了如何使用Lettuce从我们的应用程序中连接和查询Redis服务器。

Lettuce 支持完整的 Redis 功能集,并具有完全线程安全的异步接口。它还广泛使用了Java 8的CompletionStage接口,使应用程序可以精细地控制它们如何接收数据。

像往常一样,代码样本可以在GitHub上找到。

最后更新2023-06-28
0 个评论
标签