什么是事务

事务:是数据库操作的最小单元,是作为单个逻辑工作单元执行的一系列操作;这些操作作为一个整体一起向系统提交,要么都执行,要么都不执行;事务是一组不可再分割的操作集合(工作逻辑单元)。

事务是四大特性

  1. 原子性(Atomicity):事务中所有操作是不可再分割的原子单元。事务中所有操作要么全部执行成功,要么全部执行失败。
  2. 一致性(Consistency):事务执行后,数据库状态与其他业务规则保持一致。如转账业务,无论事务执行成功与否,参与转账的连个账号余额之和应该是不变的。
  3. 隔离性(lsolation):隔离性是指在并发操作中,不同事务之间应该隔离开来,使每个并发中的事务不会相互干扰。
  4. 持久性(Durability):一旦事务提交成功,事务中所有的数据操作都必须被持久化到数据库中,即使提交事务后,数据库马上崩溃,在数据库重启时,也必须能保证通过某种机制恢复数据。

事务的隔离级别

数据库事务的隔离级别有4种,由低到高分别为Read uncommittedRead committedRepeatable readSerializable。而且,在事务的并发操作中可能会出现脏读,不可重复读,幻读、事务丢失。

并发操作中出现的问题

  • 脏读:(读取了未提交的新事物,然后被回滚了)
    事务A读取了事务B中尚未提交的数据。如果事务B回滚,则A读取使用了错误的数据。

  • 不可重复读:(读取了提交的新事务,指更新操作)
    不可重复读是指在对于数据库中的某个数据,一个事务范围内多次查询却返回了不同的数据值,这是由于在查询间隔,被另一个事务修改并提交了。

  • 幻读:(也是读取了提交的新事务,指增删操作)
    在事务A多次读取构成中,事务B对数据进行了新增操作,导致事务A多次读取的数据不一致。

  • 第一类事物丢失:(称为回滚丢失)

    对于第一类事物丢失,就是比如A和B同时在执行一个数据,然后B事物已经提交了,然后A事物回滚了,这样B事物的操作就因A事物回滚而丢失了。

  • 第二类事物丢失:(提交覆盖丢失)
    对于第二类事物丢失,也称为覆盖丢失,就是A和B一起执行一个数据,两个同时取到一个数据,然后B事物首先提交,但是A事物加下来又提交,这样就覆盖了B事物.

隔离级别

  • Read uncommitted
    读未提交,顾名思义,就是一个事务可以读取另一个未提交事务的数据。会产生脏读。
  • Read committed
    读提交,顾名思义,就是一个事务要等另一个事务提交后才能读取数据。会产生不可重复读。
  • Repeatable read
    重复读,就是在开始读取数据(事务开启)时,不再允许修改操作。可能会产生幻读。
  • Serializable
    Serializable是最高的事务隔离级别,在该级别下,事务串行化顺序执行,可以避免脏读、不可重复读与幻读。但是这种事务隔离级别效率低下,比较耗数据库性能,一般不使用。

Spring事务使用

基础配置,模拟一个转账业务

  1. 定义数据源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Bean
    public DataSource dataSource(){
    BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/ssm?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false");
    dataSource.setUsername("root");
    dataSource.setPassword("123456");
    return dataSource;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
    return new JdbcTemplate(dataSource);
    }
  2. 定义数据库操作dao

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Repository
    public class AccountDao {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void out(String fromName, Integer money){
    String sql = "update account set money = money-? where name=?";
    jdbcTemplate.update(sql, money, fromName);
    }

    public void in(String toName, Integer money){
    String sql = "update account set money = money+? where name=?";
    jdbcTemplate.update(sql, money, toName);
    }
    }
  3. 定义service

    1
    2
    3
    4
    5
    6
    accountDao.out(fromName, money);
    int x = 10;
    if (x == 10){
    throw new RuntimeException("Sql Error");
    }
    accountDao.in(toName, money);

