Redis监听过期的key实现流程详解

作者:嘉禾嘉宁papa 时间:2023-12-12 02:41:28 

一、简介

  本文今天主要是讲Redis中对过期key的监听,可能很多小伙伴不会,或者使用会出现一些不可思议的问题,比如在系统中设置了一个缓存,希望在缓存失效后去做什么操作,但是实际中可能又出现了操作重复的问题。所以今天来讨论下怎么正确使用。我们来个最简单的集群架构,如下图:

Redis监听过期的key实现流程详解

  我们上面图中看到是服务A和服务B就是同一个服务的不同实例。

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.6.0</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.alian</groupId>
   <artifactId>expiration</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>expiration</name>
   <description>redis-key-expiration-listener</description>
   <properties>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
       <project.package.directory>target</project.package.directory>
       <java.version>1.8</java.version>
       <!--com.fasterxml.jackson 版本-->
       <jackson.version>2.9.10</jackson.version>
       <!--阿里巴巴fastjson 版本-->
       <fastjson.version>1.2.68</fastjson.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
       </dependency>
       <!--redis依赖-->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-redis</artifactId>
           <version>${parent.version}</version>
       </dependency>
       <!--用于序列化-->
       <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
           <version>${jackson.version}</version>
       </dependency>
       <!--java 8时间序列化-->
       <dependency>
           <groupId>com.fasterxml.jackson.datatype</groupId>
           <artifactId>jackson-datatype-jsr310</artifactId>
           <version>${jackson.version}</version>
       </dependency>
       <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.68</version>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>1.16.14</version>
       </dependency>
       <dependency>
           <groupId>junit</groupId>
           <artifactId>junit</artifactId>
           <version>4.13.2</version>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
           </plugin>
       </plugins>
   </build>
</project>

三、编码实现

3.1、application.properties

# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/expiration

# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000

3.2、Redis配置类

RedisConfig

package com.alian.expiration.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@Configuration
public class RedisConfig {
   /**
    * redis配置
    *
    * @param redisConnectionFactory
    * @return
    */
   @Bean
   public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
       // 实例化redisTemplate
       RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
       //设置连接工厂
       redisTemplate.setConnectionFactory(redisConnectionFactory);
       // key采用String的序列化
       redisTemplate.setKeySerializer(keySerializer());
       // value采用jackson序列化
       redisTemplate.setValueSerializer(valueSerializer());
       // Hash key采用String的序列化
       redisTemplate.setHashKeySerializer(keySerializer());
       // Hash value采用jackson序列化
       redisTemplate.setHashValueSerializer(valueSerializer());
       // 支持事务
       // redisTemplate.setEnableTransactionSupport(true);
       //执行函数,初始化RedisTemplate
       redisTemplate.afterPropertiesSet();
       return redisTemplate;
   }
   /**
    * key类型采用String序列化
    *
    * @return
    */
   private RedisSerializer<String> keySerializer() {
       return new StringRedisSerializer();
   }
   /**
    * value采用JSON序列化
    *
    * @return
    */
   private RedisSerializer<Object> valueSerializer() {
       //设置jackson序列化
       Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
       //设置序列化对象
       jackson2JsonRedisSerializer.setObjectMapper(getMapper());
       return jackson2JsonRedisSerializer;
   }
   /**
    * 使用com.fasterxml.jackson.databind.ObjectMapper
    * 对数据进行处理包括java8里的时间
    *
    * @return
    */
   private ObjectMapper getMapper() {
       ObjectMapper mapper = new ObjectMapper();
       //设置可见性
       mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
       //默认键入对象
       mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
       //设置Java 8 时间序列化
       JavaTimeModule timeModule = new JavaTimeModule();
       timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
       timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
       timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
       timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
       timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
       timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
       //禁用把时间转为时间戳
       mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
       mapper.registerModule(timeModule);
       return mapper;
   }
   @Bean
   RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
       RedisMessageListenerContainer container = new RedisMessageListenerContainer();
       container.setConnectionFactory(connectionFactory);
       return container;
   }
}

&emsp;&emsp;和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer

3.3、 *

RedisKeyExpirationListener

package com.alian.expiration.listener;
import com.alian.expiration.service.RedisExpirationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
   @Autowired
   private RedisExpirationService redisExpirationService;
// 把我们上面一步配置的bean注入进去
   public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
       super(listenerContainer);
   }
   /**
    * 针对redis数据失效事件,进行数据处理
    *
    * @param message
    * @param pattern
    */
   @Override
   public void onMessage(Message message, byte[] pattern) {
       // 用户做自己的业务处理即可,注意message.toString()可以获取失效的key
       String expiredKey = message.toString();
       log.info("onMessage --> redis 过期的key是:{}", expiredKey);
       try {
           // 对过期key进行处理
           redisExpirationService.processingExpiredKey(expiredKey);
           log.info("过期key处理完成:{}", expiredKey);
       } catch (Exception e) {
           e.printStackTrace();
           log.error("处理redis 过期的key异常:{}", expiredKey, e);
       }
   }
}

