Python实现简单多线程任务队列

作者:hebedich 时间:2022-07-29 13:21:43 

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):


def gradient_descent():
 # the gradient descent code
 plotly.write(X, Y)

一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。

一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。

那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。


from threading import Thread
import Queue
import time

class TaskQueue(Queue.Queue):

首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。


def __init__(self, num_workers=1):
 Queue.Queue.__init__(self)
 self.num_workers = num_workers
 self.start_workers()

初始化的时候,我们可以不用考虑工作线程的数量。


def add_task(self, task, *args, **kwargs):
 args = args or ()
 kwargs = kwargs or {}
 self.put((task, args, kwargs))

我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。


def start_workers(self):
 for i in range(self.num_workers):
   t = Thread(target=self.worker)
   t.daemon = True
   t.start()

我们为每个 worker 创建一个线程,然后在后台删除。

下面是 worker 函数的代码:


def worker(self):
 while True:
   tupl = self.get()
   item, args, kwargs = self.get()
   item(*args, **kwargs)
   self.task_done()

worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:

我们可以通过下面的代码测试:


def blokkah(*args, **kwargs):
 time.sleep(5)
 print “Blokkah mofo!”

q = TaskQueue(num_workers=5)

for item in range(1):
 q.add_task(blokkah)

q.join() # wait for all the tasks to finish.

print “All done!”

Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。


def gradient_descent():
 # the gradient descent code
 queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。


from threading import Thread
import Queue
import time

class TaskQueue(Queue.Queue):

def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()

def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))

def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()

def worker(self):
while True:
tupl = self.get()
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()

def tests():
def blokkah(*args, **kwargs):
time.sleep(5)
print "Blokkah mofo!"

q = TaskQueue(num_workers=5)

for item in range(10):
q.add_task(blokkah)

q.join() # block until all tasks are done
print "All done!"

if __name__ == "__main__":
tests()
标签:Python,多线程,任务队列
0
投稿

猜你喜欢

  • 基于python的七种经典排序算法(推荐)

    2023-06-16 18:21:43
  • 通过SQL Server的位运算功能巧妙解决多选查询方法

    2024-01-22 01:21:26
  • MySQL中修改表结构时需要注意的一些地方

    2024-01-29 09:04:11
  • 将keras的h5模型转换为tensorflow的pb模型操作

    2021-06-05 15:14:06
  • window.print()局部打印三种方式(小结)

    2024-04-28 10:18:57
  • php 不能连接数据库 php error Can't connect to local MySQL server

    2023-11-07 10:43:50
  • 详解在Python程序中使用Cookie的教程

    2021-10-25 17:58:43
  • python3 http提交json参数并获取返回值的方法

    2023-09-26 12:30:06
  • Windows下安装python2.7及科学计算套装

    2023-05-28 13:35:19
  • python jinjia2的项目使用

    2021-03-16 04:57:21
  • TensorFlow人工智能学习张量及高阶操作示例详解

    2022-04-15 15:37:52
  • python标准库sys和OS的函数使用方法与实例详解

    2022-06-24 20:22:42
  • Go语言中的内存布局详解

    2024-05-21 10:19:55
  • sqlserver 日期比较、日期查询常用语句:月的第一天,季度的第一天等

    2010-08-01 18:58:00
  • Python 3.8新特征之asyncio REPL

    2023-10-08 02:59:58
  • Python手动或自动协程操作方法解析

    2023-06-30 11:38:41
  • windows10下python3.5 pip3安装图文教程

    2023-02-25 02:06:44
  • 微信小程序 详解下拉加载与上拉刷新实现方法

    2024-04-23 09:31:09
  • MySQL5.6下windows msi安装详细介绍

    2024-01-14 04:42:50
  • Express框架定制路由实例分析

    2024-05-11 10:16:42
  • asp之家 网络编程 m.aspxhome.com