spring-cloud-stream结合kafka使用详解
作者:KyleYaoKeepGoing 时间:2022-05-19 14:32:50
1.pom文件导入依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.application.yml文件配置
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
bindings:
xxx_output: // 通道名称
destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
// 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
xxx_input:
destination: xxx // 消息发往的目的地,对应topic
group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {
@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
private MessageChannel oesWorkbenchChannel;
/**
* 发送一条kafka消息
*/
public boolean sendLifeData(Object object) {
return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
}
}
// 发布通道
public interface Source {
@Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}
4.创建消息监听者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}
接下来就可以愉快的发送监听消息了
来源:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417
标签:spring,cloud,stream,kafka
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
SpringBoot深入分析讲解监听器模式上
2022-06-25 21:04:04
![](https://img.aspxhome.com/file/2023/3/62363_0s.png)
C#中OpenCvSharp 通过特征点匹配图片的方法
2023-07-14 08:10:55
![](https://img.aspxhome.com/file/2023/3/89743_0s.jpg)
关于LinkedList集合对元素进行增查删操作
2022-09-23 11:48:45
![](https://img.aspxhome.com/file/2023/5/66535_0s.png)
C# GDI+实现时钟表盘
2023-06-20 07:11:32
![](https://img.aspxhome.com/file/2023/7/66117_0s.jpg)
spring springMVC中常用注解解析
2023-09-14 20:45:46
Java环境下高德地图Api的使用方式
2022-06-13 06:43:59
![](https://img.aspxhome.com/file/2023/6/85826_0s.jpg)
C# 对PDF文档加密、解密(基于Spire.Cloud.SDK for .NET)
2021-11-23 05:37:26
![](https://img.aspxhome.com/file/2023/4/100174_0s.png)
Java使用动态规划算法思想解决背包问题
2022-12-02 03:53:49
![](https://img.aspxhome.com/file/2023/7/89497_0s.png)
Java对象类型的判断详解
2023-07-26 09:55:07
Spring框架通过工厂创建Bean的三种方式实现
2022-11-23 11:29:54
![](https://img.aspxhome.com/file/2023/7/70527_0s.png)
Android 使用 SharedPreferences 保存少量数据的实现代码
2023-07-03 01:00:11
![](https://img.aspxhome.com/file/2023/4/83534_0s.png)
java在网页上面抓取邮件地址的方法
2023-10-01 19:18:21
Android实现Service重启的方法
2021-10-01 09:25:25
Java实现批量下载(打包成zip)的实现
2022-04-02 01:53:05
SpringCloud Feign配置应用详细介绍
2023-07-14 04:23:03
![](https://img.aspxhome.com/file/2023/7/57497_0s.png)
Java Swing实现窗体添加背景图片的2种方法详解
2021-10-26 19:01:18
![](https://img.aspxhome.com/file/2023/1/62231_0s.jpg)
Java多线程Callable接口实现代码示例
2021-08-06 14:29:01
浅析Java中Apache BeanUtils和Spring BeanUtils的用法
2021-07-10 21:29:15
![](https://img.aspxhome.com/file/2023/4/63324_0s.jpg)
SpringBoot打包发布到linux上(centos 7)的步骤
2023-08-11 06:35:55
![](https://img.aspxhome.com/file/2023/5/57945_0s.png)
C#通过反射创建自定义泛型
2022-12-30 07:12:38