&emsp;&emsp;实现的步骤如下:

  • 继承KeyExpirationEventMessageListener

  • 把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中

  • 重写onMessage方法

  • 通过Message 的 toString() 方法就可以获取到过期的key

  • 对key中关键信息进行业务处理,比如 id

3.4、服务类

RedisExpirationService

package com.alian.expiration.service;
import com.alian.expiration.util.SignUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedisExpirationService {
   @Autowired
   private RedisTemplate<String, Object> redisTemplate;
   public void processingExpiredKey(String expiredKey) {
       // 如果是优惠券的key(一定要规范命名)
       if (expiredKey.startsWith("com.mall.coupon.id")) {
           // 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理
           String tempKey = SignUtils.md5(expiredKey, "UTF-8");
           // 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)
           Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS);
           if (Boolean.TRUE.equals(exist)) {
               log.info("Business Handing...");
               // 比如截取里面的id,然后关联数据库进行处理
           } else {
               log.info("Other service is handing...");
           }
       } else {
           log.info("Expired keys without processing");
       }
   }
}

&emsp;&emsp;基本流程如下:

  • 判断是否是需要处理的key,一般这种key通过命名规范加以处理

  • 以当前key生成一个新的key作为分布式key

  • 如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)

  • 设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key

  • 根据 key 的关键信息(比如截取id),进行业务处理

3.5、工具类

SignUtils

package com.alian.expiration.util;
import java.security.MessageDigest;
public class SignUtils {
   public static final String md5(String s, String charset) {
       char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
       try {
           byte[] btInput = s.getBytes(charset);
           MessageDigest mdInst = MessageDigest.getInstance("MD5");
           mdInst.update(btInput);
           byte[] md = mdInst.digest();
           int j = md.length;
           char[] str = new char[j * 2];
           int k = 0;
           for (byte byte0 : md) {
               str[k++] = hexDigits[byte0 >>> 4 & 15];
               str[k++] = hexDigits[byte0 & 15];
           }
           return new String(str);
       } catch (Exception var11) {
           return "";
       }
   }
}

四、测试

4.1、测试类

&emsp;&emsp;简单模拟下发送一个优惠券数据到redis,然后设置超时时间

package com.alian.expiration;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisKeyExpirationTest {
   @Autowired
   private RedisTemplate<String, Object> redisTemplate;
   @Test
   public void keyExpiration() {
       // 优惠券信息
       String id = "2023021685264735";
       Map<String, String> map = new HashMap<>();
       map.put("id", id);
       map.put("amount", "1000");
       map.put("type", "1001");
       map.put("describe", "满减红包");
       // 缓存到redis
       redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map);
       // 设置过期时间
       redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS);
   }
}

4.2、单实例

&emsp;&emsp;单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:

10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

4.3、多实例

&emsp;&emsp;多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing&hellip;):

11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

&emsp;&emsp;8091端口的服务结果如下(Other service is handing&hellip;):

11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

&emsp;&emsp;结果分析:

  • 多实例的情况下,每个实例都会收到过期key通知

  • 通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复

  • 使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)

来源:https://blog.csdn.net/Alian_1223/article/details/129047717

标签:Redis,监听,key
0
投稿

猜你喜欢

  • Springboot使用redis实现接口Api限流的示例代码

    2023-11-29 02:11:05
  • C#设置开机启动项、取消开机启动项

    2023-02-19 19:40:32
  • java实现求两个字符串最长公共子串的方法

    2023-04-03 12:15:05
  • Android开发中解析xml文件XmlUtils工具类与用法示例

    2023-03-26 01:05:45
  • 关于C#数强转会不会抛出异常详解

    2021-11-09 05:44:48
  • 详解Java编程中线程同步以及定时启动线程的方法

    2021-08-31 10:59:20
  • idea热部署插件jrebel正式版及破解版安装详细图文教程

    2023-05-27 03:28:59
  • C#记录消息到日志文件的方法

    2021-09-17 23:10:39
  • java开源调度如何给xxljob加k8s执行器

    2021-09-17 16:41:50
  • 浅谈一下SpringCloud中Hystrix服务熔断和降级原理

    2021-10-02 08:46:41
  • Java二维数组实现数字拼图效果

    2021-11-21 20:39:17
  • Spring Boot Admin 进行项目监控管理的方法

    2021-09-01 23:39:19
  • Docker下搭建一个JAVA Tomcat运行环境的方法

    2022-01-13 14:13:29
  • 解析android中include标签的使用

    2022-08-17 06:59:59
  • Android禁止横屏竖屏切换的有效方法

    2023-05-05 10:56:16
  • Android 控件GridView使用案例讲解

    2023-07-14 17:33:28
  • C#编程实现自定义热键的方法

    2023-12-05 23:57:18
  • Android SQLite数据库增删改查操作的案例分析

    2022-06-06 02:55:03
  • Android实现APP秒表功能

    2022-11-13 13:58:26
  • SpringBoot全局异常与数据校验的方法

    2023-12-11 10:46:06
  • asp之家 软件编程 m.aspxhome.com