分布式事务(五)源码详解

时间:2019-06-04
本文章向大家介绍分布式事务(五)源码详解,主要包括分布式事务(五)源码详解使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

引子

本节我们将会从上一节的”简单样例“入手:Spring Boot+Atomikos(TM)+Mybatis(ORM)+Mysql(DB),深入源码,看看这个分布式事务是怎么定义、执行的。

先来回忆一下第二节讲的JTA规范,如下图。Atomikos是什么角色?起到什么作用?

角色:

Atomikos根本上是一个事务管理器(TM)也就是JTA模型的核心,上图扇形的中心位置。

作用:

TM调用 【Resource Manager资源管理器】 的XAResource接口来实现事务操作。

TM依赖 【Application Server应用服务器】 的TransactionManager接口当然如果服务器不支持事务管理,自然也就只能使用第三方包,例如Atomikos

TM依赖 【Application应用程序】 设置事务边界、属性,application调用UserTransaction接口控制事务开始、提交、回滚。

一、bean定义

1.1 JtaTransactionManager

org.springframework.transaction.jta.JtaTransactionManager类是spring提供的分布式事务管理器。

JtaTransactionManager类图如下:

实现了接口如下:

  • PlatformTransactionManager :获取事务,提交事务,回滚事务
  • TransactionFactory:创建事务
  • InitializingBean:初始化bean

JtaTransactionManager实现了InitializingBean接口的afterPropertiesSet()方法,处于bean生命周期的容器初始化->实例化期->初始化中期,如下图:

下面我们看一下JtaTransactionManager在bean初始化中期InitializingBean接口的afterPropertiesSet()做了什么:

 1 /**
 2  * Initialize the UserTransaction as well as the TransactionManager handle.
 3  * @see #initUserTransactionAndTransactionManager()
 4  */
 5 @Override
 6 public void afterPropertiesSet() throws TransactionSystemException {
 7     initUserTransactionAndTransactionManager();
 8     checkUserTransactionAndTransactionManager();
 9     initTransactionSynchronizationRegistry();
10 }
  • 1.initUserTransactionAndTransactionManager:初始化UserTransactionTransactionManager接口。主要是如果没有定义的话,可以支持JNDI。

  • 2.checkUserTransactionAndTransactionManager:校验2个接口是否存在。UserTransaction必须定义,TransactionManager可以不定义。

      源码如下:

      

      对应控制台打印:

o.s.t.jta.JtaTransactionManager          : Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@614aeccc
o.s.t.jta.JtaTransactionManager          : Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@5116ac09
  • 3.initTransactionSynchronizationRegistry:初始化事务同步注册,这个不使用JNDI的话没啥用。

上一节分布式事务(三)简单样例中我们配置了JtaTransactionManagerConfig类,如下:

 1 package study.config.datasource;
 2 
 3 import com.atomikos.icatch.jta.UserTransactionImp;
 4 import com.atomikos.icatch.jta.UserTransactionManager;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 import org.springframework.transaction.jta.JtaTransactionManager;
 8 
 9 import javax.transaction.UserTransaction;
10 
11 /**
12  * 事务管理器配置类
13  *
14  * @author denny
15  */
16 @Configuration
17 public class JtaTransactionManagerConfig {
18 
19     @Bean(name = "atomikosTransactionManager")
20     public JtaTransactionManager regTransactionManager() {
21         UserTransactionManager userTransactionManager = new UserTransactionManager();
22         UserTransaction userTransaction = new UserTransactionImp();
23         return new JtaTransactionManager(userTransaction, userTransactionManager);
24     }
25 }

 如上图,我们定义了一个name = "atomikosTransactionManager"的bean,具体类型为JtaTransactionManager。其中构造了2个实现类UserTransactionImp(javax.transaction.UserTransaction接口)、UserTransactionManager(javax.transaction.TransactionManager接口)。并用这2个实现类构造了一个JtaTransactionManager。

1.UserTransaction接口

提供给用户操控事务的:开启,提交,回滚等等。源码如下:

2 TransactionManager接口

源码如下:

相比UserTransactionTransactionManager接口多了接口的挂起、恢复、获取事务3个接口。这3个方法明显是留给系统自己调用的。

1.2 AtomikosDataSourceBean

Spring 为Atomikos定制了一个org.springframework.boot.jta.atomikos.AtomikosDataSourceBean,提供了bean生命周期的一些接口:

  1. BeanNameAware:设置bean名称
  2. InitializingBean:初始化bean
  3. DisposableBean:销毁bean

我们只需要定义这个bean即可轻松使得spring来维护。

