python爬取基于m3u8协议的ts文件并合并

作者:彼时思默 时间:2021-11-03 16:44:45 

前言

简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探HTML5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.

源码


# coding=utf-8
import asyncio
import multiprocessing
import os
import re
import time
from math import floor
from multiprocessing import Manager
import aiohttp
import requests
from lxml import html
import threading
from src.my_lib import retry
from src.my_lib import time_statistics

class M3U8Download:
_path = "./resource\\" # 本地文件路径
_url_seed = None # 资源所在链接前缀
_target_url = {} # 资源任务目标字典
_mode = ""
_headers = {"User-agent": "Mozilla/5.0"} # 浏览器代理
_target_num = 100

def __init__(self):
self._ml = Manager().list() # 进程通信列表
if not os.path.exists(self._path): # 检测本地目录存在否
 os.makedirs(self._path)
exec_str = r'chcp 65001'
os.system(exec_str) # 先切换utf-8输出,防止控制台乱码

def sniffing(self, url):
self._url = url
print("开始嗅探...")
try:
 r = requests.get(self._url) # 访问嗅探网址,获取网页信息
except:
 print("嗅探失败,网址不正确")
 os.system("pause")
else:
 tree = html.fromstring(r.content)
 try:
 source_url = tree.xpath('//video//source/@src')[0] # 嗅探资源控制文件链接,这里只针对一个资源控制文件
 # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名
 except:
 print("嗅探失败,未发现资源")
 os.system("pause")
 else:
 self.analysis(source_url)

def analysis(self, source_url):
try:
 self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名
 with requests.get(source_url) as r: # 访问资源控制文件,获得资源信息
 src = re.split("\n*#.+\n", r.text) # 解析资源信息
 for sub_src in src: # 将资源地址储存到任务字典
  if sub_src:
  self._target_url[sub_src] = self._url_seed + "/" + sub_src
except Exception as e:
 print("资源无法成功解析", e)
 os.system("pause")
else:
 self._target_num = len(self._target_url)
 print("sniffing success!!!,found", self._target_num, "url.")
 self._mode = input(
 "1:-> 单进程(Low B)\n2:-> 多进程+多线程(网速开始biubiu飞起!)\n3:-> 多进程+协程(最先进的并发!!!)\n")
 if self._mode == "1":
 for path, url in self._target_url.items():
  self._download(path, url)
 elif self._mode == "2" or self._mode == "3":
 self._multiprocessing()

def _multiprocessing(self, processing_num=4): # 多进程,多线程
target_list = {} # 进程任务字典,储存每个进程分配的任务
pool = multiprocessing.Pool(processes=processing_num) # 开启进程池
i = 0 # 任务分配标识
for path, url in self._target_url.items(): # 分配进程任务
 target_list[path] = url
 i += 1
 if i % 10 == 0 or i == len(self._target_url): # 每个进程分配十个任务
 if self._mode == "2":
  pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多线程驱动方法
 else:
  pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用协程驱动方法
 target_list = {}
pool.close() # join函数等待所有子进程结束
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool
while True:
 if self._judge_over():
 self._combine()
 break

def _sub_multithreading(self, **kwargs):
for path, url in kwargs.items(): # 根据进程任务开启线程
 t = threading.Thread(target=self._download, args=(path, url,))
 t.start()

@retry()
def _download(self, path, url): # 同步下载方法
with requests.get(url, headers=self._headers) as r:
 if r.status_code == 200:
 with open(self._path + path, "wb")as file:
  file.write(r.content)
 self._ml.append(0) # 每成功一个就往进程通信列表增加一个值
 percent = '%.2f' % (len(self._ml) / self._target_num * 100)
 print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 显示下载进度
 else:
 print(path, r.status_code, r.reason)

def _sub_coroutine(self, **kwargs):
tasks = []
for path, url in kwargs.items(): # 根据进程任务创建协程任务列表
 tasks.append(asyncio.ensure_future(self._async_download(path, url)))
loop = asyncio.get_event_loop() # 创建异步事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 注册任务列表

