Python多线程与同步机制浅析

作者:alwaysrun 时间:2021-10-31 03:22:51 

线程实现

Python中线程有两种方式:函数或者用类来包装线程对象。threading模块中包含了丰富的多线程支持功能:

  • threading.currentThread(): 返回当前线程;

  • threading.enumerate(): 返回包含正在运行的线程列表;

  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())等价。

Thread类

通过Thread类来处理线程,类中提供的一些方法:

  • run(): 用以表示线程执行的方法(可重载实现实际功能);

  • start(): 启动线程;

  • join([time]): 等待线程中止(或者超时);

  • isAlive(): 返回线程是否活动;

  • getName(): 返回线程名;

  • setName(): 设置线程名;

  • setDaemon(True):设置为后台进程(必须在start调用前设定)。

函数方式

通过Thread直接构造线程,然后通过start方法启动线程:

threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

各参数说明:

  • group:指定线程隶属的线程组(当前忽略);

  • target:指定线程要调度的目标方法(即实现功能的函数);

  • args:传递给目标方法的参数(以元组的方式);

  • kwargs:传递给目标方法的参数(以字典的方式);

  • daemon:指定线程是否为后台线程。

def simpleRoutine(name, delay):
   print(f"routine {name} starting...")
   time.sleep(delay)
   print(f"routine {name} finished")
if __name__ == '__main__':
   thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
   thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
   thrOne.start()
   thrTwo.start()
   thrOne.join()
   thrTwo.join()

继承方式

直接继承Thread,创建一个新的子类(主要实现run方法):

class SimpleThread (threading.Thread):
   def __init__(self, name, delay):
       # threading.Thread.__init__(self)
       super().__init__()
       self.name = name
       self.delay = delay
   def run(self):
       print(f"thread {self.name} starting...")
       time.sleep(self.delay)
       print(f"thread {self.name} finished")
if __name__ == '__main__':
   thrOne = SimpleThread("First", 2)
   thrTwo = SimpleThread("Second", 2)
   thrOne.start()
   thrTwo.start()
   thrOne.join()
   thrTwo.join()

同步机制

当多个线程同时修改同一条数据时可能会出现脏数据;所以,就需要线程锁,即同一时刻只允许一个线程执行操作。

同步锁Lock

threading提供了Lock和RLock(可重入锁)两个类,它们都提供了如下两个方法来加锁和释放锁:

  • acquire(blocking=True, timeout=-1):加锁,其中 timeout 参数指定加锁多少秒。

  • release():释放锁。

两种使用锁的方式:

gCount = 0
def PlusOne(locker):
   global gCount
     with locker:
         gCount += 1、
def MinusOne(locker):
   global gCount
     if locker.acquire():
         gCount -= 1
         locker.release()

条件变量Condition

Condition对象内部维护了一个锁(构造时可传递一个Lock/RLock对象,否则内部会自行创建一个RLock)和一个waiting池:

  • 通过acquire获得Condition对象;

  • 当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程;

  • 当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象:

__init__(self,lock=None):Condition类总是与一个锁相关联(若不指定lock参数,会自动创建一个与之绑定的RLock对象);

acquire(timeout):调用关联锁的acquire()方法;

release():调用关联锁的release()方法

wait(timeout):线程挂起,直到收到一个notify通知或超时才会被唤醒;必须在已获得锁的前提下调用;

notify(n=1):唤醒waiting池中的n个正在等待的线程并通知它:

  • 收到通知的线程将自动调用acquire()方法尝试加锁;

  • 若waiting池中有多个线程,随机选择n个唤醒;

  • 必须在已获得锁的前提下调用,否则将引发错误。

notify_all():通知所有线程。

class Producer(threading.Thread):
   def __init__(self, cond, storage):
       threading.Thread.__init__(self)
       self.cond = cond
       self.storage = storage
   def run(self):
       label = 1
       while True:
           with self.cond:
               if len(self.storage) < 10:
                   self.storage.append(label)
                   print(f"<- Produce {label} product")
                   label += 1
                   self.cond.notify(2)
               else:
                   print(f"<- storage full: Has Produced {label - 1} product")
                   self.cond.notify_all()
                   self.cond.wait()
               time.sleep(0.4)
class Consumer(threading.Thread):
   def __init__(self, name, cond, storage):
       threading.Thread.__init__(self)
       self.name = name
       self.cond = cond
       self.storage = storage
   def run(self):
       while True:
           if self.cond.acquire():
               if len(self.storage) > 1:
                   pro = self.storage.pop(0)
                   print(f"-> {self.name} consumed {pro}")
                   self.cond.notify()
               else:
                   print(f"-> {self.name} storage empty: no product to consume")
                   self.cond.wait()
               self.cond.release()
               time.sleep(1)

信号量Semaphore

信号量对象内部维护一个计数器:

  • acquire(blocking=True,timeout=None)时减1,当计数为0就阻塞请求的线程;

  • release()时加1,当计数大于0恢复被阻塞的线程;