编程式事务

添加事务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
//事务管理器
@Bean
public TransactionManager transactionManager(DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}

// 事务管理器模板
@Bean
public TransactionTemplate transactionTemplate(TransactionManager transactionManager){
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager((DataSourceTransactionManager) transactionManager);
return transactionTemplate;
}

使用编程式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Autowired
private TransactionTemplate transactionTemplate;

@Override
public void transfer(String fromName, String toName, Integer money) {
transactionTemplate.execute(status -> {
accountDao.out(fromName, money);
int x = 10;
if (x == 10){
throw new RuntimeException("Sql Error");
}
accountDao.in(toName, money);
return null;
});
}

代码块中的内容有一处失败,整体失败。

声明式事务

声明式事务不需要事务模板

配置类开启注解事务

1
2
3
4
5
6
@ComponentScan("com.lq.spring")
@EnableAspectJAutoProxy
@EnableTransactionManagement
public class Context {

}

添加事务管理器

1
2
3
4
5
//事务管理器
@Bean
public TransactionManager transactionManager(DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}

添加事务

1
2
3
4
5
6
7
8
9
@Transactional
public void transfer(String fromName, String toName, Integer money) {
accountDao.out(fromName, money);
int x = 10;
if (x == 10){
throw new RuntimeException("Sql Error");
}
accountDao.in(toName, money);
}

传播特性

物理事务和逻辑事务

物理事务:所谓物理事务指的就是Connection开启的事务

逻辑事务:在一个复杂的业务系统中,可能会调用多个service,每个service都有自己的事务(标注了@Transactional),此时我们需要根据事务传播方式(Propagation)来决定当前事务的行为(比如要挂起创建新事物,还是加入当前事务)。我们可以认为每个注解了@Transactional的方法都是一个逻辑事务,这些逻辑事务被Spring事务管理,Spring会根据事务传播方式来决定是否开启新事务

7个传播特性

当一个事务方法被另外一个事务方法调用时,这个事务方法应该如何执行。

Spring的7种事务隔离级别

事务 描述
PROPAGATION_REQUIRED 默认事务类型,如果没有,就新建一个事务;如果有,就加入当前事务。适合绝大多数情况。
PROPAGATION_REQUIRES_NEW 如果没有,就新建一个事务;如果有,就将当前事务挂起。
PROPAGATION_NESTED 如果没有,就新建一个事务;如果有,就在当前事务中嵌套其他事务(子事务回滚不影响父事务,但父事务回滚子事务需要回滚)。
PROPAGATION_SUPPORTS 如果没有,就以非事务方式执行;如果有,就使用当前事务。
PROPAGATION_NOT_SUPPORTED 如果没有,就以非事务方式执行;如果有,就将当前事务挂起即无论如何不支持事务。
PROPAGATION_NEVER 如果没有,就以非事务方式执行;如果有,就抛出异常。
PROPAGATION_MANDATORY 如果没有,就抛出异常;如果有,就使用当前事务。
  • 不需要事务:PROPAGATION_NEVERPROPAGATION_NOT_SUPPORTED
  • 可有可无的:PROPAGATION_SUPPORTS
  • 必须要要有事务的:PROPAGATION_REQUIRES_NEW(总是新建),PROPAGATION_REQUIRED(可新建,可加入),PROPAGATION_NESTED(嵌套事务),PROPAGATION_MANDATORY(抛出异常)

@EnableTransactionManagement注解

该注解导入了类TransactionManagementConfigurationSelector,该类实现了ImportSelector,.最终会执行selectImports为容器注入bean。

1
2
3
4
5
6
7
8
9
10
protected String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[]{this.determineTransactionAspectClass()};
default:
return null;
}
}

这里会注入AutoProxyRegistrarProxyTransactionManagementConfiguration

AutoProxyRegistrar

