关于golang监听rabbitmq消息队列任务断线自动重连接的问题

作者:孙龙-程序员 时间:2024-04-25 13:21:03 

golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

代码实现一:

消费者:

package main
import (
   "fmt"
   "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
   //time.Sleep(500*time.Microsecond)
   //return errors.New("顶顶顶顶")
   fmt.Println(string(dataByte))
   //time.Sleep(1*time.Second)
   return nil
//消息已经消费3次 失败了 请进行处理
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
   fmt.Println(err)
   fmt.Println("任务处理失败了,我要进入db日志库了")
   fmt.Println("任务处理失败了,发送钉钉消息通知主人")
func main() {
   t := &RecvPro{}
   //rabbitmq.Recv(rabbitmq.QueueExchange{
   //    "a_test_0001",
   //    "",
   //    "amqp://guest:guest@192.168.2.232:5672/",
   //},t,5)
   /*
       runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
    */
   err := rabbitmq.Recv(rabbitmq.QueueExchange{
       "a_test_0001",
       "hello_go",
       "direct",
       "amqp://guest:guest@192.168.1.169:5672/",
   },t,4)
   if(err != nil){
       fmt.Println(err)
   }

rabbitmq代码

package rabbitmq

import (
   "errors"
   "strconv"
   "time"
   //"errors"
   "fmt"
   "github.com/streadway/amqp"
   "log"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
   MsgContent() string
}
type RetryProducer interface {
// 定义接收者接口
type Receiver interface {
   Consumer([]byte)    error
   FailAction(error , []byte)  error
// 定义RabbitMQ对象
type RabbitMQ struct {
   connection *amqp.Connection
   Channel *amqp.Channel
   dns string
   QueueName   string            // 队列名称
   RoutingKey  string            // key名称
   ExchangeName string           // 交换机名称
   ExchangeType string           // 交换机类型
   producerList []Producer
   retryProducerList []RetryProducer
   receiverList []Receiver
// 定义队列交换机对象
type QueueExchange struct {
   QuName  string           // 队列名称
   RtKey   string           // key值
   ExName  string           // 交换机名称
   ExType  string           // 交换机类型
   Dns     string              //链接地址
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
   mqConn, err = amqp.Dial(r.dns)
   r.connection = mqConn   // 赋值给RabbitMQ对象
   if err != nil {
       fmt.Printf("rbmq链接失败  :%s \n", err)
   }
   return
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
   err = r.connection.Close()
   if err != nil{
       fmt.Printf("关闭mq链接失败  :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
   mqConn := r.connection
   r.Channel, err = mqConn.Channel()
   //defer mqChan.Close()
       fmt.Printf("MQ打开管道失败:%s \n", err)
   return err
func (r *RabbitMQ)CloseMqChannel() (err error){
   r.Channel.Close()
// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
   return RabbitMQ{
       QueueName:q.QuName,
       RoutingKey:q.RtKey,
       ExchangeName: q.ExName,
       ExchangeType: q.ExType,
       dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error)  {
   err = mq.MqOpenChannel()
   ch := mq.Channel
       log.Printf("Channel err  :%s \n", err)
   defer mq.Channel.Close()
   if mq.ExchangeName != "" {
       if mq.ExchangeType == ""{
           mq.ExchangeType = "direct"
       }
       err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
       if err != nil {
           log.Printf("ExchangeDeclare err  :%s \n", err)
   // 用于检查队列是否存在,已经存在不需要重复声明
   _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
       log.Printf("QueueDeclare err :%s \n", err)
   // 绑定任务
   if mq.RoutingKey != "" && mq.ExchangeName != "" {
       err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
           log.Printf("QueueBind err :%s \n", err)
   if mq.ExchangeName != "" && mq.RoutingKey != ""{
       err = mq.Channel.Publish(
           mq.ExchangeName,     // exchange
           mq.RoutingKey, // routing key
           false,  // mandatory
           false,  // immediate
           amqp.Publishing {
               ContentType: "text/plain",
               Body:        []byte(body),
           })
   }else{
           "",     // exchange
           mq.QueueName, // routing key
/*
发送延时消息
*/
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
   err =mq.MqOpenChannel()
           return
   if ttl <= 0{
       return errors.New("发送延时消息,ttl参数是必须的")
   table := make(map[string]interface{},3)
   table["x-dead-letter-routing-key"] = mq.RoutingKey
   table["x-dead-letter-exchange"] = mq.ExchangeName
   table["x-message-ttl"] = ttl*1000
   //fmt.Printf("%+v",table)
   //fmt.Printf("%+v",mq)
   ttlstring := strconv.FormatInt(ttl,10)
   queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
   routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
   _, err = ch.QueueDeclare(queueName, true, false, false, false, table)
       return
   if routingKey != "" && mq.ExchangeName != "" {
       err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
   header := make(map[string]interface{},1)
   header["retry_nums"] = 0
   var ttl_exchange string
   var ttl_routkey string
   if(mq.ExchangeName != "" ){
       ttl_exchange = mq.ExchangeName
       ttl_exchange = ""
   if mq.RoutingKey != "" && mq.ExchangeName != ""{
       ttl_routkey = routingKey
       ttl_routkey = queueName
   err = mq.Channel.Publish(
       ttl_exchange,     // exchange
       ttl_routkey, // routing key
       false,  // mandatory
       false,  // immediate
       amqp.Publishing {
           ContentType: "text/plain",
           Body:        []byte(body),
           Headers:header,
       })
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
   err :=mq.MqOpenChannel()
   //原始路由key
   oldRoutingKey := args[0]
   //原始交换机名
   oldExchangeName := args[1]
   table["x-dead-letter-routing-key"] = oldRoutingKey
   if oldExchangeName != "" {
       table["x-dead-letter-exchange"] = oldExchangeName
       mq.ExchangeName = ""
       table["x-dead-letter-exchange"] = ""
   table["x-message-ttl"] = int64(20000)
   _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
   header["retry_nums"] = retry_nums + int32(1)
       ttl_routkey = mq.RoutingKey
       ttl_routkey = mq.QueueName
   //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
       fmt.Printf("MQ任务发送失败:%s \n", err)
// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
   // 获取消费通道,确保rabbitMQ一个一个发送消息
   err =  ch.Qos(1, 0, false)
   msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
       log.Printf("Consume err :%s \n", err)
   for msg := range msgList {
       retry_nums,ok := msg.Headers["retry_nums"].(int32)
       if(!ok){
           retry_nums = int32(0)
       // 处理数据
       err := receiver.Consumer(msg.Body)
       if err!=nil {
           //消息处理失败 进入延时尝试机制
           if retry_nums < 3{
               fmt.Println(string(msg.Body))
               fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")
               retry_msg(msg.Body,retry_nums,QueueExchange{
                       mq.QueueName,
                       mq.RoutingKey,
                       mq.ExchangeName,
                       mq.ExchangeType,
                       mq.dns,
                   })
           }else{
               //消息失败 入库db
               fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")
               receiver.FailAction(err,msg.Body)
           }
           err = msg.Ack(true)
           if err != nil {
               fmt.Printf("确认消息未完成异常:%s \n", err)
       }else {
           // 确认消息,必须为false
               fmt.Printf("消息消费ack失败 err :%s \n", err)
//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
   //原始队列名称 交换机名称
   oldQName := queueExchange.QuName
   oldExchangeName := queueExchange.ExName
   oldRoutingKey := queueExchange.RtKey
   if oldRoutingKey == "" || oldExchangeName == ""{
       oldRoutingKey = oldQName
   if queueExchange.QuName != "" {
       queueExchange.QuName = queueExchange.QuName + "_retry_3";
   if queueExchange.RtKey != "" {
       queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
       queueExchange.RtKey = queueExchange.QuName + "_retry_3";
//fmt.Printf("%+v",queueExchange)
   mq := NewMq(queueExchange)
   _ = mq.MqConnect()
   defer func(){
       _ = mq.CloseMqConnect()
   }()
   //fmt.Printf("%+v",queueExchange)
   mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
func Send(queueExchange QueueExchange,msg string) (err error){
   err = mq.MqConnect()
       mq.CloseMqConnect()
   err = mq.sendMsg(msg)
//发送延时消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
   err = mq.sendDelayMsg(msg,ttl)
runNums  开启并发执行任务数量
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
   //链接rabbitMQ
   if(err != nil){
   //rbmq断开链接后 协程退出释放信号
   taskQuit:= make(chan struct{}, 1)
   //尝试链接rbmq
   tryToLinkC := make(chan struct{}, 1)
   //开始执行任务
   for i:=1;i<=runNums;i++{
       go Recv2(mq,receiver,taskQuit);
   //如果rbmq断开连接后 尝试重新建立链接
   var tryToLink = func() {
       for {
           err = mq.MqConnect()
           if(err == nil){
               tryToLinkC <- struct{}{}
               break
           time.Sleep(time.Second * 10)
   for{
       select {
       case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
            go tryToLink()
           <-tryToLinkC //建立链接成功后 重新开启协程执行任务
           fmt.Println("重新开启新的协程执行任务")
           go Recv2(mq,receiver,taskQuit);
       time.Sleep(time.Millisecond*100)
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
       defer func() {
           fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
           taskQuit <- struct{}{}
       }()
       // 验证链接是否正常
       err := mq.MqOpenChannel()
       if(err != nil){
       mq.ListenReceiver(receiver)
type retryPro struct {
   msgContent   string

实现重连方式很多,下面实现方式比较简单

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

1.Recv方法创建ampq链接

2.启动协程开始执行任务

MqOpenChannel 打开一个channel通道处理amqp消息

拿到消息 处理任务

3,协程中捕获异常发送消息到taskQuit <- struct{}{}

4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功

5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*
runNums  开启并发执行任务数量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
   mq := NewMq(queueExchange)
   //链接rabbitMQ
   err = mq.MqConnect()
   if(err != nil){
       return
   }
   //rbmq断开链接后 协程退出释放信号
   taskQuit:= make(chan struct{}, 1)
   //尝试链接rbmq
   tryToLinkC := make(chan struct{}, 1)
   //开始执行任务
   for i:=1;i<=runNums;i++{
       go Recv2(mq,receiver,taskQuit);

//如果rbmq断开连接后 尝试重新建立链接
   var tryToLink = func() {
       for {
           err = mq.MqConnect()
           if(err == nil){
               tryToLinkC <- struct{}{}
               break
           }
           time.Sleep(time.Second * 10)
       }
   for{
       select {
       case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
            go tryToLink()
           <-tryToLinkC //建立链接成功后 重新开启协程执行任务
           fmt.Println("重新开启新的协程执行任务")
           go Recv2(mq,receiver,taskQuit);
       time.Sleep(time.Millisecond*100)
}
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
       defer func() {
           fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
           taskQuit <- struct{}{}
           return
       }()
       // 验证链接是否正常
       err := mq.MqOpenChannel()
       if(err != nil){
       mq.ListenReceiver(receiver)

来源:https://www.cnblogs.com/sunlong88/p/15959476.html

标签:golang,rabbitmq,断线自动重连,消息队列
0
投稿

猜你喜欢

  • ASP XML操作类代码

    2011-03-08 10:47:00
  • 有时用户并不需要引导

    2009-07-17 18:48:00
  • 微信公众号接入ChatGPT机器人的方法

    2023-11-19 22:05:42
  • django美化后台django-suit的安装配置操作

    2021-12-19 23:13:25
  • python递归函数绘制分形树的方法

    2021-04-22 02:16:02
  • 如何利用Python识别图片中的文字详解

    2021-02-07 21:05:30
  • sqlserver数据库大型应用解决方案经验总结

    2024-01-26 13:38:52
  • Python编写万花尺图案实例

    2022-04-16 06:12:21
  • Python编译成.so文件进行加密后调用的实现

    2022-11-17 12:49:54
  • python验证码识别的示例代码

    2023-08-04 03:20:24
  • 一些相见恨晚的 JavaScript 技巧

    2024-04-18 10:10:53
  • MySQL处理重复数据的方法

    2024-01-24 04:08:27
  • MySQL数据库的触发器和事务

    2024-01-15 21:35:08
  • Python3中的真除和Floor除法用法分析

    2023-10-11 09:01:45
  • python进程类subprocess的一些操作方法例子

    2021-08-26 16:21:35
  • 利用Python实现某OA系统的自动定位功能

    2021-05-26 02:27:46
  • Pandas中的 transform()结合 groupby()用法示例详解

    2023-01-26 10:00:09
  • 想用户所想(感受亚马逊的设计)

    2007-08-26 17:09:00
  • 微信小程序实现日期格式化

    2023-07-20 20:28:32
  • Python 分享10个PyCharm技巧

    2021-11-18 11:03:41
  • asp之家 网络编程 m.aspxhome.com