大文件下载以及进度条展示和MD5校验

时间:2022-06-05
本文章向大家介绍大文件下载以及进度条展示和MD5校验,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

使用socket网络,上传一个视频,大小在3G左右

能够显示进度条,显示花费时间

下载使用TCP协议

server向client发送文件

新建文件server.py,代码如下:

import os
import json
import socket
import struct

filepath = r'E:BaiduYunDownload[电影天堂www.dy2018.com]移动迷宫3:死亡解药BD国英双语中英双字.mp4'

sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()

conn, addr = sk.accept()
filename = os.path.basename(filepath)
filesize = os.path.getsize(filepath)
dic = {'filename': filename, 'filesize': filesize}
str_dic = json.dumps(dic).encode('utf-8')
len_dic = len(str_dic)
length = struct.pack('i', len_dic)
conn.send(length)  # dic的长度
conn.send(str_dic)  # dic
with open(filepath, 'rb') as f:  # 文件
    while filesize:
        content = f.read(4096)
        conn.send(content)
        filesize -= len(content)
        '''
        这里不能减等4096,因为文件,最后可能只有3字节。
        要根据读取的长度len(content),来计算才是合理的。
        '''
conn.close()

新建文件client.py,代码如下:

import json
import struct
import socket
import sys
import time
 
def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    if rate_num == 100:
        r = 'r%s>%d%%n' % ('=' * rate_num, rate_num,)
    else:
        r = 'r%s>%d%%' % ('=' * rate_num, rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush
 
start_time = time.time()  # 开始时间
 
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
 
dic_len = sk.recv(4)
dic_len = struct.unpack('i',dic_len)[0]
dic = sk.recv(dic_len)
str_dic = dic.decode('utf-8')
dic = json.loads(str_dic)
with open(dic['filename'],'wb') as f:  # 使用wb更严谨一些,虽然可以使用ab
    content_size = 0
    while True:
        content = sk.recv(4096)
        f.write(content)  # 写入文件
        content_size += len(content)  # 接收大小
        processBar(content_size,dic['filesize'])  # 执行进度条函数
        if content_size == dic['filesize']:break  # 当接收的总大小等于文件大小时,终止循环
             
sk.close()  # 关闭连接
 
end_time = time.time()  # 结束时间
print('本次下载花费了{}秒'.format(end_time - start_time))

先执行server.py,再执行client.py,效果如下:

上面效果展示了100个等号,太长了,那么要缩减到1/3呢?

修改进度条函数

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    if rate_num == 100:
        r = 'r%s>%d%%n' % ('=' * int(rate_num / 3), rate_num,) # 控制等号输出数量,除以3,表示显示1/3
    else:
        r = 'r%s>%d%%' % ('=' * int(rate_num / 3), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

再来一个高级版,显示绿色的飞机

代码如下:

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = '✈'
    if rate_num == 100:
        r = 'r33[32m{}33[0m{}%n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = 'r33[32m{}33[0m{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

再来一个每秒换色

导入一个随机换色类

import random
 
 
class Prompt(object):  # 提示信息显示
    colour_dic = {
        'red': 31,
        'green': 32,
        'yellow': 33,
        'blue': 34,
        'purple_red': 35,
        'bluish_blue': 36,
        'white': 37,
    }
 
    def __init__(self):
        pass
 
    @staticmethod
    def display(msg, colour='white'):
        choice = Prompt.colour_dic.get(colour)
        # print(choice)
        if choice:
            info = "33[1;{};1m{}33[1;0m".format(choice, msg)
            return info
        else:
            return False
 
    def random_color(msg):  # 随机换色
        colour_list = []
        for i in Prompt.colour_dic:
            colour_list.append(i)
 
        length = len(colour_list) - 1  # 最大索引值
        index = random.randint(0, length)  # 随机数
 
        ret = Prompt.display(msg, colour_list[index])  # 随机颜色
        return ret

修改client.py

from Prompt import Prompt
 
def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = Prompt.random_color('✈')  # 随机换色
    if rate_num == 100:
        r = 'r{}{}%n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = 'r{}{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

增加MD5校验

server.py

import os
import json
import socket
import struct
import hashlib

sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()

conn, addr = sk.accept()
filename = '[电影天堂www.dy2018.com]移动迷宫3:死亡解药BD国英双语中英双字.mp4'  # 文件名
absolute_path = os.path.join('E:BaiduYunDownload',filename)  # 文件绝对路径
buffer_size = 1024*1024  # 缓冲大小,这里表示1MB

md5obj = hashlib.md5()
with open(absolute_path, 'rb') as f:
    while True:
        content = f.read(buffer_size)  # 每次读取指定字节
        if content:
            md5obj.update(content)
        else:
            break  # 当内容为空时,终止循环

md5 = md5obj.hexdigest()
print(md5)  # 打印md5值

dic = {'filename':filename,
       'filename_md5':str(md5),'buffer_size':buffer_size,
       'filesize':os.path.getsize(absolute_path)}
str_dic = json.dumps(dic).encode('utf-8')
len_dic = len(str_dic)
length = struct.pack('i', len_dic)
conn.send(length)  # dic的长度
conn.send(str_dic)  # dic
with open(absolute_path, 'rb') as f:  # 文件
    while dic['filesize']:
        content = f.read(dic['buffer_size'])
        conn.send(content)
        dic['filesize'] -= len(content)
        '''
        这里不能减等4096,因为文件,最后可能只有3字节。
        要根据读取的长度len(content),来计算才是合理的。
        '''
conn.close()

client.py

import json
import struct
import socket
import sys
import time
import hashlib
import os
from Prompt import Prompt

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = Prompt.random_color('✈')
    if rate_num == 100:
        r = 'r{}{}%n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = 'r{}{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

start_time = time.time()  # 开始时间

sk = socket.socket()
sk.connect(('127.0.0.1',9000))

dic_len = sk.recv(4)
dic_len = struct.unpack('i',dic_len)[0]
dic = sk.recv(dic_len)
str_dic = dic.decode('utf-8')
dic = json.loads(str_dic)

md5 = hashlib.md5()
with open(dic['filename'],'wb') as f:  # 使用wb更严谨一些,虽然可以使用ab
    content_size = 0
    while True:
        content = sk.recv(dic['buffer_size'])  # 接收指定大小
        f.write(content)  # 写入文件
        content_size += len(content)  # 接收大小
        md5.update(content)  # 摘要

        processBar(content_size,dic['filesize'])  # 执行进度条函数
        if content_size == dic['filesize']:break  # 当接收的总大小等于文件大小时,终止循环

    md5 = md5.hexdigest()
    print(md5)  # 打印md5值
    if dic['filename_md5'] == str(md5):
        print(Prompt.display('md5校验正确--下载成功','green'))
    else:
        print(Prompt.display('文件验证失败', 'red'))
        os.remove(dic['filename'])  # 删除文件

sk.close()  # 关闭连接

end_time = time.time()  # 结束时间
print('本次下载花费了{}秒'.format(end_time - start_time))

执行输出: