切换导航
临窗旋墨
首页
分类
归档
诗稿
留言板
关于
登录
gitee
github
QQ
spring事务入口及核心类-基于spring5.1.9版本
文章来源
:
作者:
秋风解语
发布时间:
2020-07-03
阅读:
1248
标签:
源码,事务
[toc] ## spring事务入口及核心类-基于spring5.1.9版本 ### 一 前置: 了解spirng及Aspect切面基本常识 因为spring事务实现是通过**环绕增强**的方式: 在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务, ### 二 spring对事务的抽象 统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。 #### 2.1 spring事务抽象核心类 ##### `PlatformTransactionManager`接口 定义事务的操作行为(获取|提交|回滚事务) > 依赖`TransactionDefinition`和`TransactionStatus`接口 ```java public interface PlatformTransactionManager { TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; void commit(TransactionStatus status) throws TransactionException; void rollback(TransactionStatus status) throws TransactionException; } ``` ##### `TransactionDefinition` 事务属性定义 ```java public interface TransactionDefinition { int PROPAGATION_REQUIRED = 0; int PROPAGATION_SUPPORTS = 1; int PROPAGATION_MANDATORY = 2; int PROPAGATION_REQUIRES_NEW = 3; int PROPAGATION_NOT_SUPPORTED = 4; int PROPAGATION_NEVER = 5; int PROPAGATION_NESTED = 6; int ISOLATION_DEFAULT = -1; int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED; int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED; int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ; int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE; int TIMEOUT_DEFAULT = -1; int getPropagationBehavior(); int getIsolationLevel(); int getTimeout(); boolean isReadOnly(); String getName(); } ``` ##### `TransactionStatus`:当前事务状态 ```java public interface TransactionStatus extends SavepointManager, Flushable { boolean isNewTransaction(); boolean hasSavepoint(); void setRollbackOnly(); boolean isRollbackOnly(); void flush(); boolean isCompleted(); } ``` - **`PlatformTransactionManager`:事务管理器** - `getTransaction`方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物; - `commit`方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作; - `rollback`方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。 - **`TransactionDefinition`:事务属性定义** - `getPropagationBehavior`方法:返回事务的传播属性,默认是`PROPAGATION_REQUIRED`; - `getIsolationLevel`方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性`PROPAGATION_REQUIRED`和`PROPAGATION_REQUIRES_NEW`; - `getTimeout`方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效; - `isReadOnly`方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。 - `getName`方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。 - **`TransactionStatus`:当前事务状态** - `isNewTransaction`方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务); - `hasSavepoint`方法:在嵌套事务场景中,判断当前事务是否包含保存点; - `setRollbackOnly`和`isRollbackOnly`方法:只读属性设置(主要用于标记事务,等待回滚)和查询; - `flush`方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定; - `isCompleted`方法:判断当前事务是否已完成(已提交或者已回滚)。 #### 部分实现关系  `AbstractPlatformTransactionManager`抽象类实现了Spring事务的标准流程,其子类`DataSourceTransactionManager`是我们使用较多的JDBC单数据源事务管理器,而`JtaTransactionManager`是JTA(Java Transaction API)规范的实现类,另外两个则分别是JavaEE容器*WebLogic*和*WebSphere*的JTA事务管理器的具体实现。 #### spring事务拦截相关类 `TransactionAspectSupport` ```java public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { //用于存储默认事务管理器的键。 private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object(); //currentTransactionStatus的持有者 private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<>("Current aspect-driven transaction"); //获取当前事务信息 protected static TransactionInfo currentTransactionInfo() throws NoTransactionException { return transactionInfoHolder.get(); } // .............................. } ``` `TransactionInterceptor` ```java public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { // ................................... 省略其他代码 public Object invoke(MethodInvocation invocation) throws Throwable { // 获取目标class Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); } protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); //查询目标方法事务属性、确定事务管理器、构造连接点标识(用于确认事务名称) final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 获取事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal; try { //通过回调执行目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清理当前线程事务信息 cleanupTransactionInfo(txInfo); } //目标方法执行成功,提交事务 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. 回调的事务执行处理 try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } } // ................................... } ``` 事务拦截器`TransactionInterceptor`在`invoke`方法中,通过调用父类`TransactionAspectSupport`的`invokeWithinTransaction`方法进行事务处理,该方法支持声明式事务和编程式事务。 事务抽象的核心接口为`PlatformTransactionManager`,它负责管理事务行为,包括事务的获取、提交和回滚。在`invokeWithinTransaction`方法中,我们可以看到`createTransactionIfNecessary`、`commitTransactionAfterReturning`和`completeTransactionAfterThrowing`都是针对该接口编程,并不依赖于特定事务管理器,这里是对Spring事务抽象的实现。 #### spring事务同步 spring如何判断当前方法已经开启了事务? `AbstractPlatformTransactionManager`中 ```java public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); // 参数为null时构造默认值 ... if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } ... // 获取当前事务对象 protected abstract Object doGetTransaction() throws TransactionException; // 判断当前事务对象是否包含活跃事务 protected boolean isExistingTransaction(Object transaction) throws TransactionException { return false; } ``` 注意`getTransaction`方法是`final`的,无法被子类覆盖,保证了获取事务流程的一致和稳定。抽象方法`doGetTransaction`获取当前事务对象,方法`isExistingTransaction`判断当前事务对象是否存在活跃事务,具体逻辑由特定事务管理器实现,看下我们使用最多的`DataSourceTransactionManager`对应的实现: ```java // DataSourceTransactionManager.class @Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); return txObject; } @Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); } ``` 可以看到,获取当前事务对象时,使用了`TransactionSynchronizationManager#getResource`方法,类图如下:  `TransactionSynchronizationManager`通过`ThreadLocal`对象在当前线程记录了`resources`和`synchronizations`属性。`resources`是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在`DataSourceTransactionManager`的例子中就是以`dataSource`作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过`dataSource`获取相同的数据库连接,从而保证所有操作在一个事务中进行。`synchronizations`属性是一个`TransactionSynchronization`对象的集合,`AbstractPlatformTransactionManager`类中定义了事务操作各个阶段的调用流程,以事务提交为例: ```java // AbstractPlatformTransactionManager.class private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); .... else if (status.isNewTransaction()) { // 记录日志 ... doCommit(status); } ... // 事务调用异常处理 ... try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } } ``` 很多*trigger*前缀的方法,这些方法用于在事务操作的各个阶段触发回调,从而可以精确控制在事务执行的不同阶段所要执行的操作,这些回调实际上都通过`TransactionSynchronizationUtils`来实现,它会遍历`TransactionSynchronizationManager#synchronizations`集合中的`TransactionSynchronization`对象,然后分别触发集合中各个元素对应方法的调用。例如: ```java TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { // do something after commit } }); ``` 这段代码就在当前线程的事务`synchronizations`属性中,添加了一个自定义同步类,如果当前存在事务,那么在事务管理器执行事务提交之后,就会触发`afterCommit`方法,可以通过这种方式在事务执行的不同阶段自定义一些操作。 #### 最后总结 个人觉得最中要的几个类 - **TransactionInterceptor**: 事务的入口类 - `PlatformTransactionManager`:事务管理器,定义事务的操作行为接口 - **两个类的连接点在方法:createTransactionIfNecessary 中的tm.getTransaction** ##### **事务的大体入口调用链:** - **TransactionInterceptor**#invoke - #invokeWithinTransaction 执行原来的方法,并织入事务 - #determineTransactionManager 获取指定的事务管理器 - #createTransactionIfNecessary : 创建事务 - TransactionStatus status = tm.getTransaction(txAttr); 调用指定事务管理器生成事务状态相关信息,; 由接口`PlatformTransactionManager`的实现类具体获取
发表评论
Copyright © 2020 许秋冬
皖ICP备20011253号