脚本思路:
1.创建一个文件夹,用来存放下载好的m3u8文件和下载好的ts文件
2.下载并打开m3u8文件,根据m3u8文件下载ts文件,这边设计了两种情况,a.ts的下载链接是完整的 b.ts的下载链接是需要拼接的
3.根据m3u8文件自动校验文件是否下载完整
4.由于很多时候ts文件的命名是没有规律的,所以再次打开m3u8文件,根据里面的顺序,以追加的形式写入到一个新的ts文件里
代码实现:
先创建好文件夹,这边使用了相对路径
def init():
if os.path.exists("./temp_data"):
return
else:
os.mkdir("./temp_data")
得到m3u8下载链接,获得m3u8文件名,这边假设是https://xxxxxxx126.net/nos/hls/2019/03/13/1214418271_9xxxxxxx32465d1f4c8_sd.m3u8,那么就设置“1214418271_9xxxxxxx32465d1f4c8_sd.m3u8”为文件名
url =str(input("输入m3u8文件url >"))
name = url.rsplit("/")[-1]
下载m3u8文件
def m3u8_files_download(url,name): #下载m3u8文件
resp = requests.get(url)
with open(f"temp_data/{name}.txt",mode="wb") as f:
f.write(resp.content)
resp.close()
给出第一个ts的下载链接,用户自己判断一下是需要拼接的,还是无需拼接的完整url
def get_type(name):
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
print("内容为:",line)
print("选择模式: 1.直接下载型 2.拼接型")
choice = input(">")
return str(choice)
写一个启动器,根据不同的选择,创建不同的任务,创建的任务为异步任务
async def starter(choice,name):
tasks=[]
async with aiohttp.ClientSession() as session:
if choice =="1":
with open(f"/temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
download_url = line.strip()
line = line.split("/")
file_name = str(line[-1]).strip() # 得下载的ts文件名
task = download_ts(file_name,download_url,session)
tasks.append(task)
print("文件下载中.....")
await asyncio.wait(tasks) # 等待任务执行结束
print("文件下载完成")
if choice=="2":
url = str(input("输入拼接的url>"))
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
line = line.strip()
file_name = line # 得下载的ts文件名
download_url = url+line
task = download_ts(file_name,download_url,session)
tasks.append(task)
print("文件下载中.....")
await asyncio.wait(tasks) # 等待任务执行结束
print("文件下载完成")
下载ts文件,用aiohttp来代理requests
async def aio_download_ts(download_url,line_name,session):
async with session.get(download_url,headers=header) as resp:
async with aiofiles.open(f"temp_data/{line_name}",mode="wb") as f:
await f.write(await resp.content.read())
print(f"文件{line_name}下载完成!!")
校验文件的完整性:依据m3u8文件,判断文件是否存在
def verification(name):
files=[]
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
line=line.strip()
if os.path.exists(f"temp_data/{line}"):
continue
else:
files.append(line)
print("以下文件缺失,请手动查看:",files)
合并文件,实现的方式时创建一个ts文件,依据m3u8文件里的文件顺序,依次将二进制文件写入到新的ts文件里
def merge_ts(file_name):
new_name = str(input("输入合并后的文件名>"))
with open(f"./{new_name}.ts", "ab+") as f:
with open(f"temp_data/{file_name}.txt","r") as f2:
for line in f2:
if line.startswith("#"):
continue
else:
line = line.strip().split("/")[-1].strip()
ts_name = line
try:
with open(f"temp_data/{ts_name}","rb") as f3:
f.write(f3.read())
except:
continue
最后再写一个主函数,执行这一切
def main():
init()
url =str(input("输入m3u8文件url >"))
name = url.rsplit("/")[-1]
m3u8_files_download(url,name)#下载m3u8文件
choice=get_type(name)
asyncio.run(starter(choice,name))
print("校验文件完整性")
verification(name)
print("是否合并文件? Y/N")
if str(input(">"))=="Y":
merge_ts(name)
else:
print("结束")
最终功能代码
import aiohttp
import aiofiles
import asyncio
import requests
import os
header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36"}
def merge_ts(file_name):
new_name = str(input("输入合并后的文件名>"))
with open(f"./{new_name}.ts", "ab+") as f:
with open(f"temp_data/{file_name}.txt","r") as f2:
for line in f2:
if line.startswith("#"):
continue
else:
line = line.strip().split("/")[-1].strip()
ts_name = line
try:
with open(f"temp_data/{ts_name}","rb") as f3:
f.write(f3.read())
except:
continue
async def aio_download_ts(download_url,line_name,session):
async with session.get(download_url,headers=header) as resp:
async with aiofiles.open(f"temp_data/{line_name}",mode="wb") as f:
await f.write(await resp.content.read())
print(f"文件{line_name}下载完成!!")
def m3u8_files_download(url,name): #下载m3u8文件
resp = requests.get(url)
with open(f"temp_data/{name}.txt",mode="wb") as f:
f.write(resp.content)
resp.close()
def get_type(name):
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
print("内容为:",line)
print("选择模式: 1.直接下载型 2.拼接型")
choice = input(">")
return str(choice)
def init():
if os.path.exists("./temp_data"):
return
else:
os.mkdir("./temp_data")
def verification(name):
files=[]
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
line=line.strip()
if os.path.exists(f"temp_data/{line}"):
continue
else:
files.append(line)
print("以下文件缺失,请手动查看:",files)
async def download_ts(file_name,download_url,session):
async with session.get(download_url,headers=header) as resp:
async with aiofiles.open(f"temp_data/{file_name}",mode="wb") as f:
await f.write(await resp.content.read())
async def starter(choice,name):
tasks=[]
async with aiohttp.ClientSession() as session:
if choice =="1":
with open(f"/temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
download_url = line.strip()
line = line.split("/")
file_name = str(line[-1]).strip() # 得下载的ts文件名
task = download_ts(file_name,download_url,session)
tasks.append(task)
print("文件下载中.....")
await asyncio.wait(tasks) # 等待任务执行结束
print("文件下载完成")
if choice=="2":
url = str(input("输入拼接的url>"))
with open(f"temp_data/{name}.txt","r") as f:
for line in f:
if line.startswith("#"):
continue
else:
line = line.strip()
file_name = line # 得下载的ts文件名
download_url = url+line
task = download_ts(file_name,download_url,session)
tasks.append(task)
print("文件下载中.....")
await asyncio.wait(tasks) # 等待人物执行结束
print("文件下载完成")
def main():
init()
url =str(input("输入m3u8文件url >"))
name = url.rsplit("/")[-1]
m3u8_files_download(url,name)#下载m3u8文件
choice=get_type(name)
asyncio.run(starter(choice,name))
print("校验文件完整性")
verification(name)
print("是否合并文件? Y/N")
if str(input(">"))=="Y":
merge_ts(name)
else:
print("结束")
main()
使用自欺欺人术,直接把ts文件后缀改成MP4,看着舒服点。
视频打开能正常观看,脚本完成
后记:关于脚本的使用
理论上把aiohttp,aiofiles,asyncio三个库安装好,复制粘贴应该就可以直接用,也可以把一些需要手工提供的量,在脚本中写死,以在不同的爬虫中使用。
ENDING..........