Springboot 整合 RocketMQ 收发消息的配置过程
作者:liqiangbk 时间:2023-01-22 22:49:28
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml 配置
application.yml
rocketmq:
name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t ;
public void send(){
//发送同步消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
t.send("Topic1:TagA",message);
//发送异步消息
t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
}
}
消费者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t;
public void send(){
Message<String> message = MessageBuilder.withPayload("Hello world").build();
//一旦发送消息,则执行 *
t.sendMessageInTransaction("Topic2",message,null);
}
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
}
消费者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/liqbk/p/13677137.html
标签:Springboot,整合,RocketMQ,收发消息
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
java 字浮串提取方法汇集
2023-11-24 14:43:16
详解Java内存溢出的几种情况
2023-11-13 19:46:18
![](https://img.aspxhome.com/file/2023/5/129175_0s.png)
Android复选框CheckBox与开关按钮Switch及单选按钮RadioButton使用示例详解
2023-02-06 18:20:42
![](https://img.aspxhome.com/file/2023/4/109184_0s.png)
基于C#调用c++Dll结构体数组指针的问题详解
2021-12-10 23:16:41
Android仿美团外卖菜单界面
2022-03-28 02:28:35
C# dataset存放多张表的实例
2022-10-10 14:08:13
SpringBoot如何使用applicationContext.xml配置文件
2022-11-15 08:18:53
Token登陆验证机制的原理及实现
2022-07-08 03:28:19
![](https://img.aspxhome.com/file/2023/5/76225_0s.jpg)
Android中FontMetrics的几个属性全面讲解
2023-11-14 14:57:20
![](https://img.aspxhome.com/file/2023/5/108305_0s.png)
Activiti流程引擎对象及配置原理解析
2023-02-11 22:20:20
spring boot多数据源动态切换代码实例
2022-03-11 00:37:09
![](https://img.aspxhome.com/file/2023/7/71867_0s.png)
android自定义view之模拟qq消息拖拽删除效果
2023-01-29 11:48:34
![](https://img.aspxhome.com/file/2023/4/95014_0s.gif)
visual studio 2019安装配置可编写c/c++语言的IDE环境
2023-10-04 02:01:02
![](https://img.aspxhome.com/file/2023/4/78944_0s.png)
基于静态Singleton模式的使用介绍
2022-09-13 20:11:08
![](https://img.aspxhome.com/file/2023/5/89785_0s.png)
java中transient关键字用法分析
2022-01-22 04:27:05
c# BackgroundWorker使用方法
2021-05-27 00:49:12
![](https://img.aspxhome.com/file/2023/1/76411_0s.png)
C++日期类计算器的模拟实现举例详解
2023-05-22 08:27:16
C#中反射和扩展方法如何运用
2023-08-02 01:43:16
C#与PLC通讯的实现代码
2021-10-29 13:34:39
![](https://img.aspxhome.com/file/2023/0/111280_0s.png)
C#实现网页画图功能
2021-12-05 19:33:41
![](https://img.aspxhome.com/file/2023/4/98204_0s.jpg)