Spring中事务的使用、抽象机制及模拟Spring事务实现
本文大纲如下:
编程式事务
Spring提供了两种编程式事务管理的方法
使用
TransactionTemplate
或者TransactionalOperator
.直接实现
TransactionManager
接口
如果是使用的是命令式编程,Spring推荐使用TransactionTemplate
来完成编程式事务管理,如果是响应式编程,那么使用TransactionalOperator
更加合适。
TransactionTemplate
使用示例(我这里直接用的官网提供的例子了)
public class SimpleService implements Service {
private final TransactionTemplate transactionTemplate;
// 使用构造对transactionTemplate进行初始化
// 需要提供一个transactionManager
public SimpleService(PlatformTransactionManager transactionManager) {
this.transactionTemplate = new TransactionTemplate(transactionManager);
}
public Object someServiceMethod() {
return transactionTemplate.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
// 这里实现自己的相关业务逻辑
updateOperation1();
return resultOfUpdateOperation2();
}
});
}
}
在上面的例子中,我们显示的使用了TransactionTemplate
来完成事务管理,通过实现TransactionCallback
接口并在其doInTransaction
方法中完成了我们对业务的处理。我们可以大概看下TransactionTemplate
的execute
方法的实现:
public T execute(TransactionCallback action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
// 1.通过事务管理器开启事务
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
// 2.执行传入的业务逻辑
result = action.doInTransaction(status);
}
catch (RuntimeException | Error ex) {
// 3.出现异常,进行回滚
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// 3.出现异常,进行回滚
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
// 4.正常执行完成的话,提交事务
this.transactionManager.commit(status);
return result;
}
}
这些方法具体的实现我们暂且不看,后续进行源码分析时都会详细介绍,之所以将这个代码贴出来是让大家更好的理解TransactionTemplate
的工作机制:实际上就是通过一个TransactionCallback
封装了业务逻辑,然后TransactionTemplate
会在事务的上下文中调用。
在上面的例子中doInTransaction
是有返回值的,而实际上有时候并不需要返回值,这种情况下,我们可以使用TransactionCallbackWithoutResult
提代TransactionCallback
。
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
updateOperation1();
updateOperation2();
}
});
❝实际上我们还可以通过TransactionTemplate
指定事务的属性,例如隔离级别、超时时间、传播行为等等
TransactionTemplate
是线程安全的,我们可以全局配置一个TransactionTemplate
,然后所有的类都共享这个TransactionTemplate
。但是,如果某个类需要特殊的事务配置,例如需要定制隔离级别,那么我们就有必要去创建不同的TransactionTemplate
。
❞
TransactionOperator
❝TransactionOperator适用于响应式编程的情况,这里就不做详细介绍了
❞
TransactionManager
实际上TransactionTemplate
内部也是使用TransactionManager
来完成事务管理的,我们之前也看过它的execute
方法的实现了,其实内部就是调用了TransactionManager
的方法,实际上就是分为这么几步
开启事务
执行业务逻辑
出现异常进行回滚
正常执行则提交事务
这里我还是直接用官网给出的例子
// 定义事务
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setName("SomeTxName");
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
// txManager,事务管理器
// 通过事务管理器开启一个事务
TransactionStatus status = txManager.getTransaction(def);
try {
// 完成自己的业务逻辑
}
catch (MyException ex) {
// 出现异常,进行回滚
txManager.rollback(status);
throw ex;
}
// 正常执行完成,提交事务
txManager.commit(status);
我们在后边的源码分析中其实重点分析的也就是TransactionManager
的源码。
申明式事务
在对编程式事务有一定了解之后我们会发现,编程式事务存在下面几个问题:
「我们的业务代码跟事务管理的代码混杂在一起」。
「每个需要事务管理的地方都需要写重复的代码」
如何解决呢?这就要用到申明式事务了,实现申明式事务一般有两种方式
基于XML配置
基于注解
申明式事务
事务的实现原理如下(图片来源于官网):
「实际上就是结合了APO自动代理跟事务相关API」。通过开启AOP自动代理并向容器中注册了事务需要的通知(Transaction Advisor),在Transaction Advisor
调用了事务相关API
,其实内部也是调用了TransactionManager
的方法。
基于XML配置这种方式就不讲了,笔者近两年时间没用过XML配置,我们主要就看看通过注解方式来实现申明式事务。主要涉及到两个核心注解
@EnableTransactionManagement
@Transactional
@EnableTransactionManagement
这个注解主要有两个作用,其一是,开启AOP自动代理,其二是,添加事务需要用到的通知(Transaction Advisor),如果你对AOP有一定了解的话那你应该知道一个Advisor
实际上就是一个绑定了切点(Pointcut)的通知(Advice),通过@EnableTransactionManagement
这个注解导入的Advisor
所绑定的切点就是通过@Transactional
来定义的。
申明式事务的例子我这里就省去了,我相信没几个人不会用吧.....
Spring对事务的抽象
Spring事务抽象的关键就是事务策略的概念,事务策略是通过TransactionManager
接口定义的。TransactionManager
本身只是一个标记接口,它有两个直接子接口
ReactiveTransactionManager
,这个接口主要用于在响应式编程模型下,不是我们要讨论的重点PlatformTransactionManager
,命令式编程模型下我们使用这个接口。
❝关于响应式跟命令式编程都可以单独写一篇文章了,本文重点不是讨论这两种编程模型,可以认为平常我们使用的都是命令式编程
❞
PlatformTransactionManager
PlatformTransactionManager接口定义
public interface PlatformTransactionManager extends TransactionManager {
// 开启事务
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
// 提交事务
void commit(TransactionStatus status) throws TransactionException;
// 回滚事务
void rollback(TransactionStatus status) throws TransactionException;
}
PlatformTransactionManager
是命令式编程模型下Spring事务机制的中心接口,定义了完成一个事务必须的三个步骤,也就是说定义了事务实现的规范
开启事务
提交事务
回滚事务
通常来说,我们不会直接实现这个接口,而是通过继承AbstractPlatformTransactionManager
,这个类是一个抽象类,主要用作事务管理的模板,这个抽象类已经实现了事务的传播行为以及跟事务相关的同步管理。
回头看接口中定义的三个方法,首先是开启事务的方法,从方法签名上来看,其作用就是通过一个TransactionDefinition
来获取一个TransactionStatus
类型的对象。为了更好的理解Spring中事务的抽象我们有必要了解下这两个接口
TransactionDefinition
接口定义如下:
public interface TransactionDefinition {
// 定义了7中事务的传播机制
int PROPAGATION_REQUIRED = 0;
int PROPAGATION_SUPPORTS = 1;
int PROPAGATION_MANDATORY = 2;
int PROPAGATION_REQUIRES_NEW = 3;
int PROPAGATION_NOT_SUPPORTED = 4;
int PROPAGATION_NEVER = 5;
int PROPAGATION_NESTED = 6;
// 4种隔离级别,-1代表的是使用数据库默认的隔离级别
// 比如在MySQL下,使用的就是ISOLATION_REPEATABLE_READ(可重复读)
int ISOLATION_DEFAULT = -1;
int ISOLATION_READ_UNCOMMITTED = 1;
int ISOLATION_READ_COMMITTED = 2;
int ISOLATION_REPEATABLE_READ = 4;
int ISOLATION_SERIALIZABLE = 8;
// 事务的超时时间,默认不限制时间
int TIMEOUT_DEFAULT = -1;
// 提供了对上面三个属性的get方法
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}
default int getTimeout() {
return TIMEOUT_DEFAULT;
}
// 事务是否是只读的,默认不是
default boolean isReadOnly() {
return false;
}
// 事务的名称
@Nullable
default String getName() {
return null;
}
// 返回一个只读的TransactionDefinition
// 只对属性提供了getter方法,所有属性都是接口中定义的默认值
static TransactionDefinition withDefaults() {
return StaticTransactionDefinition.INSTANCE;
}
}
从这个接口的名字上我们也能知道,它的主要完成了对事务定义的抽象,这些定义有些是数据库层面本身就有的,例如隔离级别、是否只读、超时时间、名称
。也有些是Spring赋予的,例如事务的传播机制
。Spring中一共定义了7种事务的传播机制
TransactionDefinition.PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。
TransactionDefinition.PROPAGATION_REQUIRES_NEW:创建一个新的事务,如果当前存在事务,则把当前事务挂起。
TransactionDefinition.PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。
TransactionDefinition.PROPAGATION_NOT_SUPPORTED:以非事务方式运行,如果当前存在事务,则把当前事务挂起。
TransactionDefinition.PROPAGATION_NEVER:以非事务方式运行,如果当前存在事务,则抛出异常。
TransactionDefinition.PROPAGATION_MANDATORY:如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。
TransactionDefinition.PROPAGATION_NESTED:如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于TransactionDefinition.PROPAGATION_REQUIRED。
关于事务的传播在源码分析的文章中我会重点介绍,现在大家留个印象即可。
我们在使用申明式事务的时候,会通过@Transactional
这个注解去申明某个方法需要进行事务管理,在@Transactional
中可以定义事务的属性,这些属性实际上就会被封装到一个TransactionDefinition
中,当然封装的时候肯定不是直接使用的接口,而是这个接口的一个实现类RuleBasedTransactionAttribute
。RuleBasedTransactionAttribute
,该类的继承关系如下:
DefaultTransactionDefinition
,实现了TransactionDefinition
,并为其中的定义的属性提供了默认值// 默认的传播机制为required,没有事务新建一个事务
// 有事务的话加入当前事务
private int propagationBehavior = PROPAGATION_REQUIRED;
// 隔离级别跟数据库默认的隔离级别一直
private int isolationLevel = ISOLATION_DEFAULT;
// 默认为-1,不设置超时时间
private int timeout = TIMEOUT_DEFAULT;
// 默认不是只读的
private boolean readOnly = false;TransactionAttribute
,扩展了``DefaultTransactionDefinition`,新增了两个事务的属性// 用于指定事务使用的事务管理器的名称
String getQualifier();
// 指定在出现哪种异常时才进行回滚
boolean rollbackOn(Throwable ex);DefaultTransactionAttribute
,继承了DefaultTransactionDefinition
,同时实现了TransactionAttribute
接口,定义了默认的回滚异常// 抛出RuntimeException/Error才进行回滚
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}RuleBasedTransactionAttribute
,@Transactional
注解的rollbackFor
等属性就会被封装到这个类中,允许程序员自己定义回滚的异常,如果没有指定回滚的异常,默认「抛出RuntimeException/Error才进行回滚」
TransactionStatus
TransactionExecution
,这个接口也是用于描述事务的状态,TransactionStatus
是在其上做的扩展,内部定义了以下几个方法// 判断当前事务是否是一个新的事务
// 不是一个新事务的话,那么需要加入到已经存在的事务中
boolean isNewTransaction();
// 事务是否被标记成RollbackOnly
// 如果被标记成了RollbackOnly,意味着事务只能被回滚
void setRollbackOnly();
boolean isRollbackOnly();
// 是否事务完成,回滚或提交都意味着事务完成了
boolean isCompleted();SavepointManager
,定义了管理保存点(Savepoint
)的方法,隔离级别为NESTED
时就是通过设置回滚点来实现的,内部定义了这么几个方法// 创建保存点
Object createSavepoint() throws TransactionException;
// 回滚到指定保存点
void rollbackToSavepoint(Object savepoint) throws TransactionException;
// 移除回滚点
void releaseSavepoint(Object savepoint) throws TransactionException;TransactionStatus
,继承了上面这些接口,额外提供了两个方法//用于判断当前事务是否设置了保存点
boolean hasSavepoint();
// 这个方法复写了父接口Flushable中的方法
// 主要用于刷新会话
// 对于Hibernate/jpa而言就是调用了其session/entityManager的flush方法
void flush();
TransactionDefinition
的主要作用是给出一份事务属性的定义,然后事务管理器根据给出的定义来创建事务,TransactionStatus
主要是用来描述创建后的事务的状态TransactionDefinition
跟TransactionStatus
有一定了解后,我们再回到PlatformTransactionManager
接口本身,PlatformTransactionManager
作为事务管理器的基础接口只是定义管理一个事务必须的三个方法:开启事务
,提交事务
,回滚事务
,接口仅仅是定义了规范而已,真正做事的还是要依赖它的实现类,所以我们来看看它的继承关系PlatformTransactionManager的实现类
AbstractPlatformTransactionManager
,Spring提供的一个事务管理的基类,提供了事务管理的模板,实现了Spring事务管理的一个标准流程判断当前是否已经存在一个事务 应用合适的事务传播行为 在必要的时候挂起/恢复事务 提交时检查事务是否被标记成为 rollback-only
在回滚时做适当的修改(是执行真实的回滚/还是将事务标记成 rollback-only
)触发注册的同步回调 在 AbstractPlatformTransactionManager
提供了四个常见的子类,其说明如下
Spring中事务的同步机制
资源的同步 行为的同步
SQL
(如果是单条的SQL实际上没有必要开启事务),为了保证事务所有的SQL
都能够使用一个数据库连接,这个时候我们需要将数据库连接跟事务进行同步,这个时候数据库连接就是跟这个事务同步的一个资源。TransactionSynchronizationManager
,这是一个抽象类,其中所有的方法都是静态的,并且所有的方法都是围绕它所申明的几个静态常量字段,如下:// 这就是同步的资源,Spring就是使用这个完成了连接的同步
private static final ThreadLocal
ThreadLocal
实现的,对于ThreadLocal
本文不做详细分析,如果对ThreadLocal
还不了解的同学也没有关系,对于本文而言你只需要知道ThreadLocal
能将资源跟当前线程绑定即可,例如ThreadLocal
这个属性就代表要将一个map绑定到当前线程,它提供了set跟get方法,分别用于将属性绑定到线程上以及获取线程上绑定的属性。synchronizations
之外其余的应该都很好理解,synchronizations
中绑定的是一个TransactionSynchronization
的集合,那么这个TransactionSynchronization
有什么用呢?我们来看看它的接口定义public interface TransactionSynchronization extends Flushable {
// 事务完成的状态
// 0 提交
// 1 回滚
// 2 异常状态,例如在事务执行时出现异常,然后回滚,回滚时又出现异常
// 就会被标记成状态2
int STATUS_COMMITTED = 0;
int STATUS_ROLLED_BACK = 1;
int STATUS_UNKNOWN = 2;
// 我们绑定的这些TransactionSynchronization需要跟事务同步
// 1.如果事务挂起,我们需要将其挂起
// 2.如果事务恢复,我们需要将其恢复
default void suspend() {
}
default void resume() {
}
@Override
default void flush() {
}
// 在事务执行过程中,提供的一些回调方法
default void beforeCommit(boolean readOnly) {
}
default void beforeCompletion() {
}
default void afterCommit() {
}
default void afterCompletion(int status) {
}
}
行为的同步
。模拟Spring事务的实现
jdbc
,在模拟之前我们先明确两点切点应该如何定义? 通知要实现什么功能?
JDBC
相关依赖即可,该项目仅仅是一个Spring环境下的Java项目,没有Web依赖,也不是SpringBoot项目,项目结构如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.dmz.frameworkgroupId>
<artifactId>mybatisartifactId>
<version>1.0-SNAPSHOTversion>
<dependencies>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.15version>
dependency>
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-contextartifactId>
<version>5.2.6.RELEASEversion>
dependency>
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-aopartifactId>
<version>5.2.6.RELEASEversion>
dependency>
<dependency>
<groupId>org.aspectjgroupId>
<artifactId>aspectjweaverartifactId>
<version>1.9.5version>
dependency>
dependencies>
project>
// 开启AOP跟扫描组件即可
@EnableAspectJAutoProxy
@ComponentScan("com.dmz.mybatis.tx_demo")
public class Config {
}
public class TransactionUtil {
public static final ThreadLocalsynchronousConnection =
new ThreadLocal();
private TransactionUtil() {
}
public static Connection startTransaction() {
Connection connection = synchronousConnection.get();
if (connection == null) {
try {
// 这里替换成你自己的连接地址即可
connection = DriverManager
.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8", "root", "123");
synchronousConnection.set(connection);
connection.setAutoCommit(false);
} catch (SQLException e) {
e.printStackTrace();
}
}
return connection;
}
public static int execute(String sql, Object... args) {
Connection connection = startTransaction();
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
if (args != null) {
for (int i = 1; i < args.length + 1; i++) {
preparedStatement.setObject(i, args[i - 1]);
}
}
return preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
return 0;
}
public static void commit() {
try (Connection connection = synchronousConnection.get()) {
connection.commit();
synchronousConnection.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void rollback() {
try (Connection connection = synchronousConnection.get()) {
connection.rollback();
synchronousConnection.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Component
public class UserService {
public void saveUser() {
TransactionUtil.execute
("INSERT INTO `test`.`user`(`id`, `name`) VALUES (?, ?)", 100, "dmz");
// 测试回滚
// throw new RuntimeException();
}
}
@Component
@Aspect
public class TxAspect {
@Pointcut("execution(public * com.dmz.mybatis.tx_demo.service..*.*(..))")
private void pointcut() {
}
@Around("pointcut()")
public Object around(JoinPoint joinPoint) throws Throwable {
// 在方法执行前开启事务
TransactionUtil.startTransaction();
// 执行业务逻辑
Object proceed = null;
try {
ProceedingJoinPoint method = (ProceedingJoinPoint) joinPoint;
proceed = method.proceed();
} catch (Throwable throwable) {
// 出现异常进行回滚
TransactionUtil.rollback();
return proceed;
}
// 方法执行完成后提交事务
TransactionUtil.commit();
return proceed;
}
}
public class Main {
public static void main(String[] args) {
AnnotationConfigApplicationContext ac =
new AnnotationConfigApplicationContext(Config.class);
UserService userService = ac.getBean(UserService.class);
userService.saveUser();
}
}
总结
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️