C#用RabbitMQ实现消息订阅与发布
作者:Alan.hsiang 时间:2022-09-05 16:23:40
目录
Fanout交换机模型
RabbitMQ控制台操作
新增两个队列
绑定fanout交换机
示例效果图
核心代码
消息发布
消息订阅
Fanout交换机模型
扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
RabbitMQ控制台操作
新增两个队列
在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:
绑定fanout交换机
将两个队列绑定到系统默认的fanout交换机,如下所示:
示例效果图
生产者,采用Fanout类型交换机发布消息,如下图所示:
当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:
当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:
核心代码
消息发布
建立连接后,将通道声明类型为Fanout的交换机,如下所示:
/// <summary>
/// fanout类型交换机,发送消息
/// </summary>
public class RabbitMqFanoutSendHelper : RabbitMqHelper {
/// <summary>
/// 发送消息
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
public bool SendMsg(string msg)
{
try
{
using (var conn = GetConnection("/Alan.hsiang"))
{
using (var channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "amq.fanout",
routingKey: "",
basicProperties: null,
body: body);
//Console.WriteLine(" [x] Sent {0}", message);
};
};
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
消息订阅
建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:
/// <summary>
/// 扇形交换机接收消息
/// </summary>
public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
{
public RabbitMqReceiveEventHandler OnReceiveEvent;
private IConnection conn;
private IModel channel;
private EventingBasicConsumer consumer;
public bool StartReceiveMsg(string queueName)
{
try
{
conn = GetConnection("/Alan.hsiang");
channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
//此处随机取出交换机下的队列
//var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Console.WriteLine(" [x] Received {0}", message);
if (OnReceiveEvent != null)
{
OnReceiveEvent(queueName+"::"+message);
}
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
作者:Alan.hsiang
出处:http://www.cnblogs.com/hsiang/
来源:https://www.cnblogs.com/hsiang/p/14771629.html
标签:C#,RabbitMQ,消息订阅,消息发布
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
Android仿直播类app赠送礼物功能
2023-07-26 05:06:17
![](https://img.aspxhome.com/file/2023/3/137983_0s.gif)
Minio与SpringBoot使用okhttp3问题解决
2021-06-25 19:17:08
![](https://img.aspxhome.com/file/2023/1/83501_0s.jpg)
Android应用APP自动更新功能的代码实现
2022-09-17 09:48:25
![](https://img.aspxhome.com/file/2023/0/87290_0s.jpg)
Spring bean对象实例化实现过程图解
2023-01-02 07:32:13
![](https://img.aspxhome.com/file/2023/1/125501_0s.png)
Unity2019-2020 个人版官方免费激活详细方法
2023-12-08 21:57:39
![](https://img.aspxhome.com/file/2023/4/81364_0s.png)
Android中GridView插件的使用方法
2021-07-26 16:46:28
![](https://img.aspxhome.com/file/2023/6/138056_0s.png)
SpringBoot Nacos实现自动刷新
2023-09-16 04:17:09
Java4Android开发教程(一)JDK安装与配置
2022-02-04 22:33:12
![](https://img.aspxhome.com/file/2023/5/130415_0s.png)
IDEA 2020.1 版自动导入MAVEN依赖的方法(新版MAVEN无法自动导入/更新POM依赖、MAVEN设置自动更新、自动更新快捷键)
2022-08-27 09:31:22
![](https://img.aspxhome.com/file/2023/4/88504_0s.png)
Android 系统语言切换监听和设置实例代码
2021-08-06 16:18:25
Spring Bean的初始化和销毁实例详解
2023-10-28 05:56:04
spring webflux自定义netty 参数解析
2023-07-26 18:38:25
![](https://img.aspxhome.com/file/2023/1/57811_0s.jpg)
Java方法调用解析静态分派动态分派执行过程
2023-05-03 04:32:40
Spring boot外部配置(配置中心化)详解
2022-07-11 23:13:26
android 封装抓取网页信息的实例代码
2021-11-28 09:40:32
C#实现根据字节数截取字符串并加上省略号的方法
2021-10-24 22:24:17
使用设计模式中的工厂方法模式进行C#编程的示例讲解
2023-10-21 05:27:08
![](https://img.aspxhome.com/file/2023/0/96300_0s.png)
c# 线程定时器 System.Threading.Timer的使用
2022-07-08 01:28:09
![](https://img.aspxhome.com/file/2023/0/92890_0s.png)
Flutter实现自定义搜索框AppBar的示例代码
2021-10-26 02:37:54
![](https://img.aspxhome.com/file/2023/5/137195_0s.png)
Spring整合WebSocket应用示例(上)
2023-05-05 10:09:21