spring事务入口及核心类-基于spring5.1.9版本

文章来源临窗旋墨   作者:秋风解语   发布时间:2020-07-03   阅读:3132   标签:源码,事务 分类:spring基础 专题:我读[不懂]源码

[toc]

spring事务入口及核心类-基于spring5.1.9版本

一 前置: 了解spirng及Aspect切面基本常识

因为spring事务实现是通过环绕增强的方式: 在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务,

二 spring对事务的抽象

统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。

2.1 spring事务抽象核心类

PlatformTransactionManager接口 定义事务的操作行为(获取|提交|回滚事务)

依赖TransactionDefinitionTransactionStatus接口

  1. public interface PlatformTransactionManager {
  2. TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  3. throws TransactionException;
  4. void commit(TransactionStatus status) throws TransactionException;
  5. void rollback(TransactionStatus status) throws TransactionException;
  6. }
TransactionDefinition 事务属性定义
  1. public interface TransactionDefinition {
  2. int PROPAGATION_REQUIRED = 0;
  3. int PROPAGATION_SUPPORTS = 1;
  4. int PROPAGATION_MANDATORY = 2;
  5. int PROPAGATION_REQUIRES_NEW = 3;
  6. int PROPAGATION_NOT_SUPPORTED = 4;
  7. int PROPAGATION_NEVER = 5;
  8. int PROPAGATION_NESTED = 6;
  9. int ISOLATION_DEFAULT = -1;
  10. int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
  11. int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
  12. int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
  13. int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;
  14. int TIMEOUT_DEFAULT = -1;
  15. int getPropagationBehavior();
  16. int getIsolationLevel();
  17. int getTimeout();
  18. boolean isReadOnly();
  19. String getName();
  20. }
TransactionStatus:当前事务状态
  1. public interface TransactionStatus extends SavepointManager, Flushable {
  2. boolean isNewTransaction();
  3. boolean hasSavepoint();
  4. void setRollbackOnly();
  5. boolean isRollbackOnly();
  6. void flush();
  7. boolean isCompleted();
  8. }
  • PlatformTransactionManager:事务管理器
  • getTransaction方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物;
  • commit方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作;
  • rollback方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。
  • TransactionDefinition:事务属性定义
  • getPropagationBehavior方法:返回事务的传播属性,默认是PROPAGATION_REQUIRED
  • getIsolationLevel方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性PROPAGATION_REQUIREDPROPAGATION_REQUIRES_NEW
  • getTimeout方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效;
  • isReadOnly方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。
  • getName方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。
  • TransactionStatus:当前事务状态
  • isNewTransaction方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务);
  • hasSavepoint方法:在嵌套事务场景中,判断当前事务是否包含保存点;
  • setRollbackOnlyisRollbackOnly方法:只读属性设置(主要用于标记事务,等待回滚)和查询;
  • flush方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定;
  • isCompleted方法:判断当前事务是否已完成(已提交或者已回滚)。

部分实现关系

giteeImg

AbstractPlatformTransactionManager抽象类实现了Spring事务的标准流程,其子类DataSourceTransactionManager是我们使用较多的JDBC单数据源事务管理器,而JtaTransactionManager是JTA(Java Transaction API)规范的实现类,另外两个则分别是JavaEE容器WebLogicWebSphere的JTA事务管理器的具体实现。

spring事务拦截相关类

TransactionAspectSupport

  1. public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
  2. //用于存储默认事务管理器的键。
  3. private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();
  4. //currentTransactionStatus的持有者
  5. private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
  6. new NamedThreadLocal<>("Current aspect-driven transaction");
  7. //获取当前事务信息
  8. protected static TransactionInfo currentTransactionInfo() throws NoTransactionException {
  9. return transactionInfoHolder.get();
  10. }
  11. // ..............................
  12. }

TransactionInterceptor

  1. public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  2. // ................................... 省略其他代码
  3. public Object invoke(MethodInvocation invocation) throws Throwable {
  4. // 获取目标class
  5. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  6. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  7. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  8. }
  9. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  10. final InvocationCallback invocation) throws Throwable {
  11. // If the transaction attribute is null, the method is non-transactional.
  12. TransactionAttributeSource tas = getTransactionAttributeSource();
  13. //查询目标方法事务属性、确定事务管理器、构造连接点标识(用于确认事务名称)
  14. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  15. final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  16. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  17. if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
  18. // 获取事务
  19. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
  20. Object retVal;
  21. try {
  22. //通过回调执行目标方法
  23. retVal = invocation.proceedWithInvocation();
  24. }
  25. catch (Throwable ex) {
  26. // target invocation exception
  27. completeTransactionAfterThrowing(txInfo, ex);
  28. throw ex;
  29. }
  30. finally {
  31. //清理当前线程事务信息
  32. cleanupTransactionInfo(txInfo);
  33. }
  34. //目标方法执行成功,提交事务
  35. commitTransactionAfterReturning(txInfo);
  36. return retVal;
  37. }
  38. else {
  39. final ThrowableHolder throwableHolder = new ThrowableHolder();
  40. // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. 回调的事务执行处理
  41. try {
  42. Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
  43. TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  44. try {
  45. return invocation.proceedWithInvocation();
  46. }
  47. catch (Throwable ex) {
  48. if (txAttr.rollbackOn(ex)) {
  49. // A RuntimeException: will lead to a rollback.
  50. if (ex instanceof RuntimeException) {
  51. throw (RuntimeException) ex;
  52. }
  53. else {
  54. throw new ThrowableHolderException(ex);
  55. }
  56. }
  57. else {
  58. // A normal return value: will lead to a commit.
  59. throwableHolder.throwable = ex;
  60. return null;
  61. }
  62. }
  63. finally {
  64. cleanupTransactionInfo(txInfo);
  65. }
  66. });
  67. // Check result state: It might indicate a Throwable to rethrow.
  68. if (throwableHolder.throwable != null) {
  69. throw throwableHolder.throwable;
  70. }
  71. return result;
  72. }
  73. catch (ThrowableHolderException ex) {
  74. throw ex.getCause();
  75. }
  76. catch (TransactionSystemException ex2) {
  77. if (throwableHolder.throwable != null) {
  78. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  79. ex2.initApplicationException(throwableHolder.throwable);
  80. }
  81. throw ex2;
  82. }
  83. catch (Throwable ex2) {
  84. if (throwableHolder.throwable != null) {
  85. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  86. }
  87. throw ex2;
  88. }
  89. }
  90. }
  91. // ...................................
  92. }

