C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

作者:做自己518 时间:2023-12-06 10:45:34 

1:RabbitMQ是个啥?(专业术语参考自网络)

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:


//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
 using MyRabbitMqService;
 using System.Runtime.CompilerServices;

class Program
 {
   static void Main(string[] args)
   {
       //就是简单的队列,生产者
       Console.WriteLine("====RabbitMqPublishDemo====");
       for (int i = 0; i < 500; i++)
       {
         ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
       }
       Console.WriteLine("生成完毕!");
       Console.ReadLine();
   }
 }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

using (IConnection conn = connectionFactory.CreateConnection())
 {
   using (IModel channel = conn.CreateModel())
   {
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
     var msgBody = Encoding.UTF8.GetBytes(msg);
     channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
   }
 }
}

//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
 using MyRabbitMqService;
 using System.Runtime.InteropServices;

class Program
 {
   static void Main(string[] args)
   {
     Console.WriteLine("====RabbitMqConsumerDemo====");
     ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
     {
       Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
     });
     Console.ReadLine();
   }
 }
}

#region 简单生产者后端逻辑
   /// <summary>
   /// 简单消费者
   /// </summary>
   /// <param name="queueName">队列名称</param>
   /// <param name="isBasicNack">失败后是否自动放到队列</param>
   /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
   public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
   {
     Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
     IConnection conn = connectionFactory.CreateConnection();
     IModel channel = conn.CreateModel();
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
     var consumer = new EventingBasicConsumer(channel);
     consumer.Received += (sender, ea) =>
     {
       byte[] bymsg = ea.Body.ToArray();
       string msg = Encoding.UTF8.GetString(bymsg);
       if (handleMsgStr != null)
       {
         handleMsgStr.Invoke(msg);
       }
       else
       {
         Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
       }
     };
     channel.BasicConsume(queueName, autoAck: true, consumer);
   }
   #endregion

7:Work模式

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)


//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
 using MyRabbitMqService;
 using System.Runtime.CompilerServices;

class Program
 {
   static void Main(string[] args)
   {
       //就是简单的队列,生产者
       Console.WriteLine("====RabbitMqPublishDemo====");
       for (int i = 0; i < 500; i++)
       {
         ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
       }
       Console.WriteLine("生成完毕!");
       Console.ReadLine();
   }
 }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

using (IConnection conn = connectionFactory.CreateConnection())
 {
   using (IModel channel = conn.CreateModel())
   {
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
     var msgBody = Encoding.UTF8.GetBytes(msg);
     channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
   }
 }
}

//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
 using MyRabbitMqService;
 using System.Runtime.InteropServices;

class Program
 {
   static void Main(string[] args)
   {
     Console.WriteLine("====RabbitMqConsumerDemo====");
     ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
     {
       Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
     });
     Console.ReadLine();
   }
 }
}

#region 简单生产者后端逻辑
   /// <summary>
   /// 简单消费者
   /// </summary>
   /// <param name="queueName">队列名称</param>
   /// <param name="isBasicNack">失败后是否自动放到队列</param>
   /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
   public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
   {
     Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
     IConnection conn = connectionFactory.CreateConnection();
     IModel channel = conn.CreateModel();
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
     var consumer = new EventingBasicConsumer(channel);
     consumer.Received += (sender, ea) =>
     {
       byte[] bymsg = ea.Body.ToArray();
       string msg = Encoding.UTF8.GetString(bymsg);
       if (handleMsgStr != null)
       {
         handleMsgStr.Invoke(msg);
       }
       else
       {
         Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
       }
     };
     channel.BasicConsume(queueName, autoAck: true, consumer);
   }
   #endregion

8:Fanout

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:


//就如下的code, 多次生产,3个消费者都可以自动开始消费

//生产者
using System;
namespace RabbitMqPublishDemo
{
 using MyRabbitMqService;
 using System.Runtime.CompilerServices;
 class Program
 {
   static void Main(string[] args)
   {
     for (int i = 0; i < 500; i++)
     {
       ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
     }
     Console.WriteLine("工作队列模式 生成完毕......!");
     Console.ReadLine();
   }
 }
}

//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
   {
     using (var connection = connectionFactory.CreateConnection())
     using (var channel = connection.CreateModel())
     {
       channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
       var body = Encoding.UTF8.GetBytes(msg);
       var properties = channel.CreateBasicProperties();
       properties.Persistent = true;

channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
       Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
     }
   }

//work消费端
using System;

namespace RabbitMqConsumerDemo
{
 using MyRabbitMqService;
 using System.Runtime.InteropServices;
 class Program
 {
   static void Main(string[] args)
   {
     Console.WriteLine("====Work模式开启了====");
     ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
     {
       Console.WriteLine($"work模式获取到消息{msg}");
     });
     Console.ReadLine();
   }
 }
}

