实战派 | Java项目中玩转Redis6.0客户端缓存
nanshan 2025-05-11 17:26 28 浏览 0 评论
铺垫
首先介绍一下今天要使用到的工具Lettuce,它是一个可伸缩线程安全的redis客户端。多个线程可以共享同一个RedisConnection,利用nio框架Netty来高效地管理多个连接。
放眼望向现在常用的redis客户端开发工具包,虽然能用的不少,但是目前率先拥抱redis6.0,支持客户端缓存功能的却不多,而lettuce就是其中的领跑者。
我们先在项目中引入最新版本的依赖,下面正式开始实战环节:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.8.RELEASE</version>
</dependency>
实战
在项目中应用lettuce,开启并使用客户端缓存功能,只需要下面这一段代码:
public static void main(String[] args) throws InterruptedException {
// 创建 RedisClient 连接信息
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connect = client.connect();
Map<String, String> map = new HashMap<>();
CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());
String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}
上面的代码主要完成了几项工作:
- 通过RedisURI配置redis连接的标准信息,并建立连接
- 创建用于充当本地缓存的Map,开启客户端缓存功能,建立一个缓存访问器CacheFrontend
- 在循环中使用CacheFrontend,不断查询同一个key对应的值并打印
启动上面的程序,控制台会不断的打印user对应的缓存,在启动一段时间后,我们在其他的客户端修改user对应的值,运行的结果如下:
可以看到,在其他客户端修改了key所对应的值后,打印结果也发生了变化。但是到这里,我们也不知道lettuce是不是真的使用了客户端缓存,虽然结果正确,但是说不定是它每次都重新执行了get命令呢?
所以我们下面来看看源码,分析一下具体的代码执行流程。
分析
在上面的代码中,最关键的类就是CacheFrontend了,我们再来仔细看一下上面具体实例化时的语句:
CacheFrontend<String,String> frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);
首先调用了ClientSideCaching的enable()方法,我们看一下它的源码:
解释一下传入的3个参数:
- CacheAccessor:一个定义对客户端缓存进行访问接口,上面调用它的forMap方法返回的是一个MapCacheAccessor,它的底层使用的我们自定义的Map来存放本地缓存,并且提供了get、put、evict等方法操作Map
- StatefulRedisConnection:使用到的redis连接
- TrackingArgs:客户端缓存的参数配置,使用noloop后不会接收当前连接修改key后的通知
向redis服务端发送开启tracking的命令后,继续向下调用create()方法:
这个过程中实例化了一个重要对象,它就是实现了RedisCache接口的DefaultRedisCache对象,实际向redis执行查询时的get请求、写入的put请求,都是由它来完成。
实例化完成后,继续向下调用同名的create()方法:
在这个方法中,实例化了ClientSideCaching对象,注意一下传入的两个参数,通过前面的介绍也很好理解它们的分工:
- 当本地缓存存在时,直接从CacheAccessor中读取
- 当本地缓存不存在时,使用RedisCache从服务端读取
需要额外注意一下的是返回前的两行代码,先看第一句(行号114的那行)。
这里向RedisCache添加了一个监听,当监听到类型为invalidate的作废消息时,拿到要作废的key,传递给消费者。一般情况下,keys中只会有一个元素。
消费时会遍历当前ClientSideCaching的消费者列表invalidationListeners:
而这个列表中的所有,就是在上面的第二行代码中(行号115的那行)添加的,看一下方法的定义:
而实际传入的方法引用则是下面MapCacheAccessor的evict()方法,也就是说,当收到key作废的消息后,会移除掉本地缓存Map中缓存的这个数据。
客户端缓存的作废逻辑我们梳理清楚了,再来看看它是何时写入的,直接看ClientSideCaching的get()方法:
可以看到,get方法会先从本地缓存MapCacheAccessor中尝试获取,如果取到则直接返回,如果没有再使用RedisCache读取redis中的缓存,并将返回的结果存入到MapCacheAccessor中。
图解
源码看到这里,是不是基本逻辑就串联起来了,我们再画两张图来梳理一下这个流程。先看get的过程:
再来看一下通知客户端缓存失效的过程:
怎么样,配合这两张图再理解一下,是不是很完美?
其实也不是…回忆一下我们之前使用两级缓存Caffeine+Redis时,当时使用的通知机制,会在修改redis缓存后通知所有主机修改本地缓存,修改成为最新的值。目前的lettuce看来,显然不满足这一功能,只能做到作废删除缓存但是不会主动更新。
扩展
那么,如果想实现本地客户端缓存的实时更新,我们应该如何在现在的基础上进行扩展呢?仔细想一下的话,思路也很简单:
- 首先,移除掉lettuce的客户端缓存本身自带的作废消息监听器
- 然后,添加我们自己的作废消息监听器
回顾一下上面源码分析的图,在调用DefaultRedisCache的addInvalidationListener()方法时,其实是调用的是StatefulRedisConnection的addListener()方法,也就是说,这个监听器其实是添加在redis连接上的。
如果我们再看一下这个方法源码的话,就会发现,在它的附近还有一个对应的removeListener()方法,一看就是我们要找的东西,准备用它来移除消息监听。
不过再仔细看看,这个方法是要传参数的啊,我们明显不知道现在里面已经存在的PushListener有什么,所以没法直接使用,那么无奈只能再接着往下看看这个pushHandler是什么玩意…
通过注释可以知道,这个PushHandler就是一个用来操作PushListener的处理工具,虽然我们不知道具体要移除的PushListener是哪一个,但是惊喜的是,它提供了一个getPushListeners()方法,可以获取当前所有的监听器。
这样一来就简单了,我上来直接清除掉这个集合中的所有监听器,问题就迎刃而解了~
不过,在StatefulRedisConnectionImpl中的pushHandler是一个私有对象,也没有对外进行暴露,想要操作起来还是需要费上一点功夫的。下面,我们就在分析的结果上进行代码的修改。
魔改
首先,我们需要自定义一个工具类,它的主要功能是操作监听器,所以就命名为ListenerChanger好了。它要完成的功能主要有三个:
- 移除原有的全部消息监听
- 添加新的自定义消息监听
- 更新本地缓存MapCacheAccessor中的数据
首先定义构造方法,需要传入StatefulRedisConnection和CacheAccessor作为参数,在后面的方法中会用到,并且创建一个RedisCommands,用于后面向redis服务端发送get命令请求。
public class ListenerChanger<K, V> {
private StatefulRedisConnection<K, V> connection;
private CacheAccessor<K, V> mapCacheAccessor;
private RedisCommands<K, V> command;
public ListenerChanger(StatefulRedisConnection<K, V> connection,
CacheAccessor<K, V> mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}
//其他方法先省略……
}
移除监听
前面说过,pushHandler是一个私有对象,我们无法直接获取和操作,所以只能先使用反射获得。PushHandler中的监听器列表存储在一个CopyOnWriteArrayList中,我们直接使用迭代器移除掉所有内容即可。
public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);
CopyOnWriteArrayList<PushListener> pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator<PushListener> it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
添加监听
这里我们模仿DefaultRedisCache中addInvalidationListener()方法的写法,添加一个监听器,除了最后处理的代码基本一致。对于监听到的要作废的keys集合,另外启动一个线程更新本地数据。
public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
List<K> keys = (List<K>) content.get(1);
System.out.println("modifyKeys:"+keys);
// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}
本地更新
使用RedisCommands重新从redis服务端获取最新的数据,并更新本地缓存mapCacheAccessor中的数据。
private void updateMap(List<K> keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}
至于为什么执行这个方法时额外启动了一个新线程,是因为我在测试中发现,当在PushListener的onPushMessage方法中执行RedisCommands的get()方法时,会一直取不到值,但是像这样新启动一个线程就没有问题。
测试
下面,我们来写一段测试代码,来测试上面的改动。
public static void main(String[] args) throws InterruptedException {
// 省略之前创建连接代码……
Map<String, String> map = new HashMap<>();
CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());
ListenerChanger<String, String> listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的监听器
listenerChanger.addNewListener();
String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}
可以看到,代码基本上在之前的基础上没有做什么改动,只是在创建完ClientSideCaching后,执行了我们自己实现的ListenerChanger的两个方法。先移除所有监听器、再添加新的监听器。下面我们以debug模式启动测试代码,简单看一下代码的执行逻辑。
首先,在未执行移除操作前,pushHandler中的监听器列表中有一个监听器:
移除后,监听器列表为空:
在添加完自定义监听器、并且执行完第一次查询操作后,在另外一个redis客户端中修改user的值,这时PushListener会收到作废类型的消息监听:
启动一个新线程,查询redis中user对应的最新值,并放入cacheAccessor中:
当循环中CacheFrontend的get()方法再被执行时,会直接从cacheAccessor中取到刷新后的值,不需要再次去访问redis服务端了:
总结
到这里,我们基于lettuce的客户端缓存的基本使用、以及在这个基础上进行的魔改就基本完成了。可以看到,lettuce客户端已经在底层封装了一套比较成熟的API,能让我们在将redis升级到6.0以后,开箱即用式地使用客户端缓存这一新特性。在使用中,不需要我们关注底层原理,也不用做什么业务逻辑的改造,总的来说,使用起来还是挺香的。
来源:
https://mp.weixin.qq.com/s/QOq2zUI0AFGqc8lNtSmShQ
相关推荐
- 使用nginx配置域名及禁止直接通过IP访问网站
-
前段时间刚搭建好这个网站,一直没有关注一个问题,那就是IP地址也可以访问我的网站,今天就专门研究了一下nginx配置问题,争取把这个问题研究透彻。1.nginx配置域名及禁止直接通过IP访问先来看n...
- 如何在 Linux 中使用 PID 号查找进程名称?
-
在Linux的复杂世界中,进程是系统运行的核心,每个进程都由一个唯一的「进程ID」(PID)标识。无论是系统管理员在排查失控进程,还是开发者在调试应用程序,知道如何将PID映射到对应的进程名称都是一项...
- Linux服务器硬件信息查询与日常运维命令总结
-
1.服务器硬件信息查询1.1CPU信息查询命令功能描述示例lscpu显示CPU架构、核心数、线程数等lscpucat/proc/cpuinfo详细CPU信息(型号、缓存、频率)cat/proc/c...
- Ubuntu 操作系统常用命令详解(ubuntu常用的50个命令)
-
UbuntuLinux是一款流行的开源操作系统,广泛应用于服务器、开发、学习等场景。命令行是Ubuntu的灵魂,也是高效、稳定管理系统的利器。本文按照各大常用领域,详细总结Ubuntu必学...
- 从 0 到 1:打造基于 Linux 的私有 API 网关平台
-
在当今微服务架构盛行的时代,API网关作为服务入口和安全屏障,其重要性日益凸显。你是否想过,不依赖商业方案,完全基于开源组件,在Linux上构建一个属于自己的私有API网关平台?今天就带你...
- Nginx搭建简单直播服务器(nginx 直播服务器搭建)
-
前言使用Nginx+Nginx-rtmp-module在Ubuntu中搭建简单的rtmp推流直播服务器。服务器环境Ubuntu16.04相关概念RTMP:RTMP协议是RealTi...
- Linux连不上网?远程卡?这篇网络管理指南你不能错过!
-
大家好!今天咱们聊个所有Linux用户都躲不开的“老大难”——网络管理。我猜你肯定遇到过这些崩溃时刻:新装的Linux系统连不上Wi-Fi,急得直拍桌子;远程服务器SSH连不上,提示“Connecti...
- 7天从0到上线!手把手教你用Python Flask打造爆款Web服务
-
一、为什么全网开发者都在疯学Flask?在当今Web开发的战场,Flask就像一把“瑞士军刀”——轻量级架构让新手3天速成,灵活扩展能力又能支撑百万级用户项目!对比Django的“重型装甲”,Flas...
- nginx配置文件详解(nginx反向代理配置详解)
-
Nginx是一个强大的免费开源的HTTP服务器和反向代理服务器。在Web开发项目中,nginx常用作为静态文件服务器处理静态文件,并负责将动态请求转发至应用服务器(如Django,Flask,et...
- 30 分钟搞定 Docker 安装与 Nginx 部署,轻松搭建高效 Web 服务
-
在云计算时代,利用容器技术快速部署应用已成为开发者必备技能。本文将手把手教你在阿里云轻量应用服务器上,通过Docker高效部署Nginx并发布静态网站,全程可视化操作,新手也能轻松上手!一、准...
- Nginx 配置实战:从摸鱼到部署,手把手教你搞定生产级配置
-
各位摸鱼搭子们!今天咱不聊代码里的NullPointerException,改聊点「摸鱼必备生存技能」——Nginx配置!先灵魂拷问一下:写了一堆接口却不会部署?服务器被恶意请求打崩过?静态资源加载...
- 如何使用 Daphne + Nginx + supervisor部署 Django
-
前言:从Django3.0开始支持ASGI应用程序运行,使Django完全具有异步功能。Django目前已经更新到5.0,对异步支持也越来越好。但是,异步功能将仅对在ASGI下运行的应用程序可用...
- Docker命令最全详解(39个最常用命令)
-
Docker是云原生的核心,也是大厂的必备技能,下面我就全面来详解Docker核心命令@mikechen本文作者:陈睿|mikechen文章来源:mikechen.cc一、Docker基本命令doc...
- ubuntu中如何查看是否已经安装了nginx
-
在Ubuntu系统中,可以通过以下几种方法检查是否已安装Nginx:方法1:使用dpkg命令(适用于Debian/Ubuntu)bashdpkg-l|grepnginx输出...
- OVN 概念与实践(德育概念的泛化在理论和实践中有什么弊端?)
-
今天我们来讲解OVN的概念和基础实践,要理解本篇博客的内容,需要前置学习:Linux网络设备-Bridge&VethPairLinux网络设备-Bridge详解OVS+Fa...
你 发表评论:
欢迎- 一周热门
-
-
UOS服务器操作系统防火墙设置(uos20关闭防火墙)
-
极空间如何无损移机,新Z4 Pro又有哪些升级?极空间Z4 Pro深度体验
-
手机如何设置与显示准确时间的详细指南
-
NAS:DS video/DS file/DS photo等群晖移动端APP远程访问的教程
-
如何在安装前及安装后修改黑群晖的Mac地址和Sn系列号
-
如何修复用户配置文件服务在 WINDOWS 上登录失败的问题
-
一加手机与电脑互传文件的便捷方法FileDash
-
日本海上自卫队的军衔制度(日本海上自卫队的军衔制度是什么)
-
10个免费文件中转服务站,分享文件简单方便,你知道几个?
-
爱折腾的特斯拉车主必看!手把手教你TESLAMATE的备份和恢复
-
- 最近发表
-
- 使用nginx配置域名及禁止直接通过IP访问网站
- 如何在 Linux 中使用 PID 号查找进程名称?
- Linux服务器硬件信息查询与日常运维命令总结
- Ubuntu 操作系统常用命令详解(ubuntu常用的50个命令)
- 从 0 到 1:打造基于 Linux 的私有 API 网关平台
- Nginx搭建简单直播服务器(nginx 直播服务器搭建)
- Linux连不上网?远程卡?这篇网络管理指南你不能错过!
- 7天从0到上线!手把手教你用Python Flask打造爆款Web服务
- nginx配置文件详解(nginx反向代理配置详解)
- 30 分钟搞定 Docker 安装与 Nginx 部署,轻松搭建高效 Web 服务
- 标签列表
-
- linux 查询端口号 (58)
- docker映射容器目录到宿主机 (66)
- 杀端口 (60)
- yum更换阿里源 (62)
- internet explorer 增强的安全配置已启用 (65)
- linux自动挂载 (56)
- 禁用selinux (55)
- sysv-rc-conf (69)
- ubuntu防火墙状态查看 (64)
- windows server 2022激活密钥 (56)
- 无法与服务器建立安全连接是什么意思 (74)
- 443/80端口被占用怎么解决 (56)
- ping无法访问目标主机怎么解决 (58)
- fdatasync (59)
- 405 not allowed (56)
- 免备案虚拟主机zxhost (55)
- linux根据pid查看进程 (60)
- dhcp工具 (62)
- mysql 1045 (57)
- 宝塔远程工具 (56)
- ssh服务器拒绝了密码 请再试一次 (56)
- ubuntu卸载docker (56)
- linux查看nginx状态 (63)
- tomcat 乱码 (76)
- 2008r2激活序列号 (65)