AutoProxyRegistrar实现了ImportBeanDefinitionRegistrar,回向容器注入需要的BeanDefiniton,最终调用了AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry),如果容器中没有名称为org.springframework.aop.config.internalAutoProxyCreatorBeanDefinition那么会注入InfrastructureAdvisorAutoProxyCreator

这里的InfrastructureAdvisorAutoProxyCreator是是AbstractAutoProxyCreator的子类,实现了SmartInstantiationAwareBeanPostProcessor会在getEarlyBeanReference提前暴露代理对象或者在bean完成实例化初始化后调用postProcessAfterInitialization对原始的bean进行aop增强。也就是说这里会注入一个基于动态代理实现Aop增强的一个bean后置处理器

ProxyTransactionManagementConfiguration

类图:

image-20221218194357113
  1. 注入TransactionalEventListenerFactory

    1
    2
    3
    4
    5
    @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
    return new TransactionalEventListenerFactory();
    }

    这一步是在其父类AbstractTransactionManagementConfiguration中完成的

    TransactionalEventListenerFactory在事务的多播器中会把标注TransactionalEventListener的方法包装成一个ApplicationListener来响应对应的事件。

  2. 注入TransactionInterceptor

    TransactionInterceptor实现了MethodInterceptor,是一个Advise一个基于方法拦截器的通知,在其中实现了事务的增强逻辑。在Spring AOP中我们分析过最后的aop代理对象执行方法的时候会调用到方法拦截器的invoke方法,TransactionInterceptor便是在其中实现了事务的增强。

  3. 注入BeanFactoryTransactionAttributeSourceAdvisor

    BeanFactoryTransactionAttributeSourceAdvisor是一个PointcutAdvisor,是一个Advisor,它使用Pointcut来判断当前对象的方法是否需要增强,使用Advice对方法进行增强。

  4. 注入TransactionAttributeSource

    这里注入的是AnnotationTransactionAttributeSource,主要负责从类上,方法上,获取@Transactional注解信息,比如抛出什么异常回滚,事务超时时间等。

我们基本上清楚了Spring 事务是如何实现的了,首先是通过BeanPostProcessorSpring IOC容器结合在一起,在bean实例化初始化后调用后置处理器(如果出现循环依赖那么调用的是提前暴露对象的getEarlyBeanReference)对bean进行Aop增强,在AbstractAutoProxyCreatorwrapIfNecessary方法中,会获取全部的Advisor,便会拿到注入的BeanFactoryTransactionAttributeSourceAdvisor,然后使用Pointcut进行过滤,然后通过ProxyFactory选择使用JDK动态代理,还是cglib动态代理。后续调用代理对象的方法会调用到,TransactionInterceptor中的invoke实现了事务增强的逻辑。接下来我们详细看看细节部分

Spring事务源码分析

BeanFactoryTransactionAttributeSourceAdvisor

实现了PointcutAdvisor接口,我们看下它的Pointcut到底是什么,它的Advice又是什么。

Pointcut

1
2
3
4
5
6
7
8
9
10
@Nullable
private TransactionAttributeSource transactionAttributeSource;

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};

这个类维护了一个TransactionAttributeSourcePointcut类型的pointcut

image-20221218201157611

它实现了StaticMethodMatcherPointcut,它是一个静态方法匹配的Pointcut,这里的静态意思是不会因为入参的参数不同而改变过滤结果。我们看下具体的实现逻辑。

1
2
3
4
5
@Override
public boolean matches(Method method, Class<?> targetClass) {
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

这里的getTransactionAttributeSource返回的是AnnotationTransactionAttributeSource实例,这里的逻辑是只要有TransactionAttributeSource并且可以拿到事务定义信息,那么就视为匹配,后面就会对方法进行增强。

AnnotationTransactionAttributeSource是如何获取事务定义信息的

getTransactionAttribute由其父类AbstractFallbackTransactionAttributeSource实现,其使用一个ConcurrentHashMap缓存方法和其对应的事务信息,如果缓存中没有那么会调用computeTransactionAttribute方法进行获取。computeTransactionAttribute方法会优先获取方法上面的事务注解信息,然后获取类类上面的注解信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}

