python中的生成器实现周期性报文发送功能

作者:Logintern09 时间:2023-02-07 23:15:12 

使用python中的生成器实现周期性发送列表中数值的报文发送功能。

功能开发背景:提取cantest工具采集到的现场报文数据,希望使用原始的现场数据模拟验证程序现有逻辑,需要开发一个工具能够自动按照报文发送周期依次发送采集到的报文数据中的一个数值。

功能开发需求:多个报文发送对象共用同一个报文发送线程,多个对象间的报文发送周期不同,多个对象间的总报文发送数据长度不同,能够允许报文发送过程中断及恢复某个对象的报文发送。

功能开发实现逻辑:在固定发送对象某个数值的基础程序版本上增加新的功能,考虑使用python中生成器实现周期性提取对象数值发送报文的功能。

目前只需要发送两个对象的报文数据,先定义两个使用yield生成器:

    def yield_item_value_1(self):
        item_value_list = self.item_value_dict[item_list[0]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

    def yield_item_value_2(self):
        item_value_list = self.item_value_dict[item_list[1]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

报文发送线程中的run()函数:

def run(self):
       # 实时更新item的被选状态
       self.get_checkbox_res_func()
       # 获取每个对象的实际物理值
       self.get_item_value_dict()
       self.item1_value_func = self.yield_item_value_1()
       self.item2_value_func = self.yield_item_value_2()
       while self.Flag:
           if any(msg_send_flag_dict.values()):
               # 每隔second秒执行func函数
               timer = Timer(0.01, self.tick_10ms_func)
               timer.start()
               self.send_working_msg(self.working_can_device, self.working_can_channel)
               timer.join()
           else:
               mes_info = "Goodbye *** 自动发送所有报文数据结束!!!"
               toastone = wx.MessageDialog(None, mes_info, "信息提示",
                                           wx.YES_DEFAULT | wx.ICON_QUESTION)
               if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮
                   toastone.Destroy()  # 则关闭提示框
               break

报文周期性发送函数:

def send_working_msg(self, can_device, device_id):
       for idx in range(len(item_list)):
           if msg_send_flag_dict[item_list[idx]] == 1:
               msg_id_idx = msg_operation_list.index("报文ID") - 1
               msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip())
               # 获取报文发送帧类型
               msg_type_idx = msg_operation_list.index("帧类型") - 1
               msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx])
               msg_type = 1 if msg_type == "扩展帧" else 0
               # 获取报文发送周期
               msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1
               msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx])
               send_cycle = msg_cycle / 10
               if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle:
                   # 开始喂值
                   if idx == 0:
                       try:
                           item_phyValue = next(self.item1_value_func)
                       except StopIteration:
                           msg_send_flag_dict[item_list[idx]] = 0
                           continue
                   else:
                       try:
                           item_phyValue = next(self.item2_value_func)
                       except StopIteration:
                           msg_send_flag_dict[item_list[idx]] = 0
                           continue
                   msg_data = self.get_item_msg(item_list[idx], item_phyValue)
                   if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0):
                       print("发送报文成功")
                       # print("msg_data", msg_data)
                       msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0
                   else:
                       pass
                       # print("发送报文失败")
                       # mes_info = "发送报文失败"
                       # toastone = wx.MessageDialog(None, mes_info, "信息提示",
                       #                             wx.YES_DEFAULT | wx.ICON_QUESTION)
                       # if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮
                       #     toastone.Destroy()  # 则关闭提示框

功能实现逻辑的待优化点:存在多个对象就需要定义多个存储报文数据的生成器。

上述功能实现逻辑优化如下:

    def set_yield_func(self):
        item_yield_func_dict = dict()
        for i in range(len(item_list)):
            item_yield_func_dict[item_list[i]] = self.yield_item_value(i)
        return item_yield_func_dict

    def yield_item_value(self, item_idx):
        item_value_list = self.item_value_dict[item_list[item_idx]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

报文发送线程的run()函数中调用这个存储对象报文发送数据生成器的字典item_yield_func_dict:

def run(self):
       # 实时更新item的被选状态
       self.get_checkbox_res_func()
       # 获取每个对象的实际物理值
       self.get_item_value_dict()
       self.item_yield_func_dict = self.set_yield_func()
       …………

从存储每个对象生成器的字典item_yield_func_dict中获取生成器对象:

try:
                       item_phyValue = next(self.item_yield_func_dict[item_list[idx]])
                   except StopIteration:
                       msg_send_flag_dict[item_list[idx]] = 0
                       continue

来源:https://blog.csdn.net/Logintern09/article/details/129356085

标签:python,周期,报文发送
0
投稿

猜你喜欢

  • 使用 TRUNCATE TABLE 删除所有行

    2008-04-24 19:20:00
  • 如何用Python提取10000份log中的产品信息

    2023-06-30 06:23:21
  • 如何实现优惠打折?

    2010-06-03 10:27:00
  • Python 自由定制表格的实现示例

    2023-11-11 16:54:41
  • ASP ajax分页教程一

    2011-04-10 10:51:00
  • Python使用Flask框架同时上传多个文件的方法

    2023-02-02 10:16:49
  • Python实现快速排序的方法详解

    2022-08-29 13:08:35
  • Django-xadmin+rule对象级权限的实现方式

    2023-02-20 17:08:08
  • python-yml文件读写与xml文件读写

    2022-06-16 06:43:50
  • Mysql自带profiling性能分析工具使用分享

    2024-01-14 17:48:24
  • python使用requests实现发送带文件请求功能

    2023-11-03 14:23:13
  • python滑块验证码的破解实现

    2023-11-27 11:46:17
  • Python的列表推导式你了解吗

    2022-08-13 05:30:41
  • JS中for,for...in,for...of和forEach的区别和用法实例

    2024-04-29 13:20:49
  • 前端之vue3使用WebSocket的详细步骤

    2024-04-30 10:28:54
  • 基于Python的图像阈值化分割(迭代法)

    2022-10-23 21:25:01
  • AES算法 asp源码

    2009-08-28 13:05:00
  • Python教程之全局变量用法

    2023-12-18 11:55:35
  • vue实现自定义组件挂载原型上

    2024-04-30 10:24:11
  • Django框架实现的简单分页功能示例

    2022-05-16 08:25:43
  • asp之家 网络编程 m.aspxhome.com