com.atomikos.jdbc.AtomikosDataSourceBean类图如下:

其中核心接口:

DataSource接口:getConnection获取数据库连接

ConnectionPoolProperties接口:用于载入连接池的属性

二、源码剖析

2.1 自动配置类

老套路哈,spring boot就这么点花花肠子,既然使用@Transactional这种注解的方式,那么我们就从springboot 容器启动时的自动配置载入(spring boot容器启动详解)开始看。在/META-INF/spring.factories中配置文件中查找,如下图:

载入2个关于事务的自动配置类: 

org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
org.springframework.boot.autoconfigure.transaction.jta.JtaAutoConfiguration,

由于本文是分布式事务,故2个配置文件都生效了,我们先看JtaAutoConfiguration

2.2 JtaAutoConfiguration

 1 /**
 2  * {@link EnableAutoConfiguration Auto-configuration} for JTA.
 3  *
 4  * @author Josh Long
 5  * @author Phillip Webb
 6  * @since 1.2.0
 7  */
 8 @ConditionalOnClass(javax.transaction.Transaction.class)
 9 @ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true)
10 @AutoConfigureBefore({ XADataSourceAutoConfiguration.class,
11         ActiveMQAutoConfiguration.class, ArtemisAutoConfiguration.class,
12         HibernateJpaAutoConfiguration.class })
13 @Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class,
14         AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class })
15 @EnableConfigurationProperties(JtaProperties.class)
16 public class JtaAutoConfiguration {
17 
18 }

如上,JtaAutoConfiguration这个类竟然是个空壳,只有一堆注解,挑几个重要的讲一讲:

1.@ConditionalOnClass(javax.transaction.Transaction.class):代表类路径下存在javax.transaction.Transaction.class这个类,那么JtaAutoConfiguration生效。

2.@ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true),自动开启spring.jta.enabled=true.

3.@Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class, AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class }),又是spring套路哈,用来导入类。这里导入了4个配置类,可见支持4种第三方事务管理器。AtomikosJtaConfiguration.class自然就是Atomikos了。

AtomikosJtaConfiguration.class这个配置类

 1 @Configuration
 2 @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
 3 @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
 4 @ConditionalOnMissingBean(PlatformTransactionManager.class)
 5 class AtomikosJtaConfiguration {
 6 
 7     private final JtaProperties jtaProperties;
 8 
 9     private final TransactionManagerCustomizers transactionManagerCustomizers;
10 
11     AtomikosJtaConfiguration(JtaProperties jtaProperties,
12             ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
13         this.jtaProperties = jtaProperties;
14         this.transactionManagerCustomizers = transactionManagerCustomizers
15                 .getIfAvailable();
16     }
17 
18     @Bean(initMethod = "init", destroyMethod = "shutdownForce")
19     @ConditionalOnMissingBean(UserTransactionService.class)
20     public UserTransactionServiceImp userTransactionService(
21             AtomikosProperties atomikosProperties) {
22         Properties properties = new Properties();
23         if (StringUtils.hasText(this.jtaProperties.getTransactionManagerId())) {
24             properties.setProperty("com.atomikos.icatch.tm_unique_name",
25                     this.jtaProperties.getTransactionManagerId());
26         }
27         properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir());
28         properties.putAll(atomikosProperties.asProperties());
29         return new UserTransactionServiceImp(properties);
30     }
31 
32     private String getLogBaseDir() {
33         if (StringUtils.hasLength(this.jtaProperties.getLogDir())) {
34             return this.jtaProperties.getLogDir();
35         }
36         File home = new ApplicationHome().getDir();
37         return new File(home, "transaction-logs").getAbsolutePath();
38     }
39 
40     @Bean(initMethod = "init", destroyMethod = "close")
41     @ConditionalOnMissingBean
42     public UserTransactionManager atomikosTransactionManager(
43             UserTransactionService userTransactionService) throws Exception {
44         UserTransactionManager manager = new UserTransactionManager();
45         manager.setStartupTransactionService(false);
46         manager.setForceShutdown(true);
47         return manager;
48     }
49 
50     @Bean
51     @ConditionalOnMissingBean(XADataSourceWrapper.class)
52     public AtomikosXADataSourceWrapper xaDataSourceWrapper() {
53         return new AtomikosXADataSourceWrapper();
54     }
55 
56     @Bean
57     @ConditionalOnMissingBean
58     public static AtomikosDependsOnBeanFactoryPostProcessor atomikosDependsOnBeanFactoryPostProcessor() {
59         return new AtomikosDependsOnBeanFactoryPostProcessor();
60     }
61 
62     @Bean
63     public JtaTransactionManager transactionManager(UserTransaction userTransaction,
64             TransactionManager transactionManager) {
65         JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(
66                 userTransaction, transactionManager);
67         if (this.transactionManagerCustomizers != null) {
68             this.transactionManagerCustomizers.customize(jtaTransactionManager);
69         }
70         return jtaTransactionManager;
71     }
72 
73     @Configuration
74     @ConditionalOnClass(Message.class)
75     static class AtomikosJtaJmsConfiguration {
76 
77         @Bean
78         @ConditionalOnMissingBean(XAConnectionFactoryWrapper.class)
79         public AtomikosXAConnectionFactoryWrapper xaConnectionFactoryWrapper() {
80             return new AtomikosXAConnectionFactoryWrapper();
81         }
82 
83     }
84 
85 }