async def _async_download(self, path, url): # 异步下载方法
async with aiohttp.ClientSession() as session:
 async with session.get(url, headers=self._headers) as resp:
 try:
  assert resp.status == 200, "E" # 断言状态码为200,否则抛异常,触发重试装饰器
  with open(self._path + path, "wb")as file:
  file.write(await resp.read())
 except Exception as e:
  print(e)
 else:
  self._ml.append(0) # 每成功一个就往进程通信列表增加一个值
  percent = '%.2f' % (len(self._ml) / self._target_num * 100)
  print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 显示下载进度

def _combine(self): # 组合资源方法
try:
 print("开始组合资源...")
 identification = str(floor(time.time()))
 exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"'
 os.system(exec_str) # 使用cmd命令将资源整合
 exec_str = r'del "' + self._path + r'*.ts"'
 os.system(exec_str) # 删除原来的文件
except:
 print("资源组合失败")
else:
 print("资源组合成功!")

def _judge_over(self): # 判断是否全部下载完成
if len(self._ml) == len(self._target_url):
 return True
return False

@time_statistics
def app():
multiprocessing.freeze_support()
url = input("输入嗅探网址:\n")
m3u8 = M3U8Download()
m3u8.sniffing(url)
# m3u8.analysis(url)

if __name__ == "__main__":
app()

这里是两个装饰器的实现:


import time

def time_statistics(fun):
def function_timer(*args, **kwargs):
t0 = time.time()
result = fun(*args, **kwargs)
t1 = time.time()
print("Total time running %s: %s seconds" % (fun.__name__, str(t1 - t0)))
return result

return function_timer

def retry(retries=3):
def _retry(fun):
def wrapper(*args, **kwargs):
 for _ in range(retries):
 try:
  return fun(*args, **kwargs)
 except Exception as e:
  print("@", fun.__name__, "->", e)

return wrapper

return _retry

打包成exe文件

使用PyInstaller -F download.py将程序打包成单个可执行文件.
这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.

关于协程

协程在Python3.5进化到了async await版本,用 async 标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.

关于装饰器

装饰器源于闭包原理,这里使用了两种装饰器.

  • @time_statistics:统计耗时,装饰器自己无参型

  • @retry():设置重试次数,装饰器自己有参型

  • 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于():

  • 不带括号时,调用的是这个函数本身

  • 带括号(此时必须传入需要的参数),调用的是函数的return结果

关于CMD控制台

程序会使用CMD命令来将下载的ts文件合并.
因为CMD默认使用GB2312编码,调用os.system()需要先切换成通用的UTF-8输出,否则系统信息会乱码.
而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.

来源:https://blog.csdn.net/qq_37258787/article/details/80298084

标签:python,爬取,m3u8
0
投稿

猜你喜欢

  • python基础while循环及if判断的实例讲解

    2021-02-18 06:56:06
  • python设置环境变量的作用整理

    2022-09-01 17:08:55
  • python3将视频流保存为本地视频文件

    2023-07-26 12:14:45
  • Python使用dict.fromkeys()快速生成一个字典示例

    2022-05-10 08:13:23
  • Python内建类型int源码学习

    2023-07-02 19:22:15
  • 通过Python来使用七牛云存储的方法详解

    2022-09-13 19:56:36
  • sp_executesql 使用复杂的Unicode 表达式错误的解决方法

    2012-01-29 17:58:52
  • python使用正则表达式匹配txt特定字符串(有换行)

    2023-03-23 07:46:03
  • CSS网页设计时关于字体大小的设计

    2008-10-23 13:42:00
  • 透彻掌握ASP分页技术

    2009-03-09 18:26:00
  • Python进阶之全面解读高级特性之切片

    2023-08-06 21:28:00
  • oracle 重置序列从指定数字开始的方法详解

    2023-07-05 02:40:04
  • Python入门教程(八)PythonCasting用法

    2021-11-14 02:20:41
  • 常见SQL Server 2000漏洞及其相关利用

    2007-10-01 14:45:00
  • Python获取android设备cpu和内存占用情况

    2023-02-25 20:46:47
  • python实现逢七拍腿小游戏的思路详解

    2021-02-28 23:44:29
  • Python中操作文件之write()方法的使用教程

    2023-12-29 06:06:13
  • 玩转CSS3色彩[译]

    2010-01-13 13:02:00
  • 使用 JScript 创建 .exe 或 .dll 文件

    2011-06-04 15:37:00
  • 使用python刷访问量的示例代码

    2023-11-09 12:55:30
  • asp之家 网络编程 m.aspxhome.com