【C#】58. .Net中的并发集合——BlockingCollection
时间:2019-09-17
本文章向大家介绍【C#】58. .Net中的并发集合——BlockingCollection,主要包括【C#】58. .Net中的并发集合——BlockingCollection使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
这篇是并发集合中的最后一篇,介绍一下BlockingCollection。在工作中我还没有使用过,但是看上去应该是为了便捷使用并发集合而创建的类型。默认情况下,BlockingCollection使用的是ConcurrentQueue容器,当然我们也可以使用其他实现了IProducerConsumerCollection的类型来操作。 static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run(() => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); //完成工作 } static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } 首先调用默认的BlockingCollection: 然后我们传入一个ConcurrentStack实例 Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait();
C# 并行编程 之 并发集合 (.Net Framework 4.0) 2015年05月08日 10:15:29 zy__ 阅读数 24909更多 分类专栏: C# 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/wangzhiyu1980/article/details/45497907 此文为个人学习《C#并行编程高级教程》的笔记,总结并调试了一些文章中的代码示例。 在以后开发过程中可以加以运用。 对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问。经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作。 .NetFramework 4.0 中提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度。 基本信息 .NetFramework中并行集合的名字空间: System.Collections.Concurrent 并行容器: ConcurrentQueue ConcurrentStack ConcurrentBag : 一个无序的数据结构集,当不需要考虑顺序时非常有用。 BlockingCollection : 与经典的阻塞队列数据结构类似 ConcurrentDictionary 这些集合在某种程度上使用了无锁技术(CAS Compare-and-Swap和内存屏障 Memory Barrier),与加互斥锁相比获得了性能的提升。但在串行程序中,最好不用这些集合,它们必然会影响性能。 关于CAS: http://www.tuicool.com/articles/zuui6z http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml 关于内存屏障 http://en.wikipedia.org/wiki/Memory_barrier 用法与示例 ConcurrentQueue 其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。 Enqueue:在队尾插入元素 TryDequeue:尝试删除队头元素,并通过out参数返回 TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_1_concurrent_queue { class Program { internal static ConcurrentQueue<int> _TestQueue; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestQueue.Enqueue(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (; ; ) { IsDequeuue = _TestQueue.TryDequeue(out i); if (IsDequeuue) System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ====="); if (i == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestQueue = new ConcurrentQueue<int>(); Console.WriteLine("Sample 3-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 3-1 Main }"); Console.ReadKey(); } } } ConcurrentStack 其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。 Push:向栈顶插入元素 TryPop:从栈顶弹出元素,并且通过out 参数返回 TryPeek:返回栈顶元素,但不弹出。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_2_concurrent_stack { class Program { internal static ConcurrentStack<int> _TestStack; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestStack.Push(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (; ; ) { IsDequeuue = _TestStack.TryPop(out i); if (IsDequeuue) System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); if (i == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestStack = new ConcurrentStack<int>(); Console.WriteLine("Sample 4-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-1 Main }"); Console.ReadKey(); } } } 测试中一个有趣的现象: 虽然生产者已经在栈中插入值已经到了25,但消费者第一个出栈的居然是4,而不是25。很像是出错了。但仔细想想入栈,出栈和打印语句是两个部分,而且并不是原子操作,出现这种现象应该也算正常。 Sample 3-1 Main { Main t1 t2 started { Main t1 t2 started } Main wait t1 t2 end { ThreadWork1 run { ThreadWork1 producer: 0 ThreadWork2 run { ThreadWork1 producer: 1 ThreadWork1 producer: 2 ThreadWork1 producer: 3 ThreadWork1 producer: 4 ThreadWork1 producer: 5 ThreadWork1 producer: 6 ThreadWork1 producer: 7 ThreadWork1 producer: 8 ThreadWork1 producer: 9 ThreadWork1 producer: 10 ThreadWork1 producer: 11 ThreadWork1 producer: 12 ThreadWork1 producer: 13 ThreadWork1 producer: 14 ThreadWork1 producer: 15 ThreadWork1 producer: 16 ThreadWork1 producer: 17 ThreadWork1 producer: 18 ThreadWork1 producer: 19 ThreadWork1 producer: 20 ThreadWork1 producer: 21 ThreadWork1 producer: 22 ThreadWork1 producer: 23 ThreadWork1 producer: 24 ThreadWork1 producer: 25 ThreadWork2 consumer: 16 =====4 ThreadWork2 consumer: 625 =====25 ThreadWork2 consumer: 576 =====24 ThreadWork2 consumer: 529 =====23 ThreadWork1 producer: 26 ThreadWork1 producer: 27 ThreadWork1 producer: 28 ConcurrentBag 一个无序的集合,程序可以向其中插入元素,或删除元素。 在同一个线程中向集合插入,删除元素的效率很高。 Add:向集合中插入元素 TryTake:从集合中取出元素并删除 TryPeek:从集合中取出元素,但不删除该元素。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_3_concurrent_bag { class Program { internal static ConcurrentBag<int> _TestBag; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestBag.Add(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; int nCnt = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (;;) { IsDequeuue = _TestBag.TryTake(out i); if (IsDequeuue) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nCnt++; } if (nCnt == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestBag = new ConcurrentBag<int>(); Console.WriteLine("Sample 4-3 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-3 Main }"); Console.ReadKey(); } } } BlockingCollection 一个支持界限和阻塞的容器 Add :向容器中插入元素 TryTake:从容器中取出元素并删除 TryPeek:从容器中取出元素,但不删除。 CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。 IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。 示例程序: 程序中,消费者线程完全使用 while (!_TestBCollection.IsCompleted) 作为退出运行的判断条件。 在Worker1中,有两条语句被注释掉了,当i 为50时设置CompleteAdding,但当继续向其中插入元素时,系统抛出异常,提示无法再继续插入。 using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_4_concurrent_bag { class Program { internal static BlockingCollection<int> _TestBCollection; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestBCollection.Add(i); //if (i == 50) // _TestBCollection.CompleteAdding(); } _TestBCollection.CompleteAdding(); System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; int nCnt = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); while (!_TestBCollection.IsCompleted) { IsDequeuue = _TestBCollection.TryTake(out i); if (IsDequeuue) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nCnt++; } } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestBCollection = new BlockingCollection<int>(); Console.WriteLine("Sample 4-4 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-4 Main }"); Console.ReadKey(); } } } 当然可以尝试在Work1中注释掉 CompleteAdding 语句,此时Work2陷入循环无法退出。 ConcurrentDictionary 对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。 AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。 GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。 TryAdd:尝试在容器中添加新的键和值。 TryGetValue:尝试根据指定的键获得值。 TryRemove:尝试删除指定的键。 TryUpdate:有条件的更新当前键所对应的值。 GetEnumerator:返回一个能够遍历整个容器的枚举器。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_5_concurrent_dictionary { class Program { internal static ConcurrentDictionary<int, int> _TestDictionary; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestDictionary.TryAdd(i, i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0, nCnt = 0; int nValue = 0; bool IsOk = false; System.Console.WriteLine("ThreadWork2 run { "); while (nCnt < 100) { IsOk = _TestDictionary.TryGetValue(i, out nValue); if (IsOk) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nValue = nValue * nValue; _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return value = nValue; }); nCnt++; i++; } } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); bool bIsNext = true; int nValue = 0; _TestDictionary = new ConcurrentDictionary<int, int>(); Console.WriteLine("Sample 4-5 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); foreach (var pair in _TestDictionary) { Console.WriteLine(pair.Key + " : " + pair.Value); } System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> enumer = _TestDictionary.GetEnumerator(); while (bIsNext) { bIsNext = enumer.MoveNext(); Console.WriteLine("Key: " + enumer.Current.Key + " Value: " + enumer.Current.Value); _TestDictionary.TryRemove(enumer.Current.Key, out nValue); } Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count); Console.WriteLine("Sample 4-5 Main }"); Console.ReadKey(); } } }
原文地址:https://www.cnblogs.com/kelelipeng/p/11532263.html
随机文章
- POJ 1012 Joseph
- 1344 走格子
- 如何利用配置中心规范构建PaaS服务配置
- Selenium2+python自动化8-SeleniumBuilder辅助定位元素
- HDU 1250 Hat's Fibonacci
- Scrapy在Ubuntu下的安装与配置
- Selenium2+python自动化20-引入unittest框架
- HDU 1002 A + B Problem II(高精度加法(C++/Java))
- POJ 1018 Communication System
- POJ 1017 Packets
- Codeforces 725B Food on the Plane
- Codefoces 723B Text Document Analysis
- Codefoces 723A The New Year: Meeting Friends
- ECJTUACM16 Winter vacation training #1 题解&源码
本站知识点必读
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- php的无刷新操作实现方法分析
- PHP实现创建一个RPC服务操作示例
- php 下 html5 XHR2 + FormData + File API 上传文件操作实例分析
- gearman管理工具GearmanManager的安装与php使用方法示例
- php 的多进程操作实践案例分析
- php 输出缓冲 Output Control用法实例详解
- PHP使用gearman进行异步的邮件或短信发送操作详解
- php多进程并发编程防止出现僵尸进程的方法分析
- php+ajax实现文件切割上传功能示例
- php操作redis数据库常见方法实例总结
- php使用pthreads v3多线程实现抓取新浪新闻信息操作示例
- PHP pthreads v3使用中的一些坑和注意点分析
- php ActiveMQ的安装与使用方法图文教程
- ThinkPHP5与单元测试PHPUnit使用详解
- php实现通过stomp协议连接ActiveMQ操作示例