Spring Boot整合Kafka教程详解

作者:qianmoq 时间:2023-06-14 06:02:54 

本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka。Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量。

在本教程中,我们将使用 Spring Boot 2.5.4Kafka 2.8.0

步骤一:添加依赖项

在 pom.xml 中添加以下依赖项:

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.8.0</version>
</dependency>

步骤二:配置 Kafka

application.yml 文件中添加以下配置:

sping:
 kafka:
   bootstrap-servers: localhost:9092
   consumer:
     group-id: my-group
     auto-offset-reset: earliest
   producer:
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
     key-serializer: org.apache.kafka.common.serialization.StringSerializer

这里我们配置了 Kafka 的服务地址为 localhost:9092,配置了一个消费者组 ID 为 my-group,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为 StringSerializer

步骤三:创建一个生产者

现在,我们将创建一个 Kafka 生产者,用于发送消息到 Kafka 服务器。在这里,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。

首先,我们将创建一个 KafkaProducerConfig 类,用于配置 Kafka 生产者:

@Configuration
public class KafkaProducerConfig {
   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;
   @Bean
   public Map<String, Object> producerConfigs() {
       Map<String, Object> props = new HashMap<>();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return props;
   }
   @Bean
   public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfigs());
   }
   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
   }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaProducerConfig 类声明为配置类。然后,我们使用 @Value 注解注入配置文件中的 bootstrap-servers 属性。

接下来,我们创建了一个 producerConfigs 方法,用于设置 Kafka 生产者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三个属性。

然后,我们创建了一个 producerFactory 方法,用于创建 Kafka 生产者工厂。在这里,我们使用了 DefaultKafkaProducerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaTemplate 方法,用于创建 KafkaTemplate 实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例。

接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用 @RestController 注解创建一个 RESTful 控制器:

@RestController
public class KafkaController {
   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;
   @PostMapping("/send")
   public void sendMessage(@RequestBody String message) {
       kafkaTemplate.send("my-topic", message);
   }
}

在上面的代码中,我们使用 @Autowired 注解将 KafkaTemplate 实例注入到 KafkaController 类中。然后,我们创建了一个 sendMessage 方法,用于发送消息到 Kafka。

在这里,我们使用 kafkaTemplate.send 方法发送消息到 my-topic 主题。send 方法返回一个 ListenableFuture 对象,用于异步处理结果。

步骤四:创建一个消费者

现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic 主题读取消息。

首先,我们将创建一个 KafkaConsumerConfig 类,用于配置 Kafka 消费者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;
   @Value("${spring.kafka.consumer.group-id}")
   private String groupId;
   @Bean
   public Map<String, Object> consumerConfigs() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return props;
   }
   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }
   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       return factory;
   }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaConsumerConfig 类声明为配置类,并使用 @EnableKafka 注解启用 Kafka。

然后,我们使用 @Value 注解注入配置文件中的 bootstrap-serversconsumer.group-id 属性。

接下来,我们创建了一个 consumerConfigs 方法,用于设置 Kafka 消费者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五个属性。

然后,我们创建了一个 consumerFactory 方法,用于创建 Kafka 消费者工厂。在这里,我们使用了 DefaultKafkaConsumerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaListenerContainerFactory 方法,用于创建一个 ConcurrentKafkaListenerContainerFactory 实例。在这里,我们将消费者工厂注入到 kafkaListenerContainerFactory 实例中。

接下来,我们将创建一个 Kafka 消费者类 KafkaConsumer,用于监听 my-topic 主题并接收消息:

@Service
public class KafkaConsumer {
   @KafkaListener(topics = "my-topic", groupId = "my-group-id")
   public void consume(String message) {
       System.out.println("Received message: " + message);
   }
}

在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为 my-group-id

现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。

这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。

来源:https://juejin.cn/post/7207411012479074364

标签:Spring,Boot,Kafka,整合
0
投稿

猜你喜欢

  • java 代码块与静态代码块加载顺序

    2021-08-01 15:19:35
  • 通过与Java功能上的对比来学习Go语言

    2023-02-18 02:04:53
  • 并行Stream与Spring事务相遇会发生什么?

    2022-08-28 15:40:12
  • 详解Android App中ViewPager使用PagerAdapter的方法

    2021-12-06 06:11:52
  • Android ContentProvider查看/读取手机联系人实例

    2021-10-13 07:10:13
  • Java Resource路径整理总结

    2021-11-24 06:52:31
  • java实现抽奖功能解析

    2021-08-29 16:08:21
  • Android实现图片的裁剪(不调用系统功能)

    2021-05-25 23:26:41
  • java微信公众号开发案例

    2023-01-30 11:05:36
  • android主线程和子线程之间消息传递详解

    2021-11-06 05:06:23
  • Android TextView字体颜色设置方法小结

    2023-02-22 14:44:52
  • C#实现文件断点续传下载的方法

    2021-09-05 10:37:42
  • 简单实现Java版学生管理系统

    2022-06-22 15:16:19
  • C#中的EventHandler观察者模式详解

    2021-09-04 17:08:27
  • Servlet中/和/*的区别详解

    2022-07-11 03:21:33
  • java图形界面之布局设计

    2021-11-10 16:07:40
  • 浅谈shiro的SecurityManager类结构

    2022-08-25 14:13:32
  • 程序员最喜欢的ThreadLocal使用姿势

    2022-10-22 21:14:55
  • Android网络开发中GET与POST请求详解

    2022-05-28 23:23:53
  • springcloud微服务之Eureka配置详解

    2021-06-15 14:35:28
  • asp之家 软件编程 m.aspxhome.com