dotnet 使用 AsyncQueue 创建高性能内存生产者消费者队列
在本机相同进程中创建生产者消费者队列,可以解决很多线程安全以及高性能需求问题。本文告诉大家如何通过在 GitHub 完全开源的 AsyncWorkerCollection 库的 AsyncQueue 类创建在内存中的高性能低资源占用的生产者消费者队列
本文使用的 AsyncWorkerCollection 库在 GitHub 完全开源,请看 https://github.com/dotnet-campus/AsyncWorkerCollection/
这个库里面的所有代码都是在我团队实际项目经过约3年的测试,在大约 200 万台设备上运行过
本文使用的 AsyncQueue 是 AsyncWorkerCollection 库的其中一个类。通过 AsyncQueue 可以制作出一个支持多线程并发调用的生产者消费者队列,而 AsyncQueue 本身支持异步,可以配置线程池做到极低的资源占用
意义
为什么需要有生产者消费者队列?其实这个设计方法可以极大规避多个模块之间的性能差异,如下面例子
我有两个不同的模块,假定是 A 和 B 两个。业务逻辑要求让 A 模块执行完成的数据,进入到 B 模块。换句话说就是 B 模块的处理都需要依赖 A 模块的执行完成
但是现在存在的问题是 A 和 B 两个模块的执行速度有差异。如 A 模块是通过读取本机文件,而 B 模块是解析文件本身。或者说 A 模块是接收网络请求,而 B 模块是执行复杂的数据库逻辑
那么就会存在一个问题,能否让 A 和 B 独立执行,同时控制 A 和 B 两个模块的线程数量,以达到整体性能最佳?此时通过生产者消费者队列就能实现
按照如上描述,其实 A 模块就是生产者,生产出数据让 B 模块处理。而 B 模块就是消费者
如果 A 和 B 两个模块设计为独立执行,那么意味着可以让 A 和 B 两个模块的执行线程数量可以不匹配。如 A 的速度比较慢,此时分配给 A 更多的线程,或者说 B 执行比较慢分配给 B 更多的线程处理
在使用生产者消费者队列另一个意义在于可以做到资源的动态调配。如我有一个不重要的模块,这个模块需要处理一些杂务,而我不期望给这个模块投入太大的资源,但是我又期望在我应用空闲的时候可以将空闲资源投入处理。此时使用 生产者消费者队列 就能完成这个需求,所有模块将任务投入到生产者消费者队列里面,而平时只有很少的线程投入使用作为消费者处理这些任务。在应用空闲的时候,将更多线程投入到消费者处理里面处理
当然生产者消费者队列可以使用的业务将会很多,其他用途还请小伙伴自己摸索,或者百度一下
大部分的生产者消费者队列库都是设计为分布式的,支持多设备跨进程的,而这些库也就需要使用更多的资源。本文的 AsyncQueue 是在内存中创建,不会涉及到数据库等功能,只能在相同进程内使用。而因为没有跨进程和设备的功能,可以减少很多资源的时候,只需要一个简单的信号量锁就能完成
安装库
在使用之前的第一步就是安装 NuGet 库,本文的 AsyncWorkerCollection 库提供两个 NuGet 包,一个是 dll 引用,另一个是源代码引用,分别如下
- dotnetCampus.AsyncWorkerCollection
- dotnetCampus.AsyncWorkerCollection.Source
如果使用 SDK 版本的 csproj 可以在项目文件使用下面代码安装源代码版本
<ItemGroup>
<PackageReference Include="dotnetCampus.AsyncWorkerCollection.Source" Version="1.2.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
使用方法
本质上的 AsyncQueue 的设计就是一个队列,因此用法和队列相同,有一个叫入队的方法,还有一个叫出队的方法。入队的方法是给生产者使用的,将数据或任务加入到队列里面提供给消费者使用。而出队就是给消费者使用的,消费者通过出队获取数据或任务用来执行
假定有数据是 FooTask 类,在创建 AsyncQueue 对象就需要传入对应的数据或任务的定义
var asyncQueue = new AsyncQueue<FooTask>();
在生产者对应的代码里面使用 Enqueue 方法入队,这个方法是线程安全的,可以随意调用
asyncQueue.Enqueue(new FooTask());
在消费者对应的代码里面使用 DequeueAsync 方法出队,这个方法是线程安全的,在队列里面没有数据的时候将会通过 await 等待,让线程返回线程池。在有数据的时候此方法将会返回。每一条数据都只会返回一次,也就是如果有多个线程同时调用 DequeueAsync 方法不会存在返回同一条数据。换句话说有多少次 Enqueue 方法入队,就会有多少次 DequeueAsync 的返回
var fooTask = await asyncQueue.DequeueAsync();
使用方法就这么简单
那么如何用来做修改执行的线程的数量?如上面说的,本文的 AsyncQueue 是线程安全的,支持多个线程调用。因此如果修改调用 Enqueue 入队的线程数量就能修改生产者的线程数量。而修改等待 DequeueAsync 返回的线程数量也就能修改消费者的线程数量
也可以用来固定消费者的线程数量,用法很简单,就是预设置对应的消费者线程的数量就可以。如设置有三个线程进入循环等待 DequeueAsync 返回,那么消费者将最多使用三个线程执行。而在等待的时候通过 await 可以让线程返回线程池提升性能
有一点需要注意的是 AsyncQueue 是包含 Dispose 和 DisposeAsync 方法的,因为本质 AsyncQueue 使用了锁,需要手动释放。但主要需要调用的原因是让在业务完成之后,让没有手动释放的 DequeueAsync 方法可以释放,解决内存泄露问题。这部分原理请看 dotnet 使用 SemaphoreSlim 可能的内存泄露 大概的问题是在 AsyncQueue 对象不再使用的时候,如果有业务代码在 DequeueAsync 等待,那么这些业务代码引用的类将会存在内存泄露,不会被释放
在调用 Dispose 或 DisposeAsync 方法将可以释放 AsyncQueue 对象,在执行完成当前队列里面所有的数据之后,最后的 DequeueAsync 返回,解决内存泄露。而 Dispose 和 DisposeAsync 方法的不同在于,调用 DisposeAsync 方法相当于调用如下代码
asyncQueue.Dispose();
await asyncQueue.WaitForCurrentFinished();
例子
下面用一个例子让小伙伴了解使用的方法,和这个库的强大
假定有任务的定义如下
class FooTask
{
public void Do()
{
Console.WriteLine("DoTask");
}
}
在实际使用的时候,其实更多将会定义为数据本身,让消费者执行相同的逻辑处理数据。而定义为任务本身可以提升灵活度,也就是每个任务可以使用不同的逻辑,但是需要小心任务本身的线程安全问题。如在 WPF 中不应该使用非 UI 线程访问 UI 线程控件等
这个任务有 100 个从线程池拿到的线程在创建,加入队列
var random = new Random();
var asyncQueue = new AsyncQueue<FooTask>();
for (int i = 0; i < 100; i++)
{
Task.Run(async () =>
{
while (true)
{
asyncQueue.Enqueue(new FooTask());
await Task.Delay(random.Next(1000));
}
});
}
而有对应的 10 个从线程池拿到的线程在执行
for (int i = 0; i < 10; i++)
{
Task.Run(async () =>
{
while (true)
{
var fooTask = await asyncQueue.DequeueAsync();
fooTask.Do();
Console.WriteLine($"剩余 {asyncQueue.Count}");
await Task.Delay(random.Next(50));
}
});
}
执行代码可以看到在消费线程里面将会不断从队列里面拿到任务,然后调用任务
本文代码放在github欢迎小伙伴访问
而 AsyncQueue 不仅可以用在多线程并发调用,也可以支持在单线程玩出协程的方法。先调用 DequeueAsync 加上等待,此时将会在当前线程注册等待调用,接着在其他业务模块调用入队的方法,每次调用入队将会回到出队的异步方法里面
如在 WPF 中添加下面代码
private static async void DoTask(AsyncQueue<FooTask> asyncQueue)
{
while (true)
{
var task = await asyncQueue.DequeueAsync();
task.Do();
Console.WriteLine($"执行线程 {Thread.CurrentThread.ManagedThreadId}");
}
}
var random = new Random();
var asyncQueue = new AsyncQueue<FooTask>();
DoTask(asyncQueue);
for (int i = 0; i < 100; i++)
{
asyncQueue.Enqueue(new FooTask());
await Task.Delay(random.Next(1000));
}
可以看到里面代码全程没有创建线程也没有调用线程池,在入队之后将会在入队的线程调用到 await 出让才会让 DoTask 继续执行
如果代码不是在 WPF 中使用,而是在控制台就需要自己实现同步上下文,请看 C# dotnet 自己实现一个线程同步上下文
原理
其实这个 AsyncQueue 的本质就是使用 ConcurrentQueue 和 SemaphoreSlim 两个基础类创建的
关于 SemaphoreSlim 请看 C# dotnet 的锁 SemaphoreSlim 和队列
这个 SemaphoreSlim 锁的功能就是提供信号量,和异步等待的功能。信号量的用法就是设置多少次信号量就允许多少次使用信号量,这就是 AsyncQueue 可以让入队和出队的最大次数相等的原因
为什么是说最大次数而不是次数?原因是在于可以入队,但是没有线程调用 DequeueAsync 出队
在 DequeueAsync 方法底层调用的等待就是调用 SemaphoreSlim 的等待方法,如果没有信号量可以使用,那么这个等待将会等待到有信号量被设置。而等待是异步方法,也就是不会占用一个线程,此时占用的资源很小。当然用这个方法就需要小心 dotnet 使用 SemaphoreSlim 可能的内存泄露 这也就是在不使用 AsyncQueue 需要调用释放的原因
而 ConcurrentQueue 就是提供本身存放数据的类,这个类的设计就是线程安全的
因此通过 ConcurrentQueue 存放数据,而通过 SemaphoreSlim 通知出队,让 SemaphoreSlim 支持等待数据出队和让入队数量和出队最大数量相等
使用这两个类的配合就可以做到 AsyncQueue 的高性能低资源占用
本文使用的 AsyncWorkerCollection 库是完全开源基于 MIT 协议的库,欢迎小伙伴使用,在使用中遇到任何问题都欢迎在 GitHub 讨论
更多请看
本文会经常更新,请阅读原文: https://blog.lindexi.com/post/dotnet-%E4%BD%BF%E7%94%A8-AsyncQueue-%E5%88%9B%E5%BB%BA%E9%AB%98%E6%80%A7%E8%83%BD%E5%86%85%E5%AD%98%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E9%98%9F%E5%88%97.html ,以避免陈旧错误知识的误导,同时有更好的阅读体验。
如果你想持续阅读我的最新博客,请点击 RSS 订阅,推荐使用RSS Stalker订阅博客,或者前往 CSDN 关注我的主页
- 【工具推荐】图像界的魔术师 ImageMagick
- 使用Metrics.NET 构建 ASP.NET MVC 应用程序的性能指标
- 如何设计完善的构建系统,为日常开发提速一倍
- 两年 100 期技术周报后,我收获了这四点
- 如何为技术博客设计一个推荐系统(中):基于 Google 搜索的半自动推荐
- 我是如何为技术博客设计一个推荐系统(上):统计与评分加权
- c#处理空白字符
- 后台优化:使用应用性能管理工具
- Disruptor-NET和内存栅栏
- 我们是如何将 Cordova 应用嵌入到 React Native 中
- ADO.NET的弹性连接控制[ADO.NET idle connection resiliency]
- ASP.Net MVC 5 in Xamarin Studio 5.2
- 自制基于 Snips 和 Snowboy 的智能音箱来保护你的隐私
- 从 React 将从 BSD 改 MIT 许可证,谈如何选择正确的开源许可
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- android studio3.3.1代码提示忽略大小写的设置
- Ascgen2可以把图片变成文字的小工具
- 解决android studio卡顿,提升studio运行速度的方法
- Android物理键盘事件解析
- AndroidQ(10)分区存储完美适配方法
- (全局快捷键工具)Power Keys彻底提升码字效率?
- android自定义view实现钟表效果
- 教你如何用OpenVZ限制虚拟机硬盘IO速度
- Android自定义控件实现短信验证码自动填充
- android studio 3.6.1升级后如何处理 flutter问题
- Android项目迁移到AndroidX的方法步骤
- Android中layer-list基本使用详解
- Android Studio中主题样式的使用方法详解
- android点击无效验证的解决方法
- Android Studio 3.5格式化布局代码时错位、错乱bug的解决