EventBus In eShop -- 解析微软微服务架构Demo(四)
引言
大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。
今天说下EventBus,前几天园里的大神已经把其解刨,我今天就借着大神的肩膀,分析下在eShop项目中EventBus的实现。
最近发觉转发文章不写出处的,特此加上链接:http://inday.cnblogs.com
解析源码
我们知道使用EventBus是为了解除Publisher和Subscriber之间的依赖性,这样我们的Publisher就不需要知道有多少Subscribers,只需要通过EventBus进行注册管理就好了,在eShop项目中,有一个这样的接口IEventBus(eShopOnContainerssrcBuildingBlocksEventBusEventBusAbstractions)
public interface IEventBus
{
void Subscribe<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void Publish(IntegrationEvent @event);
}
我们可以看到这个接口定义了EventBus所需的一些操作, 对比大神的EventBus,相关功能都是一致的,我们看下它的实现类:EventBusRabbitMQ,从名字上可以看出,这是一个通过RabbitMQ来进行管理的EventBus,我们可以看到它使用了IEventBusSubscriptionsManager进行订阅存储,也就是大神文中的:
private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;
微软在Demo中把其提取出了接口,把一些常用方法给提炼了出来,但是核心还是Dictionary<string, List<Delegate>>, 使用Dictionary进行Map映射。通过Subscribe和UnSubscribe进行订阅和取消,使用Publish方法进行发布操作。
public void Subscribe<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = typeof(T).Name;
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
}
}
_subsManager.AddSubscription<T, TH>(handler);
}
我们看到在订阅的时候,EventBus会检查下在Map中是否有相应的注册,如果没有的话首先回去RabbitMQ中创建一个新的channel进行绑定,随后在Map中进行注册映射。
UnSubscribe则直接从Map中取消映射,通过OnEventRemoved事件判断Map下此映射的subscriber是否为空,为空则从RabbitMQ中关闭channel。
在RabbitMQ的构造方法中,我们看到这样一个创建:CreateConsumerChannel(),这里创建了一个EventingBasicConsumer,当Queue中有新的消息时会通过ProcessEvent执行Map中注册的handler(subscribers),看图可能更清晰些:
在ProcessEvent方法中,回去Map中找寻subscribers,然后通过动态反射进行执行:
private async Task ProcessEvent(string eventName, string message)
{
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handlers = _subsManager.GetHandlersForEvent(eventName);
foreach (var handlerfactory in handlers)
{
var handler = handlerfactory.DynamicInvoke();
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
微软通过简单的代码解耦了Publisher和Subscribers之间的依赖关系,我们引用大神的总结:
应用
在catalog.api中,微软出现了EventBus,我在上一篇中也提到了,这是我的一个疑惑,因为在catalog中并没有订阅操作,直接执行了Publish操作,原先以为是一个空操作,后来看了Basket.Api我才知道为何微软要用RabbitMQ。
使用RabbitMQ,我们不仅是从类之间的解耦,更可以跨项目,跨语言,跨平台的解耦,publisher仅仅需要把消息体(IntegrationEvent)传送到RabbitMQ,Consumer从Queue中获取消息体,然后推送到Subscribers执行相应的操作。我们看下Basket.Api.Startup.cs:
protected virtual void ConfigureEventBus(IApplicationBuilder app)
{
var catalogPriceHandler = app.ApplicationServices
.GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();
var orderStartedHandler = app.ApplicationServices
.GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
(() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());
eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
(() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}
在这个方法里,我们看到了Subscribe操作,想想之前的提问有点搞笑,不过研究明白了也不错,对吧!
总结
今天我们看了EventBus在Demo中的应用,总结一下。
1、EventBus可以很好的解耦订阅者和发布者之间的依赖
2、使用RabbitMQ能够跨项目、跨平台、跨语言的解耦订阅者和发布者
虽然在Demo中我们看到对订阅者的管理是通过Dictionary内存的方式,所以我们的Subscribe仅仅只在Basket.Api中看到,但微软是通过IEventBusSubscriptionsManager接口定义的,我们可以通过自己的需求来进行定制,可以做成分布式的,比如使用memcached。
写在最后
每个月到下旬就会比较忙,所以文章发布会比较慢,但我也会坚持学习完eShop的,为了学习,我建了个群,大家可以进来一起学习,有什么建议和问题都可以进来哦。
eShop虽好,但不建议大家放到生产环境,毕竟是一个Demo,而且目前还是ALPHA版本,用来学习是一个很好的教材,这就是一个大杂烩,学习中你会学到很多新的东西,大家如果看好core的发展,可以一起研究下。
QQ群:376248054
- Flash背景透明的代码
- Maven私服Nexus3.x环境构建操作记录
- Mapx用xml创建点图层
- 编程语言之间的百舸争流
- Mysql连接错误:Lost connection to Mysql server at 'waiting for initial communication packet'
- 适应现代变化的数据架构
- Linux下修改系统编码的操作记录
- 微信公众平
- linq to xml复习
- web cache server方案比较:varnish、squid、nginx
- Nginx虚拟目录alias和root目录
- Nginx的https配置记录以及http强制跳转到https的方法梳理
- VPC下访问FTP的问题
- 分析车辆雨刮频次计算降雨量 大数据服务天气预报
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- android studio实现简单的计算器功能
- Android小程序实现个人信息管理系统
- Flutter 使用Navigator进行局部跳转页面的方法
- Android小程序实现简易QQ界面
- Android小程序实现音乐播放列表
- 详解Android10的分区存储机制(Scoped Storage)适配教程
- Android自定义View实现可拖拽缩放的矩形框
- Android实现掷骰子效果
- Android实现侧滑菜单DrawerLayout
- Android :okhttp+Springmvc文件解析器实现android向服务器上传照片
- 纯小白都能看懂的《单个神经元》、《随机梯度下降》、《逻辑与》
- 使用Flutter开发的抖音国际版实例代码详解
- vue-cropper裁剪图片
- Android 开发使用PopupWindow实现弹出警告框的复用类示例
- python初学者笔记—关于 random 和 time 模块