Spring Cloud Bus中的事件的订阅与发布(一)
年前最后一篇文章,提前祝大家新年快乐!
下面进入正文。Spring Cloud Bus用轻量级的消息代理将分布式系统的节点连接起来。这可以用来广播状态的该表(比如配置的改变)或者其他关联的指令。一个关键的想法是,总线就像是一个分布式Actuator,用于Spring Boot应用程序的扩展,但它也可以用作应用程序之间的通信通道。Spring Cloud提供了AMQP 传输的代理和Kafka启动Starters,对具有相同的基本功能集的其他传输组件的支持,也在未来的规划中。
Spring Cloud Bus
Spring Cloud Bus
是在Spring Cloud Stream
的基础上进行的封装,对于指定主题的消息的发布与订阅是通过Spring Cloud Stream
的具体binder实现。因此引入的依赖可以是spring-cloud-starter-bus-amqp
和spring-cloud-starter-bus-kafka
其中的一种,分别对应于binder的两种实现。根据上一节的基础应用,我们总结出Spring Cloud Bus
的主要功能如下两点:
- 对指定主题
springCloudBus
的消息订阅与发布。 - 事件监听,包括刷新事件、环境变更事件、远端应用的ack事件以及本地服务端发送事件等。
下面我们以这两方面作为主线,进行Spring Cloud Bus
的源码分析。本文主要针对事件的订阅户发布。
事件的订阅与发布
事件驱动模型
这部分需要读者首先了解下Spring的事件驱动模型。我们在这边简单介绍下设计的主要概念,帮助大家易于理解后面的内容。
event-source
Spring的事件驱动模型由三部分组成:
- 事件:ApplicationEvent,继承自JDK的EventObject,所有事件将继承它,并通过source得到事件源。
- 事件发布者:ApplicationEventPublisher及ApplicationEventMulticaster接口,使用这个接口,我们的Service就拥有了发布事件的能力。
- 事件订阅者:ApplicationListener,继承自JDK的EventListener,所有监听器将继承它。
事件的定义
Spring的事件驱动模型的事件定义均继承自ApplicationEvent
,Spring Cloud Bus
中有多个事件类,这些事件类都继承了一个重要的抽象类RemoteApplicationEvent
,我们看一下事件类的类图:
bus-event
涉及的事件类有:代表了对特定事件确认的事件AckRemoteApplicationEvent
、环境变更的事件EnvironmentChangeRemoteApplicationEvent
、刷新事件RefreshRemoteApplicationEvent
、发送事件 SentApplicationEvent
、以及未知事件UnknownRemoteApplicationEvent
。下面我们分别看一下这些事件的定义。
抽象基类:RemoteApplicationEvent
通过上面的类图,我们知道RemoteApplicationEvent
是其他事件类的基类,定义了事件对象的公共属性。
1@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") //序列化时使用子类的名称作为type
2@JsonIgnoreProperties("source") //序列化时,忽略 source
3public abstract class RemoteApplicationEvent extends ApplicationEvent {
4 private static final Object TRANSIENT_SOURCE = new Object();
5 private final String originService;
6 private final String destinationService;
7 private final String id;
8
9 protected RemoteApplicationEvent(Object source, String originService,
10 String destinationService) {
11 super(source);
12 this.originService = originService;
13 if (destinationService == null) {
14 destinationService = "**";
15 }
16
17 if (!"**".equals(destinationService)) {
18 if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1
19 && !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {
20 //destination的所有实例
21 destinationService = destinationService + ":**";
22 }
23 }
24 this.destinationService = destinationService;
25 this.id = UUID.randomUUID().toString();
26 }
27 ...
28}
在RemoteApplicationEvent
中定义了主要的三个通用属性事件的来源originService、事件的目的服务destinationService和随机生成的全局id。通过其构造方法可知,destinationService可以使用通配符的形式{serviceId}:{appContextId},两个变量都省略的话,则通知到所有服务的所有实例。只省略appContextId时,则对应的destinationService为相应serviceId的所有实例。另外,注解@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
对应于序列化时,使用子类的名称作为type;而@JsonIgnoreProperties("source")
表示序列化时,忽略source属性,source定义在JDK中的EventObject
。
EnvironmentChangeRemoteApplicationEvent
用于动态更新服务实例的环境属性,我们在基础应用中更新cloud.version
属性时,关联到该事件。
1public class EnvironmentChangeRemoteApplicationEvent extends RemoteApplicationEvent {
2
3 private final Map<String, String> values;
4
5 public EnvironmentChangeRemoteApplicationEvent(Object source, String originService,
6 String destinationService, Map<String, String> values) {
7 super(source, originService, destinationService);
8 this.values = values;
9 }
10 ...
11}
可以看到,EnvironmentChangeRemoteApplicationEvent
事件类的实现很简单。定义了Map类型的成员变量,key对应于环境变量名,而value对应更新后的值。
RefreshRemoteApplicationEvent
刷新远端应用配置的事件,用于接收远端刷新的请求。
1public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {
2 public RefreshRemoteApplicationEvent(Object source, String originService,
3 String destinationService) {
4 super(source, originService, destinationService);
5 }
6}
继承自抽象事件类RemoteApplicationEvent
,没有特别的成员属性。
AckRemoteApplicationEvent
确认远端应用事件,该事件表示一个特定的RemoteApplicationEvent
事件被确认。
1public class AckRemoteApplicationEvent extends RemoteApplicationEvent {
2
3 private final String ackId;
4 private final String ackDestinationService;
5 private Class<? extends RemoteApplicationEvent> event;
6
7 public AckRemoteApplicationEvent(Object source, String originService,
8 String destinationService, String ackDestinationService, String ackId,
9 Class<? extends RemoteApplicationEvent> type) {
10 super(source, originService, destinationService);
11 this.ackDestinationService = ackDestinationService;
12 this.ackId = ackId;
13 this.event = type;
14 }
15 ...
16
17 public void setEventName(String eventName) {
18 try {
19 event = (Class<? extends RemoteApplicationEvent>) Class.forName(eventName);
20 } catch (ClassNotFoundException e) {
21 event = UnknownRemoteApplicationEvent.class;
22 }
23 }
24}
该事件类在RemoteApplicationEvent
基础上,定义了成员属性ackId、ackDestinationService和event。
ackId和ackDestinationService,分别表示确认的时间的id和对应的目标服务。event对应事件类型,确认事件能够确认的必然是RemoteApplicationEvent
的子类,因此event属性设值时需要进行检查,如果转换出现异常,则定义为未知的事件类型。这些事件可以被任何需要统计总线事件响应的应用程序来监听。 它们的行为与普通的远程应用程序事件相似,即如果目标服务与本地服务ID匹配,则应用程序会在其上下文中触发该事件。
SentApplicationEvent
发送应用事件,表示系统中的某个地方发送了一个远端事件。
1@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
2@JsonIgnoreProperties("source")
3public class SentApplicationEvent extends ApplicationEvent {
4
5 private static final Object TRANSIENT_SOURCE = new Object();
6 private final String originService;
7 private final String destinationService;
8 private final String id;
9 private Class<? extends RemoteApplicationEvent> type;
10
11 protected SentApplicationEvent() {
12 // for serialization libs like jackson
13 this(TRANSIENT_SOURCE, null, null, null, RemoteApplicationEvent.class);
14 }
15
16 public SentApplicationEvent(Object source, String originService,
17 String destinationService, String id,
18 Class<? extends RemoteApplicationEvent> type) {
19 super(source);
20 this.originService = originService;
21 this.type = type;
22 if (destinationService == null) {
23 destinationService = "*";
24 }
25 if (!destinationService.contains(":")) {
26 // All instances of the destination unless specifically requested
27 destinationService = destinationService + ":**";
28 }
29 this.destinationService = destinationService;
30 this.id = id;
31 }
32 ...
33}
可以看到该事件类继承自ApplicationEvent
,它本身并不是一个RemoteApplicationEvent
事件,所以不会通过总线发送,而是在本地生成(多为响应远端事件)。想要审计远端事件的应用可以监听该事件,并且所有的AckRemoteApplicationEvent
事件中的id来源于相应的SentApplicationEvent
中定义的id。在其定义的成员属性中,相比于远端应用事件多了一个事件类型type,该类型限定于RemoteApplicationEvent
的子类。
UnknownRemoteApplicationEvent
未知的远端应用事件,也是RemoteApplicationEvent
事件类的子类。该事件类与之前的SentApplicationEvent
、AckRemoteApplicationEvent
有关,当序列化时遇到事件的类型转换异常,则自动构造成一个未知的远端应用事件。
事件监听器以及消息的订阅与发布待后续更新。。
参考
Spring Cloud Bus-v1.3.3
- Log4Net使用心得
- nginx通过https方式反向代理多实例tomcat
- Linux系统下yum镜像源环境部署记录
- 特斯拉vs凯迪拉克vs奔驰:三大汽车自动驾驶系统比拼
- Centos下添加静态路由(临时和永久有效)的操作记录
- python如何保证输入键入数字
- 微信小程序自定义数据分析试水
- 挂载银行前置机Ukey到windows server2012虚拟机的操作记录
- 文件上传速度查询方法
- “AS3.0高级动画编程”学习:第三章等角投影(上)
- su: 无法设置用户ID: 资源暂时不可用
- NumPY学习笔记
- LVS负载均衡下session共享的实现方式-持久化连接
- Centos6.9下RabbitMQ集群部署记录
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- CSS垂直居中的七个方法
- 「译」更快的 async 函数和 promises
- Python面向对象基础
- ubuntu16.04配置samba解决linux的svn使用舒适问题
- Erlang学习笔记(1)
- 结合公司现状浅谈CMDB
- SQLAlchemy使用
- 常用Oracle SQL集锦
- Python、PyGame游戏项目
- windows 认证机制
- 谷歌地球引擎python文档(GEE_python_API)
- react基础
- 基于ASP.NET Core 3.x的端点路由(Endpoint Routing)实现控制器(Controller)和操作(Action)分离的接口服务
- 流量转发映射
- 什么情况用ArrayList or LinkedList呢?