"""
1、请求目标网站获取m3u8链接
2、拿到第一个m3u8
3、根据第一个m3u8获取第二个m3u8
4、下载切片的视频
5、视频解密
6、合成视频
"""
import requests
import re
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
import os
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
# 拿到第一个m3u8链接,并下载,读取第二个m3u8链接
def reps_1(url_1):
reps = requests.get(url_1, headers=headers)
reps.encoding = "utf-8"
obj = re.compile(r'player_aaaa.*?"url":"(.*?)","url_next"', re.S)
m3u8_url_1 = obj.findall(reps.text)[0].replace("\\", "")
# print(m3u8_url_1)
url_test = re.findall("(http.*?://.*?)/", m3u8_url_1)[0]
# print(url_test)
reps_two = requests.get(m3u8_url_1, headers=headers)
m3u8_url_2 = url_test + reps_two.text.strip().split("\n")[-1]
# print(m3u8_url_2)
return m3u8_url_2, url_test
# 下载第二个m3u8文件
def reps_2(url_2):
reps = requests.get(url_2, headers=headers)
with open("m3u8.txt", "wb") as f:
f.write(reps.content)
# download下载函数
async def download(session, url_ts, file_name):
async with session.get(url_ts) as reps_3:
async with aiofiles.open(f"vidos/{file_name}", "wb") as f:
await f.write(await reps_3.content.read())
print(url_ts, "ok")
# 开始下载切片视频主函数
async def download_main(url_3):
num = 1
tasks = []
async with aiohttp.ClientSession() as session:
async with aiofiles.open("m3u8.txt", "r", encoding="utf-8") as f:
async for i in f:
if "#" in i:
continue
i = i.strip()
file_name = i.rsplit("/", 1)[-1]
url_ts = url_3 + i
tasks.append(asyncio.create_task(download(session, url_ts, file_name)))
num += 1
# print(url_ts)
# print(file_name)
# break
await asyncio.wait(tasks)
# 解密函数
async def decode(name, key):
aes = AES.new(key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f"vidos/{name}", "rb") as f1, \
aiofiles.open(f"vidos/temp_{name}", "wb") as f2:
bs = await f1.read()
await f2.write(aes.decrypt(bs))
print(f"{name}解密完成")
# 下载的视频切片进行解密
async def decode_main(keys):
tasks = []
async with aiofiles.open("m3u8.txt", "r", encoding="utf-8") as f:
async for i in f:
if "#" in i:
continue
name = i.strip().rsplit("/", 1)[-1]
print(name)
task_obj = asyncio.create_task(decode(name, keys))
tasks.append(task_obj)
await asyncio.wait(tasks)
def main():
url = "http://adahuoyun.com/vodplay/29814-1-1.html"
# 获取第一个m3u8链接并下载对应文件
m3u8_url_2, url_test = reps_1(url)
# 从第一个m3u8获取第二个m3u8下载链接并下载
reps_2(m3u8_url_2)
# 使用异步协程执行下载
asyncio.run(download_main(url_test))
# 请求解码key获取解码密钥
with open("m3u8.txt", "r", encoding="utf-8") as f:
key_url_test = re.findall('EXT-X-KEY.*?URI="(.*?)"', f.read())[0]
key_url = url_test + key_url_test
# print(key_url)
reps_key = requests.get(key_url, headers=headers)
reps_key.encoding = "utf-8"
key = reps_key.text
print(key)
# 开始解密
asyncio.run(decode_main(key))
# 合并视频
lis = []
with open("m3u8.txt", "r", encoding="utf-8") as s:
for i in s:
if "#" in i:
continue
name_test = i.strip().rsplit("/", 1)[-1]
name = f"vidos/temp_{name_test}"
lis.append(name)
str_lis = " ".join(lis)
# print()
os.system(f"cat {str_lis} > one.mp4")
if __name__ == '__main__':
main()