python 协程并发数控制

作者:梦想橡皮擦 时间:2023-09-20 01:07:10 

前言:

本篇博客要采集的站点:【看历史,通天下-历史剧网】

目标数据是该站点下的热门历史事件,列表页分页规则如下所示:

http://www.lishiju.net/hotevents/p0
http://www.lishiju.net/hotevents/p1
http://www.lishiju.net/hotevents/p2

首先我们通过普通的多线程,对该数据进行采集,由于本文主要目的是学习如何控制并发数,所以每页仅输出历史事件的标题内容。

普通的多线程代码:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
   def __init__(self, url):
       threading.Thread.__init__(self)
       self.__url = url
   def run(self):
       res = requests.get(url=self.__url)
       soup = BeautifulSoup(res.text, 'html.parser')
       title_tags = soup.find_all(attrs={'class': 'item-title'})
       event_names = [item.a.text for item in title_tags]
       print(event_names)
       print("")
if __name__ == "__main__":
   start_time = time.perf_counter()
   threads = []
   for i in range(111):  # 创建了110个线程。
       threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
   for t in threads:
       t.start()  # 启动了110个线程。
   for t in threads:
       t.join()  # 等待线程结束
   print("累计耗时:", time.perf_counter() - start_time)
   # 累计耗时: 1.537718624

上述代码同时开启所有线程,累计耗时 1.5 秒,程序采集结束。

多线程之信号量

python 信号量(Semaphore)用来控制线程并发数,信号量管理一个内置的计数器。 信号量对象每次调用其 acquire()方法时,信号量计数器执行 -1 操作,调用 release()方法,计数器执行 +1 操作,当计数器等于 0 时,acquire()方法会阻塞线程,一直等到其它线程调用 release()后,计数器重新 +1,线程的阻塞才会解除。

使用 threading.Semaphore()创建一个信号量对象。

修改上述并发代码:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
   def __init__(self, url):
       threading.Thread.__init__(self)
       self.__url = url
   def run(self):
       if semaphore.acquire():  # 计数器 -1
           print("正在采集:", self.__url)
           res = requests.get(url=self.__url)
           soup = BeautifulSoup(res.text, 'html.parser')
           title_tags = soup.find_all(attrs={'class': 'item-title'})
           event_names = [item.a.text for item in title_tags]
           print(event_names)
           print("")
           semaphore.release()  # 计数器 +1
if __name__ == "__main__":
   semaphore = threading.Semaphore(5)  # 控制每次最多执行 5 个线程
   start_time = time.perf_counter()
   threads = []
   for i in range(111):  # 创建了110个线程。
       threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
   for t in threads:
       t.start()  # 启动了110个线程。
   for t in threads:
       t.join()  # 等待线程结束
   print("累计耗时:", time.perf_counter() - start_time)
   # 累计耗时: 2.8005530640000003

当控制并发线程数量之后,累计耗时变多。

补充知识点之 GIL:

GIL是 python 里面的全局解释器锁(互斥锁),在同一进程,同一时间下,只能运行一个线程,这就导致了同一个进程下多个线程,只能实现并发而不能实现并行

需要注意 python 语言并没有全局解释锁,只是因为历史的原因,在 CPython解析器中,无法移除 GIL,所以使用 CPython解析器,是会受到互斥锁影响的。

还有一点是在编写爬虫程序时,多线程比单线程性能是有所提升的,因为遇到 I/O 阻塞会自动释放 GIL锁。

协程中使用信号量控制并发

下面将信号量管理并发数,应用到协程代码中,在正式编写前,使用协程写法重构上述代码。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(url):
   print("正在采集:", url)
   async with aiohttp.request('GET', url) as res:
       html = await res.text()
       soup = BeautifulSoup(html, 'html.parser')
       title_tags = soup.find_all(attrs={'class': 'item-title'})
       event_names = [item.a.text for item in title_tags]
       print(event_names)
async def main():
   tasks = [asyncio.ensure_future(get_title("http://www.lishiju.net/hotevents/p{}".format(i))) for i in range(111)]
   dones, pendings = await asyncio.wait(tasks)
   # for task in dones:
   #     print(len(task.result()))
if __name__ == '__main__':
   start_time = time.perf_counter()
   asyncio.run(main())
   print("代码运行时间为:", time.perf_counter() - start_time)
   # 代码运行时间为: 1.6422313430000002

