python使用MQTT给硬件传输图片的实现方法

作者:Young5566 时间:2022-02-09 21:55:59 

最近因需要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:

python使用MQTT给硬件传输图片的实现方法

协议为:

需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。
消息(MQTT Payload) 格式:Web服务器-------->BASE:

python使用MQTT给硬件传输图片的实现方法

反馈:BASE---------> Web服务器:

python使用MQTT给硬件传输图片的实现方法

如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

程序流程图

根据上面的协议,可以得到如下的流程图:

python使用MQTT给硬件传输图片的实现方法

代码如下:


# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *
# 日志配置信息
logging.basicConfig(
 level=logging.INFO,
 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
)
class Mqtt(object):
 def __init__(self, img_data, size):
   self.MQTTHOST = '*******'
   self.MQTTPORT = "******"
   # 订阅和发送的主题
   self.topic_from_base = 'mqttTestSub'
   self.topic_to_base = 'mqttTestPub'
   self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
   self.client = mqtt.Client(self.client_id)
   # 完成链接后的回掉函数
   self.client.on_connect = self.on_connect
   # 图片大小
   self.size = size
   # 用于跳出死循环,结束任务
   self.finished = None
   # 包的编号
   self.index = 0
   # 将收到的图片数据按大小分成列表
   self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]
   # 记录发布后的数据,用于监控时延
   self.pub_time = 0
   self.header_to_base = 0xffffeeee
   self.header_from_base = 0xeeeeffff
   # 功能标识
   self.function_begin = 0x01
   self.function_doing = 0x02
   self.function_finished = 0x03
   # 包的完整和非完整状态
   self.whole_package = 0x01
   self.bad_package = 0x00
   # 头信息的格式,小端模式
   self.format_to_base = "<Lbhh"
   self.format_from_base = "<Lbhb"
   # 如果重发包时,用于检查是否重发第一个包
   self.first = True
   # 如果重发包时,用于检查是否重发最后一个包
   self.last = False
   self.begin_data = 'image.jpg;' + str(self.size)
 # 链接mqtt服务器函数
 def on_mqtt_connect(self):
   self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
   self.client.loop_start()
 # 链接完成后的回调函数
 def on_connect(self, client, userdata, flags, rc):
   logging.info("+++ Connected with result code {} +++".format(str(rc)))
   self.client.subscribe(self.topic_from_base)
 # 订阅函数
 def subscribe(self):
   self.client.subscribe(self.topic_from_base, 1)
   # 消息到来处理函数
   self.client.on_message = self.on_message
 # 接收到信息后的回调函数
 def on_message(self, client, userdata, msg):
   # 如果接受第一个包则不需要重发第一个
   self.first = False
   # 将接受到的包进行解压,得到一个元组
   base_tuple = struct.unpack(self.format_from_base, msg.payload)
   logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
   logging.info("+++ package_number is {}, package_status_from_base is {} +++"
          .format(base_tuple[2], base_tuple[3]))
   # 检查接受到信息的头部是否正确
   if base_tuple[0] == self.header_from_base:
     logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))
     # 是否完成传输,如果完成则退出
     if base_tuple[1] == self.function_finished:
       logging.info("+++ finish work +++")
       self.finished = 1
       self.client.disconnect()
     else:
       # 是否是最后一个包
       if self.index == len(self.image_data_list) - 1:
         self.publish('finished', self.function_finished)
         self.last = True
         logging.info("+++ finished_data_to_base is finished+++")
       else:
         # 如果接收到的包不是 0x03则进行传送数据
         if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
           logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                  .format(base_tuple[2],base_tuple[3]))
           # 如果数据的反馈中,包的状态是1则继续发下一个包
           if base_tuple[3] == self.whole_package:
             self.publish(self.index, self.function_doing)
             logging.info("+++ data_to_base is finished+++")
             self.index += 1
           # 如果数据的反馈中,包的状态是0则重发数据包
           elif base_tuple[3] == self.bad_package:
             re_package_number = base_tuple[2]
             self.publish(re_package_number-1, self.function_doing)
             logging.info("+++ re_data_to_base is finished+++")
           else:
             logging.info("+++ package_status_from_base is not 0 or 1 +++")
             self.client.disconnect()
         else:
           logging.info("+++ function_identifier is illegal +++")
           self.client.disconnect()
   else:
     logging.info("+++ header_from_base is illegal +++")
     self.client.disconnect()
 # 数据发送函数
 def publish(self, index, fuc):
   # 看是否是最后一个包
   if index == 'finished':
     length = 0
     package_number = 0
     data = b''
   else:
     length = len(self.image_data_list[index])
     package_number = index
     data = self.image_data_list[index]
   # 打包数据头信息
   buffer = struct.pack(
     self.format_to_base,
     self.header_to_base,
     fuc,
     package_number,
     length
   )
   to_base_data = buffer + data
   # mqtt发送
   self.client.publish(
     self.topic_to_base,
     to_base_data
   )
   self.pub_time = time.time()
 # 发送第一个包函数
 def publish_begin(self):
   buffer = struct.pack(
     self.format_to_base,
     self.header_to_base,
     self.function_begin,
     0,
     len(self.begin_data.encode('utf-8')),
   )
   begin_data = buffer + self.begin_data.encode('utf-8')
   self.client.publish(self.topic_to_base, begin_data)
 # 控制函数
 def control(self):
   self.on_mqtt_connect()
   self.publish_begin()
   begin_time = time.time()
   self.pub_time = time.time()
   self.subscribe()
   while True:
     time.sleep(1)
     # 超过5秒重传
     date = time.time() - self.pub_time
     if date > 5:
       # 是否重传第一个包
       if self.first == True:
         self.publish_begin()
         logging.info('+++ this is timeout first_data +++')
       # 是否重传最后一个包
       elif self.last == True:
         self.publish('finished', self.function_finished)
         logging.info('+++ this is timeout last_data +++')
       else:
         self.publish(self.index-1, self.function_doing)
         logging.info('+++ this is timeout middle_data +++')
     if self.finished == 1:
       logging.info('+++ all works is finished+++')
       break
   print(str(time.time()-begin_time) + 'begin_time - end_time')
