kafka监听问题的解决和剖析

作者:知晓汝名,吓吾一跳! 时间:2021-06-28 04:41:25 

目录
  • 问题如下:

  • 一. 解决问题一(kafka监听不到数据)

  • 二. 解决问题二(kafka为什么会有重复数据发送)

  • 三. 解决问题三(kafka数据重复如何解决)

  • 四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)

  • 五. 粘一下我的监听配置文件

  • 总结

问题如下:

  1. kafka为什么监听不到数据

  2. kafka为什么会有重复数据发送

  3. kafka数据重复如何解决

  4. 为什么kafka会出现俩个消费端都可以消费问题

  5. kafka监听配置文件 

一. 解决问题一(kafka监听不到数据)

首先kafka监听不得到数据,检查如下

  • 检查配置文件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题)

  • 检查接收数据的正确性(比如原生的代码,可能是用byte序列化接收的数据,而你接收使用String。也是配置文件序列化问题,还有与发送者商量问题)

  • 检查kafka版本问题(一般的版本其实是没什么问题的,只有个别版本会出现监听不到问题)

  • 没有加
     @Component    犯了最不应该出差错的问题

如果出现监听不到数据的问题,那么就试试更改方法一二,如果不可以在去试试方法三,之前出现这个问题也是查过 一般查到都会说  “低版本的服务器接收不到高版本的生产者发送的消息”,但是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。

如果按照版本一致,那么根本就不现实,因为可能不同的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求自己维护项目版本升级?如果出现第四种情况就无话可说了。

二. 解决问题二(kafka为什么会有重复数据发送)

重复数据的发送问题如下

  1. 可能在发送者的那里的事务问题。mysql存储事务发生异常导致回滚操作,但是kafka消息却是已经发送到了服务器中。此事肯定会出现重复问题

  2. 生产者设置时间问题,生产发送设置的时间内,消息没完成发送,生产者以为消费者挂掉,便重新发送一个,导致重复

  3. offset问题,当项目重启,offset走到某一个位置已扔到kafka服务器中,但是项目被重启.那么offset会是在原本重启的那一个点的地方再次发送一次,这是kafka设计的问题,防止出现丢失数据问题

三. 解决问题三(kafka数据重复如何解决)

目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并定期清理

粘贴一下我的排重(Redis排重法)


//kafka prefix
 String cache = "kafka_cache";
 //kafka suffix
 Calendar c = Calendar.getInstance();
 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 //0点,目前是为了设置为这一天的固定时间。这个完全可以去写个工具类自己弄,为了看的更清楚,麻烦了一点的写入
 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
 String gtimeStart = sdf2.format(c.getTime());
 long time = sdf.parse(gtimeStart).getTime();

//此位置为了设置是否是新的一天,新的一天需要设置定时时间,保证redis中不会存储太多无用数据
 Boolean flag = false;
 //数据接收
 Set<String> range = new HashSet<>();
 //判断是否存在
 if (redisTemplate.hasKey(cache + time)) {
 //存在则取出这个set
 range = redisTemplate.opsForSet().members(cache + time);
 }else {
 //不存在,则为下面过期时间的设置铺垫
 flag = true;
 }
 //判断监听到的数据是否是重复
 if (range.contains("测试需要")) {
 //重复则排出,根据逻辑自己修改
 continue;
 } else {
 //添加进去
 redisTemplate.opsForSet().add(cache + time, i+"");
 if (flag){
  //设置为24小时,保证新一天使用,之前使用的存储会消失掉
  redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
  //不会在进入这个里面,如果多次的存入过期时间,那么这个key的过期时间就永远是24小时,一直就不会过期
  flag = false;
 }
 }

四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)

原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每一个组发送一个他所收到的消息,但是两个消费端监听同一个租,那么就只有一个消费端可以消费到。

五. 粘一下我的监听配置文件


# 指定kafka 代理地址,可以多个,用逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id= test
# 是否自动提交
spring.kafka.consumer.enable-auto-commit= true
# 提交间隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大轮询的次数
spring.kafka.consumer.max-poll-records=1
# 将偏移量重置为最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

来源:https://www.cnblogs.com/honour1207/p/14078121.html

标签:kafka,监听
0
投稿

猜你喜欢

  • 使用Spring Data JDBC实现DDD聚合的示例代码

    2022-05-04 05:11:23
  • Spring利用@Validated注解实现参数校验详解

    2023-08-26 12:06:26
  • 关于JWT之token令牌认证登录

    2022-03-16 07:32:58
  • Mybatis-Plus的多数据源你了解吗

    2023-07-22 00:46:59
  • Java服务限流算法的6种实现

    2022-04-03 04:52:51
  • 关于Android实现简单的微信朋友圈分享功能

    2021-07-01 16:49:49
  • C#实现图形位置组合转换的方法

    2022-01-23 17:29:35
  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    2023-11-16 08:14:56
  • 重新启动IDEA时maven项目SSM框架文件变色所有@注解失效

    2021-12-08 06:42:23
  • JDK14性能管理工具之jstack使用介绍

    2022-10-11 18:48:01
  • Spring RabbitMQ死信机制原理实例详解

    2022-04-29 18:52:18
  • Java中的几种读取properties配置文件的方式

    2022-09-06 13:36:19
  • C# 多线程编程技术基础知识入门

    2023-05-27 08:00:24
  • 四种引用类型在JAVA Springboot中的使用详解

    2023-11-24 03:34:38
  • Android画板开发之橡皮擦功能

    2022-10-23 02:48:52
  • c#实现断点续传功能示例分享

    2022-05-03 04:39:49
  • C#创建windows系统用户的方法

    2022-07-16 20:30:12
  • idea推送项目到gitee中的创建方法

    2021-08-19 11:27:14
  • 一篇文章带你入门Java接口

    2023-11-06 02:07:55
  • 详解C#实现MD5加密的示例代码

    2023-11-28 21:01:54
  • asp之家 软件编程 m.aspxhome.com