2.3 TransactionAutoConfiguration

这里和本地事务分析过程一致,就不再重复,飞机票spring事务详解(三)源码详解,一直看到第二节结束.这里只截个图:

最终源码调用具体事务管理器的PlatformTransactionManager接口的3个方法:

1 public interface PlatformTransactionManager {
2     // 获取事务状态
3     TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
4   // 事务提交
5     void commit(TransactionStatus status) throws TransactionException;
6   // 事务回滚
7     void rollback(TransactionStatus status) throws TransactionException;
8 }

三、核心源码

核心实现类图:

如上提所示,PlatformTransactionManager顶级接口定义了最核心的事务管理方法,下面一层是AbstractPlatformTransactionManager抽象类,实现了PlatformTransactionManager接口的方法并定义了一些抽象方法,供子类拓展。最下面一层是2个经典事务管理器:

1.DataSourceTransactionmanager: 即本地单资源事务管理器。

2.JtaTransactionManager: 即多资源事务管理器(又叫做分布式事务管理器),其实现了JTA规范,使用XA协议进行两阶段提交。

我们这里自然是JTA分布式环境,我们只需要从JtaTransactionManager这个实现类入手即可。

3.1 getTransaction获取事务

AbstractPlatformTransactionManager实现了getTransaction()方法如下:

 1 @Override
 2     public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
 3         Object transaction = doGetTransaction();
 4 
 5         // Cache debug flag to avoid repeated checks.
 6         boolean debugEnabled = logger.isDebugEnabled();
 7 
 8         if (definition == null) {
 9             // Use defaults if no transaction definition given.
10             definition = new DefaultTransactionDefinition();
11         }
12       // 如果当前已经存在事务
13         if (isExistingTransaction(transaction)) {
14             // 根据不同传播机制不同处理
15             return handleExistingTransaction(definition, transaction, debugEnabled);
16         }
17 
18         // 超时不能小于默认值
19         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
20             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
21         }
22 
23         // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
24         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
25             throw new IllegalTransactionStateException(
26                     "No existing transaction found for transaction marked with propagation 'mandatory'");
27         }// 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
28         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
29                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
30                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
31             SuspendedResourcesHolder suspendedResources = suspend(null);
32             if (debugEnabled) {
33                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
34             }
35             try {// 是否需要新开启同步// 开启// 开启
36                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
37                 DefaultTransactionStatus status = newTransactionStatus(
38                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
39                 doBegin(transaction, definition);// 开启新事务
40                 prepareSynchronization(status, definition);//预备同步
41                 return status;
42             }
43             catch (RuntimeException ex) {
44                 resume(null, suspendedResources);
45                 throw ex;
46             }
47             catch (Error err) {
48                 resume(null, suspendedResources);
49                 throw err;
50             }
51         }
52         else {
53             // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,创建“空”事务:没有实际事务,但可能是同步。警告:定义了隔离级别,但并没有真实的事务初始化,隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
54             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
55                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
56                         "isolation level will effectively be ignored: " + definition);
57             }
58             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
59             return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
60         }
61     }

上图核心步骤就是:

  • 1.doGetTransaction():获取事务
  • 2.doBegin:准备工作

3.1.1 JtaTransactionManager的doGetTransaction()

其实也就是把UserTransaction封装成一个JtaTransactionObject返回。

 1     @Override
 2     protected Object doGetTransaction() {
 3         UserTransaction ut = getUserTransaction();
 4         if (ut == null) {
 5             throw new CannotCreateTransactionException("No JTA UserTransaction available - " +
 6                     "programmatic PlatformTransactionManager.getTransaction usage not supported");
 7         }
 8         if (!this.cacheUserTransaction) {
 9             ut = lookupUserTransaction(
10                     this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME);
11         }
12         return doGetJtaTransaction(ut);
13     }
14 
15     /**
16      * Get a JTA transaction object for the given current UserTransaction.
17      * <p>Subclasses can override this to provide a JtaTransactionObject
18      * subclass, for example holding some additional JTA handle needed.
19      * @param ut the UserTransaction handle to use for the current transaction
20      * @return the JtaTransactionObject holding the UserTransaction
21      */
22     protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) {
23         return new JtaTransactionObject(ut);
24     }