app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)
# 接受参数
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)
class GetImage(Resource):
 # 得到参数并从图床下载到本地
 def get(self):
   args = parser.parse_args()
   url = args.get('url')
   response = requests.get(url)
   # 获取图片
   image = Image.open(BytesIO(response.content))
   # 存取图片
   add = os.path.join(os.path.abspath(''), 'image.jpg')
   image.save(add)
   # 得到图片大小
   size = os.path.getsize(add)
   f = open(add, 'rb')
   imageData = f.read()
   f.close()
   # 进行mqtt传输
   mqtt = Mqtt(imageData, size)
   mqtt.control()
   # 删除文件
   os.remove(add)
   logging.info('*** the result of control is {} ***'.format(1))
   return jsonify({
     "imageData": 1
   })
api.add_resource(GetImage, '/image')
if __name__ == '__main__':
 app.run(debug=True, host='0.0.0.0')

总结

以上所述是小编给大家介绍的python使用MQTT给硬件传输图片的实现方法,网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

来源:https://juejin.im/post/5ccc4bd1f265da037c7cf4a5

标签:python,mqtt,传输
0
投稿

猜你喜欢

  • Python切片用法实例教程

    2023-09-28 15:26:17
  • python修改全局变量可以不加global吗?

    2021-01-11 08:10:56
  • 自动化Nginx服务器的反向代理的配置方法

    2022-08-07 04:02:07
  • numpy自动生成数组详解

    2023-09-02 22:53:47
  • Python使用爬虫抓取美女图片并保存到本地的方法【测试可用】

    2023-11-04 19:44:17
  • JS页内查找关键词的高亮显示

    2013-07-18 21:13:54
  • python垃圾回收机制(GC)原理解析

    2023-01-05 10:57:46
  • ElementUI 的 Tree 组件的基本使用实战教程

    2024-04-26 17:40:46
  • Python使用自带的ConfigParser模块读写ini配置文件

    2022-04-01 00:07:01
  • 两行代码实现的QQ窗口抖动效果

    2008-09-06 12:34:00
  • 引用 js在IE与FF之间的区别详细解析

    2024-05-22 10:37:44
  • 如何用python开发Zeroc Ice应用

    2022-06-23 01:17:50
  • python+numpy实现的基本矩阵操作示例

    2023-07-16 13:52:37
  • 利用python汇总统计多张Excel

    2023-12-31 14:22:11
  • Node.js Koa2使用JWT进行鉴权的方法示例

    2024-05-02 17:05:45
  • IN&EXISTS与NOT IN&NOT EXISTS 的优化原则小结

    2024-01-16 15:31:59
  • python3 scrapy框架的执行流程

    2022-04-12 08:53:35
  • Django 日志配置按日期滚动的方法

    2021-02-02 08:27:01
  • GPU状态监测 nvidia-smi 命令的用法详解

    2022-08-28 20:57:37
  • Python约瑟夫生者死者小游戏实例讲解

    2023-02-12 02:14:25
  • asp之家 网络编程 m.aspxhome.com