spring-cloud-stream结合kafka使用详解

作者:KyleYaoKeepGoing 时间:2022-05-19 14:32:50 

1.pom文件导入依赖


<!-- kafka -->
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.application.yml文件配置


spring:
cloud:
 stream:
  kafka:
   binder:
    brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
  bindings:
   xxx_output: // 通道名称
    destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
    // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
   xxx_input:
    destination: xxx // 消息发往的目的地,对应topic
    group: xxx // 对应kafka的group

3.创建消息发送者


@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {

@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
 private MessageChannel oesWorkbenchChannel;

/**
  * 发送一条kafka消息
  */
 public boolean sendLifeData(Object object) {
   return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
 }
}

// 发布通道
public interface Source {
 @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
 MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}

4.创建消息监听者


@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {

@Resource
 private FileService fileService;

@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
 public void receiveData(MoveMessage moveMessage) {
 }
}

// 接受通道
public interface Sink {
 @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
 SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}

接下来就可以愉快的发送监听消息了

来源:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417

标签:spring,cloud,stream,kafka
0
投稿

猜你喜欢

  • SpringBoot深入分析讲解监听器模式上

    2022-06-25 21:04:04
  • C#中OpenCvSharp 通过特征点匹配图片的方法

    2023-07-14 08:10:55
  • 关于LinkedList集合对元素进行增查删操作

    2022-09-23 11:48:45
  • C# GDI+实现时钟表盘

    2023-06-20 07:11:32
  • spring springMVC中常用注解解析

    2023-09-14 20:45:46
  • Java环境下高德地图Api的使用方式

    2022-06-13 06:43:59
  • C# 对PDF文档加密、解密(基于Spire.Cloud.SDK for .NET)

    2021-11-23 05:37:26
  • Java使用动态规划算法思想解决背包问题

    2022-12-02 03:53:49
  • Java对象类型的判断详解

    2023-07-26 09:55:07
  • Spring框架通过工厂创建Bean的三种方式实现

    2022-11-23 11:29:54
  • Android 使用 SharedPreferences 保存少量数据的实现代码

    2023-07-03 01:00:11
  • java在网页上面抓取邮件地址的方法

    2023-10-01 19:18:21
  • Android实现Service重启的方法

    2021-10-01 09:25:25
  • Java实现批量下载(打包成zip)的实现

    2022-04-02 01:53:05
  • SpringCloud Feign配置应用详细介绍

    2023-07-14 04:23:03
  • Java Swing实现窗体添加背景图片的2种方法详解

    2021-10-26 19:01:18
  • Java多线程Callable接口实现代码示例

    2021-08-06 14:29:01
  • 浅析Java中Apache BeanUtils和Spring BeanUtils的用法

    2021-07-10 21:29:15
  • SpringBoot打包发布到linux上(centos 7)的步骤

    2023-08-11 06:35:55
  • C#通过反射创建自定义泛型

    2022-12-30 07:12:38
  • asp之家 软件编程 m.aspxhome.com