// First, see if we have a cached value.
Object cacheKey = getCacheKey(method, targetClass);
TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// Value will either be canonical value indicating there is no transaction attribute,
// or an actual transaction attribute.
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
}
else {
return cached;
}
}
else {
// We need to work it out.
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// Put it in the cache.
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
...
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}

computeTransactionAttribute的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
// 是否只允许public方法,一般为true, 如果只允许public方法且当前方法不是public
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}

// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
// 如果是CGLIB代理后的类,会一直找父类知道找到用户定义的类中的方法,并且是原始方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

// First try is the method in the target class.
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}

// Second try is the transaction attribute on the target class.
// 先从方法上面获取注解,其次拿类上面的
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}

if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}

return null;
}

可以看到事务注解只能标注在public方法上面,如果是像mybatis这种生成接口动态代理类那么会拿到接口上面的注解信息。

获取注解信息findTransactionAttribute调用了子类的AnnotationTransactionAttributeSource#determineTransactionAttribute方法

1
2
3
4
5
6
7
8
9
10
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
TransactionAttribute attr = parser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}

这里涉及到一个新的类TransactionAnnotationParser,这是Spring根据AnnotatedElement获取注解的接口,可以扩展此接口实现自己的事务注解,并定制事务定义信息。这些事务注解解析器在构造方法中进行了定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
this.publicMethodsOnly = publicMethodsOnly;
if (jta12Present || ejb3Present) {
this.annotationParsers = new LinkedHashSet<>(4);
// spring的事务注解解析
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}
else {
this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
}
}

可以看到Spring内置了JTA,ejb的事务注解处理器,但是使用的是LinkedHashSetSpringTransactionAnnotationParser会放在最前面,解析Spring的@Transactional注解,这也是我们最常用的事务注解,SpringTransactionAnnotationParser会拿到事务注解信息,然后把注解的属性内容包装成 RuleBasedTransactionAttribute

image-20221218204708731

RuleBasedTransactionAttributerollbackOn进行了扩展,配合SpringTransactionAnnotationParser会解析事务注解中的rollbackFor,rollbackForClassName,noRollbackFor,noRollbackForClassName属性来判断异常抛出时是否需要回滚事务。如果这个属性没有值的话,会调用父类的rollbackOn,最终只会在RuntimeExceptionError上面回滚。

BeanFactoryTransactionAttributeSourceAdvisor是如何获取到Advice

它实现了BeanFactoryAware,使用advice持有当前通知的引用,如果没有那么从容器中根据名称拿。在ProxyTransactionManagementConfiguration中直接设置advice为容器中的TransactionInterceptor,也就是说事务的增强逻辑定义在TransactionInterceptor中。

1
2
3
4
5
6
//AbstractBeanFactoryPointcutAdvisor
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
resetAdviceMonitor();
}

TransactionInterceptor

TransactionInterceptor是事务拦截器,其invoke方法会在代理对象被代理方法执行的时候被回调到

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
// 获取class 如果被CGLIB代理过会获取父类
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 方法引用,业务逻辑代码
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

其中invokeWithinTransaction是spring事务原理的关键,此方法在其父类TransactionAspectSupport实现

利用TransactionAttributeSource获取方法或者类上的事务注解信息

1
2
//invokeWithinTransaction
TransactionAttributeSource tas = getTransactionAttributeSource();

事务注解决定了事务增强的代码执行逻辑,这里使用的TransactionAttributeSource通常是是上文中我们提到的AnnotationTransactionAttributeSource,上文中我们直到,其内部存在map缓存,如果缓存没有那么调用TransactionAnnotationParser链式的进行解析,自然就调用到了SpringTransactionAnnotationParser,SpringTransactionAnnotationParser反射获取注解信息包装成TransactionAttribute对象