3.1.2 JtaTransactionManager.doBegin

 1 @Override
 2     protected void doBegin(Object transaction, TransactionDefinition definition) {
 3         JtaTransactionObject txObject = (JtaTransactionObject) transaction;
 4         try {
 5             doJtaBegin(txObject, definition);
 6         }
 7         catch (NotSupportedException ex) {
 8             // assume nested transaction not supported
 9             throw new NestedTransactionNotSupportedException(
10                     "JTA implementation does not support nested transactions", ex);
11         }
12         catch (UnsupportedOperationException ex) {
13             // assume nested transaction not supported
14             throw new NestedTransactionNotSupportedException(
15                     "JTA implementation does not support nested transactions", ex);
16         }
17         catch (SystemException ex) {
18             throw new CannotCreateTransactionException("JTA failure on begin", ex);
19         }
20     }

调用JtaTransactionManager.doJtaBegin:

1 protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
2             throws NotSupportedException, SystemException {
3         
4         applyIsolationLevel(txObject, definition.getIsolationLevel());
5         int timeout = determineTimeout(definition);
6         applyTimeout(txObject, timeout);
7         txObject.getUserTransaction().begin();
8     }

UserTransactionImp.begin->TransactionManagerImp.begin

 1 public void begin ( int timeout ) throws NotSupportedException,
 2             SystemException
 3     {
 4         CompositeTransaction ct = null;
 5         ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;
 6         
 7         ct = compositeTransactionManager.getCompositeTransaction();
 8         if ( ct != null && ct.getProperty (  JTA_PROPERTY_NAME ) == null ) {
 9             LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
10                     " (will be resumed after JTA transaction ends)" );
11             ct = compositeTransactionManager.suspend();
12             resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
13         }
14         
15         try {
16             ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
17             if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
18             if ( ct.isRoot () && getDefaultSerial () )
19                 ct.getTransactionControl ().setSerial ();
20             ct.setProperty ( JTA_PROPERTY_NAME , "true" );
21         } catch ( SysException se ) {
22             String msg = "Error in begin()";
23             LOGGER.logWarning( msg , se );
24             throw new ExtendedSystemException ( msg , se
25                     .getErrors () );
26         }
27         recreateCompositeTransactionAsJtaTransaction(ct);
28     }

createCompositeTransaction创建混合事务

 1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
 2     {
 3         Stack errors = new Stack();
 4         CompositeTransaction ct = null , ret = null;
 5         // 获取当前线程绑定的事务
 6         ct = getCurrentTx ();
       // 当前线程不存在事务
7 if ( ct == null ) {
// 创建组合事务
8 ret = service_.createCompositeTransaction ( timeout ); 9 if(LOGGER.isInfoEnabled()){ 10 LOGGER.logInfo("createCompositeTransaction ( " + timeout + " ): " 11 + "created new ROOT transaction with id " + ret.getTid ()); 12 }
        // 当前线程存在事务
13 } else { 14 if(LOGGER.isInfoEnabled()) LOGGER.logInfo("createCompositeTransaction ( " + timeout + " )");
          // 创建子事务
15 ret = ct.getTransactionControl ().createSubTransaction (); 16 17 } 18 Thread thread = Thread.currentThread ();
       // 绑定当前线程和事务的2个映射map
19 setThreadMappings ( ret, thread ); 20 21 return ret; 22 }

如果当前线程不存在事务,创建组合事务。如果当前线程存在事务,创建子事务。

调用TransactionServiceImp的createCompositeTransaction创建混合事务

 1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
 2     {
 3         if ( !initialized_ ) throw new IllegalStateException ( "Not initialized" );
 4 
 5         if ( maxNumberOfActiveTransactions_ >= 0 && 
 6              tidToTransactionMap_.size () >= maxNumberOfActiveTransactions_ ) {
 7             throw new IllegalStateException ( "Max number of active transactions reached:" + maxNumberOfActiveTransactions_ );
 8         }
 9         
10         String tid = tidmgr_.get ();
11         Stack lineage = new Stack ();
12         //创建协调者
15         CoordinatorImp cc = createCC ( null, tid, true, false

原文地址:https://www.cnblogs.com/dennyzhangdd/p/10858414.html