添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码

代码一:以下代码开启线程后,代码正常执行

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
  // 异步线程
  writeStatisticsData(studentId);
  return studentId;
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........

代码二:以下代码开启线程后,代码不正常执行

@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
   // 异步线程
  writeStatisticsData(studentId);
  // 插入学生地址记录
  Long addressId = addressService.insert(address);
  return studentId;
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........

二、问题分析

这里使用了spring事务,显然需要考虑事务的隔离级别

2.1、mysql隔离级别

查看mysql隔离级别

SELECT @@tx_isolation;
READ-COMMITTED

读提交,即在事务A插入数据过程中,事务B在A提交之前读取A插入的数据读取不到,而B在A提交之后再去读就会读取到A插入的数据,也即Read Committed不能保证在一个事务中每次读都能读到相同的数据,因为在每次读数据之后其他并发事务可能会对刚才读到的数据进行修改。

2.2、问题原因分析

  • 代码一正常运行的原因

由于mysql事务的隔离级别是读提交,test方法在开启异步线程后,异步线程也开启了事务,同时以读者身份去读 test 方法中插入的 student 记录,但此时 test 方法已经提交了事务,所以可以读取到 student 记录(即在异步方法中可以读取到 student 记录),但此代码有风险,若事务提交的时间晚一点,异步线程也有可能读取不到 student 记录。

  • 代码二不能正常运行的原因

经过上面分析,很明显异步方法中不能读取到 student 记录,由于代码二在异步线程下面又执行了其他操作,延时了test方法中事务的提交,所以代码二不能正常运行。

三、解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 获取事务属性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加载配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  // 声明式事务的处理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 编程式事务的处理......
  //......

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