//work后端逻辑
   public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
   {
     var connection = connectionFactory.CreateConnection();
     var channel = connection.CreateModel();

channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
     Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");

consumer.Received += (sender, ea) =>
     {
       var body = ea.Body.ToArray();
       var message = Encoding.UTF8.GetString(body);
       if (handserMsg != null)
       {
         if (!string.IsNullOrEmpty(message))
         {
           handserMsg.Invoke(message);
         }
       }
       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
     };
     channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
   }

9:Direct

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:


//同一个消息会被多个订阅者消费

//发布者
using System;

namespace RabbitMqPublishDemo
{
 using MyRabbitMqService;
 using System.Runtime.CompilerServices;

class Program
 {
   static void Main(string[] args)
   {

#region 发布订阅模式,带上了exchange
     for (int i = 0; i < 500; i++)
     {
       ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
     }
     Console.WriteLine("发布ok!");
     #endregion
     Console.ReadLine();
   }
 }
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
 public static void PublishExchangeModel(string exchangeName, string message)
   {
     using (var connection = connectionFactory.CreateConnection())
     using (var channel = connection.CreateModel())
     {
       channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
       var body = Encoding.UTF8.GetBytes(message);
       channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
       Console.WriteLine($" Sent {message}");
     }
   }

//订阅者
using System;
namespace RabbitMqConsumerDemo
{
 using MyRabbitMqService;
 using System.Runtime.InteropServices;
 class Program
 {
   static void Main(string[] args)
   {

#region 发布订阅模式 Exchange
     ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
     {
       Console.WriteLine($"订阅到消息:{msg}");
     });
     #endregion
     Console.ReadLine();
   }
 }
}

//订阅者后端的逻辑
public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
   {
     var connection = connectionFactory.CreateConnection();
     var channel = connection.CreateModel();

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉

var queueName = channel.QueueDeclare().QueueName;
     channel.QueueBind(queue: queueName,
              exchange: exchangeName,
              routingKey: "");

Console.WriteLine(" Waiting for msg....");

var consumer = new EventingBasicConsumer(channel);
     consumer.Received += (model, ea) =>
     {
       var body = ea.Body.ToArray();
       var message = Encoding.UTF8.GetString(body);
       if (handlerMsg != null)
       {
         if (!string.IsNullOrEmpty(message))
         {
           handlerMsg.Invoke(message);
         }
       }
       else
       {
         Console.WriteLine($"订阅到消息:{message}");
       }
     };
     channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
   }

来源:https://www.cnblogs.com/Fengge518/p/13826983.html

标签:C#,RabbitMq,队列
0
投稿

猜你喜欢

  • Android 使用 okhttp3和retrofit2 进行单文件和多文件上传

    2023-04-29 07:33:22
  • 详解Java8新特性Stream之list转map及问题解决

    2021-06-12 05:45:06
  • 关于C#中ajax跨域访问问题

    2023-03-04 21:28:04
  • Java实现经典游戏飞机大战-I的示例代码

    2023-07-30 15:45:22
  • C#(asp.net)多线程用法示例(可用于同时处理多个任务)

    2022-03-09 01:41:30
  • Android身份证号有效性校验工具类案例

    2022-02-02 21:07:01
  • Android ViewPager实现Banner循环播放

    2022-07-16 06:24:03
  • Java异常处理之try...catch...finally详解

    2023-09-17 05:38:24
  • 解析Android开发优化之:对Bitmap的内存优化详解

    2023-08-31 21:05:40
  • 如何在WorkManager中处理异步任务详解

    2021-09-12 14:05:29
  • java递归实现拼装多个api的结果操作方法

    2023-11-24 23:44:35
  • 老生常谈Scanner的基本用法

    2021-08-27 00:34:18
  • Android使用WebView实现截图分享功能

    2023-04-17 08:03:48
  • Android实现数字跳动效果的TextView方法示例

    2023-05-24 16:54:29
  • Scala异常处理的方法深入分析

    2022-01-09 19:50:35
  • SpringBoot整合Docker实现一次构建到处运行的操作方法

    2023-01-23 04:32:14
  • Servlet中/和/*的区别详解

    2022-07-11 03:21:33
  • 29个要点帮你完成java代码优化

    2022-11-06 05:16:26
  • Android Studio3.2中导出jar包的过程详解

    2021-10-14 07:06:05
  • JAVA NIO实现简单聊天室功能

    2023-05-01 10:32:49
  • asp之家 软件编程 m.aspxhome.com