结合线程池实现apache kafka消费者组的误区及解决方法
作者:字母哥哥 时间:2023-08-06 15:40:31
一个错误:多线程使用单一消费者
下图显现了一种错误的使用KafkaConsumer的方法
创建多个线程用来消费kafka数据
多线程使用同一个KafkaConsumer对象
在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。
这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。
一个误区:多线程就是消费者组
下图中体现的是一种正常的KafkaConsumer使用方式
使用一个KafkaConsumer拉取数据
拉取数据后将一个批次的数据交给一个线程去处理
这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。
常规正确做法:使用线程池实现消费者组
下面的方法是常规的正确实现方式
因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
每个线程持有一个KafkaConsumer对象
多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
public void groupConsumer(){
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
MyConsumer myConsumer = new MyConsumer();
executorService.execute(myConsumer);
}
}
}
MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)
@Override
public void run() {
MyConsumer myConsumer = new MyConsumer();
myConsumer.pollData();
}
来源:https://blog.csdn.net/hanxiaotongtong/article/details/125646483
标签:线程池,apache,kafka,消费者
0
投稿
猜你喜欢
C# wpf 无边框窗口添加阴影效果的实现
2023-11-05 01:15:09
Spring Bean创建流程分析讲解
2022-04-20 22:53:07
全面解析Android之ANR日志
2023-12-18 23:30:21
SpringBoot中获取微信用户信息的方法
2023-05-26 21:40:55
JVM 运行时数据区与JMM 内存模型
2022-08-12 10:49:27
java开发RocketMQ消息中间件原理基础详解
2023-10-01 21:04:49
Java Kryo,Protostuff,Hessian序列化方式对比
2023-10-24 12:41:15
运行java的class文件方法详解
2021-07-29 03:53:48
在unity脚本中控制Inspector面板的参数操作
2023-07-22 11:39:58
解决spring mvc 多数据源切换,不支持事务控制的问题
2022-09-30 03:39:56
C#获取局域网MAC地址的简单实例
2022-08-04 11:26:12
解决Java API不能远程访问HBase的问题
2023-11-27 04:17:48
Android itemDecoration接口实现吸顶悬浮标题
2023-03-14 00:58:31
详解Java中对象池的介绍与使用
2023-07-25 13:13:48
springmvc 分页查询的简单实现示例代码
2022-01-09 11:08:22
Android定时器和倒计时实现淘宝秒杀功能
2023-01-18 02:17:04
浅析java 10中的var关键字用法
2021-12-23 15:18:17
C# 修改文件的创建、修改和访问时间的示例
2023-06-09 23:18:50
JAVA用递归实现全排列算法的示例代码
2023-06-01 09:09:58
Android ViewPager中显示图片与播放视频的填坑记录
2023-12-23 14:28:40