Kafka常用命令之kafka-console-consumer.sh解读

作者:Ernest.Wu 时间:2022-06-11 00:20:32 

kafka-console-consumer.sh解读

kafka-console-consumer.sh 脚本是一个简易的消费者控制台。

该 shell 脚本的功能通过调用 kafka.tools 包下的 ConsoleConsumer 类,并将提供的命令行参数全部传给该类实现。

  • 注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。

  • 若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。

  • 一定要注意两者参数值所指向的集群地址是不同的。

消息消费

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。

从开始位置消费

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName

表示从指定主题中有效的起始位移位置开始消费所有分区的消息。

显示key消费

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName

消费出的消息结果将打印出消息体的 key 和 value。

若还需要为你的消息添加其他属性

请参考下述列表

参数值类型说明有效值
--topicstring被消费的topic
--whiteliststring正则表达式,指定要包含以供使用的主题的白名单
--partitioninteger指定分区
除非指定’–offset’,否则从分区结束(latest)开始消费

--offsetstring执行消费的起始offset位置
默认值:latest
latest
earliest
<offset>
--consumer-propertystring将用户定义的属性以key=value的形式传递给使用者
--consumer.configstring消费者配置属性文件
请注意,[consumer-property]优先于此配置

--formatterstring用于格式化kafka消息以供显示的类的名称
默认值:kafka.tools.DefaultMessageFormatter
kafka.tools.DefaultMessageFormatter
kafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter
kafka.tools.ChecksumMessageFormatter
--propertystring初始化消息格式化程序的属性print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
--from-beginning
从存在的最早消息开始,而不是从最新消息开始
--max-messagesinteger消费的最大数据量,若不指定,则持续消费下去
--timeout-msinteger在指定时间间隔内没有消息可用时退出
--skip-message-on-error
如果处理消息时出错,请跳过它而不是暂停
--bootstrap-serverstring必需(除非使用旧版本的消费者),要连接的服务器
--key-deserializerstring

--value-deserializerstring

--enable-systest-events
除记录消费的消息外,还记录消费者的生命周期
(用于系统测试)

--isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息
设置为read_uncommitted以读取所有消息
默认值:read_uncommitted

--groupstring指定消费者所属组的ID
--blackliststring要从消费中排除的主题黑名单
--csv-reporter-enabled
如果设置,将启用csv metrics报告器
--delete-consumer-offsets
如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dirstring输出csv度量值
需与[csv-reporter-enable]配合使用

--zookeeperstring必需(仅当使用旧的使用者时)连接zookeeper的字符串。
可以给出多个URL以允许故障转移

来源:https://blog.csdn.net/qq_29116427/article/details/80206125

标签:Kafka,命令,kafka-console-consumer.sh
0
投稿

猜你喜欢

  • mybatis-plus @DS实现动态切换数据源原理

    2023-07-09 21:44:23
  • 谈谈你可能并不了解的java枚举

    2023-11-09 21:08:55
  • servlet异步请求的实现

    2023-07-14 17:11:38
  • C#实现对Json字符串处理实例

    2023-06-21 08:26:24
  • java实现五子棋小游戏

    2021-12-25 06:58:56
  • 【Java IO流】字节流和字符流的实例讲解

    2023-08-08 20:45:58
  • 基于Java信号量解决死锁过程解析

    2023-05-13 22:23:02
  • Java的Hibernate框架中Criteria查询使用的实例讲解

    2023-08-22 23:25:47
  • Java 反射类型Type的用法说明

    2023-01-23 04:15:44
  • java进阶之了解SpringBoot的配置原理

    2022-05-08 05:10:36
  • C++实现无重复字符的最长子串

    2023-11-02 22:49:00
  • SpringBoot多数据源配置详细教程(JdbcTemplate、mybatis)

    2023-08-26 01:59:33
  • IntellJ IDEA神器使用技巧(小结)

    2023-08-08 22:01:44
  • 怎么把本地jar包放入本地maven仓库和远程私服仓库

    2023-12-05 20:13:00
  • 分析JVM源码之Thread.interrupt系统级别线程打断

    2023-07-31 17:15:23
  • slf4j jcl jul log4j1 log4j2 logback各组件系统日志切换

    2023-08-08 13:00:41
  • Java集合之Set接口及其实现类精解

    2022-01-23 17:27:44
  • Java利用栈实现简易计算器功能

    2022-05-08 16:57:25
  • Java利用POI读取、写入Excel的方法指南

    2023-11-23 15:39:23
  • SpringBoot在一定时间内限制接口请求次数的实现示例

    2021-10-12 04:28:52
  • asp之家 软件编程 m.aspxhome.com