添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
眉毛粗的大蒜  ·  数据库-postegres ...·  7 月前    · 
内向的菠萝  ·  kubectl patch ...·  1 年前    · 
眉毛粗的跑步鞋  ·  linux - How can I ...·  1 年前    · 
俊秀的石榴  ·  c - Eclipse CDT Debug ...·  1 年前    · 

脚本思路:

1.创建一个文件夹,用来存放下载好的m3u8文件和下载好的ts文件

2.下载并打开m3u8文件,根据m3u8文件下载ts文件,这边设计了两种情况,a.ts的下载链接是完整的 b.ts的下载链接是需要拼接的

3.根据m3u8文件自动校验文件是否下载完整

4.由于很多时候ts文件的命名是没有规律的,所以再次打开m3u8文件,根据里面的顺序,以追加的形式写入到一个新的ts文件里

代码实现:

先创建好文件夹,这边使用了相对路径

def init():
    if os.path.exists("./temp_data"):
        return
    else:
        os.mkdir("./temp_data")

得到m3u8下载链接,获得m3u8文件名,这边假设是https://xxxxxxx126.net/nos/hls/2019/03/13/1214418271_9xxxxxxx32465d1f4c8_sd.m3u8,那么就设置“1214418271_9xxxxxxx32465d1f4c8_sd.m3u8”为文件名

url =str(input("输入m3u8文件url >"))
name = url.rsplit("/")[-1]

下载m3u8文件

def m3u8_files_download(url,name):   #下载m3u8文件
    resp = requests.get(url)
    with open(f"temp_data/{name}.txt",mode="wb") as f:
        f.write(resp.content)
    resp.close()

给出第一个ts的下载链接,用户自己判断一下是需要拼接的,还是无需拼接的完整url

def get_type(name):
    with open(f"temp_data/{name}.txt","r") as f:
         for line in f:
            if line.startswith("#"):
                continue
            else:
                print("内容为:",line)
                print("选择模式: 1.直接下载型  2.拼接型")
                choice = input(">")
                return str(choice)

写一个启动器,根据不同的选择,创建不同的任务,创建的任务为异步任务

async def starter(choice,name):
    tasks=[]
    async with aiohttp.ClientSession() as session:
        if choice =="1":
            with open(f"/temp_data/{name}.txt","r") as f:
                for line in f:
                    if line.startswith("#"):
                        continue
                    else:
                        download_url = line.strip()
                        line = line.split("/")
                        file_name = str(line[-1]).strip()  # 得下载的ts文件名
                        task = download_ts(file_name,download_url,session)
                        tasks.append(task)
                print("文件下载中.....")
                await asyncio.wait(tasks)  # 等待任务执行结束
                print("文件下载完成")
        if choice=="2":
            url = str(input("输入拼接的url>"))
            with open(f"temp_data/{name}.txt","r") as f:
                for line in f:
                    if line.startswith("#"):
                        continue
                    else:
                        line = line.strip()
                        file_name = line # 得下载的ts文件名
                        download_url = url+line
                        task = download_ts(file_name,download_url,session)
                        tasks.append(task)
                print("文件下载中.....")
                await asyncio.wait(tasks)  # 等待任务执行结束
                print("文件下载完成")

下载ts文件,用aiohttp来代理requests

async def aio_download_ts(download_url,line_name,session):
    async with session.get(download_url,headers=header) as resp:
        async with aiofiles.open(f"temp_data/{line_name}",mode="wb") as f:
            await f.write(await resp.content.read())
        print(f"文件{line_name}下载完成!!")

校验文件的完整性:依据m3u8文件,判断文件是否存在

def verification(name):
    files=[]
    with open(f"temp_data/{name}.txt","r") as f:
        for line in f:
            if line.startswith("#"):
                continue
            else:
                line=line.strip()
                if os.path.exists(f"temp_data/{line}"):
                    continue
                else:
                    files.append(line)
        print("以下文件缺失,请手动查看:",files)

合并文件,实现的方式时创建一个ts文件,依据m3u8文件里的文件顺序,依次将二进制文件写入到新的ts文件里

def merge_ts(file_name):
    new_name = str(input("输入合并后的文件名>"))
    with open(f"./{new_name}.ts", "ab+") as f:
        with open(f"temp_data/{file_name}.txt","r") as f2:
            for line in f2:
                if line.startswith("#"):
                    continue
                else:
                    line = line.strip().split("/")[-1].strip()
                    ts_name = line
                    try:
                        with open(f"temp_data/{ts_name}","rb") as f3:
                            f.write(f3.read())
                    except:
                        continue