public final void commit(TransactionStatus status) throws TransactionException {
    // 这里省略很多代码,如事务回滚......
		processCommit(defStatus);
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
        prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				if (status.hasSavepoint()) {
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
          // 提交事务
					doCommit(status);
				//......
			} catch (......) {
				// 事务异常处理......
			try {
        // 事务提交成功后的处理-----这里是重点
				triggerAfterCommit(status);
			} finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		finally {
			cleanupAfterCompletion(status);
private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中

public static void triggerAfterCommit() {
		invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
		if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
				synchronization.afterCommit();

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

3.1、方式一

经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 当前存在事务
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
        } else {
            // 当前不存在事务
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........

3.2、方式二

使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
// 监听器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  @Transactional
  public Long test() {
    // ......
    // 插入记录
    Long studentId = studentService.insert(student);
    // 发布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入学生地址记录
    Long addressId = addressService.insert(address);
    return studentId;

Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:

public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  @Override
	public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重点——将TransactionalEventListenerFactory加入到容器中
		registerTransactionalEventListenerFactory(parserContext);
		String mode = element.getAttribute("mode");
		if ("aspectj".equals(mode)) {
			// mode="aspectj"
			registerTransactionAspect(element, parserContext);
		else {
			// mode="proxy"
			AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
		return null;
  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
		RootBeanDefinition def = new RootBeanDefinition();
		def.setBeanClass(TransactionalEventListenerFactory.class);
		parserContext.registerBeanComponent(new BeanComponentDefinition(def,
				TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  //省略部分代码......
	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
   //省略部分代码......
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		} else if (this.annotation.fallbackExecution()) {
			//.......
			processEvent(event);
		} else {
			// 当前不存在事务什么也不做

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)。

四、使用案例

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
    // 数据库代码...
    // 异步代码
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                // 异步业务
                execApiCache(api);
    return ResultUtil.buildSucc();

Ps:setDaemon(false) 注意这里守护线程标记必须设置为 false,否则主线程执行完,异步线程没执行完的话,异步线程会马上被中断、关闭,所以这里不能设置成守护(用户)线程。

首先我们要知道,数据库中的四基本特性(ACID)原子性。事务要么全执行要么全不执行。比如A打钱给B,大概分为两步走,A扣钱和B收钱, 打成功的话,钱从A的账户转移到B中,失败的话还是留在A的账户中,不会扣了A的钱而B没收到。一致性。指数据事务不能破坏关系数据的完整性以及业务逻辑上的一致性,比如A:500 B:300这个一致性状态 在A给B转了200后变成A:200 B:500的这样另一个一致性状... 批量异步请求获取数据方法 背景:有方法A,B,C,在我请求C的时候我需要方法A以及方法B的结果做的传参 现象:在页面一加载执行方法面分别写入三个方法,按顺序写入,将方法AB的结果放到setstate面,但是C执行的时候请求接口的参数并没有从ABset进去的获取值获取 解决方案: const dataIdPromise = getCorrelationData( params, editModeCorrelationViews 俩个service方法, 方法A中调用方法B。 方法A核心业务涉及多张表的数据操作,事务采用注解:@Transactional(rollbackFor = Exception.class)。 方法B比较耗时,为了不影响核心业务,方法B 用@Async注解,单独开启一个线程去异步执行。(方法B在另外一个类边,不能和A在同一个类)。 出现的问题方法A的事务还没提交方法B就执行了,导致方法B中查到的数据还是老数据。 当时想到的解决方案,方法A事务提交后再执行方法B。 对标注了事务注解的方法进行动态代理代理方法的前置处理是获取数据库连接,定义事务信息等,存储在 ThreadLocal 中开启事务执行方法逻辑提交 / 回滚事务清除事务信息。 一致性(consistency):数据库总是从一个一致性的状态转换到另一个一致性的状态。在事务开始前后,数据库的完整性约束没有被破坏。例如违反了唯一性,必须撤销事务,返回初始状态。 隔离性(isolation):每个读写事务的对象对其他事务的操作对象能相互分离,即:事务提交前对其他事务是不可见的,通常内部加锁实现。 持久性(durabil. 开发中偶尔会遇到sql已经执行,日志都打印出来了但是数据并没有任何变化,此时多半为事务没有提交,下面记录一下最近一段时间遇到的事务没有提交问题排查思路,下文默认可以本地debu复现问题,如果是无法debug的环境只是增加了获取相应数据的复杂度(无法debug),以及查看相应数据的复杂的(需要打日志才看得到)等等,原理并没有变化 首先查看执行sql的方法是否已经开启了事务 有时可能会因为事务切面没有切到,或者没有打注解等等原因导致事务没有生效,此时可以在执行sql的方法处打断点,然后执行 Transac 1,脏读就是一个事务在处理过程中读取了另一个事务提交数据。1,事务A在多次读取同一数据事务B在数据A多次读取数据的过程中对数据进行了更新并提交, 导致事务A前后读取数据不一致。1,可重复读的隔离级别解决了不可重复读的问题,保证了同一个事务查询到的数据都是开始时 的状态。 文章目录1 背景2 问题分析2.1 mysql隔离级别2.2 问题原因分析3 解决问题方案3.1 方式一3.2 方式二 业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码 代码一:以下代码开启线程后,代码正常执行 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, Tim... 了解到一个事故,在MySQL数据库中,使用Spring管理的事务在开启以后没有在操作结束时提交或回滚,使得原有线程在后续执行数据库操作时可能继续使用原有事务,且不会提交,导致对数据库的修改在Java应用层认为是成功,但在数据库层最终是没有生效的,产生了比较严重的后果 1. 什么是线程?他和进程有什么区别?为什么使用多线程? 进程是程序运行和资源分配的基本单位,线程是CPU调度和分配的基本单位,它是程序执行的最小单元。一个程序至少有一个进程,一个进程至少有一个线程。进程在执行过程中拥有独立的内存单元(代码段,数据段和堆空间),而在一个进程中运行的多个线程并发运行,共享这些内... 事物的并发问题:1.脏读:脏读是指在一个事务处理过程中读取了另一个事务提交数据。2.不可重复读:事务A 多次读取同一数据事务B在事务A多次读取的过程中,对数据作了更新并提交,导致事务A多次读取统一数据时,结果因为本事务先后两次读到的数据结果不一致。3.幻读:可重复读的隔离级别解决了不可重复读的问题,保证了同一个事务,查询的结果都是事务开始时的状态(一致性)。MySQL默认的事务隔离级别为repeatable-read(可重复读)MySQL支持4种事务隔离级别。 不知道大家有没有想过数据库的事务隔离级别和@Transaction设置的隔离级别到底是什么关系? 数据库设置的高隔离级别,@Transaction设置低隔离级别,那么事务的隔离级别到底以谁的为主? 下面就让我们一起去用代码研究一下 首先我们mysql数据库的默认隔离级别是read-commit读已经提交; 那么我们先测试@Transactional(isolation = Isolation.READ_UNCOMMITTED) 读提交; 先上测试代码 @Service public clas