图文并茂讲解RocketMQ消息类别
作者:一个双子座的Java攻城狮 时间:2023-06-11 07:59:41
1、同步消息
即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
//同步消息发送
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
}
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
}
});
consumer.start();// 开启多线程 监控消息,持续运行
System.out.println("接收消息服务已运行");
}
}
测试:
2、异步消息
即时性较弱,但需要有回执的消息,例如订单中的某些信息
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//异步消息发送
Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
//表示成功返回结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
}
});
consumer.start();// 开启多线程 监控消息,持续运行
System.out.println("接收消息服务已运行");
}
}
测试:
3、单向消息
不需要有回执的消息,例如日志类消息
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//单向消息
Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendOneway(msg);
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消费者代码同上
测试:
总结 同步消息
SendResult result = producer.send(msg);
异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
producer.send(msg, new SendCallback() {
//表示成功返回结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
单向消息
producer.sendOneway(msg);
来源:https://blog.csdn.net/weixin_64061088/article/details/128406813
标签:RocketMQ,消息类别,消息类型
0
投稿
猜你喜欢
深入了解java.util.Arrays的使用技巧
2023-01-10 18:01:32
Maven项目修改JDK版本全过程
2021-07-19 12:13:29
解析JAVA深度克隆与浅度克隆的区别详解
2023-11-02 10:57:28
Flutter系列重学Container示例详解
2023-07-19 00:46:10
TextView实现跑马灯效果 就这么简单!
2023-06-25 18:42:24
C#中缓存System.Web.Caching用法总结
2021-09-05 04:41:11
Android PowerManagerService省电模式策略控制
2023-11-25 02:46:53
详解SpringMVC常用注解功能及属性
2021-12-29 02:49:23
Java C++ 算法题解leetcode652寻找重复子树
2022-08-17 23:58:09
Android应用开发中模拟按下HOME键的效果(实现代码)
2023-03-09 00:13:14
.net的序列化与反序列化实例
2022-12-29 00:50:50
Android自定义弹窗提示效果
2022-05-13 12:00:14
Android操作系统的架构设计分析
2022-07-12 11:02:26
C#利用时间和随即字符串创建唯一的订单编号
2022-01-21 15:18:56
初识Spring Boot框架和快速入门
2022-10-17 00:58:52
android中图片加载到内存的实例代码
2023-05-23 06:04:35
java线程并发控制同步工具CountDownLatch
2022-09-02 12:18:06
C#多线程系列之资源池限制
2022-01-02 17:11:04
Android 自定义Button控件实现按钮点击变色
2022-12-04 18:18:46
Spring MVC 关于controller的字符编码问题
2023-06-17 09:52:52