跳转至

Python 多任务异步协程

异步协程概述

当程序执行 IO 操作时,线程此刻处于阻塞状态,协程就是当程序处于 IO 操作时候,可以选择性切换到其他任务上,在微观上协程是一个一个任务的切换,切换条件一般是执行 IO 操作,在宏观上,我们看到的是多任务异步操作,协程都是在单线程资源进行。如果使用异步协程编写,则使用的函数方法也应该是异步方法,同步方法会使异步失效。

基本使用

使用多任务异步协程一般需要配合使用的方法主要有:asyncio、aiohttp、aiofiles 。

常用写法:

Python
import asyncio


async def download(url):
    print(f"{url}开始下载")
    await asyncio.sleep(6)  # 模拟爬虫请求时间
    print(f"{url}下载完成")


async def main():
    url_list = [
        "https://www.baidu.com",
        "https://google.cn",
        "https://bing.cn"
    ]
    tasks = []
    # 多任务
    for i in url_list:
        # 将多个任务对象包装成task对象添加到列表
        task_obj = asyncio.create_task(download(i))
        tasks.append(task_obj)
    # 设置任务异步协程
    await asyncio.wait(tasks)


if __name__ == '__main__':
    # 启动异步协程
    asyncio.run(main())

代码示例:

Python
"""
1、请求目标网站获取m3u8链接
2、拿到第一个m3u8
3、根据第一个m3u8获取第二个m3u8
4、下载切片的视频
5、视频解密
6、合成视频
"""

import requests
import re
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
import os

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}


# 拿到第一个m3u8链接,并下载,读取第二个m3u8链接
def reps_1(url_1):
    reps = requests.get(url_1, headers=headers)
    reps.encoding = "utf-8"
    obj = re.compile(r'player_aaaa.*?"url":"(.*?)","url_next"', re.S)
    m3u8_url_1 = obj.findall(reps.text)[0].replace("\\", "")
    # print(m3u8_url_1)
    url_test = re.findall("(http.*?://.*?)/", m3u8_url_1)[0]
    # print(url_test)
    reps_two = requests.get(m3u8_url_1, headers=headers)
    m3u8_url_2 = url_test + reps_two.text.strip().split("\n")[-1]
    # print(m3u8_url_2)
    return m3u8_url_2, url_test


# 下载第二个m3u8文件
def reps_2(url_2):
    reps = requests.get(url_2, headers=headers)
    with open("m3u8.txt", "wb") as f:
        f.write(reps.content)


# download下载函数
async def download(session, url_ts, file_name):
    async with session.get(url_ts) as reps_3:
        async with aiofiles.open(f"vidos/{file_name}", "wb") as f:
            await f.write(await reps_3.content.read())
            print(url_ts, "ok")


# 开始下载切片视频主函数
async def download_main(url_3):
    num = 1
    tasks = []
    async with aiohttp.ClientSession() as session:
        async with aiofiles.open("m3u8.txt", "r", encoding="utf-8") as f:
            async for i in f:
                if "#" in i:
                    continue
                i = i.strip()
                file_name = i.rsplit("/", 1)[-1]
                url_ts = url_3 + i
                tasks.append(asyncio.create_task(download(session, url_ts, file_name)))
                num += 1
                # print(url_ts)
                # print(file_name)
                # break
            await asyncio.wait(tasks)


# 解密函数
async def decode(name, key):
    aes = AES.new(key, IV=b"0000000000000000", mode=AES.MODE_CBC)
    async with aiofiles.open(f"vidos/{name}", "rb") as f1, \
            aiofiles.open(f"vidos/temp_{name}", "wb") as f2:
        bs = await f1.read()
        await f2.write(aes.decrypt(bs))
        print(f"{name}解密完成")


# 下载的视频切片进行解密
async def decode_main(keys):
    tasks = []
    async with aiofiles.open("m3u8.txt", "r", encoding="utf-8") as f:
        async for i in f:
            if "#" in i:
                continue
            name = i.strip().rsplit("/", 1)[-1]
            print(name)
            task_obj = asyncio.create_task(decode(name, keys))
            tasks.append(task_obj)
        await asyncio.wait(tasks)


def main():
    url = "http://adahuoyun.com/vodplay/29814-1-1.html"
    # 获取第一个m3u8链接并下载对应文件
    m3u8_url_2, url_test = reps_1(url)
    # 从第一个m3u8获取第二个m3u8下载链接并下载
    reps_2(m3u8_url_2)
    # 使用异步协程执行下载
    asyncio.run(download_main(url_test))
    # 请求解码key获取解码密钥
    with open("m3u8.txt", "r", encoding="utf-8") as f:
        key_url_test = re.findall('EXT-X-KEY.*?URI="(.*?)"', f.read())[0]
        key_url = url_test + key_url_test
        # print(key_url)
    reps_key = requests.get(key_url, headers=headers)
    reps_key.encoding = "utf-8"
    key = reps_key.text
    print(key)
    # 开始解密
    asyncio.run(decode_main(key))
    # 合并视频
    lis = []
    with open("m3u8.txt", "r", encoding="utf-8") as s:
        for i in s:
            if "#" in i:
                continue
            name_test = i.strip().rsplit("/", 1)[-1]
            name = f"vidos/temp_{name_test}"
            lis.append(name)
    str_lis = " ".join(lis)
    # print()
    os.system(f"cat {str_lis} > one.mp4")


if __name__ == '__main__':
    main()

性能提升

Python
1
2
3
4
5
6
# 使用 pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 之后代码和以前代理一致