获取事务管理器PlatformTransactionManager

1
2
//invokeWithinTransaction
final TransactionManager tm = determineTransactionManager(txAttr);

PlatformTransactionManager是spring抽象出的事务管理器接口,主要包含下面三个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface PlatformTransactionManager extends TransactionManager {

// 根据指定的传播行为返回当前活动的事务或创建新事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;

// 根据事务状态提交事务
void commit(TransactionStatus status) throws TransactionException;

// 根据事务状态回滚事务
void rollback(TransactionStatus status) throws TransactionException;

}

determineTransactionManager方法决定使用什么事务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// invokeWithinTransaction
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
// 没有事务定义信息,或没有beanFactory返回TransactionManager属性的值
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}

// 这一步获取事务注解上value的值,如果存在内容根据名称从beanFactory中获取合适的事务管理
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
// 一般不存在 走这里
// 获取默认的事务管理器
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
//从beanFactory中获取合适的事务管理
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}

这里可以看到Spring是支持@Transactional注解value指定特定的事务管理器,但是我们实际使用中通常是没有指定的,这里就会去bean工程中获取PlatformTransactionManager类型的bean,存在多个就会抛出异常了。这里拿到的一般是DataSourceTransactionManager

根据事务传播级别来决定是否由必要创建事务createTransactionIfNecessary

1
2
// invokeWithinTransaction
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

首先调用getTransaction方法获取一个事务,然后调用prepareTransactionInfo封装事务的处理配置信息并绑定到当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// ...
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
// log
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

其中getTransactionAbstractPlatformTransactionManager中是一个final的模板方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// getTransaction
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.

// 找到现有事务->检查传播行为以了解如何行为。
return handleExistingTransaction(def, transaction, debugEnabled);
}

// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
// log
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
// log
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
不存在事务
  1. 首先要求超时时长不能小于-1.-1表示的使用底层事务系统的默认超时,如果不支持超时,则使用无

    1
    2
    3
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
  2. 然后如果的传播级别是MANDATORY支持当前事务,如果不存在则抛出异常,也就是说MANDATORY要求外层调用方法是在一个具备事务的情况下进行的调用.

    1
    2
    3
    4
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
    "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
  3. 如果隔离级别是Required(支持当前事务,如果当前不存在事务那么创建一个新事务),RequireNew(创建一个新事务,如果存在事务则暂停当前事务),Nested(如果当前事务存在,则在嵌套事务中执行)那么会执行下面的逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    // 挂起当前事务,让事务和线程解绑
    SuspendedResourcesHolder suspendedResources = suspend(null);
    // log
    try {
    // 开启一个事务
    return startTransaction(def, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error ex) {
    // 出现异常恢复被挂起的事务
    resume(null, suspendedResources);
    throw ex;
    }
    }
    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
    boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 开启具体事务
    doBegin(transaction, definition);
    // 设置当前事务的一些属性记录在ThreadLocal中
    prepareSynchronization(status, definition);
    return status;
    }

    首先是挂起当前事务,由于当前不存在事务,其实是把当前存在TransactionSynchronization事务同步回调的接口信息保存在SuspendedResourcesHolder中。在DataSourceTransactionManagerdoBegin方法会获取对应的Connection,然后根据事务定义对Connection进行设置,比如如果是只读事务那么会执行SET TRANSACTION READ ONLY设置事务只读,设置超时时长,关闭自动提交等,并且把创建的事务信息绑定到resources ThreadLocal中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
    TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
    TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
    definition.getIsolationLevel() : null);
    TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
    TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
    TransactionSynchronizationManager.initSynchronization();
    }
    }
    public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
    throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
    }

    prepareSynchronization负责把事务相关信息设置到ThreadLocal中,并且初始化TransactionSynchronizationLinkedHashSet,这样我们我们通过TransactionSynchronizationManager加入一些回调方法的时候不会抛NPE,之所以使用LinkedHashSetTransactionSynchronizationManager支持Ordered接口,@Order注解进行排序

  4. 如果是SUPPORTS(支持当前事务,如果不存在,则以非事务方式执行),NOT_SUPPORTED(不支持当前事务;始终以非事务方式执行),NEVER(不支持当前事务,如果当前事务存在,则抛出异常)那么不会开启事务,但是支持事务同步

    1
    2
    3
    4
    5
    6
    7
    8
    else {
    // Create "empty" transaction: no actual transaction, but potentially synchronization.
    if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    // log
    }
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
存在事务的情况
  1. 是如何判断是否存在事务的首先doGetTransaction方法会获取resources ThreadLocal中的事务信息,这个事务信息是在doBegin方法中绑定

    1
    2
    3
    4
    5
    6
    7
    8
    protected Object doGetTransaction() {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
    txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
    // 从threadLocal中获取事务资源
    ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
    }

    然后调用isExistingTransaction来判断是否存在事务

    1
    2
    3
    4
    protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
    return txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive();
    }

    同样doBegin方法会设置transactionActive为true。
    最后如果存在事务那么会执行下面的handleExistingTransaction来根据事务传播级别来创建事务

  2. 首先如果传播级别是NEVER(不支持当前事务,如果当前事务存在,则抛出异常)当前存在事务,那么抛出异常

    1
    2
    3
    4
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException(
    "Existing transaction found for transaction marked with propagation 'never'");
    }
  3. 如果传播级别是NOT_SUPPORTED(不支持当前事务;始终以非事务方式执行)那么不会开启事务,但是支持事务同步,并且会挂起当前事务,其中prepareTransactionStatus会初始化事务同步set,并且把当前事务的信息包装到DefaultTransactionStatus并返回

    1
    2
    3
    4
    5
    6
    7
    8
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    // log
    Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    // 会初始化事务同步的set集合
    return prepareTransactionStatus(
    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
  4. 如果传播级别是RequireNew(创建一个新事务,如果存在事务则暂停当前事务)会挂起当前事务,从ThreadLocal中解绑,然后开启新事务,并初始化TransactionSynchronization的set

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    // log
    // 挂起事务
    SuspendedResourcesHolder suspendedResources = suspend(transaction);
    try {
    // 开启事务
    return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error beginEx) {
    // 将之前的事务恢复到ThreadLocal
    resumeAfterBeginException(transaction, suspendedResources, beginEx);
    throw beginEx;
    }
    }
  5. 如果传播级别是Nested(如果当前事务存在,则在嵌套事务中执行),首先会判断是否允许嵌套事务,如果不允许那么抛出异常,通常DataSourceTransactionManager使用savepoint来实现嵌套事务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    // 判断是否允许嵌套事务
    if (!isNestedTransactionAllowed()) {
    // 抛出异常
    }
    // ..log
    // 使用savepoint来执行嵌套事务
    if (useSavepointForNestedTransaction()) {
    // Create savepoint within existing Spring-managed transaction,
    // through the SavepointManager API implemented by TransactionStatus.
    // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    DefaultTransactionStatus status =
    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    // 创建保存点
    status.createAndHoldSavepoint();
    return status;
    }
    else {
    // 这里是JTA使用XA实现分布式事务的逻辑
    // Nested transaction through nested begin and commit/rollback calls.
    // Usually only for JTA: Spring synchronization might get activated here
    // in case of a pre-existing JTA transaction.
    return startTransaction(definition, transaction, debugEnabled, null);
    }
    }

    调用Connection#setSavepoint方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // JdbcTransactionObjectSupport
    public Object createSavepoint() throws TransactionException {
    ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();

    try {
    if (!conHolder.supportsSavepoints()) {
    throw new NestedTransactionNotSupportedException("Cannot create a nested transaction because savepoints are not supported by your JDBC driver");
    } else if (conHolder.isRollbackOnly()) {
    throw new CannotCreateTransactionException("Cannot create savepoint for transaction which is already marked as rollback-only");
    } else {
    return conHolder.createSavepoint();
    }
    } catch (SQLException var3) {
    throw new CannotCreateTransactionException("Could not create JDBC savepoint", var3);
    }
    }

    // ConnectionHolder
    public Savepoint createSavepoint() throws SQLException {
    ++this.savepointCounter;
    return this.getConnection().setSavepoint("SAVEPOINT_" + this.savepointCounter);
    }
  6. 最后如果是Required(支持当前事务,如果当前不存在事务那么创建一个新事务) SUPPORTS(支持当前事务;如果不存在,则以非事务方式执行) MANDATORY(支持当前事务;如果当前不存在事务,则抛出异常)

    1
    2
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);

    都不会产生新的事务。

