C#用RabbitMQ实现消息订阅与发布

作者:Alan.hsiang 时间:2022-09-05 16:23:40 

目录
  • Fanout交换机模型

  • RabbitMQ控制台操作

    • 新增两个队列

    • 绑定fanout交换机

  • 示例效果图

    • 核心代码

      • 消息发布

      • 消息订阅

    Fanout交换机模型

    扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    C#用RabbitMQ实现消息订阅与发布

    RabbitMQ控制台操作

    新增两个队列

    在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    绑定fanout交换机

    将两个队列绑定到系统默认的fanout交换机,如下所示:

    C#用RabbitMQ实现消息订阅与发布

    示例效果图

    生产者,采用Fanout类型交换机发布消息,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

     当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    核心代码

    消息发布

    建立连接后,将通道声明类型为Fanout的交换机,如下所示:


    /// <summary>
       /// fanout类型交换机,发送消息
       /// </summary>
       public class RabbitMqFanoutSendHelper : RabbitMqHelper {
           /// <summary>
           /// 发送消息
           /// </summary>
           /// <param name="msg"></param>
           /// <returns></returns>
           public bool SendMsg(string msg)
           {
               try
               {
                   using (var conn = GetConnection("/Alan.hsiang"))
                   {
                       using (var channel = conn.CreateModel())
                       {
                           channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);

    var body = Encoding.UTF8.GetBytes(msg);

    channel.BasicPublish(exchange: "amq.fanout",
                                                routingKey: "",
                                                basicProperties: null,
                                                body: body);

    //Console.WriteLine(" [x] Sent {0}", message);
                       };
                   };
                   return true;
               }
               catch (Exception ex)
               {
                   throw ex;
               }
           }
       }

    消息订阅

    建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:


    /// <summary>
       /// 扇形交换机接收消息
       /// </summary>
       public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
       {
           public RabbitMqReceiveEventHandler OnReceiveEvent;

    private IConnection conn;

    private IModel channel;

    private EventingBasicConsumer consumer;

    public bool StartReceiveMsg(string queueName)
           {
               try
               {
                   conn = GetConnection("/Alan.hsiang");

    channel = conn.CreateModel();
                   channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
                   //此处随机取出交换机下的队列
                   //var queueName = channel.QueueDeclare().QueueName;
                   channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
                   consumer = new EventingBasicConsumer(channel);
                   consumer.Received += (model, ea) =>
                   {
                       var body = ea.Body.ToArray();
                       var message = Encoding.UTF8.GetString(body);
                       //Console.WriteLine(" [x] Received {0}", message);
                       if (OnReceiveEvent != null)
                       {
                           OnReceiveEvent(queueName+"::"+message);
                       }
                   };
                   channel.BasicConsume(queue: queueName,
                                           autoAck: true,
                                           consumer: consumer);
                   return true;
               }
               catch (Exception ex)
               {
                   throw ex;
               }
           }
       }

    作者:Alan.hsiang
    出处:http://www.cnblogs.com/hsiang/

    来源:https://www.cnblogs.com/hsiang/p/14771629.html

    标签:C#,RabbitMQ,消息订阅,消息发布
    0
    投稿

    猜你喜欢

  • Android仿直播类app赠送礼物功能

    2023-07-26 05:06:17
  • Minio与SpringBoot使用okhttp3问题解决

    2021-06-25 19:17:08
  • Android应用APP自动更新功能的代码实现

    2022-09-17 09:48:25
  • Spring bean对象实例化实现过程图解

    2023-01-02 07:32:13
  • Unity2019-2020 个人版官方免费激活详细方法

    2023-12-08 21:57:39
  • Android中GridView插件的使用方法

    2021-07-26 16:46:28
  • SpringBoot Nacos实现自动刷新

    2023-09-16 04:17:09
  • Java4Android开发教程(一)JDK安装与配置

    2022-02-04 22:33:12
  • IDEA 2020.1 版自动导入MAVEN依赖的方法(新版MAVEN无法自动导入/更新POM依赖、MAVEN设置自动更新、自动更新快捷键)

    2022-08-27 09:31:22
  • Android 系统语言切换监听和设置实例代码

    2021-08-06 16:18:25
  • Spring Bean的初始化和销毁实例详解

    2023-10-28 05:56:04
  • spring webflux自定义netty 参数解析

    2023-07-26 18:38:25
  • Java方法调用解析静态分派动态分派执行过程

    2023-05-03 04:32:40
  • Spring boot外部配置(配置中心化)详解

    2022-07-11 23:13:26
  • android 封装抓取网页信息的实例代码

    2021-11-28 09:40:32
  • C#实现根据字节数截取字符串并加上省略号的方法

    2021-10-24 22:24:17
  • 使用设计模式中的工厂方法模式进行C#编程的示例讲解

    2023-10-21 05:27:08
  • c# 线程定时器 System.Threading.Timer的使用

    2022-07-08 01:28:09
  • Flutter实现自定义搜索框AppBar的示例代码

    2021-10-26 02:37:54
  • Spring整合WebSocket应用示例(上)

    2023-05-05 10:09:21
  • asp之家 软件编程 m.aspxhome.com