MassTransit Get Started->
MassTransit:是一款.NET的分布式应用程序框架(开源、免费)。通过MassTransit,可以轻松创建利用基于消息的、松耦合异步通信的应用程序和服务,以提高可用性,可靠性和可伸缩性。
MassTransit本身定位轻量级的服务总线,并支持多种传输方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息异常处理:重试配置、重新交付、erro管道、死信管道。分布式事务处理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、调度支持:Quartz 、hangfire。更多功能参考官网文档。
MassTransit目前已经发布到了第7个版本了,7.0版本新增了对Kafka 的支持,构建仅支持.NET Standard 2.0...其他改动不大。MassTransit社区使用也是很活跃的,对于首次接触的,通过本篇文章(基于rabbitmq)帮你快速入门!
一个应用程序或服务可以使用两种不同的方法来生产消息,主要区别是sent需要指定具体的端点地址,而pub不需要,下面的代码会演示这两种方式。
- 发布事件(多个接收者)
- 发送命令(一个接收者)
发布事件(事件消息)
场景假设:在xx项目中,需要与第三方进行交互。比如:订单发货之后,把发货的信息的推送给第三方、把订单的状态变化也推送过去。我们分析下需求,系统要求在发货之后,需要做若干事情。可以解读为,发货这个动作已经发生了,需要做的事情不确定。这不是典型的发布订阅模式嘛!好了,那使用masstransit如何实现呢?
1.创建一个类库项目定义消息体,命名为contract
public interface OrderShipped
{
public Guid OrderId { get; set; }//订单号
}
2.创建一个api项目作为消息的生产方,命名为Delivery,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, config) =>
{
config.Host("rabbitmq://localhost:5672", hostConfig =>
{
hostConfig.Username("Amq");//填写你的用户名
hostConfig.Password("mq123");//填写你的用户名
});
});
});
services.AddMassTransitHostedService();
在ValueController中,进行发布消息
readonly IPublishEndpoint _publishEndpoint;
public ValuesController(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
/// <summary>
/// 测试发布
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult> OrderDelivery()
{
//其他
await _publishEndpoint.Publish<OrderShipped>(new
{
OrderId = Guid.NewGuid()
});
return Ok();
}
通过使用IPublishEndpoint实例的Publish方法,发布一个事件。
3.创建一个api项目作为消息的消费方,命名为Listener,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>
{
x.AddConsumer<DeliveryExpressNotifyConsumer>();
x.UsingRabbitMq((context, config) =>
{
config.Host("rabbitmq://localhost:5672", hostConfig =>
{
hostConfig.Username("Amq");
hostConfig.Password("mq123");
});
config.ReceiveEndpoint("Order.Shipped", e =>
{
e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
实现接口IConsumer即可完成消费逻辑的过程。
public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>
{
ILogger<DeliveryExpressNotifyConsumer> _logger;
public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<OrderShipped> context)
{
_logger.LogInformation("订单id: {Value}已发货,通知第三方", context.Message.OrderId);
}
}
到此,消息生产方和消费方代码都已经实现了,运行一下,效果如下
发送消息(命令消息)
发送消息适用的场景,常常是一种命令,并且期望消息只被一个接收者或服务实例进行处理。masstransit使用发送消息和发布消息,在消息生产方不同之处,sent消息需要指定目标地址,使用ISendEndpoint的Send方法,消费者代码一样的配置。以下代码演示发送一个创建发货单的指令消息,比较简单直接贴出源码:
1.定义一个消息SubmitShippingOrder
public class SubmitShippingOrder
{
public Guid OrderId { get; set; }
}
2.sent消息
readonly IPublishEndpoint _publishEndpoint;
readonly ISendEndpointProvider _sendEndpointProvider;
public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider)
{
_publishEndpoint = publishEndpoint;
_sendEndpointProvider = sendEndpointProvider;
}
/// <summary>
/// 测试发送
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult> Delivery()
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //获取发送端点
await endpoint.Send(new SubmitShippingOrder
{
OrderId = Guid.NewGuid()
});
return Ok();
}
3.消费
public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>
{
ILogger<SubmitShippingOrderConsumer> _logger;
public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<SubmitShippingOrder> context)
{
_logger.LogInformation("创建发货单,订单id: {Value}", context.Message.OrderId);
}
}
运行一下效果:
官网文档:
http://masstransit-project.com/
https://masstransit-project.com/advanced/courier/
- HLS Lesson16-数组优化:数组分割
- HLS Lesson15-for循环优化:其他方法
- 对自己的上网搜索记录进行爬虫是怎样一种体验
- HLS Lesson13-for循环优化:合并
- HLS Lesson12-for循环优化:基本性能指标
- HLS Lesson11-c/c++ testbench书写续2
- 【机器学习】实例详解机器学习如何解决问题
- 企业网站架构之Nginx+tomcat+memcached集群
- 企业级Docker Registry开源工具Harbor的介绍以及使用指南
- HLS Lesson8-基本操作
- Windows渗透测试工具:RedSnarf
- HLS Lesson7-复合数据类型
- matlab GUI基础3
- 【Python环境】《Python数据科学入门》试译 第一章 简介
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 这一次搞懂Spring自定义标签以及注解解析原理
- 这一次搞懂Spring的Bean实例化原理
- 单片机入门学习十三 STM32单片机学习十 通用定时器
- Spring IOC原理补充(循环依赖、Bean作用域等)
- 这一次搞懂Spring代理创建及AOP链式调用过程
- 这一次搞懂Spring事务注解的解析
- 这一次搞懂Spring事务是如何传播的
- 这一次搞懂SpringMVC原理
- 这一次搞懂Spring Web零xml配置原理以及父子容器关系
- 这一次搞懂SpringBoot核心原理(自动配置、事件驱动、Condition)
- 全网最深分析SpringBoot MVC自动配置失效的原因
- Mybatis源码初探——优雅精良的骨架
- 深入Mybatis源码——配置解析
- 深入Mybatis源码——执行流程
- Mybatis插件扩展以及与Spring整合原理