kafka并发写大消息异常TimeoutException排查记录

作者:kl 时间:2023-11-27 23:07:11 

前言

先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据。昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下:

thread:  kafka-producer-network-thread | producer-1
throwable:  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append

定位异常点

应用抛一个不常见的异常,一般操作是先去百度or谷歌搜索一番的,就上面这个timeout超时的异常,搜索引擎的结果都是producer连不上Borker导致的问题,根本不是我们这个场景的,所以其次我们就需要从源码中寻找答案了。博主使用的开发工具是IDEA,借助IDEA很容易定位到异常抛出点。首先定位TimeoutException异常类,然后按住ctrl键,点击这个类,会出现如下图所有抛TimeoutException异常的点,然后根据异常message内容,寻找相匹配的点击进去就是抛异常的地方了,如图,红色箭头所指即代码位置:

kafka并发写大消息异常TimeoutException排查记录

分析抛异常的逻辑

程序中的异常,一定是符合某些条件才会抛出的,想要解决异常,只要让运行时的环境不满足抛异常的条件即可,下面就是抛异常的代码:

boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
       if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
           expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
       else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
           expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
       else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
           expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
boolean expired = expiryErrorMessage != null;
       if (expired)
           abortRecordAppends();
       return expired;
   }

可以看到,我们的异常是在第一个逻辑判断时候就满足了所以抛异常了。在此处有可能会抛出三个不同的timeout异常,用中文语义翻译条件分别是:

  • 没设置重试,并且发送批次(batch.size)满了,并且配置请求超时时间(request.timeout.ms)小于【当前时间减去最后追加批次的时间】

  • 没设置重试,并且配置请求超时时间(request.timeout.ms)小于【创建批次时间减去配置的等待发送的时间(linger.ms)】

  • 设置重试,并且配置请求超时时间(request.timeout.ms)小于【当前时间-最后重试时间-重试需要等待的时间(retry.backoff.ms)】

上面括号中的参数就是kafka producer中配置的相关的参数,这些参数都没有重新设置过,batch.size默认是10kb大小,而引发报错的消息都是36kb的大小,默认的request.timeout.ms超时设置是30s,所以在这个判断可能过期了的方法中,引发我们异常的主要原因是batch.size和request.timeout.ms的参数设置问题了。

真实原因-解决方案

从上面代码看表面原因是参数设置不够了,实际上呢,博主使用kafka-test启动了五个Borker集群做复现验证测试,测试写入相同的36kb的message,在所有配置也保持默认的情况下,也根本毫无压力。后面查找相关的错误日志,发现所有的TimeoutException集中在几乎同一时刻,经查明,是因为业务批量导入了数据到mysql中,造成binlog消息突然增加,高并发的往kafka写大消息导致Borker处理不过来,造成的TimeoutException超时,所以真正解决问题也可以从两个方面入手:

  • 服务端:增加Borker,并设置多个TopicPartition,平摊写入压力,这个是根本的解决问题

  • 客户端:加大request.timeout.ms、batch.size参数,或者开启消息重试,这种方案治标不治本,但是也能大概率的减少因为此类场景导致的TimeoutException

来源:http://www.kailing.pub/article/index/arcid/259.html

标签:kafka,并发异常,TimeoutException
0
投稿

猜你喜欢

  • Eclipse自定义启动画面和图标的方法介绍

    2022-05-14 09:27:13
  • Java中Cookie和Session的那些事儿

    2022-09-24 08:24:54
  • Android:Field can be converted to a local varible.的解决办法

    2022-01-23 16:53:14
  • 一文详解如何在控制台显示MyBatis的SQL语句

    2023-01-09 06:43:38
  • springboot之端口设置和contextpath的配置方式

    2023-10-05 14:16:20
  • C#模拟window操作鼠标的方法

    2021-07-17 01:50:22
  • Android MediaPlayer 播放音频的方式

    2021-09-07 07:27:09
  • C# ComboBox控件“设置 DataSource 属性后无法修改项集合”的完美解决方法

    2023-01-30 04:11:58
  • Java动态代理详解及实例

    2023-12-08 15:51:03
  • Java Scanner类用法及nextLine()产生的换行符问题实例分析

    2022-12-22 21:44:04
  • Mapper类中存在名称相同的方法重载报错问题

    2023-04-04 02:44:39
  • Java实现添加条形码到PDF表格的方法详解

    2023-04-26 12:37:25
  • Android自定义控制条效果

    2023-01-23 23:59:12
  • Java集合教程之Collection实例详解

    2022-12-01 23:45:13
  • c#实现多线程局域网聊天系统

    2022-12-01 23:34:25
  • AndroidStudio替换项目图标ic_launcher操作

    2023-03-20 09:48:07
  • 解析spring cloud ouath2中的Eureka

    2023-10-12 04:07:54
  • Java GC 机制与内存分配策略详解

    2022-06-12 05:36:44
  • c#使用S22.Imap收剑灵激活码邮件代码示例(imap收邮件)

    2022-11-27 20:59:37
  • 完美解决客户端webview持有的页面缓存,不会立即释放的问题

    2021-09-23 03:30:50
  • asp之家 软件编程 m.aspxhome.com