MassTransit Get Started->

时间:2022-07-25
本文章向大家介绍MassTransit Get Started->,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

MassTransit:是一款.NET的分布式应用程序框架(开源、免费)。通过MassTransit,可以轻松创建利用基于消息的、松耦合异步通信的应用程序和服务,以提高可用性,可靠性和可伸缩性。

MassTransit本身定位轻量级的服务总线,并支持多种传输方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息异常处理:重试配置、重新交付、erro管道、死信管道。分布式事务处理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、调度支持:Quartz 、hangfire。更多功能参考官网文档。

MassTransit目前已经发布到了第7个版本了,7.0版本新增了对Kafka 的支持,构建仅支持.NET Standard 2.0...其他改动不大。MassTransit社区使用也是很活跃的,对于首次接触的,通过本篇文章(基于rabbitmq)帮你快速入门!

一个应用程序或服务可以使用两种不同的方法来生产消息,主要区别是sent需要指定具体的端点地址,而pub不需要,下面的代码会演示这两种方式。

  • 发布事件(多个接收者)
  • 发送命令(一个接收者)

发布事件(事件消息)

场景假设:在xx项目中,需要与第三方进行交互。比如:订单发货之后,把发货的信息的推送给第三方、把订单的状态变化也推送过去。我们分析下需求,系统要求在发货之后,需要做若干事情。可以解读为,发货这个动作已经发生了,需要做的事情不确定。这不是典型的发布订阅模式嘛!好了,那使用masstransit如何实现呢?

1.创建一个类库项目定义消息体,命名为contract
   public interface OrderShipped
    {
        public Guid OrderId { get; set; }//订单号
    }
2.创建一个api项目作为消息的生产方,命名为Delivery,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, config) =>
                {

                    config.Host("rabbitmq://localhost:5672", hostConfig =>
                    {

                        hostConfig.Username("Amq");//填写你的用户名
                        hostConfig.Password("mq123");//填写你的用户名
                    });
                });
            });
            services.AddMassTransitHostedService();
在ValueController中,进行发布消息
        readonly IPublishEndpoint _publishEndpoint;
        public ValuesController(IPublishEndpoint publishEndpoint)
        {
            _publishEndpoint = publishEndpoint;
        }

        /// <summary>
        /// 测试发布
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        public async Task<ActionResult> OrderDelivery()
        {
            //其他
            await _publishEndpoint.Publish<OrderShipped>(new
            {
                OrderId = Guid.NewGuid()
            });

            return Ok();
        }

通过使用IPublishEndpoint实例的Publish方法,发布一个事件。

3.创建一个api项目作为消息的消费方,命名为Listener,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
            services.AddMassTransit(x =>
            {
                x.AddConsumer<DeliveryExpressNotifyConsumer>();
                x.UsingRabbitMq((context, config) =>
                {
                    config.Host("rabbitmq://localhost:5672", hostConfig =>
                    {
                        hostConfig.Username("Amq");
                        hostConfig.Password("mq123");
                    });

                    config.ReceiveEndpoint("Order.Shipped", e =>
                    {
                        e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);
                    });

                });
            });

            services.AddMassTransitHostedService();

实现接口IConsumer即可完成消费逻辑的过程。

    public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>
    {
        ILogger<DeliveryExpressNotifyConsumer> _logger;

        public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger)
        {
            _logger = logger;
        }
        public async Task Consume(ConsumeContext<OrderShipped> context)
        {
            _logger.LogInformation("订单id: {Value}已发货,通知第三方", context.Message.OrderId);
        }
    }
到此,消息生产方和消费方代码都已经实现了,运行一下,效果如下

发送消息(命令消息)

发送消息适用的场景,常常是一种命令,并且期望消息只被一个接收者或服务实例进行处理。masstransit使用发送消息和发布消息,在消息生产方不同之处,sent消息需要指定目标地址,使用ISendEndpoint的Send方法,消费者代码一样的配置。以下代码演示发送一个创建发货单的指令消息,比较简单直接贴出源码:

1.定义一个消息SubmitShippingOrder
 public class SubmitShippingOrder
    {
        public Guid OrderId { get; set; }
    }
2.sent消息
        readonly IPublishEndpoint _publishEndpoint;
        readonly ISendEndpointProvider _sendEndpointProvider;
        public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider)
        {
            _publishEndpoint = publishEndpoint;
            _sendEndpointProvider = sendEndpointProvider;
        }
        
        /// <summary>
        /// 测试发送
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        public async Task<ActionResult> Delivery()
        {
            var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //获取发送端点
            await endpoint.Send(new SubmitShippingOrder
            {
                OrderId = Guid.NewGuid()
            });
            return Ok();
        }
3.消费
  public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>
    {
        ILogger<SubmitShippingOrderConsumer> _logger;

        public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger)
        {
            _logger = logger;
        }
        public async Task Consume(ConsumeContext<SubmitShippingOrder> context)
        {
            _logger.LogInformation("创建发货单,订单id: {Value}", context.Message.OrderId);
        }
    }

运行一下效果:

官网文档:

http://masstransit-project.com/

https://masstransit-project.com/advanced/courier/