[Python] 纯文本查看 复制代码 from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import requests as r
import os
def calcDivisionalRange(fileSize, chuck=10):
step = fileSize//chuck
arr = list(range(0, fileSize, step))
result = []
for i in range(len(arr)-1):
s_pos, e_pos = arr[i], arr[i+1]-1
result.append([s_pos, e_pos])
result[-1][-1] = fileSize-1
return result
# 多线程下载
def threadDownload(saveName,url,s_pos, e_pos):
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = r.get(url, headers=headers, stream=True)
with open(saveName, "rb+") as file:
file.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
file.write(chunk)
#获取文件名
def getFileFromURL(url):
parsedURL = urlparse(url)
fileName = parsedURL.path.split('/')[-1]
return fileName
#这里是控制用户输入的地方
def main():
try :
url = input("Input the file's link $ ")
saveName = getFileFromURL(url)
res = r.head(url)
fileSize = int(res.headers['Content-Length'], 0)
divisional_ranges = calcDivisionalRange(fileSize)
# 先创建空文件
with open(saveName, "wb") as file:
pass
# 储存下载后的信息
with ThreadPoolExecutor() as p:
futures = []
for s_pos, e_pos in divisional_ranges:
print(s_pos, e_pos)
futures.append(p.submit(threadDownload, saveName, url, s_pos, e_pos))
# 等待所有任务执行完毕
as_completed(futures)
os.system(saveName)
except KeyError:
print("请求被网站拦截!")
if __name__ == "__main__":
main()
|