prepareTransactionInfo包装事务信息并绑定到ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// createTransactionIfNecessary
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {

// 包装成TransactionInfo
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
// log
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
// log
}

// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 绑定当前事务信息到ThreadLocal, 并记录之前的事务信息
txInfo.bindToThread();
return txInfo;
}

private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}

这里会使用oldTransactionInfo记录之前的事务信息,并且绑定当前事务信息到ThreadLocal上,这样A事务方法调用B事务方法的时,能像栈一样先进后出。在invokeWithinTransaction调用完业务方法后,会调用cleanupTransactionInfooldTransactionInfo重写设置到ThreadLocal中,这意味着B方法执行结束,回到了A方法的调用栈中。

回调业务逻辑InvocationCallback#proceedWithInvocation

1
2
// invokeWithinTransaction
retVal = invocation.proceedWithInvocation();

其实调用的是MethodInvocation#proceed,如果当前对象存在多层代理,比如先事务代理,再基于@AspectJ注解的方法调用时长统计,那么后面代理增强也会执行,具体逻辑在ReflectiveMethodInvocation#proceed方法中,如果还存在其他的拦截器链那么会继续执行拦截器中的逻辑,否则直接执行我们自己的业务逻辑代码。

completeTransactionAfterThrowing 业务逻辑出现异常时的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// invokeWithinTransaction
completeTransactionAfterThrowing(txInfo, ex);

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// log
// 如果当前抛出的异常需要回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 调用事务管理器的回滚方法
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
// 向外抛出异常
catch (TransactionSystemException ex2) {
// log
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
// log
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
// 当前异常不需要回滚
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}

这里rollbackOn取决于事务注解上面标注的在什么异常上回滚,在什么异常上不回滚,默认是在RuntimeExceptionError上面才会回滚。

事务是如何回滚的

1
2
3
4
5
6
7
8
9
10
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false); // 执行回滚策略
}

回滚策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
// 回调事务同步的beforeCompletion方法
triggerBeforeCompletion(status);

// 如果有保存点,一般是嵌套事务,回滚到保存点
if (status.hasSavepoint()) {
// log
status.rollbackToHeldSavepoint();
}
// 如果是一个独立的事务like回滚
else if (status.isNewTransaction()) {
// log
doRollback(status);
}
else {
// Participating in larger transaction
// 不是独立事务
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
// log
doSetRollbackOnly(status);
}
else {
// log
}
}
else {
// log
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}

执行回滚DataSourceTransactionManager是调用的Connection#rollback方法,这里首先会获取ThreadLocal中的TransactionSynchronization并且按照@OrderOrdered排序然后依次调用beforeCompletion,如果具备回滚点,那么直接回滚到保存点,如果是一个新事务,那么直接回滚,如果不是一个独立的事务,只是标记需要回滚,执行完这些后,还会回调triggerAfterCompletion,然后调用TransactionSynchronization#afterCompletion方法,然后调用cleanupAfterCompletion清理资源,并且恢复被挂起的线程。

