消息服务框架使用案例之--大文件上传(断点续传)功能
消息服务框架使用案例之--大文件上传(断点续传)功能
一、分块上传和断点续传原理
在我们的一个产品应用中,客户需要上传大量的文件到服务器,其中不乏很大的视频文件。虽然可以使用FTP这样成熟稳定的工具,但客户表示不会使用FTP工具,并且我们产品也觉得客户从我们软件在切换到FTP用户体验不好,如果做成后台脚本调用FTP上传那么进度信息很难呈现到我们软件上。最终,决定我们自己做文件上传功能。
大文件上传受限于服务器每次处理数据的能力,不能一次传输完成,所以分块上传是必然的了,由于上传时间可能较长,中途可能因为网络或者人为原因终止上传,所以还需要断点上传功能。
分块上传实际上是在客户端分块读取文件,然后在服务器分块写入文件,每次读写记录下读写的起始位置,也就是文件的偏移量,和要读写的数据长度。在上传过程中,每完成一个文件数据块的写入,就向客户端返回一次信息,客户端据此进行下一文件数据块的读取。 断点续传功能也比较好实现,就是上传过程中将文件在服务器写为临时文件,等全部写完了(文件上传完),将此临时文件重命名为正式文件即可,如果中途上传中断过,下次上传的时候根据当前临时文件大小,作为在客户端读取文件的偏移量,从此位置继续读取文件数据块,上传到服务器从此偏移量继续写入文件即可。
二、消息服务框架实现文件上传
假设我们将每一个文件数据块看做一份“消息”,那么文件上传本质上就是客户端和服务器两端频繁的消息交互而已。消息服务框架(MSF)是一个集成了服务容器和消息访问的框架,正好可以用来做文件上传应用。具体做法就是在服务端,编写一个“文件上传服务”,在客户端,编写一个调用上传服务的回调方法即可。
2.1,文件上传服务端
新建一个MSF服务类:
public class FilesService : ServiceBase
{
}
然后添加一个处理上传文件的方法:
/// <summary>
/// 批量上传文件(通过回调客户端的方式,支持断点续传)
/// </summary>
/// <param name="list">文件列表</param>
/// <returns></returns>
public UploadResult UploadFiles(List<UploadFileInfos> list)
{
int uploadCount = 0;
foreach (var uploadInfo in list)
{
string pathfile = string.Empty;
try
{
pathfile = this.MapServerPath(uploadInfo.FilePath);
if (!Directory.Exists(Path.GetDirectoryName(pathfile)))
{
Directory.CreateDirectory(Path.GetDirectoryName(pathfile));
}
if (File.Exists(pathfile))
{
FileInfo fi = new FileInfo(pathfile);
if (fi.Length == uploadInfo.Size && fi.LastWriteTime == uploadInfo.FileModifyTime)
{
Console.WriteLine("文件 {0} {1}", pathfile, "已上传,跳过");
continue;//文件已上传,跳过
}
else
{
fi.Delete();
}
}
//"断点"上传的文件
long offset = 0;
//上传的分部文件名称增加一个文件长度数字,避免下次客户端上传的时候,修改了内容。
//如果文件上传了一部分,的确修改了内容,那么原来上传的部分文件就丢弃了。
string partFile = pathfile + uploadInfo.Size + ".part";
if (File.Exists(partFile))
{
FileInfo fi = new FileInfo(partFile);
offset = fi.Length;
}
while (offset < uploadInfo.Size)
{
uploadInfo.Offset = offset;
uploadInfo.Length = MaxReadSize;
if (uploadInfo.Offset + uploadInfo.Length > uploadInfo.Size)
uploadInfo.Length = (int)(uploadInfo.Size - uploadInfo.Offset);
//回调客户端,通知上传文件块
var data = GetUploadFileData(uploadInfo);
if (data.Length == 0)
{
//如果有长度为零的文件表示客户读取文件失败,终止上传操作
throw new Exception("读取客户端文件失败(Length=0),终止上传操作");
}
if (data.Length != uploadInfo.Length)
throw new Exception("网络异常:上传的文件流数据块大小与预期的不一致");
//等待上次写完
resetEvent.WaitOne();
//异步写文件
System.Threading.Thread t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(obj =>
{
WriteFileInfo wfi = (WriteFileInfo)obj;
CurWriteFile(wfi.FileName, wfi.WriteData, wfi.Offset);
}));
t.Start(new WriteFileInfo()
{
FileName = partFile,
WriteData = data,
Offset = offset
});
offset += uploadInfo.Length;
}
resetEvent.WaitOne();
//重命名到正常文件名
File.Move(partFile, pathfile);
System.IO.File.SetLastWriteTime(pathfile, uploadInfo.FileModifyTime);
uploadCount++;
resetEvent.Set();
}
catch (Exception ex)
{
resetEvent.Set();
return new UploadResult()
{
Success = false,
FilesCount = 0,
Message = ex.Message
};
}//end try
} //end for
return new UploadResult()
{
Success = true,
FilesCount = list.Count
};
}
在这个方法中,有一个重要方法, //回调客户端,通知上传文件块 var data = GetUploadFileData(uploadInfo);
它调用了MSF框架服务上下文的回调函数CallBackFunction,来读取客户端文件数据的,代码如下:
private byte[] GetUploadFileData(UploadFileInfos fileinfo)
{
return base.CurrentContext.CallBackFunction<UploadFileInfos, byte[]>(fileinfo);
}
另外,服务端写文件的方法CurWriteFile 实现如下:
/// <summary>
/// 将服务器端获取到的字节流写入文件
/// </summary>
/// <param name="pReadByte">流</param>
/// <param name="fileName">文件名</param>
/// <param name="offset">要写入文件的位置</param>
public void CurWriteFile(string fileName, byte[] pReadByte, long offset)
{
FileStream pFileStream = null;
try
{
pFileStream = new FileStream(fileName, FileMode.OpenOrCreate);
pFileStream.Seek(offset, SeekOrigin.Begin);
pFileStream.Write(pReadByte, 0, pReadByte.Length);
}
catch(Exception ex)
{
throw new Exception("写文件块失败,写入位置:"+offset+",文件名:"+fileName+",错误原因:"+ex.Message);
}
finally
{
if (pFileStream != null)
pFileStream.Close();
resetEvent.Set();
}
}
2.2,文件上传客户端
现在看文件上传客户端代码,如何提供服务端需要的文件读取回调函数:
ServiceRequest request = new ServiceRequest();
request.ServiceName = "FilesService";
request.MethodName = "UploadFiles";
request.Parameters = new object[] { infos };
Proxy srvProxy = new Proxy();
srvProxy.ServiceBaseUri = string.Format("net.tcp://{0}", serverHost);
srvProxy.ErrorMessage += srvProxy_ErrorMessage;
Task<UploadResult> result= srvProxy.RequestServiceAsync<UploadResult, UploadFileInfos, byte[]>(request,
uploadingInfo =>
{
//action委托方法显示进度给客户端
action(new UploadStateArg()
{
State = uploadingInfo.Offset + uploadingInfo.Length >= uploadingInfo.Size
? UploadState.Success: UploadState.Uploading,
ProgressFile = uploadingInfo.FilePath,
ProcessValue = Convert.ToInt32(uploadingInfo.Offset * 100 / uploadingInfo.Size),
TotalProcessValue = Convert.ToInt32((uploadingInfo.UploadIndex +1) * 100 / index)
});
Console.WriteLine(">>>Debug:Path:{0},FilePath:{1}",folder, uploadingInfo.FilePath);
var fullName = Path.IsPathRooted(folder)? folder + uploadingInfo.FilePath : uploadingInfo.FilePath;
Console.WriteLine(">>>服务器读取客户端文件:{0},偏移量:{1} 长度:{2}",
fullName, uploadingInfo.Offset, uploadingInfo.Length);
return ReadFileData(fullName, uploadingInfo.Offset, uploadingInfo.Length);
}
);
在上面的方法中, srvProxy.RequestServiceAsync泛型方法需要3个参数,第一个参数是服务的结果类型,第二个参数是提供给服务端回调方法(前面的base.CurrentContext.CallBackFunction方法)的参数,第三个参数是服务回调方法的结果。srvProxy.RequestServiceAsync 的回调方法的参数 uploadingInfo 是服务器推送过来的消息,里面包含了需要读取的文件信息,包括文件名,偏移量,读取长度等信息。
其中,客户端读取文件的方法 ReadFileData 实现如下:
/// <summary>
/// 读取文件返回字节流
/// </summary>
/// <param name="fileName">文件路径</param>
/// <param name="offset">要读取的文件流的位置</param>
/// <param name="length">要读取的文件块大小</param>
/// <returns></returns>
private byte[] ReadFileData(string fileName, long offset, int length)
{
FileStream pFileStream = null;
byte[] pReadByte = new byte[0];
try
{
pFileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read);
BinaryReader r = new BinaryReader(pFileStream);
r.BaseStream.Seek(offset, SeekOrigin.Begin);
pReadByte = r.ReadBytes(length);
return pReadByte;
}
catch
{
return pReadByte;
}
finally
{
if (pFileStream != null)
pFileStream.Close();
}
}
这样,在一次文件上传的“请求-响应”过程中,MSF的服务端进行了多次回调客户端的操作,客户端根据服务端推送过来的参数信息来精确的读取服务端需要的文件数据。一个支持断点续传的大文件上传服务,使用MSF框架就做好了。
三、其它
本文使用到的其它相关服务端对象的代码定义如下:
/// <summary>
/// 上传状态枚举
/// </summary>
public enum UploadState
{
/// <summary>
/// 上传成功
/// </summary>
Success,
/// <summary>
/// 上传中
/// </summary>
Uploading,
/// <summary>
/// 错误
/// </summary>
Error
}
/// <summary>
/// 上传状态参数
/// </summary>
public class UploadStateArg
{
/// <summary>
/// 上传状态
/// </summary>
public UploadState State { get; set; }
/// <summary>
/// 上传的文件名
/// </summary>
public string ProgressFile { get; set; }
/// <summary>
/// 处理的消息,如果出错,这里是错误消息
/// </summary>
public string Message { get; set; }
/// <summary>
/// 处理进度(百分比)
/// </summary>
public int ProcessValue { get; set; }
/// <summary>
/// 总体处理进度(百分比)
/// </summary>
public int TotalProcessValue { get; set; }
}
如果你不清楚如何使用MSF来实现本文的功能,请先阅读下面的文章:
“一切都是消息”--MSF(消息服务框架)入门简介
建议你读完相关的其它两篇文章:
“一切都是消息”--MSF(消息服务框架)之【请求-响应】模式
“一切都是消息”--MSF(消息服务框架)之【发布-订阅】模式
读完后,建议你再读读MSF的理论总结:
分布式系统的消息&服务模式简单总结
有关消息服务框架(MSF)更多的讨论,请加我们QQ群讨论,群号:18215717 ,加群口令:消息服务框架
- 还没开始学Python之前,你要知道,Python程序员一定会的十件事
- zabbix-server端与zabbix-agent端部署与监控
- 比特币是什么?比特币从何而来?
- Django内置Admin
- Python内置函数property()使用实例
- 区块链技术的历史发展的不同阶段
- 微信力推搜一搜,培养用户微信搜索习惯,搜一搜直接给红包
- datetime
- [转自JeffreyZhao]在LINQ to SQL中使用Translate方法以及修改查询用SQL
- Flask-SQLAlchemy
- 2017全球存储大会:存储设备面临三大挑战
- cobbler自动安装系统(Centos7.X)
- Python接口自动化-7-unittest
- cobbler自动安装系统(Centos7.X)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 蓝桥杯vip测试题系统-数组求和(解题思路以及解题代码,手画思路图虽然丑丑的)
- 蓝桥杯vip测试题-找零钱(解题思路以及解题代码)
- 剑指Office-二进制中1的个数
- 剑指Office-旋转数组的最小数
- Mysql调优你不知道这几点,就太可惜了
- Mysql快速导入数百万条数据,亲测有效
- [面试题06]从未到头打印链表(LeetCode-剑指Offer)
- SpringBoot集成RabbitMQ-三种模式的实现
- Centos7-Docker卸载旧的更新到新版本
- vue vuecli3 前端解决跨域问题
- 求求你,不要再纠结指针了(1) ——万能转化公式
- 求求你,不要再纠结指针了(2)——函数指针
- 用Python解决100个问题 | 倒计时
- 【转载】【ionic+angularjs】angularjs ui-router路由简介
- 实时性迷思(1) —— “快是优点么?”