SpringBoot中使用Redis Stream实现消息监听示例

作者:胡海龙Blog 时间:2021-09-02 04:42:07 

Demo环境

  • JDK8

  • Maven3.6.3

  • springboot2.4.3

  • redis_version:6.2.1

仓库地址

https://gitee.com/hlovez/redismq.git.

POM依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>vip.huhailong</groupId>
<artifactId>redismq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redismq</name>
<description>base redis stream mq</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

这里是一个简单的Demo,所以关于redis的一些序列化配置就省略了。

配置监听消息类

配置监听消息类,这里类需要实现StreamListener接口,该接口下只有一个要实现的方法&mdash;&mdash;onMessage方法,代码:

package vip.huhailong.redismq.redistool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
* @author Huhailong
* @Description 监听消息
* @Date 2021/3/10.
*/
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

@Override
   public void onMessage(MapRecord<String, String, String> entries) {
       log.info("接受到来自redis的消息");
       System.out.println("message id "+entries.getId());
       System.out.println("stream "+entries.getStream());
       System.out.println("body "+entries.getValue());
   }

}

配置完该类后我们再创建一个类将该 * 注入进去,代码:

package vip.huhailong.redismq.config;

import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import vip.huhailong.redismq.redistool.ListenerMessage;

import java.time.Duration;

/**
* @author Huhailong
* @Description
* @Date 2021/3/12.
*/
@Configuration
public class RedisStreamConfig {

@Autowired
   private ListenerMessage streamListener;

@Bean
   public Subscription subscription(RedisConnectionFactory factory){
       var options = StreamMessageListenerContainer
               .StreamMessageListenerContainerOptions
               .builder()
               .pollTimeout(Duration.ofSeconds(1))
               .build();
       var listenerContainer = StreamMessageListenerContainer.create(factory,options);
       var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
               StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
       listenerContainer.start();
       return subscription;
   }
}

代码分析:

首先将我们实现了StreamListener的 * 类注入。

subscription方法返回的是一个Subscription类型,它是与当前正在运行任务的链接,可以理解为订阅的链接,它有俩个方法

  • boolean await(Duration timeout): 当订阅变为活动或超时时,同步阻塞呼叫将返回

  • boolean isActive(): 如果当前正在订阅则为true

代码中的var是使用了Lombok的可变局部变量。主要是为了方便

StreamMessageListenerContainer: 消息侦听容器,不能在外部实现。创建后,StreamMessageListenerContainer可以订阅Redis流并使用传入的消息。 StreamMessageListenerContainer允许多个流读取请求,并为每个读取请求返回一个Subscription句柄。取消订阅最终将终止后台轮询。使用键和值序列化器转换消息以支持各种序列化策略。具体文档

__StreamMessageListenerContainerOptions: __ 它是上面4的选项,代码中pollTimeout表示轮询超时时间

create方法使用给定的RedisConnectionFactory和上面的配置选项创建 * 。

接下来使用receiveAutoAck创建一个新订阅,注意这里接受到消息后会被自动的确认,如果不想自动确认请使用其他的创建订阅方式。该方法共有三个参数

  • 消费组 consumer group ,它不能为null (Consumer类型)

  • stream offset ,stream的偏移量(StreamOffset 类型)

  • listener 不能为null (StreamListener<K,V> 类型)

代码中表示消费者来自名称为mygroup的组,消费者名称为huhailong,这里偏移量设置为了lastConsumed,它表示读取ID大于消费者组使用的最后一个元素的所有新到达的元素。

现在就可以运行项目来验证了,将项目运行起来后通过终端给对应key的stream添加一条消息

> XADD mystream * message springboot

这时可以看到spring boot的控制台打印除了一下消息:

message id 1615532778588-0
stream mystream
body {message=springboot}

说明侦听成功,它会一直处于监听状态,只要对应key的stream添加了新的消息都会被侦听到,到此也就简单的实现了消息队列功能。

监听俩个stream的实现

有网友想实现俩个或俩个以上的stream监听,可以这样实现(有其他更好的方法请留言评论,谢谢)
在上面监听一个stream的基础上在RedisStreamConfig类里增加监听方法,如下:

@Configuration
public class RedisStreamConfig {

@Autowired
   private ListenerMessage streamListener;

@Bean
   public Subscription subscription(RedisConnectionFactory factory){
       var options = StreamMessageListenerContainer
               .StreamMessageListenerContainerOptions
               .builder()
               .pollTimeout(Duration.ofSeconds(1))
               .build();
initStream("mystream","mygroup");//详细描述请看下方的问题补充——初始化key和group
       var listenerContainer = StreamMessageListenerContainer.create(factory,options);
       var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
               StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
       listenerContainer.start();
       return subscription;
   }

@Bean
   public Subscription subscription2(RedisConnectionFactory factory){
       var options = StreamMessageListenerContainer
               .StreamMessageListenerContainerOptions
               .builder()
               .pollTimeout(Duration.ofSeconds(1))
               .build();
initStream("mystream","mygroup");//详细描述请看下方的问题补充——初始化key和group
       var listenerContainer = StreamMessageListenerContainer.create(factory,options);
       var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
               StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener);
       listenerContainer.start();
       return subscription;
   }
}

