基于可靠消息方案的分布式事务(四):接入Lottor服务
在上一篇文章中,通过Lottor Sample介绍了快速体验分布式事务Lottor。本文将会介绍如何将微服务中的生产方和消费方服务接入Lottor。
场景描述
- 生产方:User服务
- 消费方:Auth服务
- 事务管理方:Lottor Server
Lottor-Samples中的场景为:客户端调用User服务创建一个用户,用户服务的user表中增加了一条用户记录。除此之外,还会调用Auth服务创建该用户对应的角色和权限信息。
我们通过上面的请求流程图入手,介绍接入Lottor服务。当您启动好docker-compose中的组件时,会创建好两个服务对应的user和auth数据库。其中User和Auth服务所需要的初始化数据已经准备好,放在各自的classpath下,服务在启动时会自动初始化数据库,所需要的预置数据(如角色、权限信息)也放在sql文件中。
Lottor客户端API
Lottor Client中提供了一个ExternalNettyService
接口,用以发送三类消息到Lottor Server:
- 预提交消息
- 确认提交消息
- 消费完成消息
1public interface ExternalNettyService {
2
3 /**
4 * pre-commit msgs
5 *
6 * @param preCommitMsgs
7 */
8 public Boolean preSend(List<TransactionMsg> preCommitMsgs);
9
10 /**
11 * confirm msgs
12 *
13 * @param success
14 */
15 public void postSend(Boolean success, Object message);
16
17 /**
18 * msgs after consuming
19 *
20 * @param msg
21 * @param success
22 */
23 public void consumedSend(TransactionMsg msg, Boolean success);
24}
预发送#preSend
的入参为预提交的消息列表,一个生产者可能有对应的多个消费者;确认提交#postSend
的入参为生产方本地事务执行的状态,如果失败,第二个参数记录异常信息;#consumedSend
为消费方消费成功的发送的异步消息,第一个入参为其接收到的事务消息,第二个为消费的状态。
事务消息TransactionMsg
1public class TransactionMsg implements Serializable {
2 /**
3 * 用于消息的追溯
4 */
5 private String groupId;
6
7 /**
8 * 事务消息id
9 */
10 @NonNull
11 private String subTaskId;
12
13 /**
14 * 源服务,即调用发起方
15 */
16 private String source;
17
18 /**
19 * 目标方服务
20 */
21 private String target;
22
23 /**
24 * 执行的方法,适配成枚举
25 */
26 private String method;
27
28 /**
29 * 参数,即要传递的内容,可以为null
30 */
31 private Object args;
32
33 /**
34 * 创建时间
35 */
36 private Long createTime = Timestamp.valueOf(DateUtils.getCurrentDateTime()).getTime();
37
38 /**
39 * 操作结果信息
40 */
41 private String message;
42
43 /**
44 * 更新时间
45 */
46 private Long updateTime;
47
48 /**
49 * 是否消费,默认为否
50 *
51 * {@linkplain com.blueskykong.lottor.common.enums.ConsumedStatus}
52 */
53 private int consumed = ConsumedStatus.UNCONSUMED.getStatus();
54
55 ...
56}
在构建事务消息时,事务消息id、源服务、目标服务、目标方法和目标方法的传参args都是必不可少的。消费方消费完之后,将会设置consumed的状态,出现异常将会设置异常message信息。
生产方-User服务
创建用户时,需要创建对应的角色。生产方接入分为三步:
- 发送预提交消息
- 执行本地事务
- 发送确认提交的消息
引入依赖
首先,需要引入Lottor客户端的依赖:
1 <dependency>
2 <groupId>com.blueskykong</groupId>
3 <artifactId>lottor-starter</artifactId>
4 <version>2.0.0-SNAPSHOT</version>
5 </dependency>
发起调用
在UserService
中定义了创建用户的方法,我们需要在执行本地事务之前,构造事务消息并预发送到Lottor Server(对应流程图中的步骤1)。如果遇到预发送失败,则直接停止本地事务的执行。如果本地事务执行成功(对应步骤3),则发送confirm消息,否则发送回滚消息到Lottor Server(对应步骤4)。
1@Service
2public class UserServiceImpl implements UserService {
3
4 private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);
5
6 //注入ExternalNettyService
7 @Autowired
8 private ExternalNettyService nettyService;
9
10 @Autowired
11 private UserMapper userMapper;
12
13 @Override
14 @Transactional
15 public Boolean createUser(UserEntity userEntity, StateEnum flag) {
16 UserRoleDTO userRoleDTO = new UserRoleDTO(RoleEnum.ADMIN, userEntity.getId());
17 //构造消费方的TransactionMsg
18 TransactionMsg transactionMsg = new TransactionMsg.Builder()
19 .setSource(ServiceNameEnum.TEST_USER.getServiceName())
20 .setTarget(ServiceNameEnum.TEST_AUTH.getServiceName())
21 .setMethod(MethodNameEnum.AUTH_ROLE.getMethod())
22 .setSubTaskId(IdWorkerUtils.getInstance().createUUID())
23 .setArgs(userRoleDTO)
24 .build();
25
26 if (flag == StateEnum.CONSUME_FAIL) {
27 userRoleDTO.setUserId(null);
28 transactionMsg.setArgs(userRoleDTO);
29 }
30
31 //发送预处理消息
32 if (!nettyService.preSend(Collections.singletonList(transactionMsg))) {
33 return false;//预发送失败,本地事务停止执行
34 }
35
36 //local transaction本地事务
37 try {
38 LOGGER.debug("执行本地事务!");
39 if (flag != StateEnum.PRODUCE_FAIL) {
40 userMapper.saveUser(userEntity);
41 } else {
42 userMapper.saveUserFailure(userEntity);
43 }
44 } catch (Exception e) {
45 //本地事务异常,发送回滚消息
46 nettyService.postSend(false, e.getMessage());
47 LOGGER.error("执行本地事务失败,cause is 【{}】", e.getLocalizedMessage());
48 return false;
49 }
50 //发送确认消息
51 nettyService.postSend(true, null);
52 return true;
53 }
54
55}
代码如上所示,实现不是很复杂。本地事务执行前,必然已经成功发送了预提交消息,当本地事务执行成功,Lottor Client将会记录本地事务执行的状态,避免异步发送的确认消息的丢失,便于后续的Lottor Server回查。
配置文件
1lottor:
2 enabled: true
3 core:
4 cache: true
5 cache-type: redis
6 tx-redis-config:
7 host-name: localhost
8 port: 6379
9 serializer: kryo
10 netty-serializer: kryo
11 tx-manager-id: lottor
12
13spring:
14 datasource:
15 url: jdbc:mysql://localhost:3306/user?autoReconnect=true&useSSL=false
16 continue-on-error: false
17 initialize: true
18 max-active: 50
19 max-idle: 10
20 max-wait: 10000
21 min-evictable-idle-time-millis: 60000
22 min-idle: 8
23 name: dbcp1
24 test-on-borrow: false
25 test-on-return: false
26 test-while-idle: false
27 time-between-eviction-runs-millis: 5000
28 username: root
29 password: _123456_
30 schema[0]: classpath:/user.sql
如上为User服务的部分配置文件,lottor.enabled: true
开启Lottor 客户端服务。cache 开启本地缓存记录。cache-type指定了本地事务记录的缓存方式,可以为redis或者MongoDB。serializer为序列化和反序列化方式。tx-manager-id为对应的Lottor Server的服务名。
Lottor Server
多个微服务的接入,对Lottor Server其实没什么侵入性。这里需要注意的是,TransactionMsg
中设置的source
和target
字段来源于lottor-common中的com.blueskykong.lottor.common.enums.ServiceNameEnum
:
1public enum ServiceNameEnum {
2 TEST_USER("user", "tx-user"),
3 TEST_AUTH("auth", "tx-auth");
4 //服务名
5 String serviceName;
6 //消息中间件的topic
7 String topic;
8
9 ...
10}
消息中间件的topic是在服务名的基础上,加上tx-
前缀。消费方在设置订阅的topic时,需要按照这样的规则命名。Lottor Server完成的步骤为上面流程图中的2(成功收到预提交消息)和5(发送事务消息到指定的消费方),除此之外,还会定时轮询异常状态的事务组和事务消息。
消费方-Auth服务
引入依赖
1 <dependency>
2 <groupId>com.blueskykong</groupId>
3 <artifactId>lottor-starter</artifactId>
4 <version>2.0.0-SNAPSHOT</version>
5 </dependency>
6
7 <dependency>
8 <groupId>org.springframework.cloud</groupId>
9 <artifactId>spring-cloud-stream</artifactId>
10 </dependency>
11 <dependency>
12 <groupId>org.springframework.cloud</groupId>
13 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
14 </dependency>
引入了Lottor客户端starter,spring-cloud-stream用于消费方接收来自Lottor Server的事务消息。
topic监听
1@Component
2@EnableBinding({TestSink.class})
3public class ListenerStream extends InitStreamHandler {
4 private static final Logger LOGGER = LoggerFactory.getLogger(ListenerStream.class);
5
6 @Autowired
7 private RoleUserService roleUserService;
8
9 @Autowired
10 public ListenerStream(ExternalNettyService nettyService, ObjectSerializer objectSerializer) {
11 super(nettyService, objectSerializer);
12 }
13
14 @StreamListener(TestSink.INPUT)
15 public void processSMS(Message message) {
16 //解析接收到的TransactionMsg
17 process(init(message));
18 }
19
20 @Transactional
21 public void process(TransactionMsg message) {
22 try {
23 if (Objects.nonNull(message)) {
24 LOGGER.info("===============consume notification message: =======================" + message.toString());
25 if (StringUtils.isNotBlank(message.getMethod())) {
26 MethodNameEnum method = MethodNameEnum.fromString(message.getMethod());
27 LOGGER.info(message.getMethod());
28 //根据目标方法进行处理,因为一个服务可以对应多个生产方,有多个目标方法
29 switch (method) {
30 case AUTH_ROLE:
31 UserRoleDTO userRoleDTO = (UserRoleDTO) message.getArgs();
32 RoleEntity roleEntity = roleUserService.getRole(userRoleDTO.getRoleEnum().getName());
33 String roleId = "";
34 if (Objects.nonNull(roleEntity)) {
35 roleId = roleEntity.getId();
36 }
37 roleUserService.saveRoleUser(new UserRole(UUID.randomUUID().toString(), userRoleDTO.getUserId(), roleId));
38 LOGGER.info("matched case {}", MethodNameEnum.AUTH_ROLE);
39
40 break;
41 default:
42 LOGGER.warn("no matched consumer case!");
43 message.setMessage("no matched consumer case!");
44 nettyService.consumedSend(message, false);
45 return;
46 }
47 }
48 }
49 } catch (Exception e) {
50 //处理异常,发送消费失败的消息
51 LOGGER.error(e.getLocalizedMessage());
52 message.setMessage(e.getLocalizedMessage());
53 nettyService.consumedSend(message, false);
54 return;
55 }
56 //成功消费
57 nettyService.consumedSend(message, true);
58 return;
59 }
60}
消费方监听指定的topic(如上实现中,为test-input中指定的topic,spring-cloud-stream更加简便调用的接口),解析接收到的TransactionMsg。根据目标方法进行处理,因为一个服务可以对应多个生产方,有多个目标方法。执行本地事务时,Auth会根据TransactionMsg中提供的args作为入参执行指定的方法(对应步骤7),最后向Lottor Server发送消费的结果(对应步骤8)。
配置文件
1---
2spring:
3 cloud:
4 stream:
5 bindings:
6 test-input:
7 group: testGroup
8 content-type: application/x-java-object;type=com.blueskykong.lottor.common.entity.TransactionMsgAdapter
9 destination: tx-auth
10 binder: rabbit1
11 binders:
12 rabbit1:
13 type: rabbit
14 environment:
15 spring:
16 rabbitmq:
17 host: localhost
18 port: 5672
19 username: guest
20 password: guest
21 virtual-host: /
22
23---
24lottor:
25 enabled: true
26 core:
27 cache: true
28 cache-type: redis
29 tx-redis-config:
30 host-name: localhost
31 port: 6379
32 serializer: kryo
33 netty-serializer: kryo
34 tx-manager-id: lottor
配置和User服务的差别在于增加了spring-cloud-stream的配置,配置了rabbitmq的相关信息,监听的topic为tx-auth。
小结
本文主要通过User和Auth的示例服务讲解了如何接入Lottor客户端。生产方构造涉及的事务消息,首先预发送事务消息到Lottor Server,成功预提交之后便执行本地事务;本地事务执行完则异步发送确认消息(可能成功,也可能失败)。Lottor Server根据接收到的确认消息决定是否将对应的事务组消息发送到对应的消费方。Lottor Server还会定时轮询异常状态的事务组和事务消息,以防因为异步的确认消息发送失败。消费方收到事务消息之后,将会根据目标方法执行对应的处理操作,最后将消费结果异步回写到Lottor Server。
Lottor项目地址:https://github.com/keets2012/Lottor
- 【腾讯云的1001种玩法】十分钟轻松搞定云架构之三:更大的存储
- Socket学习总结系列(一) -- IM & Socket
- 【腾讯云的1001种玩法】十分钟轻松搞定云架构 之四:替你分心的负载均衡
- 【腾讯云的1001种玩法】十分钟搞定云架构 · 什么是Bucket、什么是Object
- 【腾讯云的1001种玩法】十分钟轻松搞定云架构 · 负载均衡的最佳实践
- 【黑客浅析】像黑客一样思考
- 【腾讯云的1001种玩法】 十分钟轻松搞定云架构 · 负载均衡的几种均衡模式
- ASP.NET Web API的Controller是如何被创建的?
- 【腾讯云的1001种玩法】十分钟轻松搞定云架构:COS的两种上传模式
- 物流行业迎变革,云计算是基础,大数据是关键
- Socket学习总结系列(二) -- CocoaAsyncSocket
- 比特币勒索病毒肆虐,腾讯云安全专家给你支招
- HTML5 直播协议之 WebSocket 和 MSE
- IoC在ASP.NET Web API中的应用
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Linux中没有rc.local文件的完美解决方法
- 详解linux lcd驱动编写
- Linux下安装telnet的方法
- Linux 安装二进制MySQL 及 破解MySQL密码的方法
- Linux创建进程达到65535的方法
- SSH 上传文件及文件夹到linux服务器的方法
- apache tika检测文件是否损坏的方法
- Linux下二进制编译安装MySql centos7的教程
- Linux 6 修改ssh默认远程端口号的操作步骤
- 基于python的Linux系统指定进程性能监控思路详解
- ubuntu下的虚拟环境中安装Django的操作方法
- 详解linux下umask的使用
- Linux下设置每天自动备份数据库的方法
- Linux常用命令之chmod修改文件权限777和754
- 解决CentOS 7升级Python到3.6.6后yum出错问题总结