C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
时间:2020-04-14
本文章向大家介绍C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率,主要包括C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
原文:
一、引言
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。
二、示例
2.1、环境准备
在NuGet上安装RabbitMQ.Client。
2.2、工厂类
添加一个工厂类RabbitMQFactory:
/// <summary> /// 多路复用技术(Multiplexing)目的:为了避免创建多个TCP而造成系统资源的浪费和超载,从而有效地利用TCP连接。 /// </summary> public static class RabbitMQFactory { private static IConnection sharedConnection; private static int ChannelCount { get; set; } private static readonly object _locker = new object(); public static IConnection SharedConnection { get { if (ChannelCount >= 1000) { if (sharedConnection != null && sharedConnection.IsOpen) { sharedConnection.Close(); } sharedConnection = null; ChannelCount = 0; } if (sharedConnection == null) { lock (_locker) { if (sharedConnection == null) { sharedConnection = GetConnection(); ChannelCount++; } } } return sharedConnection; } } private static IConnection GetConnection() { var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; return factory.CreateConnection(); } }
2.3、主窗体
代码如下:
public partial class RabbitMQMultithreading : Form { public delegate void ListViewDelegate<T>(T obj); public RabbitMQMultithreading() { InitializeComponent(); } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="msg"></param> private void ShowMessage(string msg) { if (InvokeRequired) { BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="format"></param> /// <param name="args"></param> private void ShowMessage(string format, params object[] args) { if (InvokeRequired) { BeginInvoke(new MethodInvoker(delegate () { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); })); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// 生产者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnSend_Click(object sender, EventArgs e) { int messageCount = 100; var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); for (int i = 1; i <= messageCount; i++) { channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); ShowMessage($"Send {message}"); } } } } /// <summary> /// 消费者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnReceive_Click(object sender, EventArgs e) { Random random = new Random(); int rallyNumber = random.Next(1, 1000); int channelCount = 0; await Task.Run(() => { try { int asyncCount = 10; List<Task<bool>> tasks = new List<Task<bool>>(); var connection = RabbitMQFactory.SharedConnection; for (int i = 1; i <= asyncCount; i++) { tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber))); } Task.WaitAll(tasks.ToArray()); string syncResultMsg = $"集结号 {rallyNumber} 已吹起号角--" + $"本次开启信道成功数:{tasks.Count(s => s.Result == true)}," + $"本次开启信道失败数:{tasks.Count() - tasks.Count(s => s.Result == true)}" + $"累计开启信道成功数:{channelCount + tasks.Count(s => s.Result == true)}"; ShowMessage(syncResultMsg); } catch (Exception ex) { ShowMessage($"集结号 {rallyNumber} 消费异常:{ex.Message}"); } }); } /// <summary> /// 异步方法 /// </summary> /// <param name="state"></param> /// <param name="rallyNumber"></param> /// <returns></returns> private bool MessageWorkItemCallback(object state, int rallyNumber) { bool syncResult = false; IModel channel = null; try { IConnection connection = state as IConnection; //不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。 channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Thread.Sleep(1000); ShowMessage($"集结号 {rallyNumber} Received {message}"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer); syncResult = true; } catch (Exception ex) { syncResult = false; ShowMessage(ex.Message); } return syncResult; } }
2.4、运行结果
多点几次消费者即可增加信道,提升消费能力。
原文地址:https://www.cnblogs.com/lonelyxmas/p/12698220.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 27.opengl高级光照-点光源阴影
- 28.opengl高级光照-法线贴图
- python抓取动态验证码,具体第几帧数的位置静态图片
- 在 Spring Security 中,我就想从子线程获取用户登录信息,怎么办?
- php自动加载
- 011.Nginx防盗链
- 012.Nginx负载均衡
- 013.Nginx动静分离
- 014.Nginx跨域配置
- 深入理解 FilterChainProxy【源码篇】
- matplotlib基础绘图命令之bar
- 使用 Github Actions 自动部署 Angular 应用到 Github Pages
- 路径中关于斜杠/和反斜杠 的区别
- Redis的高级特性与应用场景(二)
- Redis的高级特性与应用场景(一)