记得创建该stream和对应的组,让后启动程序,在控制台模拟添加数据,如图:

SpringBoot中使用Redis Stream实现消息监听示例

然后idea控制台的打印可以看到分别接收到了来自不同的stream的消息

SpringBoot中使用Redis Stream实现消息监听示例

其他类和配置与监听一个的时候相同,不需要改变。

【问题补充】确认完消息删除消息

消息确认完后消息实际上没有在redis中消失,这也是redis stream中的一个特性,如果要想真正的删除该消息,需要我们进行指定字段ID删除,如下:

在RedisUtil中添加删除方法:

public void delField(String key, String fieldId){
       redisTemplate.opsForStream().delete(key,fieldId);
}

然后再ListenerMessage中的onMessage方法里处理完消息后添加删除该消息的方法:

@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

@Autowired
   RedisUtil redisUtil;

@Override
   public void onMessage(MapRecord<String, String, String> entries) {
       log.info("接受到来自redis的消息");
       System.out.println("message id "+entries.getId().getValue());
       System.out.println("stream "+entries.getStream());
       System.out.println("body "+entries.getValue());
       //下面是删除该消息的方法
       redisUtil.delField("mystream",entries.getId().getValue());
   }

}

【问题补充】自动初始化stream的key和group问题-最新更新-2021年12月4日

最近看到评论大部分有报找不到key和group的错信息,这个是因为没有初始化导致的,当时在写这个demo的时候为了简单测试直接写死了我已经存在的key和group,部分朋友不知道这个是需要初始化的,所以我更新了一下代码,增加了初始化key和group的方法,这样大家直接运行代码就可以了,不需要在做多余的操作。
RedisUtil中增加增加的代码:

public RecordId addStream(String key, Map<String,Object> message){
       RecordId add = redisTemplate.opsForStream().add(key, message);
       return add;//返回增加后的id
   }

public void addGroup(String key, String groupName){
    redisTemplate.opsForStream().createGroup(key,groupName);
}

/**
* 用来判断key是否存在
*/
public boolean hasKey(String key){
    if(key==null){
        return false;
    }else{
        return redisTemplate.hasKey(key);
    }

}

然后再RedisStreamConfig中增加初始化方法,这里注意要判断key是否存在:

private void initStream(String key, String group){
//判断key是否存在,如果不存在则创建
   boolean hasKey = redisUtil.hasKey(key);
   if(!hasKey){
       Map<String,Object> map = new HashMap<>();
       map.put("field","value");
       RecordId recordId = redisUtil.addStream(key, map);
       redisUtil.addGroup(key,group);
       //将初始化的值删除掉
       redisUtil.delField(key,recordId.getValue());
       log.info("stream:{}-group:{} initialize success",key,group);
   }
}

然后再上面配置监听的方法里调用这个初始化方法就可以了

@Bean
   public Subscription subscription(RedisConnectionFactory factory){
       var options = StreamMessageListenerContainer
               .StreamMessageListenerContainerOptions
               .builder()
               .pollTimeout(Duration.ofSeconds(1))
               .build();
       initStream("mystream","mygroup");//调用初始化
       var listenerContainer = StreamMessageListenerContainer.create(factory,options);
       var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
               StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
       listenerContainer.start();
       return subscription;
   }

https://gitee.com/hlovez/redismq.git已更新,代码已优化整理上传

来源:https://blog.csdn.net/hhl18730252820/article/details/114826366

标签:SpringBoot,消息监听
0
投稿

猜你喜欢

  • 详解Java中HashSet和TreeSet的区别

    2022-01-17 09:47:47
  • Java方法重载Overload原理及使用解析

    2021-11-21 14:23:14
  • IDEA集成MyBatis Generator插件的使用

    2023-08-12 00:28:47
  • 教你怎么用java一键自动生成数据库文档

    2021-08-01 02:34:36
  • Maven+Tomcat8 实现自动化部署的方法

    2023-01-03 06:44:20
  • mybatis自定义类型处理器TypehHandler示例详解

    2023-10-11 04:30:40
  • springboot如何实现自动装配源码解读

    2023-11-10 15:44:20
  • spring cglib 与 jdk 动态代理

    2021-07-19 20:28:43
  • Java设计模式之享元模式实例详解

    2021-12-19 17:54:00
  • Java emoji持久化mysql过程详解

    2023-10-10 23:11:49
  • c++中的string常用函数用法总结

    2023-11-02 16:44:02
  • idea 打包maven项目忽略test文件的操作

    2021-10-16 07:21:14
  • Java中常见的编码集问题总结

    2023-11-29 01:40:04
  • 浅谈Java字符串比较的三种方法

    2023-05-13 12:26:51
  • Centos中安装jdk案例讲解

    2023-04-30 00:37:50
  • Java客户端调用.NET的WebService实例

    2023-11-03 17:22:00
  • @CacheEvict 清除多个key的实现方式

    2023-11-21 08:28:04
  • 基于Java生成GUID的实现方法

    2022-04-09 02:44:09
  • Spring与Mybatis基于注解整合Redis的方法

    2022-09-19 09:19:56
  • Java 数据结构与算法系列精讲之排序算法

    2023-11-01 13:25:40
  • asp之家 软件编程 m.aspxhome.com