Springboot 整合 RocketMQ 收发消息的配置过程

作者:liqiangbk 时间:2023-01-22 22:49:28 

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。


<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.1.0</version>
</dependency>

yml 配置

application.yml


rocketmq:
 name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名


rocketmq:
 producer:
   group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名


rocketmq:
 producer:
   group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者


package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
   @Autowired
   private RocketMQTemplate t ;
   public  void send(){
       //发送同步消息
       t.convertAndSend("Topic1:TagA", "Hello world! ");
       //发送spring的Message
       Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
       t.send("Topic1:TagA",message);
       //发送异步消息
       t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
           @Override
           public void onSuccess(SendResult sendResult) {
               System.out.println("发送成功");
           }
           @Override
           public void onException(Throwable throwable) {
               System.out.println("发送失败");
           }
       });
       //发送顺序消息
       t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
       t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
       t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
   }
}

消费者


package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer  implements RocketMQListener<String> {
   @Override
   public void onMessage(String s) {
       System.out.println("收到"+s);
   }
}

主类


package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
   public static void main(String[] args) {
       SpringApplication.run(Main.class, args);
   }
}

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")


package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
   @Autowired
   private  Producer producer;
   @Test
   public void test1(){
       producer.send();
       try {
           Thread.sleep(5000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

demo 2

发送事务消息

生产者


package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component

public class Producer {

@Autowired
   private RocketMQTemplate t;

public void send(){
       Message<String> message = MessageBuilder.withPayload("Hello world").build();
       //一旦发送消息,则执行 *
       t.sendMessageInTransaction("Topic2",message,null);
   }
   @RocketMQTransactionListener
   class Lis implements RocketMQLocalTransactionListener {
       @Override
       public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
           System.out.println("执行本地事务");
           return RocketMQLocalTransactionState.UNKNOWN;
       }

@Override
       public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
           System.out.println("执行事务回查");
           return RocketMQLocalTransactionState.COMMIT;
       }
   }

}

消费者


package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
   @Override
   public void onMessage(String s) {
       System.out.println("收到"+s);
   }
}

主类


package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
   public static void main(String[] args) {
       SpringApplication.run(Main.class, args);
   }
}

测试类


package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
   @Autowired
   private  Producer producer;
   @Test
   public void  test1(){
       producer.send();
       //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
       try {
           Thread.sleep(30000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

来源:https://www.cnblogs.com/liqbk/p/13677137.html

标签:Springboot,整合,RocketMQ,收发消息
0
投稿

猜你喜欢

  • java 字浮串提取方法汇集

    2023-11-24 14:43:16
  • 详解Java内存溢出的几种情况

    2023-11-13 19:46:18
  • Android复选框CheckBox与开关按钮Switch及单选按钮RadioButton使用示例详解

    2023-02-06 18:20:42
  • 基于C#调用c++Dll结构体数组指针的问题详解

    2021-12-10 23:16:41
  • Android仿美团外卖菜单界面

    2022-03-28 02:28:35
  • C# dataset存放多张表的实例

    2022-10-10 14:08:13
  • SpringBoot如何使用applicationContext.xml配置文件

    2022-11-15 08:18:53
  • Token登陆验证机制的原理及实现

    2022-07-08 03:28:19
  • Android中FontMetrics的几个属性全面讲解

    2023-11-14 14:57:20
  • Activiti流程引擎对象及配置原理解析

    2023-02-11 22:20:20
  • spring boot多数据源动态切换代码实例

    2022-03-11 00:37:09
  • android自定义view之模拟qq消息拖拽删除效果

    2023-01-29 11:48:34
  • visual studio 2019安装配置可编写c/c++语言的IDE环境

    2023-10-04 02:01:02
  • 基于静态Singleton模式的使用介绍

    2022-09-13 20:11:08
  • java中transient关键字用法分析

    2022-01-22 04:27:05
  • c# BackgroundWorker使用方法

    2021-05-27 00:49:12
  • C++日期类计算器的模拟实现举例详解

    2023-05-22 08:27:16
  • C#中反射和扩展方法如何运用

    2023-08-02 01:43:16
  • C#与PLC通讯的实现代码

    2021-10-29 13:34:39
  • C#实现网页画图功能

    2021-12-05 19:33:41
  • asp之家 软件编程 m.aspxhome.com