解决python3 pika之连接断开的问题

作者:moxiaomomo 时间:2021-09-28 18:40:09 

问题描述

在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。

问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。

源码示例


#!/usr/bin
#coding: utf-8

import pika
import time

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

def callback(ch, method, properties, body):
print(body)
time.sleep(600)
ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
s_conn = pika.BlockingConnection(
 pika.ConnectionParameters('127.0.0.1',
  credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)

chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
chan.basic_consume(callback, queue=TEST_QUEUE)
chan.start_consuming()

if __name__ == "__main__":
test_main()

运行一段时间后, 就会报错:


[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

问题排查

猜测:pika客户端没有及时发送心跳,连接被server断开

一开始修改了heartbeat_interval参数值, 示例如下:


def test_main():
s_conn = pika.BlockingConnection(
 pika.ConnectionParameters('127.0.0.1',
  heartbeat_interval=10,
  socket_timeout=5,
  credentials=pika.PlainCredentials(USER, PWD)))
# ....

修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;

于是又加了个心跳线程, 示例如下:


#!/usr/bin
#coding: utf-8

import pika
import time
import logging
import threading

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

class Heartbeat(threading.Thread):
def __init__(self, connection):
 super(Heartbeat, self).__init__()
 self.lock = threading.Lock()
 self.connection = connection
 self.quitflag = False
 self.stopflag = True
 self.setDaemon(True)

def run(self):
 while not self.quitflag:
  time.sleep(10)
  self.lock.acquire()
  if self.stopflag :
   self.lock.release()
   continue
  try:
   self.connection.process_data_events()
  except Exception as ex:
   logging.warn("Error format: %s"%(str(ex)))
   self.lock.release()
   return
  self.lock.release()

def startHeartbeat(self):
 self.lock.acquire()
 if self.quitflag==True:
  self.lock.release()
  return
 self.stopflag=False
 self.lock.release()

def callback(ch, method, properties, body):
logging.info("recv_body:%s" % body)
time.sleep(600)
ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
s_conn = pika.BlockingConnection(
 pika.ConnectionParameters('127.0.0.1',
  heartbeat_interval=10,
  socket_timeout=5,
  credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_consume(callback,
     queue=TEST_QUEUE)

heartbeat = Heartbeat(s_conn)
heartbeat.start()   #开启心跳线程
heartbeat.startHeartbeat()
chan.start_consuming()

if __name__ == "__main__":
test_main()

尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。

去看它的api,看到heartbeat_interval的解析:


:param int heartbeat_interval: How often to send heartbeats.
        Min between this value and server's proposal
        will be used. Use 0 to deactivate heartbeats
        and None to accept server's proposal.

按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。

如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。

如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。

来源:https://blog.csdn.net/moxiaomomo/article/details/77414831

标签:python3,pika,连接,断开
0
投稿

猜你喜欢

  • go语言分布式id生成器及分布式锁介绍

    2024-01-30 21:20:43
  • TensorFlow实现自定义Op方式

    2021-11-01 15:56:38
  • Python实现的简单计算器功能详解

    2023-11-17 09:34:08
  • CSS文字排版终极指南

    2010-01-19 10:30:00
  • Pyecharts绘制可视化地球实现示例

    2021-03-18 17:18:48
  • 使用Python保存网页上的图片或者保存页面为截图

    2022-04-08 10:45:19
  • Vue组件的继承用法示例详解

    2024-05-29 22:44:22
  • Python下载网络小说实例代码

    2023-08-01 18:22:17
  • python深度学习tensorflow训练好的模型进行图像分类

    2023-02-20 20:40:37
  • 基于Python3.6中的OpenCV实现图片色彩空间的转换

    2022-05-20 14:03:13
  • 基于python实现聊天室程序

    2022-09-26 07:50:33
  • Vue中使用eslint和editorconfig方式

    2024-06-05 10:03:46
  • python调用百度语音识别实现大音频文件语音识别功能

    2023-11-29 00:59:53
  • 详解git reset 加不加 --hard的区别

    2022-04-03 06:45:13
  • ASP字符串大写转换成小写 ASP小写转换成大写 ucase lcase

    2011-03-31 10:59:00
  • Python 关于反射和类的特殊成员方法

    2021-10-16 19:30:13
  • 高并发状态下Replace Into造成的死锁问题解决

    2024-01-17 10:17:37
  • 有用的SQL语句(删除重复记录,收缩日志)

    2024-01-19 09:55:24
  • 一个完美网站的101项指标.第四部分.设计

    2008-02-29 22:22:00
  • Django serializer优化类视图的实现示例

    2021-11-23 23:16:46
  • asp之家 网络编程 m.aspxhome.com