针对事件驱动架构的Spring Cloud Stream
今天我们要分享一个比较有意思的内容。就是如何通过spring cloud 的stream来改造一个微服务下事件驱动的框架。
为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次的操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。本文希望通过改造让cloud stream变成一个对事件驱动的微服务开发更友好更方便的事件驱动框架。
准备工作
我们还是通过spring initializr来新建一个项目吧:
如上,我们引入了web、stream kafka依赖。
然后生成项目并下载,打开项目开始我们的改造之旅吧。
然后我们来看看现在的spring cloud的版本:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
现在的版本是Camden.SR6。而我们今天要演示的stream是最新版本,所以我们得把cloud版本修改为Brixton.SR7:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
你也许会在一些事件源框架中,比如Axon中,看到以下类似代码:
public class MyEventHandler {
@EventHandler
public void handle(CustomerCreatedEvent event) {
...
}
@EventHandler
public void handle(AccountCreatedEvent event) {
...
}
}
没错,这其实就是事件源框架中最终所呈现出来的入口最核心的样子。
现在我们对spring cloud stream进行改造,让它变成一个真正的或者说像Axon那样的一个事件源框架。
Cloud Stream 现有处理事件的做法
在开始真正的改造之前,我们还是先看看spring cloud stream 1.1.2(也就是cloud版本为Camden.SR中的stream版本) 中的消息处理的基本样子:
@StreamListener(Sink.INPUT)
public void handle(Foo foo){
...
}
没错,就是一个通过@StreamListener注解的handle方法,就可监听到消息流了。
然后我们看看使用最新的Brixton.SR7版本spring cloud stream的样子:
@EnableBinding
class MyEventHandler {
@StreamListener
target=Sink.INPUT,condition="payload.eventType=='CustomerCreatedEvent'")
public void handleCustomerEvent(@Payload Event event) {
// handle the message</span>
}
@StreamListener
target=Sink.INPUT,condition="payload.eventType=='AccountCreatedEvent'")
public void handleAccountEvent(@Payload Event event) {
// handle the message</span>
}
}
通过上面的代码,我们知道spring cloud stream可以支持配置一个condition的属性来让不同的事件类型路由到不同的handle方法中来处理。其中condition里边的表达式叫做SpEL,就是spring 表达式,通过返回true或false来表示是否匹配。
另外上面的支持是4天前才发布的。也许就是为了支持最近炒得火热的CQRS+ES而发布的。
之前@StreamListener的源码是这样的:
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
public @interface StreamListener {
/**
* The name of the binding target (e.g. channel) that the method subscribes to.
*/
String value() default "";
}
发现没?只有一个value方法属性。
而最新的已经增加了condition条件。这显然是为了支持事件驱动的微服务开发而支持的。
我们点进去看看StreamListener新增加了什么:
发现新增了两个方法属性,一个是target,一个是condition。
而且描述也变成了含有“事件驱动”字样。
ok,现在我们已经知道了spring cloud stream的基本用法和代码样子。
最新版的做法已经算是一种不错的改进了。不过,从编程的语法上,它也许并没有我们想要的那么清晰。当然这只是一种个人的喜好,抑或是我们希望把改造成像Axon那样。
自定义注解
这里我们希望把spring cloud stream改造成一个像Axon那样的风格。因为这也许对于CQRS + ES 框架来说是一种比较理想的开发入口。
像下面这样:
@EnableEventHandling
class MyEventHandler {
@EventHandler("CustomerCreatedEvent")
public void handleCustomerEvent(@Payload Event event) {
// handle the message
}
@EventHandler("AccountCreatedEvent")
public void handleAccountEvent(@Payload Event event) {
// handle the message
}
}
我们既然想要上面的样子,那么就得新定义上面的这两个注解。
我们首先来封装一个@EventHandler注解吧:
@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
@AliasFor(annotation = StreamListener.class, attribute = "target")
String value() default "";
@AliasFor(annotation = StreamListener.class, attribute = "target")
String target() default Sink.INPUT;
@AliasFor(annotation = StreamListener.class, attribute = "condition")
String condition() default "";
}
我们把@StreamListener封装上面的注解内。
现在已经很接近了我们上面想要的样子了:
@EnableBinding
class MyEventHandler{
@EventHandler(condition="payload.eventType=='CustomerCreatedEvent'")
public void handleCustomerEvent(@Payload Event event) {
// handle the message
}
@EventHandler(condition="payload.eventType=='AccountCreatedEvent'")
public void handleAccountEvent(@Payload Event event) {
// handle the message
}
}
但,@EnableEventHandling这个注解还没有定义。现在我们来定义这个注解:
我们先来搞一个配置类(可横屏观看,排版效果更好):
@Configuration
public class EventHandlerConfig {
/*
* 用于允许spring cloud stream binder把事件路由到匹配的方法上的SpEL表达式
*/
private String eventHandlerSpelPattern = "payload.eventType=='%s'";
/**
* 在此bean中自定义processor,然后把eventType属性转成condition表达式
* @return
*/
@Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME)
public BeanPostProcessor streamListenerAnnotationBeanPostProcessor() {
return new StreamListenerAnnotationBeanPostProcessor() {
@Override
protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) {
Map<String, Object> attributes = new HashMap<>(
AnnotationUtils.getAnnotationAttributes(originalAnnotation));
if (StringUtils.hasText(originalAnnotation.condition())) {
String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition());
attributes.put("condition", spelExpression);
}
return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod);
}
};
}
}
然后我们,再新建@EnableEventHandling注解:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EnableBinding
@Import({EventHandlerConfig.class})
public @interface EnableEventHandling {
}
上面的注解我们只是把刚才的那个配置类import即可。
你也许发现了,其实spring boot中的很多类似@EnableXXXX的注解其实都是一个框架预定义好的配置类,然后在@EnableXXXX的中通过@Import注解导入就好了。本质上是一个配置类。
最后我们再把@EventHandler注解修改一下,把condition修改成eventType作为condition的一个别名:
@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
/**
* 方法所订阅的channel的名称
* @return 绑定目标的名称
*/
@AliasFor(annotation=StreamListener.class, attribute="condition")
String value() default "";
/**
* 方法所订阅对的channel的名称
* @return 绑定目标的名称
*/
@AliasFor(annotation=StreamListener.class, attribute="target")
String target() default Sink.INPUT;
/**
* 对 condition的封装
* @return SpEL 的表达式
*/
@AliasFor(annotation=StreamListener.class, attribute="condition")
String eventType() default "";
}
总结
通过上面一系列的spring算是“奇技淫巧”我们愣是把spring cloud stream改造成了一个CQRS和EventSourcing那样的事件驱动的全新框架。
@EnableEventHandling
class MyEventHandler{
@EventHandler("CustomerCreatedEvent")
public void handleCustomerEvent(@Payload Event event) {
// handle the message
}
@EventHandler("AccountCreatedEvent")
public void handleAccountEvent(@Payload Event event) {
// handle the message
}
}
上面改造的技术核心其实就是利用@EnableXxx的一贯做法,自定义注解。然后import一个configuration。然后configuration类中则实例化并注册一个
自定义BeanPostProcessor到context中。而这个自定义的BeanPostProcessor则是在postProcessAnnotation方法中拦截到使用@Import的当前注解@StreamListener,然后动态把要设置到转化后设置进去,从而实现了改造。
为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次都操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。通过改造后,开发事件驱动的微服务就变得更加的方便和友好。
本文只是对Spring Cloud Stream的入口做了一个简单的封装,并没有大动任何内部代码。也许你并不喜欢这样的风格。你完全可以使用最新的那种基于SpEL的默认做法。
另外有关CQRS以及Event Sourcing的内容,你可以移步:微服务业务开发三个难题-拆分、事务、查询(上)、微服务业务开发三个难题-拆分、事务、查询(下)。
- Docker网络解决方案-Flannel部署记录
- Nginx的location配置规则梳理
- 统计代码行数的方法梳理
- 如何在不影响asp.net默认安全性的前提下使用ckeditor/fckeditor?
- Linux下防御DDOS攻击的操作梳理
- Android新手之旅(8) ListView的使用
- 更换Ubuntu源为国内源的操作记录
- Android新手之旅(8) ListView的使用
- CKEditor/CKFinder升级心得
- Docker容器学习梳理-Dockerfile构建镜像
- 再谈web开中几种经典的大文件上传组件
- Nginx负载均衡中后端节点服务器健康检查的操作梳理
- Linux系统下CPU使用(load average)梳理
- 基于组件的.NET技术(5)
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Spring学习一、组件注册
- 复习 EL 表达式与 JSTL
- Spring 学习二、Bean生命周期相关注解
- 十分钟学会 HTML
- 聊一下会话跟踪技术
- 朝花夕拾之Matlab基础回顾:向量的点积、叉积、混合积
- 详解响应消息 response
- Kubernetes 1.19.0——deployment(3)
- Selenium-01-测试环境搭建使用
- Selenium-02-常用元素定位
- SpringBoot + Vue 前后端分离项目下载视频文件踩坑记录
- Selenium-03-常用方法
- 用Python里面的Xpath完成一个在线汇率转换器
- 详解请求消息 resquest
- Android中窗口Input事件接收