commitTransactionAfterReturning提交事务

1
2
// invokeWithinTransaction
commitTransactionAfterReturning(txInfo);

具体逻辑在processCommit中进行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// processCommit
// 钩子方法
prepareForCommit(status);
// 回调事务同步器
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;

// 存在保存点,释放资源
if (status.hasSavepoint()) {
// log
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 独立事务直接提交
else if (status.isNewTransaction()) {
// log
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}

// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}

DataSourceTransactionManager提交事务是执行的是Connection#commit

同样完成只会还会调用triggerAfterCommit,triggerAfterCompletion并清理ThreadLocal中的内容,并且恢复被挂起的事务。

事务同步回调接口TransactionSynchronization

image-20220828192354727

其中beforeCommit,beforeCompletion,aterCommitafterCompletion,只有当前事务是一个独立事务的时候才会回调,而且如果提交或者回滚的时候出现异常,beforeCompletion,afterCompletion也会被调用,我们可以通过TransactionSynchronizationManager#registerSynchronization注册回调的逻辑。

image-20220828193252799

那么什么时候事务被视作时一个独立事务昵

  • 如果外部不存在一个事务,并且传播级别是REQUIRED,REQUIRES_NEW,NESTED
  • 如果外部存在一个事务,且传播级别为REQUIRES_NEW
  • 如果外部存在一个事务,且传播级别为嵌套事务,但是此时不是通过保存点来实现嵌套事务

事务失效的场景

  1. 方法内的自调用: Spring事务是基于AOP的, 只要使用代理对象调用某个方法时,Spring事务才能生效, 而在一个方法中调用使用this.xx()调用方法时,this并不是代理目对象,所以会导致事务失效。导致的方法上的注解@Transcational失效。

    • 解放办法1:把调用方法拆分到另外-个Bean中
    • 解决办法2:自己注入自己
    • 解决办法3: AopContext.currentProxy() + @EnableAspectUAutoProxy(exposeProxy=true)
  2. 方法是private: Spring事务 spring事务的实现AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute方法中有个判断,如果目标方法不是public,则TransactionAttribute返回null,即不支持事务。

  3. 方法是final的:原因和private是一样的,因为spring事务底层实现使用了代理,aop,通过jdk的动态代理或者cglib,生成了代理类,在代理类中实现了事务功能,如果方法被final修饰,无法重写该方法,也就无法添加事务的功能了

  4. 多个的线程调用方法:当MybatisJdbcTemplate执行SQL时, 会从ThreadLocal中去获取数据库连接对象, 如果开启事务的线程和执行SQL的线程是同一个,那么就能
    拿到数据库连接对象,如果不是同一个线程,那就拿到不到数据库连接对象,这样,Mybatis或ldbcTemplate就会自 己去新建一个数据库连接用来执行SQL, 此数据库连
    接的autocommit为true,那么执行完SQL就会提交,后续再抛异常也就不能再回滚之前已经提交了的SQL了。

  5. 没加@Configuration注解:如果用SpringBoot基本没有这个问题, 但是如果用的Spring, 那么可能会有这个问题,这个问题的原因其实也是由于MybatisJdbcTemplate会从ThreadLocal中去获取数据库连接,但是ThreadLocal中存储的是一 个MAP, MAP的key为DataSource对象, value为连接对象, 而如果我们没有在AppConfig上添加@Configuration注解的话, 会导致MAP中存的DataSource对象和MybatisJdbcTemplate中的DataSource对象不相等, 从而也拿不到数据库连接,导致自己去创建数据库连接了。

  6. 异常被吃掉:如果Spring事务没有捕获到异常,那么也就不会回滚了,默认情况下Spring会捕获RuntimeExceptionError

  7. 类没有被Spring管理

  8. 数据库不支持事务

参考

https://www.cnblogs.com/cuzzz/p/16633523.html