Spring boot集成Kafka消息中间件代码实例

作者:墨营 时间:2022-11-06 21:53:48 

一.创建Spring boot项目,添加如下依赖


<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
   <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
   <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.41</version>
   </dependency>

二.配置文件

server.port=4400

#kafka配置
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.102.88:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=jkafka.demo
#earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

三.编辑消息实体


@Data
public class Message implements Serializable{

/**
  *
  */
 private static final long serialVersionUID = 2522280475099635810L;

//消息ID
 private String id;

//消息内容
 private String msg;

// 消息发送时间
 private Date sendTime;

}

四.消息发送类


@Component
public class KfkaProducer {

private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

@Autowired
 private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic,Message message) {
   try {
     logger.info("正在发送消息...");
     kafkaTemplate.send(topic,JSON.toJSONString(message));
     logger.info("发送消息成功 ----->>>>> message = {}", JSON.toJSONString(message));
   } catch (Exception e) {
     e.getMessage();
   }

}
}

五.发现监听接收类


@Component
public class KfkaListener {

private static Logger logger = LoggerFactory.getLogger(KfkaListener.class);

@KafkaListener(topics = {"hello"})
 public void listen(ConsumerRecord<?, ?> record) {
   Optional<?> kafkaMessage = Optional.ofNullable(record.value());
   if (kafkaMessage.isPresent()) {
     Object message = kafkaMessage.get();
     logger.info("接收消息------------ record =" + record);
     logger.info("接收消息----------- message =" + message);
   }
 }
}

六.定时发送信息测试类


@EnableScheduling
@Component
public class PublisherController {

private static final Logger log = LoggerFactory.getLogger(PublisherController.class);

@Autowired
 private KfkaProducer kfkaProducer;

@Scheduled(fixedRate = 5000)
 public void pubMsg() {
   Message msg=new Message();
   msg.setId(UUID.randomUUID().toString());
   msg.setMsg("发送这条消息给你,你好啊!!!!!!");
   msg.setSendTime(new Date());
   kfkaProducer.send("hello", msg);;
   log.info("Publisher sendes Topic... ");
 }
}

七.测试结果

Spring boot集成Kafka消息中间件代码实例

来源:https://blog.51cto.com/13501268/2494869

标签:Spring,boot,Kafka,消息,中间件
0
投稿

猜你喜欢

  • springboottest测试依赖和使用方式

    2021-11-21 13:41:38
  • Android WorkManager使用以及源码分析

    2022-02-04 01:10:26
  • Android实现退出时关闭所有Activity的方法

    2021-10-03 00:15:00
  • Android launcher中模拟按home键的实现

    2023-03-25 02:33:48
  • 基于Java代码操作Redis过程详解

    2022-03-28 06:53:58
  • 利用Spring boot+LogBack+MDC实现链路追踪

    2023-10-03 16:02:53
  • Java实现插入排序算法可视化的示例代码

    2021-08-06 19:35:50
  • Spring如何消除代码中的if-else/switch-case

    2021-12-12 03:04:47
  • springboot中使用rabbitt的详细方法

    2023-06-17 09:57:43
  • 基于C语言实现井字棋游戏

    2023-06-28 21:23:18
  • C#使用opencv截取旋转矩形区域图像的实现示例

    2023-11-12 22:20:44
  • 使用windows控制台调试服务的方法

    2023-08-19 03:34:30
  • C#用记事本编写简单WinForm窗体程序

    2021-12-19 13:02:31
  • Android短信验证码(用的Mob短信验证)

    2022-12-16 15:22:41
  • SpringBoot实现配置文件的替换

    2023-11-21 22:27:16
  • RecyclerView实现拖拽排序效果

    2022-09-14 01:23:40
  • Android封装的http请求实用工具类

    2021-09-16 03:03:52
  • MybatisX-Generator自动代码生成插件教程

    2022-01-08 10:50:48
  • 一篇文章弄懂Java和Kotlin的泛型难点

    2022-11-19 11:37:19
  • Java Swing中JTable渲染器与编辑器用法示例

    2022-11-02 09:23:13
  • asp之家 软件编程 m.aspxhome.com