事务拦截器TransactionInterceptorinvoke方法中,通过调用父类TransactionAspectSupportinvokeWithinTransaction方法进行事务处理,该方法支持声明式事务和编程式事务。

事务抽象的核心接口为PlatformTransactionManager,它负责管理事务行为,包括事务的获取、提交和回滚。在invokeWithinTransaction方法中,我们可以看到createTransactionIfNecessarycommitTransactionAfterReturningcompleteTransactionAfterThrowing都是针对该接口编程,并不依赖于特定事务管理器,这里是对Spring事务抽象的实现。

spring事务同步

spring如何判断当前方法已经开启了事务?

AbstractPlatformTransactionManager

  1. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
  2. Object transaction = doGetTransaction();
  3. // 参数为null时构造默认值
  4. ...
  5. if (isExistingTransaction(transaction)) {
  6. // Existing transaction found -> check propagation behavior to find out how to behave.
  7. return handleExistingTransaction(definition, transaction, debugEnabled);
  8. }
  9. ...
  10. // 获取当前事务对象
  11. protected abstract Object doGetTransaction() throws TransactionException;
  12. // 判断当前事务对象是否包含活跃事务
  13. protected boolean isExistingTransaction(Object transaction) throws TransactionException {
  14. return false;
  15. }

注意getTransaction方法是final的,无法被子类覆盖,保证了获取事务流程的一致和稳定。抽象方法doGetTransaction获取当前事务对象,方法isExistingTransaction判断当前事务对象是否存在活跃事务,具体逻辑由特定事务管理器实现,看下我们使用最多的DataSourceTransactionManager对应的实现:

  1. // DataSourceTransactionManager.class
  2. @Override
  3. protected Object doGetTransaction() {
  4. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  5. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  6. ConnectionHolder conHolder =
  7. (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
  8. txObject.setConnectionHolder(conHolder, false);
  9. return txObject;
  10. }
  11. @Override
  12. protected boolean isExistingTransaction(Object transaction) {
  13. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  14. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  15. }

可以看到,获取当前事务对象时,使用了TransactionSynchronizationManager#getResource方法,类图如下:

![(https://pic2.zhimg.com/80/v2-102f07dd273c2a9e093ea4dec88c18c5_720w.png)

giteeImg

TransactionSynchronizationManager通过ThreadLocal对象在当前线程记录了resourcessynchronizations属性。resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在DataSourceTransactionManager的例子中就是以dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。synchronizations属性是一个TransactionSynchronization对象的集合,AbstractPlatformTransactionManager类中定义了事务操作各个阶段的调用流程,以事务提交为例:

  1. // AbstractPlatformTransactionManager.class
  2. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  3. try {
  4. boolean beforeCompletionInvoked = false;
  5. try {
  6. prepareForCommit(status);
  7. triggerBeforeCommit(status);
  8. triggerBeforeCompletion(status);
  9. ....
  10. else if (status.isNewTransaction()) {
  11. // 记录日志
  12. ...
  13. doCommit(status);
  14. }
  15. ...
  16. // 事务调用异常处理
  17. ...
  18. try {
  19. triggerAfterCommit(status);
  20. }
  21. finally {
  22. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  23. }
  24. }
  25. }

很多trigger前缀的方法,这些方法用于在事务操作的各个阶段触发回调,从而可以精确控制在事务执行的不同阶段所要执行的操作,这些回调实际上都通过TransactionSynchronizationUtils来实现,它会遍历TransactionSynchronizationManager#synchronizations集合中的TransactionSynchronization对象,然后分别触发集合中各个元素对应方法的调用。例如:

  1. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
  2. @Override
  3. public void afterCommit() {
  4. // do something after commit
  5. }
  6. });

这段代码就在当前线程的事务synchronizations属性中,添加了一个自定义同步类,如果当前存在事务,那么在事务管理器执行事务提交之后,就会触发afterCommit方法,可以通过这种方式在事务执行的不同阶段自定义一些操作。

最后总结

个人觉得最中要的几个类

  • TransactionInterceptor: 事务的入口类
  • PlatformTransactionManager:事务管理器,定义事务的操作行为接口
  • 两个类的连接点在方法:createTransactionIfNecessary 中的tm.getTransaction
事务的大体入口调用链:
  • TransactionInterceptor#invoke
    • invokeWithinTransaction 执行原来的方法,并织入事务

      • determineTransactionManager 获取指定的事务管理器

      • createTransactionIfNecessary : 创建事务

        • TransactionStatus status = tm.getTransaction(txAttr); 调用指定事务管理器生成事务状态相关信息,; 由接口PlatformTransactionManager的实现类具体获取

发表评论

目录