最后再写一个主函数,执行这一切

def main():
    init()
    url =str(input("输入m3u8文件url >"))
    name = url.rsplit("/")[-1]
    m3u8_files_download(url,name)#下载m3u8文件
    choice=get_type(name)
    asyncio.run(starter(choice,name))
    print("校验文件完整性")
    verification(name)
    print("是否合并文件?   Y/N")
    if str(input(">"))=="Y":
        merge_ts(name)
    else:
        print("结束")

最终功能代码

import aiohttp
import aiofiles
import asyncio
import requests
import os
header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36"}
def merge_ts(file_name):
    new_name = str(input("输入合并后的文件名>"))
    with open(f"./{new_name}.ts", "ab+") as f:
        with open(f"temp_data/{file_name}.txt","r") as f2:
            for line in f2:
                if line.startswith("#"):
                    continue
                else:
                    line = line.strip().split("/")[-1].strip()
                    ts_name = line
                    try:
                        with open(f"temp_data/{ts_name}","rb") as f3:
                            f.write(f3.read())
                    except:
                        continue
async def aio_download_ts(download_url,line_name,session):
    async with session.get(download_url,headers=header) as resp:
        async with aiofiles.open(f"temp_data/{line_name}",mode="wb") as f:
            await f.write(await resp.content.read())
        print(f"文件{line_name}下载完成!!")
def m3u8_files_download(url,name):   #下载m3u8文件
    resp = requests.get(url)
    with open(f"temp_data/{name}.txt",mode="wb") as f:
        f.write(resp.content)
    resp.close()
def get_type(name):
    with open(f"temp_data/{name}.txt","r") as f:
         for line in f:
            if line.startswith("#"):
                continue
            else:
                print("内容为:",line)
                print("选择模式: 1.直接下载型  2.拼接型")
                choice = input(">")
                return str(choice)
def init():
    if os.path.exists("./temp_data"):
        return
    else:
        os.mkdir("./temp_data")
def verification(name):
    files=[]
    with open(f"temp_data/{name}.txt","r") as f:
        for line in f:
            if line.startswith("#"):
                continue
            else:
                line=line.strip()
                if os.path.exists(f"temp_data/{line}"):
                    continue
                else:
                    files.append(line)
        print("以下文件缺失,请手动查看:",files)
async def download_ts(file_name,download_url,session):
    async with session.get(download_url,headers=header) as resp:
        async with aiofiles.open(f"temp_data/{file_name}",mode="wb") as f:
            await f.write(await resp.content.read())
async def starter(choice,name):
    tasks=[]
    async with aiohttp.ClientSession() as session:
        if choice =="1":
            with open(f"/temp_data/{name}.txt","r") as f:
                for line in f:
                    if line.startswith("#"):
                        continue
                    else:
                        download_url = line.strip()
                        line = line.split("/")
                        file_name = str(line[-1]).strip()  # 得下载的ts文件名
                        task = download_ts(file_name,download_url,session)
                        tasks.append(task)
                print("文件下载中.....")
                await asyncio.wait(tasks)  # 等待任务执行结束
                print("文件下载完成")
        if choice=="2":
            url = str(input("输入拼接的url>"))
            with open(f"temp_data/{name}.txt","r") as f:
                for line in f:
                    if line.startswith("#"):
                        continue
                    else:
                        line = line.strip()
                        file_name = line # 得下载的ts文件名
                        download_url = url+line
                        task = download_ts(file_name,download_url,session)
                        tasks.append(task)
                print("文件下载中.....")
                await asyncio.wait(tasks)  # 等待人物执行结束
                print("文件下载完成")
def main():
    init()
    url =str(input("输入m3u8文件url >"))
    name = url.rsplit("/")[-1]
    m3u8_files_download(url,name)#下载m3u8文件
    choice=get_type(name)
    asyncio.run(starter(choice,name))
    print("校验文件完整性")
    verification(name)
    print("是否合并文件?   Y/N")
    if str(input(">"))=="Y":
        merge_ts(name)
    else:
        print("结束")
main()

使用自欺欺人术,直接把ts文件后缀改成MP4,看着舒服点。

视频打开能正常观看,脚本完成

后记:关于脚本的使用

理论上把aiohttp,aiofiles,asyncio三个库安装好,复制粘贴应该就可以直接用,也可以把一些需要手工提供的量,在脚本中写死,以在不同的爬虫中使用。

ENDING..........