threading中有Semaphore和BoundedSemaphore两个信号量;BoundedSemaphore限制了release的次数,任何时候计数器的值,都不不能大于初始值(release时会检测计数器的值,若大于等于初始值,则抛出ValueError异常)。

通过Semaphore维护生产(release一个)、消费(acquire一个)量:

# products = threading.Semaphore(0)
def produceOne(label, sem: threading.Semaphore):
   sem.release()
   print(f"{label} produce one")
def consumeOne(label, sem: threading.Semaphore):
   sem.acquire()
   print(f"{label} consume one")

通过BoundedSemaphore来控制并发数量(最多有Semaphore初始值数量的线程并发):

# runner = threading.BoundedSemaphore(3)
def runBound(name, sem: threading.BoundedSemaphore):
   with sem:
       print(f"{name} is running")
       time.sleep(1)
       print(f"{name} finished")

事件Event

事件对象内部有个标志字段,用于线程等待事件的发生:

  • isSet():返回event的状态值;

  • wait():状态为False时,一直阻塞;否则立即返回;

  • set(): 设置状态值为True,激活所有被阻塞的线程;

  • clear():恢复状态值为False。

多线程等待事件发生,然后开始执行:

def waiters(name, evt: threading.Event):
   evt.wait()
   print(f"{name} is running")
   time.sleep(1)
   print(f"{name} finished")
def starting(evt: threading.Event):
   evt.set()
   print("event is set")

屏障Barrier

屏障用于设定等待线程数量,当数量达到指定值时,开始执行:

threading.Barrier(parties, action=None, timeout=None)

屏障属性与方法:

  • wait(timeout=None):等待通过屏障;线程被阻塞,直到阻塞的数量达到parties时,被阻塞的线程被同时全部释放;

  • reset():重置屏障到默认的空状态;

  • abort():将障碍置为断开状态;导致等待的线程引发BrokenBarrierError异常;

  • partier():通过障碍所需的线程数;

  • n_waiting():当前在屏障中等待的线程数;

  • broken():如果屏障处于断开状态,则返回True。

def waitBarrier(name, barr: threading.Barrier):
   print(f"{name} waiting for open")
   try:
       barr.wait()
       print(f"{name} running")
       time.sleep(5)
   except threading.BrokenBarrierError:
       print(f"{name} exception")
   print(f"{name} finished")

GIL全局解释器锁

GIL(Global Interpreter Lock,全局解释器锁);cpython中,某个线程想要执行,必须先拿到GIL(可以把GIL看作是&ldquo;通行证&rdquo;)。每次释放GIL锁,线程都要进行锁竞争,切换线程,会消耗资源。

由于GIL锁的存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程),这就是为什么在多核CPU上,python的多线程效率并不高:

  • CPU密集型代码:由于计算工作多,会很快用完时间片,然后触发GIL的释放与再竞争;

  • IO密集型代码(文件处理、网络爬虫等):多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。

python在使用多线程的时候,调用的是c语言的原生线程:

  • 拿到公共数据

  • 申请GIL

  • python解释器调用os原生线程

  • os操作cpu执行运算

  • 当线程执行时间到后,就进行切换(context switch)

来源:https://blog.csdn.net/alwaysrun/article/details/127157924

标签:Python,多线程,同步
0
投稿

猜你喜欢

  • Python实现完全数的示例详解

    2021-11-21 20:09:30
  • php bugs代码审计基础详解

    2023-06-02 13:49:52
  • php中使用session_set_save_handler()函数把session保存到MySQL数据库实例

    2023-11-18 01:11:16
  • 解析Oracle数据库中的对象集合schema

    2023-07-22 20:42:34
  • 关于设计规范

    2008-06-02 13:10:00
  • 一个不错的javascript加密解密算法源码

    2010-03-28 13:12:00
  • 删除多余的属性 xmlns=""

    2010-08-24 18:41:00
  • 如何利用pyinstaller打包Python程序为exe可执行文件

    2023-11-08 08:01:39
  • SQL Server 数据库备份和还原认识和总结 (一)

    2012-10-07 10:52:54
  • 浅谈python在提示符下使用open打开文件失败的原因及解决方法

    2023-12-07 18:31:33
  • 浅议 Web 网页 Form 表单设计技巧

    2007-10-09 13:05:00
  • Django RestFramework 全局异常处理详解

    2023-12-15 03:16:42
  • SQL文本字段的数字排序问题

    2008-11-18 16:47:00
  • MySQL数据库的授权原则

    2008-12-29 13:39:00
  • Python分割单词和转换命名法的实现

    2023-11-24 00:06:16
  • 解决Dreamweaver不支持中文文件名方法

    2008-01-09 12:52:00
  • PDO::setAttribute讲解

    2023-06-05 18:04:23
  • 关于H1的用法探讨

    2008-03-18 12:55:00
  • 快速掌握如何使用SQL Server来过滤数据

    2009-01-15 13:27:00
  • MySQL百万级高并发网站实战攻略

    2009-03-25 15:49:00
  • asp之家 网络编程 m.aspxhome.com