流式图表拒绝增删改查之kafka核心消费逻辑上篇

作者:在下uptown 时间:2023-04-19 03:32:11 

消费逻辑

上文 流式图表框架搭建

框架搭建好之后着手开发下kafka的核心消费逻辑,流式图表的核心消费逻辑就是实现一个消费链接池维护消费者客户端链接,将kafka client封装成Runable任务提交到线程池里做一个常驻线程,实时消费数据,消费到数据后存到redis中,并通过websocket推送到浏览器,浏览器刷新图表实现流式图表功能。

代码设计

按照之前的代码划分,核心逻辑写在matrix-core子模块中,整体结构用maven的父子模块依赖继承的特性管理依赖。

maxtrix-core模块只做kafka client的管理和消费逻辑,尽量轻一点,只需要引入redis和kafka依赖即可。

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
   <groupId>com.uptown</groupId>
   <artifactId>matrix-common</artifactId>
   <version>1.0-SNAPSHOT</version>
</dependency>

反序列化工具、线程池工具、lombok都放到matrix-common中,具体用google的包,这样其他内部模块直接引用common模块即可使用。

<dependency>
   <groupId>com.google.code.gson</groupId>
   <artifactId>gson</artifactId>
</dependency>

消费池

首先要创建出一个线程池出来,由于我们的业务要实时监听数据,所以线程池提交的线程必须是个常驻线程。所以需要重写线程池的任务失败策略和异常处理器。

// 自定义异常处理器,捕获错误日志
@Slf4j
public class ConsumerExceptionHandler implements Thread.UncaughtExceptionHandler {
  @Override
  public void uncaughtException(Thread t, Throwable e) {
       log.error(e.getMessage(), e);
  }
}
// 任务失败策略
@Slf4j
class ConsumerThreadPoolExecutor extends ThreadPoolExecutor {
  ConsumerThreadPoolExecutor(int corePoolSize,
                       int maximumPoolSize,
                       long keepAliveTime,
                       TimeUnit unit,
                       BlockingQueue<Runnable> workQueue,
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler rejectedExecutionHandler) {
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, rejectedExecutionHandler);
  }
  @Override
  protected void afterExecute(Runnable r, Throwable t) {
     super.afterExecute(r, t);
     //若线程执行某任务失败了,重新提交该任务
     if (t != null) {
        log.error("restart kafka consumer task for {}", (Object) t.getStackTrace());
     }
     execute(r);
  }
}

剩下的创建出线程池即可,消费逻辑中只需要注入到具体类中即可。

@Data
@Component
@Slf4j
public class KafkaConsumerConfig {
   // 线程池维护线程的最少数量
   @Value(value = "${kafka.core-pool-size:20}")
   private int corePoolSize;
   // 线程池维护线程的最大数量
   @Value(value = "${kafka.max-pool-size:20}")
   private int maxPoolSize;
   // 线程池维护线程所允许的空闲时间
   @Value(value = "${kafka.keep-alive-time:0}")
   private int keepAliveTime;
   // 线程池所使用的缓冲队列大小
   @Value(value = "${kafka.work-queue-size:0}")
   private int workQueueSize;
  // 统一存放kafka客户端的map
  @Bean
  public Map<String, KafkaConsumerRunnable> globalKafkaConsumerThreadMap() {
     return Maps.newConcurrentMap();
  }
   /**
    * kafka监听任务 线程池
    */
   @Bean(name = "defaultThreadPool")
   public ThreadPoolExecutor defaultThreadPool() {
      // 使用google线程工厂 线程挂掉重启策略
     ConsumerExceptionHandler exceptionHandler = new ConsumerExceptionHandler();
     ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d")
        .setUncaughtExceptionHandler(exceptionHandler).build();
     return new ConsumerThreadPoolExecutor(
        corePoolSize,                                
        maxPoolSize,                                  
        keepAliveTime,                                
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(maxPoolSize),
        threadFactory,
        new ThreadPoolExecutor.CallerRunsPolicy()
     );
   }
}

这么搞的主要原因是防止消费线程中出现消费异常,比如反序列化异常、客户端监听网络异常等,为啥不在任务中try catch住异常是因为这样做更优雅点,让kafka client和线程的生命绑定一块,比较好管理。

统一存放kafka客户端的map算是做一个统计,统计内存中已提交的kafka监听线程数,具体的Runable任务放在下一篇提供,毕竟上班写文章容易翻车。

来源:https://juejin.cn/post/7220235452291416123

标签:kafka,流式图表,拒绝增删改查
0
投稿

猜你喜欢

  • 一小时迅速入门Mybatis之初识篇

    2023-07-20 10:27:23
  • Java实现短信验证码的示例代码

    2023-11-09 03:22:47
  • Android 中Banner的使用详解

    2023-08-18 08:57:02
  • C#日期格式字符串的相互转换操作实例分析

    2021-09-01 10:06:56
  • 四种引用类型在JAVA Springboot中的使用详解

    2023-11-24 03:34:38
  • Android利用RecyclerView实现全选、置顶和拖拽功能示例

    2023-06-05 18:21:22
  • 超简单C#获取带汉字的字符串真实长度(单个英文长度为1,单个中文长度为2)

    2021-11-13 20:37:55
  • Java 开发的几个注意点总结

    2021-11-30 20:07:11
  • Android支付宝支付的示例代码

    2022-06-20 02:32:49
  • C#中XmlTextWriter读写xml文件详细介绍

    2022-01-26 05:13:48
  • C#中单问号(?)和双问号(??)的用法整理

    2021-08-27 02:50:53
  • Java设计模式编程之工厂方法模式的使用

    2021-10-01 03:09:15
  • spring bean.xml文件p标签使用报错的解决

    2022-03-12 23:16:16
  • android shape实现阴影或模糊边效果

    2022-10-14 02:01:09
  • C#异常捕获机制图文详解

    2023-05-22 19:39:20
  • mybatisPlus返回Map类型的集合

    2022-01-31 13:37:07
  • Android中Root权限获取的简单代码

    2021-10-22 20:42:52
  • Java中的Set、List、Map的用法与区别介绍

    2022-10-03 04:11:48
  • c#使用win32api实现获取光标位置

    2022-05-09 10:59:33
  • maven多个plugin相同phase的执行顺序

    2021-07-07 10:33:34
  • asp之家 软件编程 m.aspxhome.com