代码一次性并发 110 个协程,耗时 1.6 秒执行完毕,接下来就对上述代码,增加信号量管理代码。

核心代码是 semaphore = asyncio.Semaphore(10),控制事件循环中并发的协程数量。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(semaphore, url):
   async with semaphore:
       print("正在采集:", url)
       async with aiohttp.request('GET', url) as res:
           html = await res.text()
           soup = BeautifulSoup(html, 'html.parser')
           title_tags = soup.find_all(attrs={'class': 'item-title'})
           event_names = [item.a.text for item in title_tags]
           print(event_names)
async def main():
   semaphore = asyncio.Semaphore(10)  # 控制每次最多执行 10 个线程
   tasks = [asyncio.ensure_future(get_title(semaphore, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
            range(111)]
   dones, pendings = await asyncio.wait(tasks)
   # for task in dones:
   #     print(len(task.result()))
if __name__ == '__main__':
   start_time = time.perf_counter()
   asyncio.run(main())
   print("代码运行时间为:", time.perf_counter() - start_time)
   # 代码运行时间为: 2.227831242

aiohttp 中 TCPConnector 连接池

既然上述代码已经用到了 aiohttp 模块,该模块下通过限制同时连接数,也可以控制线程并发数量,不过这个不是很好验证,所以从数据上进行验证,先设置控制并发数为 2,测试代码运行时间为 5.56 秒,然后修改并发数为 10,得到的时间为 1.4 秒,与协程信号量控制并发数得到的时间一致。所以使用 TCPConnector 连接池控制并发数也是有效的。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(session, url):
   async with session.get(url) as res:
       print("正在采集:", url)
       html = await res.text()
       soup = BeautifulSoup(html, 'html.parser')
       title_tags = soup.find_all(attrs={'class': 'item-title'})
       event_names = [item.a.text for item in title_tags]
       print(event_names)
async def main():
   connector = aiohttp.TCPConnector(limit=1)  # 限制同时连接数
   async with aiohttp.ClientSession(connector=connector) as session:
       tasks = [asyncio.ensure_future(get_title(session, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
                range(111)]
       await asyncio.wait(tasks)
if __name__ == '__main__':
   start_time = time.perf_counter()
   asyncio.run(main())
   print("代码运行时间为:", time.perf_counter() - start_time)

来源:https://juejin.cn/post/7068824518902906916

标签:python,协程,并发,控制
0
投稿

猜你喜欢

  • 使用pytorch提取卷积神经网络的特征图可视化

    2023-02-01 20:32:30
  • python热力图实现简单方法

    2023-10-28 06:14:41
  • pytorch forward两个参数实例

    2022-09-05 09:54:34
  • 分享一个简单的python读写文件脚本

    2022-11-21 20:03:00
  • 常见SQL Server 2000漏洞及其相关利用

    2007-10-01 14:45:00
  • python实现人脸签到系统

    2023-12-21 06:35:05
  • 简单了解python列表和元组的区别

    2022-02-11 17:14:43
  • Python利用imshow制作自定义渐变填充柱状图(colorbar)

    2023-07-14 00:27:57
  • Go语言中数组的基本用法演示

    2024-02-09 21:26:34
  • CSS滤镜示范(filter)附源代码(静态滤镜)

    2008-05-18 12:42:00
  • 10分钟学会Google Map API (二)

    2009-06-07 18:14:00
  • Python实现对图像加噪(高斯噪声 椒盐噪声)

    2023-06-15 03:08:34
  • 如何利用Python解析超大的json数据(GB级别)

    2023-03-22 12:08:21
  • python操作XML格式文件的一些常见方法

    2023-02-10 00:06:12
  • 基于golang的简单分布式延时队列服务的实现

    2024-05-08 10:44:03
  • ASP:一个网站空间多个域名访问

    2008-11-21 17:03:00
  • Python列表的深复制和浅复制示例详解

    2023-01-29 01:26:47
  • Python pygame项目实战英雄动画特效实现

    2021-02-22 21:07:50
  • Pandas数据离散化原理及实例解析

    2022-06-26 21:45:14
  • 加快Firefox 3.5启动速度的方法

    2009-07-16 15:22:00
  • asp之家 网络编程 m.aspxhome.com